6
\$\begingroup\$

I'm new to TPL Dataflow and was looking for an action block to basically push an object on a timer - specifically to produce heartbeats on every interval. I was unable to find anything out of the box so I decided to create a wrapper class on action block that would give me the desired functionality. I also wanted to include async/await pattern in the implementation.

Given the following heartbeat object:

public class Heartbeat
{
    public DateTime HearbeatTimeUtc { get; set; }
}

The wrapper class:

public class TimerActionBlock<TInput>
{
    private readonly ActionBlock<TInput> _actionBlock;

    public TimerActionBlock(Func<TInput, Task> func,
        Func<TInput, Exception, Task> excptionFunc)
    {
        _actionBlock = new ActionBlock<TInput>(async input =>
        {
            try
            {
                await func(input);
            }
            catch (Exception e)
            {
                await excptionFunc(input, e);
            }
        });
    }

    public void Start(Func<Task<TInput>> inputFunc, 
        TaskScheduler scheduler,
        TimeSpan dueTime,
        TimeSpan period,
        CancellationToken cancellationToken,
        Action<Exception> onError)
    {
        Task
            .Factory
            .StartNew(async () =>
                {
                    await Task.Delay(dueTime);

                    while (true)
                    {
                        var input = await inputFunc();

                        await _actionBlock.SendAsync(input);

                        var task = Task.Delay(period,
                            cancellationToken);

                        try
                        {
                            await task;
                        }
                        catch (TaskCanceledException)
                        {
                            return;
                        }
                    }
                }, 
                cancellationToken, 
                TaskCreationOptions.LongRunning, 
                scheduler)
            .ContinueWith(task => { onError(task.Exception); }, TaskContinuationOptions.OnlyOnFaulted);
    }
}

Calling code would something like the following:

var timerActionBlock = new TimerActionBlock<Heartbeat>(heartbeat =>
    {
        Console.WriteLine(JsonConvert.SerializeObject(heartbeat));
        return Task.CompletedTask;
    },
    (heartbeat, exception) =>
    {
        Console.WriteLine(exception);
        return Task.CompletedTask;
    });

timerActionBlock
    .Start(() => Task.FromResult(new Heartbeat
        {
            HearbeatTimeUtc = DateTime.UtcNow
        }), 
        TaskScheduler.Default, 
        TimeSpan.FromSeconds(10), 
        TimeSpan.FromMilliseconds(1000), 
        CancellationToken.None, 
        Console.WriteLine);
\$\endgroup\$
2
  • \$\begingroup\$ TPL DataFlow is about creating a data mesh and having data flow. I'm not sure I understand the need to have an action block that just fires on a timer. When you say you want the timer to push an object on a timer do you want this to be the source of the Data Flow? If so then Action block isn't the correct block to base it on. With more information I could maybe help provide an answer \$\endgroup\$ Commented Dec 31, 2019 at 17:02
  • \$\begingroup\$ You might also look at the periodic stuff in this library that I created: github.com/BrannonKing/Kts.ActorsLite \$\endgroup\$ Commented Jan 30, 2020 at 16:46

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.