3,390 questions
Best practices
0
votes
2
replies
30
views
Which classes must be POJOs/serializable in Apache Flink? When should I use env.registerType()?
I'm mantaining a Flink application and I'm confused about which classes need to be POJOs (serializable) for Flink to reach the State compatibility between different versions of the app.
What I ...
1
vote
1
answer
47
views
Flink: Channels vs. Gates
The Flink docs mention channels and gates.
I am having difficulties inferring what a channel and what a gate is and how they differ. Are these merely logical abstractions or is there also a one-to-one ...
1
vote
2
answers
41
views
Does Flink strictly enforce CPU requests with fine-grained resource management?
Flink allows to define requirements for CPU cores using fine-grained resource management.
I am wondering if this CPU request is strictly enforced or best effort?
Example:
A task manager has 4 CPU ...
-3
votes
1
answer
157
views
Flink Job Manager Direct Buffer Memory gets exhausted when checkpointing enabled
Issue:
Flink application throws Thread 'jobmanager-io-thread-25' produced an uncaught exception. java.lang.OutOfMemoryError: Direct buffer memory and terminates after running for 2-3 days.
No matter ...
Advice
0
votes
0
replies
82
views
Flink Iceberg job loses authentication with REST catalog (Keycloak OAuth2) after short time
I’m running a Flink DataStream job that reads events from a Kafka topic and writes them into an Apache Iceberg table using the REST catalog (Lakekeeper).
Authentication to the REST catalog is ...
-1
votes
1
answer
45
views
Does Flink throughput decrease proportionally with the number of side outputs?
I have a Flink job with multiple downstream operators I want to route tuples to based on a condition. Side outputs are advertised for this use case in the Flink documentation.
However, when sending ...
Advice
0
votes
1
replies
54
views
Flink - Async IO Threads required
we are using Flink's AsyncIO function with Futures to make external gRPC calls. Currently, we have set the async capacity to 1, and we are using a blocking stub to make those calls. For each event, we ...
0
votes
1
answer
49
views
Are Flink's timer service & Collector thread-safe?
Question
Async operation & Future callback was added as the State API was upgraded to v2. Will it be thread-safe to call the Timer service & Collector from that callback?
Example
final var ...
0
votes
1
answer
65
views
Does an uninitialized ValueState occupy memory in checkpoints in Flink?
I'm using a ValueState with TTL and I want to understand the difference (if any) in the checkpointed state size/memory between two scenarios:
First scenario
I create/obtain the ValueState but never ...
1
vote
2
answers
58
views
Flink GenericType List serialization
In my Flink app, I found this log:
Field EnrichmentInfo#groupIds will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the ...
-4
votes
1
answer
87
views
Questions about Apache Flink internals, BATCH execution mode
We recently experimented with Flink, in particular BATCH execution mode to setup an ETL job processing an bounded data-set. It works quite well but I'd like to get some clarifications about my ...
0
votes
0
answers
65
views
Flink CDC + Hudi isn't working as expect, found log said state is cleared
I'm using Flink CDC + Apache Hudi in Flink to sync data from MySQl to AWS S3. My Flink job looks like:
parallelism = 1
env = StreamExecutionEnvironment.get_execution_environment(config)
...
0
votes
1
answer
49
views
High CPU usage from RowData serialization in Flink Table API despite ObjectReuse optimization
I have a Table API pipeline that does a 1-minute Tumbling Count aggregation over a set of 15 columns. FlameGraph shows that most of the CPU (~40%) goes into serializing each row, despite using ...
0
votes
1
answer
48
views
How to add a new column to flink sql job that can restore from an existing savepoint or checkpoint
I have 2 table, both use kafka connector to read data. I join these source and write data to a another kafka topic. We checkpoint every 10 minutes, so when job restart, we use execution.savepoint.path ...
0
votes
1
answer
69
views
FLink sql with mini batch seems to trigger only on checkpoint
I have the following config set for my job
'table.exec.sink.upsert-materialize': 'NONE',
'table.exec.mini-batch.enabled': true,
'table.exec.mini-batch.allow-latency'...
0
votes
1
answer
59
views
Flink DynamoDB Sink: "The provided key element does not match the schema"
I'm trying to write data from a Flink SQL job to a DynamoDB table, but I'm consistently getting the following error:
Caused by: software.amazon.awssdk.services.dynamodb.model.DynamoDbException: The ...
0
votes
0
answers
31
views
Kafka - Flink -> Complex Event Processing
I have a topic in kafka with the following messages:
{"time":"2025-07-31T17:25:31.483425Z","EventID":4624,"ComputerName":"workstation"}
{"time&...
0
votes
1
answer
48
views
Flink's Buffer debloating mechanism
I would like to understand better the functioning of Buffer Debloating in Apache Flink.
Assume that:
I have a Flink job structured like a pipeline (A -> B -> C -> D -> E)
aligned ...
0
votes
0
answers
33
views
Compacted files are written to the lowest datetime bucket among source part files instead of original part file system time
I’m working with Apache Flink 1.16 on an ETL job that reads data from Kafka and writes the output to HDFS in Parquet format. I’m using a FileSink with BulkFormat (ParquetAvroWriters), a ...
0
votes
1
answer
33
views
How to Log State Access (get/put) in Flink SQL Join Operators with Operator Metadata?
I'm using Flink SQL to join data from multiple Kafka topics. Sometimes the resulting join is non-trivial to debug, and I want to log the state access involved in the join — specifically, I’d like to ...
2
votes
0
answers
148
views
Flink FlatMap "Could not forward element to next operator"
I'm working on a Flink batch job that reads JSON strings from Kafka, converts them into Avro GenericRecords, and writes them to Parquet using AvroParquetWriters.
The JSON messages look like this:
{&...
0
votes
0
answers
54
views
The Records Received of the downstream operator is not equal to the sum of the Records Sent of the upstream operator
upstream operator
downstream operator
please check the images.
The Bytes Received of the downstream operator is equal to the Bytes Sent of the upstream operator, but the Records is different,I looked ...
0
votes
0
answers
130
views
Flink autoscaler not scaling down task manger pods
We have deployed flink pipeline using flink kubernetes operator in application mode. We have enabled flink autoscaler with the following configuration. I see that adpative scheduler is able to adjust ...
0
votes
0
answers
27
views
How to update online mean and covariance per portfolio in Flink with streaming returns?
I'm working on a streaming application using Apache Flink. I receive real-time stock return events in this format:
StockReturn{ticker='XYZ', returnValue=..., timestamp=...}
I want to update the ...
0
votes
1
answer
47
views
Sliding Window Assigner in Apache Flink
I am using flink to consume events from kafka and having a sliding window assigner for 1 hour window every 5 mins and then it has to write result in cassandra.
I want to understand how this works ...
1
vote
1
answer
75
views
How to perform an Interval Join in Flink SQL on upsert (non-append-only) tables?
I have 2 source tables in my streaming Flink application - sessions and orders. Each order belongs to a certain session. Both tables can be updated (not only appended to), but we know that sessions ...
0
votes
0
answers
96
views
Flink pipeline latency increases over time despite low processing time per task
I’m currently testing a Flink pipeline with the following architecture:
3 TaskManagers
Each TaskManager has 4 slots
The pipeline structure is:
Source → Map → KeyBy → KeyedProcessFunction → Sink
...
1
vote
0
answers
372
views
How to write a Postgres sink with Apache Flink 2.0
I been spinning my wheels for a couple days. I think the documentation does not match the api from what I see - and the java doc links appear to be broken.
I want to make a sink that saves json coming ...
0
votes
1
answer
53
views
How to assign custom operator names in PyFlink (v1.19.1) similar to .name() in Java Flink?
I’m building a streaming application using PyFlink version 1.19.1 and Python 3.9.6, running on Amazon EMR in YARN cluster mode.
When I submit my PyFlink job to the cluster, I notice that the operator ...
0
votes
0
answers
30
views
Issue with Flink Application Mode args – Custom Command Not Executing as Expected
Issue with Flink Application Mode args – Custom Command Not Executing as Expected
Description:
Apache Flink Version: 1.20.1
Java Version: 17
Platform: AWS EKS (Kubernetes)
I'm encountering an issue ...
1
vote
0
answers
50
views
Flink JDBCSource guarantees
We're using the following JDBCSource within Flink application that connects to Snowflake and retrieves some data.
So essentialy, after starting the JOB wait for 5 seconds and begin fetching rows every ...
0
votes
0
answers
76
views
How to migrate my current streaming application (Spark, delta lake, medallion architecture) to Flink 2.0
I want to migrate an existing streaming application (based on Medallion architecture) to Flink 2.0, may I get advice/best practice from architectural level?
Challenge(Data Flow):
Stream from Source A ...
1
vote
1
answer
112
views
Issue while adding Tumble Window/Watermark with TIMESTAMP AS event_time to view
Need help on fixing the below errors while adding Window on Table API.
Env is with flinkVersion = "1.20.0"
I'm trying to add Tumble Window to View and expecting to apply LISTAGG() function ...
0
votes
1
answer
161
views
Improperly specified VM option 'MaxMetaspaceSize=268435456 '
While Running Flink on Microsoft Windows 11 Enterprise with Java 17 using flink 2.0.0 getting below error
Improperly specified VM option 'MaxMetaspaceSize=268435456
'
Error: Could not create the Java ...
0
votes
0
answers
158
views
Issue with pyflink api code for inserting data into sql
I use the following development environment.
Flink : 2.0
MySQL : 8.0.4
JDK : 17.0.2
And I develop python flink API code, which inserts a simple data into MySQL.
from pyflink.common import Types
from ...
0
votes
1
answer
30
views
Always missing one latest record when using count over partition
For Flik Kafka SQL source definition:
CREATE TEMPORARY TABLE person
(
payload STRING,
`headers` MAP<STRING, BYTES> METADATA,
`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
...
2
votes
0
answers
110
views
Flink 2.x, enabling async state when using a KeyedCoProcessFunction
I'm looking to migrate a Flink 1.20 job to Flink 2.x. The main goal of this migration is to mitigate performance issues stemming from a large state store size.
From what I understand in order to use ...
0
votes
0
answers
145
views
java.lang.ClassNotFoundException: org.apache.flink.api.connector.sink2.Sink$InitContext
I'm trying to develop Flink java api maven program which inserts several documents into MongoDB. I use the following Flink environment.
Flink: 2.0
MongoDB: 8.0
OS: Windows 11
JDK: 17.0.2
pom.xml:
&...
0
votes
2
answers
62
views
collector.collect (not in main thread ) seems to skip data somehow
We put the message to threadpool immediately after current operator recevived it, and pass the collector object to the threadpool. it worked well after flink started, then we found that the next ...
0
votes
1
answer
37
views
Over Windows in Batch Mode
In the below minimal working example (executed with Flink v. 1.20), the use over an over_window in batch_mode always yields the following error message, although the time column is TIMESTAMP(3):
org....
0
votes
0
answers
126
views
RabbitMQ Source connector in apache Flink is a bottleneck for our streaming app
We have a flink rmq source connector that is ingesting messages from a queue. We have been facing problems lately with ingestion during peak traffic hours. We have not seen this issue in the past.The ...
0
votes
1
answer
128
views
Using Flink's state as a cache
I'm trying to use state as a cache for my Flink job. I will have a counter stored in the state for each key ( I'm using keyed stream) then I have to validate my event against this counter to decide if ...
1
vote
1
answer
77
views
Early(?) Triggering of Flink's Event Time Windows and Non-Deterministic Results
I'm reading a small sample of data stored in a Kafka but instead of applying watermarks directly to the source I do some processing on the data then extract the event timestamps.
I then apply event ...
0
votes
1
answer
71
views
Unable to start a pyFlink job from savepoint
I'm using Flink 1.20.0, and try to submit a pyFlink job and start it from aan existed savepoint, I execute in command line:
flink run --fromSavepoint s3a://.../1a4e1e73910e5d953183b8eb1cd6eb84/chk-1 -...
0
votes
0
answers
26
views
Flink unit test is failing to send message to kafka
I have a flink job that reads message from a topic and sends it to another topic, very basic in terms of flink.
public class RoutingJob {
private static final Logger LOG = LoggerFactory.getLogger(...
0
votes
0
answers
78
views
Kinesis Stream Consumption not working with Flink
I am writing an Consumer which consumes data records from kinesis streams using flink 1.20, but despite following all the instructions mentioned in flink 1.20 docs , i am not able to digest the ...
0
votes
1
answer
95
views
Unable to sink Hudi table to S3 in Flink
I'm trying to use Flink-cdc to capture data change from Mysql and update the Hudi table in S3.
My pyFlink job was like:
env = StreamExecutionEnvironment.get_execution_environment(config)
env....
-1
votes
1
answer
45
views
How to change the file name with updated date in flink job
I have a flink job which streams data to azure using hadoop fs.
Currently I'm able to push the data and create a new file but I want to roll the new file when there is a date change(like from 2025-03-...
1
vote
1
answer
54
views
How sqlExecute queries run in Apache Flink when triggered via proccessFunction?? How are the SQL Tasks managed?
Context:
So, I am trying to build a Flink application that runs rules dynamically. I have a rule Stream from where SQL rules are written, which Flink reads from and executes. I have connected the ...
1
vote
0
answers
39
views
Watermark strategy passed to env.from_source not being used,
Setup:
flink 1.19.1
python 3.10
Apache flink 1.19.1 python package
java 11
Kafka flink connector 3.4.0
I have am creating a KafkaSource that's reading from a Kafka topic where messages have ...