0

I'm trying to develop Flink java api maven program which inserts several documents into MongoDB. I use the following Flink environment.

  • Flink: 2.0
  • MongoDB: 8.0
  • OS: Windows 11
  • JDK: 17.0.2

pom.xml:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-mongodb</artifactId>
  <version>2.0.0-1.20</version>
</dependency>
<dependency>
  <groupId>org.mongodb</groupId>
  <artifactId>bson</artifactId>
  <version>5.4.0</version>
</dependency>
<dependency>
  <groupId>org.mongodb</groupId>
  <artifactId>mongo-java-driver</artifactId>
  <version>3.12.14</version>
</dependency>

My java sources

import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.mongodb.sink.MongoSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.bson.BsonDocument;
import org.bson.Document;

import com.mongodb.client.model.InsertOneModel;

public class Mongo_Write {

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        List<Tuple2<String, Integer>> data = new ArrayList<>();
        data.add(new Tuple2<>("Hello", 1));
        data.add(new Tuple2<>("Hi", 2));
        data.add(new Tuple2<>("Hey", 3));
                
        DataStream<Tuple2<String, Integer>> stream = env.fromData(data);
            
        MongoSink<Tuple2<String, Integer>> sink = MongoSink.<Tuple2<String, Integer>>builder()
               .setUri("mongodb://127.0.0.1:27017")
               .setDatabase("test_db")
               .setCollection("test_coll")
               .setBatchSize(1000)
               .setBatchIntervalMs(1000)
               .setMaxRetries(3)
               .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
               .setSerializationSchema(
                       (input, context) 
                            -> {
                                Document doc = new Document(input.f0, input.f1);
                                return new InsertOneModel<>(BsonDocument.parse(doc.toJson()));
                           })
                .build();
                       
        stream.sinkTo(sink);
        env.execute("Flink MongoDB Test");
        env.close();
    }
}

But the unknown errors are thrown from the sources.

Exception in thread "main" java.lang.RuntimeException: Could not serialize stream nodes
        at org.apache.flink.streaming.api.graph.StreamGraph$UserDefinedObjectsHolder.serializeStreamNodes(StreamGraph.java:1579)
        at org.apache.flink.streaming.api.graph.StreamGraph$UserDefinedObjectsHolder.<init>(StreamGraph.java:1563)       
        at org.apache.flink.streaming.api.graph.StreamGraph.serializeUserDefinedInstances(StreamGraph.java:1412)
        at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:99)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1993)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1872)
        at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:68)     
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1846)
        at com.aaa.flink.Mongo_Write.main(Mongo_Write.java:46)
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Could not serialize stream node Sink: Writer-3
        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
        at org.apache.flink.streaming.api.graph.StreamGraph$UserDefinedObjectsHolder.serializeOperatorFactories(StreamGraph.java:1596)
        at org.apache.flink.streaming.api.graph.StreamGraph$UserDefinedObjectsHolder.serializeStreamNodes(StreamGraph.java:1575)
        ... 8 more
Caused by: java.lang.RuntimeException: Could not serialize stream node Sink: Writer-3
        at org.apache.flink.streaming.api.graph.StreamGraph$UserDefinedObjectsHolder.lambda$serializeOperatorFactoriesAsync$2(StreamGraph.java:1608)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/api/connector/sink2/Sink$InitContext
        at java.base/java.lang.Class.getDeclaredMethods0(Native Method)
        at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3402)
        at java.base/java.lang.Class.getDeclaredMethod(Class.java:2673)
        at java.base/java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1655)
        at java.base/java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:544)
        at java.base/java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:515)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:318)
        at java.base/java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:515)
        at java.base/java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:409)
        at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1147)
        at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1582)
        at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1539)
        at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1448)
        at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1191)
        at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:354)
        at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:502)
        at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
        at org.apache.flink.streaming.api.graph.StreamGraph$UserDefinedObjectsHolder.lambda$serializeOperatorFactoriesAsync$2(StreamGraph.java:1605)
        ... 4 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.connector.sink2.Sink$InitContext
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
        ... 22 more

Do you think these errors are due to version mismatch? Or do I miss another jars files from the pom.xml file? How to solve these errors?

1
  • Kindly inform me of the release date of issue solved flink-connector-mongodb jar component. Commented Sep 2 at 0:26

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.