I'm trying to develop Flink java api maven program which inserts several documents into MongoDB. I use the following Flink environment.
- Flink: 2.0
- MongoDB: 8.0
- OS: Windows 11
- JDK: 17.0.2
pom.xml:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>2.0.0-1.20</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>bson</artifactId>
<version>5.4.0</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.12.14</version>
</dependency>
My java sources
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.mongodb.sink.MongoSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.bson.BsonDocument;
import org.bson.Document;
import com.mongodb.client.model.InsertOneModel;
public class Mongo_Write {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("Hello", 1));
data.add(new Tuple2<>("Hi", 2));
data.add(new Tuple2<>("Hey", 3));
DataStream<Tuple2<String, Integer>> stream = env.fromData(data);
MongoSink<Tuple2<String, Integer>> sink = MongoSink.<Tuple2<String, Integer>>builder()
.setUri("mongodb://127.0.0.1:27017")
.setDatabase("test_db")
.setCollection("test_coll")
.setBatchSize(1000)
.setBatchIntervalMs(1000)
.setMaxRetries(3)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setSerializationSchema(
(input, context)
-> {
Document doc = new Document(input.f0, input.f1);
return new InsertOneModel<>(BsonDocument.parse(doc.toJson()));
})
.build();
stream.sinkTo(sink);
env.execute("Flink MongoDB Test");
env.close();
}
}
But the unknown errors are thrown from the sources.
Exception in thread "main" java.lang.RuntimeException: Could not serialize stream nodes
at org.apache.flink.streaming.api.graph.StreamGraph$UserDefinedObjectsHolder.serializeStreamNodes(StreamGraph.java:1579)
at org.apache.flink.streaming.api.graph.StreamGraph$UserDefinedObjectsHolder.<init>(StreamGraph.java:1563)
at org.apache.flink.streaming.api.graph.StreamGraph.serializeUserDefinedInstances(StreamGraph.java:1412)
at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:99)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1993)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1872)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:68)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1846)
at com.aaa.flink.Mongo_Write.main(Mongo_Write.java:46)
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Could not serialize stream node Sink: Writer-3
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at org.apache.flink.streaming.api.graph.StreamGraph$UserDefinedObjectsHolder.serializeOperatorFactories(StreamGraph.java:1596)
at org.apache.flink.streaming.api.graph.StreamGraph$UserDefinedObjectsHolder.serializeStreamNodes(StreamGraph.java:1575)
... 8 more
Caused by: java.lang.RuntimeException: Could not serialize stream node Sink: Writer-3
at org.apache.flink.streaming.api.graph.StreamGraph$UserDefinedObjectsHolder.lambda$serializeOperatorFactoriesAsync$2(StreamGraph.java:1608)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/api/connector/sink2/Sink$InitContext
at java.base/java.lang.Class.getDeclaredMethods0(Native Method)
at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3402)
at java.base/java.lang.Class.getDeclaredMethod(Class.java:2673)
at java.base/java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1655)
at java.base/java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:544)
at java.base/java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:515)
at java.base/java.security.AccessController.doPrivileged(AccessController.java:318)
at java.base/java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:515)
at java.base/java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:409)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1147)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1582)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1539)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1448)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1191)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:354)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:502)
at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
at org.apache.flink.streaming.api.graph.StreamGraph$UserDefinedObjectsHolder.lambda$serializeOperatorFactoriesAsync$2(StreamGraph.java:1605)
... 4 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.connector.sink2.Sink$InitContext
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
... 22 more
Do you think these errors are due to version mismatch? Or do I miss another jars files from the pom.xml file? How to solve these errors?