Skip to main content
Filter by
Sorted by
Tagged with
Best practices
0 votes
2 replies
30 views

I'm mantaining a Flink application and I'm confused about which classes need to be POJOs (serializable) for Flink to reach the State compatibility between different versions of the app. What I ...
Marco's user avatar
  • 54
1 vote
1 answer
47 views

The Flink docs mention channels and gates. I am having difficulties inferring what a channel and what a gate is and how they differ. Are these merely logical abstractions or is there also a one-to-one ...
keezar's user avatar
  • 111
1 vote
2 answers
41 views

Flink allows to define requirements for CPU cores using fine-grained resource management. I am wondering if this CPU request is strictly enforced or best effort? Example: A task manager has 4 CPU ...
keezar's user avatar
  • 111
-3 votes
1 answer
157 views

Issue: Flink application throws Thread 'jobmanager-io-thread-25' produced an uncaught exception. java.lang.OutOfMemoryError: Direct buffer memory and terminates after running for 2-3 days. No matter ...
Strange's user avatar
  • 1,514
Advice
0 votes
0 replies
82 views

I’m running a Flink DataStream job that reads events from a Kafka topic and writes them into an Apache Iceberg table using the REST catalog (Lakekeeper). Authentication to the REST catalog is ...
Andrey's user avatar
  • 47
-1 votes
1 answer
45 views

I have a Flink job with multiple downstream operators I want to route tuples to based on a condition. Side outputs are advertised for this use case in the Flink documentation. However, when sending ...
keezar's user avatar
  • 111
Advice
0 votes
1 replies
54 views

we are using Flink's AsyncIO function with Futures to make external gRPC calls. Currently, we have set the async capacity to 1, and we are using a blocking stub to make those calls. For each event, we ...
Sidharth Ramalingam's user avatar
0 votes
1 answer
49 views

Question Async operation & Future callback was added as the State API was upgraded to v2. Will it be thread-safe to call the Timer service & Collector from that callback? Example final var ...
thekey.kim's user avatar
0 votes
1 answer
65 views

I'm using a ValueState with TTL and I want to understand the difference (if any) in the checkpointed state size/memory between two scenarios: First scenario I create/obtain the ValueState but never ...
Marco's user avatar
  • 54
1 vote
2 answers
58 views

In my Flink app, I found this log: Field EnrichmentInfo#groupIds will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the ...
Marco's user avatar
  • 54
-4 votes
1 answer
87 views

We recently experimented with Flink, in particular BATCH execution mode to setup an ETL job processing an bounded data-set. It works quite well but I'd like to get some clarifications about my ...
JackatWaterloo's user avatar
0 votes
0 answers
65 views

I'm using Flink CDC + Apache Hudi in Flink to sync data from MySQl to AWS S3. My Flink job looks like: parallelism = 1 env = StreamExecutionEnvironment.get_execution_environment(config) ...
Rinze's user avatar
  • 834
0 votes
1 answer
49 views

I have a Table API pipeline that does a 1-minute Tumbling Count aggregation over a set of 15 columns. FlameGraph shows that most of the CPU (~40%) goes into serializing each row, despite using ...
Tomás Cerdá's user avatar
0 votes
1 answer
48 views

I have 2 table, both use kafka connector to read data. I join these source and write data to a another kafka topic. We checkpoint every 10 minutes, so when job restart, we use execution.savepoint.path ...
hitesh's user avatar
  • 389
0 votes
1 answer
69 views

I have the following config set for my job 'table.exec.sink.upsert-materialize': 'NONE', 'table.exec.mini-batch.enabled': true, 'table.exec.mini-batch.allow-latency'...
hitesh's user avatar
  • 389
0 votes
1 answer
59 views

I'm trying to write data from a Flink SQL job to a DynamoDB table, but I'm consistently getting the following error: Caused by: software.amazon.awssdk.services.dynamodb.model.DynamoDbException: The ...
Chiara Capuano's user avatar
0 votes
0 answers
31 views

I have a topic in kafka with the following messages: {"time":"2025-07-31T17:25:31.483425Z","EventID":4624,"ComputerName":"workstation"} {"time&...
Adrian Cincu's user avatar
0 votes
1 answer
48 views

I would like to understand better the functioning of Buffer Debloating in Apache Flink. Assume that: I have a Flink job structured like a pipeline (A -> B -> C -> D -> E) aligned ...
Marco's user avatar
  • 54
0 votes
0 answers
33 views

I’m working with Apache Flink 1.16 on an ETL job that reads data from Kafka and writes the output to HDFS in Parquet format. I’m using a FileSink with BulkFormat (ParquetAvroWriters), a ...
Ronmeir's user avatar
  • 23
0 votes
1 answer
33 views

I'm using Flink SQL to join data from multiple Kafka topics. Sometimes the resulting join is non-trivial to debug, and I want to log the state access involved in the join — specifically, I’d like to ...
hitesh's user avatar
  • 389
2 votes
0 answers
148 views

I'm working on a Flink batch job that reads JSON strings from Kafka, converts them into Avro GenericRecords, and writes them to Parquet using AvroParquetWriters. The JSON messages look like this: {&...
Tuan Duy's user avatar
0 votes
0 answers
54 views

upstream operator downstream operator please check the images. The Bytes Received of the downstream operator is equal to the Bytes Sent of the upstream operator, but the Records is different,I looked ...
GW David's user avatar
0 votes
0 answers
130 views

We have deployed flink pipeline using flink kubernetes operator in application mode. We have enabled flink autoscaler with the following configuration. I see that adpative scheduler is able to adjust ...
Sumit Nekar's user avatar
0 votes
0 answers
27 views

I'm working on a streaming application using Apache Flink. I receive real-time stock return events in this format: StockReturn{ticker='XYZ', returnValue=..., timestamp=...} I want to update the ...
TFERHAN's user avatar
0 votes
1 answer
47 views

I am using flink to consume events from kafka and having a sliding window assigner for 1 hour window every 5 mins and then it has to write result in cassandra. I want to understand how this works ...
swapnil jain's user avatar
1 vote
1 answer
75 views

I have 2 source tables in my streaming Flink application - sessions and orders. Each order belongs to a certain session. Both tables can be updated (not only appended to), but we know that sessions ...
EgorBr's user avatar
  • 74
0 votes
0 answers
96 views

I’m currently testing a Flink pipeline with the following architecture: 3 TaskManagers Each TaskManager has 4 slots The pipeline structure is: Source → Map → KeyBy → KeyedProcessFunction → Sink ...
Trung Hiếu Nguyễn's user avatar
1 vote
0 answers
372 views

I been spinning my wheels for a couple days. I think the documentation does not match the api from what I see - and the java doc links appear to be broken. I want to make a sink that saves json coming ...
Edv Beq's user avatar
  • 1,020
0 votes
1 answer
53 views

I’m building a streaming application using PyFlink version 1.19.1 and Python 3.9.6, running on Amazon EMR in YARN cluster mode. When I submit my PyFlink job to the cluster, I notice that the operator ...
Vishal Kamlapure's user avatar
0 votes
0 answers
30 views

Issue with Flink Application Mode args – Custom Command Not Executing as Expected Description: Apache Flink Version: 1.20.1 Java Version: 17 Platform: AWS EKS (Kubernetes) I'm encountering an issue ...
BILAL ASLAM's user avatar
1 vote
0 answers
50 views

We're using the following JDBCSource within Flink application that connects to Snowflake and retrieves some data. So essentialy, after starting the JOB wait for 5 seconds and begin fetching rows every ...
ashur's user avatar
  • 4,375
0 votes
0 answers
76 views

I want to migrate an existing streaming application (based on Medallion architecture) to Flink 2.0, may I get advice/best practice from architectural level? Challenge(Data Flow): Stream from Source A ...
Yun Xing's user avatar
1 vote
1 answer
112 views

Need help on fixing the below errors while adding Window on Table API. Env is with flinkVersion = "1.20.0" I'm trying to add Tumble Window to View and expecting to apply LISTAGG() function ...
Madhusudhan Reddy's user avatar
0 votes
1 answer
161 views

While Running Flink on Microsoft Windows 11 Enterprise with Java 17 using flink 2.0.0 getting below error Improperly specified VM option 'MaxMetaspaceSize=268435456 ' Error: Could not create the Java ...
abu shaikh's user avatar
0 votes
0 answers
158 views

I use the following development environment. Flink : 2.0 MySQL : 8.0.4 JDK : 17.0.2 And I develop python flink API code, which inserts a simple data into MySQL. from pyflink.common import Types from ...
Joseph Hwang's user avatar
  • 1,433
0 votes
1 answer
30 views

For Flik Kafka SQL source definition: CREATE TEMPORARY TABLE person ( payload STRING, `headers` MAP<STRING, BYTES> METADATA, `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp', ...
王子1986's user avatar
  • 3,659
2 votes
0 answers
110 views

I'm looking to migrate a Flink 1.20 job to Flink 2.x. The main goal of this migration is to mitigate performance issues stemming from a large state store size. From what I understand in order to use ...
francis's user avatar
  • 6,419
0 votes
0 answers
145 views

I'm trying to develop Flink java api maven program which inserts several documents into MongoDB. I use the following Flink environment. Flink: 2.0 MongoDB: 8.0 OS: Windows 11 JDK: 17.0.2 pom.xml: &...
Joseph Hwang's user avatar
  • 1,433
0 votes
2 answers
62 views

We put the message to threadpool immediately after current operator recevived it, and pass the collector object to the threadpool. it worked well after flink started, then we found that the next ...
skycaptain's user avatar
0 votes
1 answer
37 views

In the below minimal working example (executed with Flink v. 1.20), the use over an over_window in batch_mode always yields the following error message, although the time column is TIMESTAMP(3): org....
Malte Winckler's user avatar
0 votes
0 answers
126 views

We have a flink rmq source connector that is ingesting messages from a queue. We have been facing problems lately with ingestion during peak traffic hours. We have not seen this issue in the past.The ...
Hariharan Janakiraman's user avatar
0 votes
1 answer
128 views

I'm trying to use state as a cache for my Flink job. I will have a counter stored in the state for each key ( I'm using keyed stream) then I have to validate my event against this counter to decide if ...
mvr's user avatar
  • 3
1 vote
1 answer
77 views

I'm reading a small sample of data stored in a Kafka but instead of applying watermarks directly to the source I do some processing on the data then extract the event timestamps. I then apply event ...
K.M's user avatar
  • 39
0 votes
1 answer
71 views

I'm using Flink 1.20.0, and try to submit a pyFlink job and start it from aan existed savepoint, I execute in command line: flink run --fromSavepoint s3a://.../1a4e1e73910e5d953183b8eb1cd6eb84/chk-1 -...
Rinze's user avatar
  • 834
0 votes
0 answers
26 views

I have a flink job that reads message from a topic and sends it to another topic, very basic in terms of flink. public class RoutingJob { private static final Logger LOG = LoggerFactory.getLogger(...
Pavanraotk's user avatar
  • 1,145
0 votes
0 answers
78 views

I am writing an Consumer which consumes data records from kinesis streams using flink 1.20, but despite following all the instructions mentioned in flink 1.20 docs , i am not able to digest the ...
Rohan Sharma's user avatar
0 votes
1 answer
95 views

I'm trying to use Flink-cdc to capture data change from Mysql and update the Hudi table in S3. My pyFlink job was like: env = StreamExecutionEnvironment.get_execution_environment(config) env....
Rinze's user avatar
  • 834
-1 votes
1 answer
45 views

I have a flink job which streams data to azure using hadoop fs. Currently I'm able to push the data and create a new file but I want to roll the new file when there is a date change(like from 2025-03-...
Prajyod Kumar's user avatar
1 vote
1 answer
54 views

Context: So, I am trying to build a Flink application that runs rules dynamically. I have a rule Stream from where SQL rules are written, which Flink reads from and executes. I have connected the ...
Sai Ashrritth Patnana's user avatar
1 vote
0 answers
39 views

Setup: flink 1.19.1 python 3.10 Apache flink 1.19.1 python package java 11 Kafka flink connector 3.4.0 I have am creating a KafkaSource that's reading from a Kafka topic where messages have ...
Abhishek Bhrushundi's user avatar

1
2 3 4 5
68