0

I am wondering whether it is possible to distinguish elements from offset time in Apache Flink sliding windows. Let me explain it in more details.

I want to take elements from eg.: 13:00 to 13:59:59. However, I also added an offset/slide so I have also elements from 12:30 so I can calculate things based on the values from previous hour, but I do not want to count them again (it was already counted during previous window). Is it possible out of the box to skip calculation of the elements from previous hour?

My stream definition:

var myStream= env.fromSource(source, watermarkStrategy, "data")
    .keyBy(MyType::getKey)
    .connect(otherStream)
    .process(new EnrichWithMyTypeProcessFunction())
    .assignTimestampsAndWatermarks(watermarkStrategyWithMyType)
    .keyBy(elem -> elem.getKey())
    .window(SlidingEventTimeWindows.of(Time.minutes(90), Time.minutes(60), Time.minutes(-30)))
    .trigger(EventTimeTrigger.create())
    .sideOutputLateData(lateDataTag)
    .process(new CalculateFromHourFunction());

Thanks for any hint!

1 Answer 1

1

I think you could do this using a 30 minute slide, and no .trigger(). In your CalculateFromHourFunction you'll see all of the elements from the 90 minute window (12:30 to 13:59:59.999, in your example), so you can do whatever calculation you need that informs the 1 hour window with values from the preceding 30 minutes.

This isn't the most efficient approach, since each window element would be collected and processed twice, once for the 30 minute pre-window, and a second time for the 1 hour actual window. You could do better with a custom KeyedProcessFunction, where you do your own windowing and use state to store the 30 minute pre-window result, but that would be a lot more complex.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.