Skip to main content
deleted 44 characters in body
Source Link
James Z
  • 12.3k
  • 10
  • 27
  • 48

I tryI'm trying to develop Flink java api maven program which inserts several documents into MongoDB. And I use the following Flink environment.

Flink : 2.0
MongoDB : 8.0
OS : Windows 11
JDK : 17.0.2
  • Flink: 2.0
  • MongoDB: 8.0
  • OS: Windows 11
  • JDK: 17.0.2

The belows are myMy java sources

Do you think these errors are due to version mismatch? Or Dodo I miss another jars files from the pom.xml file? Kindly inform me howHow to solve these errors.?

I try to develop Flink java api maven program which inserts several documents into MongoDB. And I use the following Flink environment.

Flink : 2.0
MongoDB : 8.0
OS : Windows 11
JDK : 17.0.2

The belows are my java sources

Do you think these errors are due to version mismatch? Or Do I miss another jars files from the pom.xml file? Kindly inform me how to solve these errors.

I'm trying to develop Flink java api maven program which inserts several documents into MongoDB. I use the following Flink environment.

  • Flink: 2.0
  • MongoDB: 8.0
  • OS: Windows 11
  • JDK: 17.0.2

My java sources

Do you think these errors are due to version mismatch? Or do I miss another jars files from the pom.xml file? How to solve these errors?

Source Link
Joseph Hwang
  • 1.4k
  • 7
  • 42
  • 76

java.lang.ClassNotFoundException: org.apache.flink.api.connector.sink2.Sink$InitContext

I try to develop Flink java api maven program which inserts several documents into MongoDB. And I use the following Flink environment.

Flink : 2.0
MongoDB : 8.0
OS : Windows 11
JDK : 17.0.2

pom.xml :

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-mongodb</artifactId>
  <version>2.0.0-1.20</version>
</dependency>
<dependency>
  <groupId>org.mongodb</groupId>
  <artifactId>bson</artifactId>
  <version>5.4.0</version>
</dependency>
<dependency>
  <groupId>org.mongodb</groupId>
  <artifactId>mongo-java-driver</artifactId>
  <version>3.12.14</version>
</dependency>

The belows are my java sources

import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.mongodb.sink.MongoSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.bson.BsonDocument;
import org.bson.Document;

import com.mongodb.client.model.InsertOneModel;

public class Mongo_Write {

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        List<Tuple2<String, Integer>> data = new ArrayList<>();
        data.add(new Tuple2<>("Hello", 1));
        data.add(new Tuple2<>("Hi", 2));
        data.add(new Tuple2<>("Hey", 3));
                
        DataStream<Tuple2<String, Integer>> stream = env.fromData(data);
            
        MongoSink<Tuple2<String, Integer>> sink = MongoSink.<Tuple2<String, Integer>>builder()
               .setUri("mongodb://127.0.0.1:27017")
               .setDatabase("test_db")
               .setCollection("test_coll")
               .setBatchSize(1000)
               .setBatchIntervalMs(1000)
               .setMaxRetries(3)
               .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
               .setSerializationSchema(
                       (input, context) 
                            -> {
                                Document doc = new Document(input.f0, input.f1);
                                return new InsertOneModel<>(BsonDocument.parse(doc.toJson()));
                           })
                .build();
                       
        stream.sinkTo(sink);
        env.execute("Flink MongoDB Test");
        env.close();
    }
}

But the unknown errors are thrown from the sources.

Exception in thread "main" java.lang.RuntimeException: Could not serialize stream nodes
        at org.apache.flink.streaming.api.graph.StreamGraph$UserDefinedObjectsHolder.serializeStreamNodes(StreamGraph.java:1579)
        at org.apache.flink.streaming.api.graph.StreamGraph$UserDefinedObjectsHolder.<init>(StreamGraph.java:1563)       
        at org.apache.flink.streaming.api.graph.StreamGraph.serializeUserDefinedInstances(StreamGraph.java:1412)
        at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:99)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1993)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1872)
        at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:68)     
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1846)
        at com.aaa.flink.Mongo_Write.main(Mongo_Write.java:46)
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Could not serialize stream node Sink: Writer-3
        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
        at org.apache.flink.streaming.api.graph.StreamGraph$UserDefinedObjectsHolder.serializeOperatorFactories(StreamGraph.java:1596)
        at org.apache.flink.streaming.api.graph.StreamGraph$UserDefinedObjectsHolder.serializeStreamNodes(StreamGraph.java:1575)
        ... 8 more
Caused by: java.lang.RuntimeException: Could not serialize stream node Sink: Writer-3
        at org.apache.flink.streaming.api.graph.StreamGraph$UserDefinedObjectsHolder.lambda$serializeOperatorFactoriesAsync$2(StreamGraph.java:1608)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/api/connector/sink2/Sink$InitContext
        at java.base/java.lang.Class.getDeclaredMethods0(Native Method)
        at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3402)
        at java.base/java.lang.Class.getDeclaredMethod(Class.java:2673)
        at java.base/java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1655)
        at java.base/java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:544)
        at java.base/java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:515)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:318)
        at java.base/java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:515)
        at java.base/java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:409)
        at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1147)
        at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1582)
        at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1539)
        at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1448)
        at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1191)
        at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:354)
        at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:502)
        at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
        at org.apache.flink.streaming.api.graph.StreamGraph$UserDefinedObjectsHolder.lambda$serializeOperatorFactoriesAsync$2(StreamGraph.java:1605)
        ... 4 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.connector.sink2.Sink$InitContext
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
        ... 22 more

Do you think these errors are due to version mismatch? Or Do I miss another jars files from the pom.xml file? Kindly inform me how to solve these errors.