0

I use the following development environment.

  • Flink : 2.0
  • MySQL : 8.0.4
  • JDK : 17.0.2

And I develop python flink API code, which inserts a simple data into MySQL.

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.jdbc import JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink
from pyflink.datastream.stream_execution_environment import RuntimeExecutionMode

env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(1)

env.add_jars('file:///Users/joseph/Downloads/flink-connector-jdbc-core-4.0.0-2.0.jar',
             'file:///Users/joseph/Downloads/flink-connector-jdbc-mysql-4.0.0-2.0.jar',
             'file:///Users/joseph/Downloads/mysql-connector-j-8.4.0.jar')

type_info = Types.ROW([Types.STRING(), Types.INT()])
ds = env.from_collection([('GM', 300), ('Volvo', 400)], type_info=type_info)

insert_sql = 'insert into Car (brand, price) values (?, ?)'

jdbc_exe_option = JdbcExecutionOptions.builder() \
                .with_batch_interval_ms(1000) \
                .with_batch_size(200) \
                .with_max_retries(5) \
                .build()

jdbc_conn_option = JdbcConnectionOptions.JdbcConnectionOptionsBuilder() \
                   .with_url('jdbc:mysql://localhost:3306/etlmysql') \
                   .with_driver_name('com.mysql.cj.jdbc.Driver') \
                   .with_user_name('root') \
                   .with_password('p@$$w0rd') \
                   .build()

sink = JdbcSink.sink(insert_sql, type_info, jdbc_conn_option, jdbc_exe_option)

ds.add_sink(sink)

env.execute()
env.close()

But the error message are thrown from JdbcSink.sink line.

File "c:\VSCode_Workspace\flink-python\com\aaa\flink\flink-jdbc-test.py", line 32, in <module>
    sink = JdbcSink.sink(insert_sql, type_info, jdbc_conn_option, jdbc_exe_option)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\VSCode_Workspace\.venv-etl\Lib\site-packages\pyflink\datastream\connectors\jdbc.py", line 60, in sink
    j_builder_method = output_format_clz.getDeclaredMethod('createRowJdbcStatementBuilder',
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\VSCode_Workspace\.venv-etl\Lib\site-packages\py4j\java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "C:\VSCode_Workspace\.venv-etl\Lib\site-packages\pyflink\util\exceptions.py", line 162, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File "C:\VSCode_Workspace\.venv-etl\Lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o71.getDeclaredMethod.
: java.lang.NoSuchMethodException: org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.createRowJdbcStatementBuilder([I)
        at java.base/java.lang.Class.getDeclaredMethod(Class.java:2675)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)      
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)

For your information, I attach Java code of the same flink MySQL connector.

JdbcExecutionOptions jdbcExeOption = JdbcExecutionOptions.builder()
        .withBatchSize(1000)
        .withBatchIntervalMs(200)
        .withMaxRetries(5)
        .build();
                
JdbcConnectionOptions jdbcConnOption = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withUrl("jdbc:mysql://localhost:3306/etlmysql")
        .withDriverName("com.mysql.cj.jdbc.Driver")
        .withUsername("root")
        .withPassword("p@$$w0rd")
        .build();
        
JdbcSink<Car> jdbcSink = new JdbcSinkBuilder<Car>()
        .withQueryStatement(
                "INSERT INTO car VALUES(?,?)",
                (statement, car) -> {
                        statement.setString(1, car.getBrand());
                        statement.setInt(2, car.getPrice());
                })
        .withExecutionOptions(jdbcExeOption)
        .buildAtLeastOnce(jdbcConnOption);
               
        
stream.sinkTo(jdbcSink);

As you see, java JdbcSink connector class has different shape from python JdbcSink connector. In Java code, jdbcSink object is generated from JdbcSinkBuilder class, but in python it is not. I think these errors are due to API version mismatch. Any idea to solve these errors?

1
  • Someone kindly inform me when the next version of flink-connector-jdbc-core will be released. Commented Sep 2 at 0:23

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.