I use the following development environment.
- Flink : 2.0
- MySQL : 8.0.4
- JDK : 17.0.2
And I develop python flink API code, which inserts a simple data into MySQL.
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.jdbc import JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink
from pyflink.datastream.stream_execution_environment import RuntimeExecutionMode
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(1)
env.add_jars('file:///Users/joseph/Downloads/flink-connector-jdbc-core-4.0.0-2.0.jar',
'file:///Users/joseph/Downloads/flink-connector-jdbc-mysql-4.0.0-2.0.jar',
'file:///Users/joseph/Downloads/mysql-connector-j-8.4.0.jar')
type_info = Types.ROW([Types.STRING(), Types.INT()])
ds = env.from_collection([('GM', 300), ('Volvo', 400)], type_info=type_info)
insert_sql = 'insert into Car (brand, price) values (?, ?)'
jdbc_exe_option = JdbcExecutionOptions.builder() \
.with_batch_interval_ms(1000) \
.with_batch_size(200) \
.with_max_retries(5) \
.build()
jdbc_conn_option = JdbcConnectionOptions.JdbcConnectionOptionsBuilder() \
.with_url('jdbc:mysql://localhost:3306/etlmysql') \
.with_driver_name('com.mysql.cj.jdbc.Driver') \
.with_user_name('root') \
.with_password('p@$$w0rd') \
.build()
sink = JdbcSink.sink(insert_sql, type_info, jdbc_conn_option, jdbc_exe_option)
ds.add_sink(sink)
env.execute()
env.close()
But the error message are thrown from JdbcSink.sink line.
File "c:\VSCode_Workspace\flink-python\com\aaa\flink\flink-jdbc-test.py", line 32, in <module>
sink = JdbcSink.sink(insert_sql, type_info, jdbc_conn_option, jdbc_exe_option)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\VSCode_Workspace\.venv-etl\Lib\site-packages\pyflink\datastream\connectors\jdbc.py", line 60, in sink
j_builder_method = output_format_clz.getDeclaredMethod('createRowJdbcStatementBuilder',
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\VSCode_Workspace\.venv-etl\Lib\site-packages\py4j\java_gateway.py", line 1322, in __call__
return_value = get_return_value(
^^^^^^^^^^^^^^^^^
File "C:\VSCode_Workspace\.venv-etl\Lib\site-packages\pyflink\util\exceptions.py", line 162, in deco
return f(*a, **kw)
^^^^^^^^^^^
File "C:\VSCode_Workspace\.venv-etl\Lib\site-packages\py4j\protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o71.getDeclaredMethod.
: java.lang.NoSuchMethodException: org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.createRowJdbcStatementBuilder([I)
at java.base/java.lang.Class.getDeclaredMethod(Class.java:2675)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
For your information, I attach Java code of the same flink MySQL connector.
JdbcExecutionOptions jdbcExeOption = JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build();
JdbcConnectionOptions jdbcConnOption = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/etlmysql")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername("root")
.withPassword("p@$$w0rd")
.build();
JdbcSink<Car> jdbcSink = new JdbcSinkBuilder<Car>()
.withQueryStatement(
"INSERT INTO car VALUES(?,?)",
(statement, car) -> {
statement.setString(1, car.getBrand());
statement.setInt(2, car.getPrice());
})
.withExecutionOptions(jdbcExeOption)
.buildAtLeastOnce(jdbcConnOption);
stream.sinkTo(jdbcSink);
As you see, java JdbcSink connector class has different shape from python JdbcSink connector. In Java code, jdbcSink object is generated from JdbcSinkBuilder class, but in python it is not. I think these errors are due to API version mismatch. Any idea to solve these errors?