I'm new to TPL Dataflow and was looking for an action block to basically push an object on a timer - specifically to produce heartbeats on every interval. I was unable to find anything out of the box so I decided to create a wrapper class on action block that would give me the desired functionality. I also wanted to include async/await pattern in the implementation.
Given the following heartbeat object:
public class Heartbeat
{
public DateTime HearbeatTimeUtc { get; set; }
}
The wrapper class:
public class TimerActionBlock<TInput>
{
private readonly ActionBlock<TInput> _actionBlock;
public TimerActionBlock(Func<TInput, Task> func,
Func<TInput, Exception, Task> excptionFunc)
{
_actionBlock = new ActionBlock<TInput>(async input =>
{
try
{
await func(input);
}
catch (Exception e)
{
await excptionFunc(input, e);
}
});
}
public void Start(Func<Task<TInput>> inputFunc,
TaskScheduler scheduler,
TimeSpan dueTime,
TimeSpan period,
CancellationToken cancellationToken,
Action<Exception> onError)
{
Task
.Factory
.StartNew(async () =>
{
await Task.Delay(dueTime);
while (true)
{
var input = await inputFunc();
await _actionBlock.SendAsync(input);
var task = Task.Delay(period,
cancellationToken);
try
{
await task;
}
catch (TaskCanceledException)
{
return;
}
}
},
cancellationToken,
TaskCreationOptions.LongRunning,
scheduler)
.ContinueWith(task => { onError(task.Exception); }, TaskContinuationOptions.OnlyOnFaulted);
}
}
Calling code would something like the following:
var timerActionBlock = new TimerActionBlock<Heartbeat>(heartbeat =>
{
Console.WriteLine(JsonConvert.SerializeObject(heartbeat));
return Task.CompletedTask;
},
(heartbeat, exception) =>
{
Console.WriteLine(exception);
return Task.CompletedTask;
});
timerActionBlock
.Start(() => Task.FromResult(new Heartbeat
{
HearbeatTimeUtc = DateTime.UtcNow
}),
TaskScheduler.Default,
TimeSpan.FromSeconds(10),
TimeSpan.FromMilliseconds(1000),
CancellationToken.None,
Console.WriteLine);