CheckPoints in Microsoft Agent Workflow

Imagine you have a Microsoft Agent Workflow that continuously processes a high-volume stream of incoming data. The data flows through multiple executors responsible for validation, summarization and downstream actions.
Now suppose that after processing thousands of events, you realize that a particular subset of the stream data failed critical checks. Now you would want to re-execute those events.
But there is a catch. The original stream data is no longer available.
The workflow processed the data in real time without workflow checkpoints. Of course the incoming data is available in a persistent data storage but that would mean recreating the stream to the input source to be re processed but it can be challenging and to an extent cumbersome.
Also imagine a different scenario where in an a operation goes through a sequence of executors and during execution one of the executors fails. In such a case, the input data associated with the failed operation cannot simply be replayed or reprocessed from the beginning because the original execution context may no longer be available.
This becomes even more challenging in long-running or streaming workflows where the operation may have already passed through multiple intermediate stages before the failure occurred. Restarting the entire workflow from scratch can lead to unnecessary re computation, duplicate processing, increased operational cost and possible data inconsistencies.
These are the most important challenges in streaming AI workflows and distributed agent systems. All you need in such scenarios is an native inbuilt AI workflow capability to rerun the process for those events without requiring them to be recreated manually. This is where MAF Checkpoints help.
To demonstrate MAF checkpoints we will use the use case from my earlier article.
CheckpointManager
The most important component in MAF checkpointing is the CheckpointManager.
It is responsible for storing and retrieval of the workflow checkpoint execution. It acts as one of the parameter to RunStreamingAsync of workflow execution. Setting this up enables the workflow to maintain checkpoints during execution.
The CheckpointManager can create a JSON object or store the checkpoint details in memory. You can use the JSON option incase you want to store the checkpoint details to persistent storage.
In this article we will create a workflow checkpoint in JSON format and store it on the filesystem.
Structure
The checkpoint manager creates an index.jsonl file that maintains a mapping between the session ID and the checkpoint ID.
index.jsonl is always used as reference for all executions and each checkpoint entries are appended into it as the workflow progresses through multiple executions.
First Execution:
Second Execution:
Third Execution:
If you observe the third session, you will realize that the number of checkpoint ID's are comparatively more than the previous two executions. This is because the input data for the third execution had to go through an additional executor compared to the previous two executions.
The no of checkpoint ID's created correspond to the no of executor's the workflow passes through.
Code
As mentioned earlier, we will reuse the example from my previous article and fit in the checkpoint manager into the code from that article.
We create object of FileSystemJsonCheckpointStore that persists the checkpoint data on the disk in JSON format.
string input = "13";
DirectoryInfo dirInfo = new DirectoryInfo("Json Directory location");
var store = new FileSystemJsonCheckpointStore(dirinfo);
and then create an instance of theCheckpointManager from the store.
CheckpointManager checkpointManager = CheckpointManager.CreateJson(store);
and finally use that instance of theCheckpointManager in InProcessExecution.
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, new ChatMessage(ChatRole.User, input), checkpointManager);
Assume that we executed the workflow for an input value of 13 that resulted in the following output along with creation of multiple checkpoints in JSON format.
In the next execution we re run the workflow for an input value of "a", as expected it results in the following output
This resulted in creation of two checkpoint files.
We now want to re-execute the workflow for value of 13 through checkpoints. The session ID for the execution of value 13 is 49d7868b625943a59a804c6c3363e1b7 and the checkpoint ID's are
854b727b737b4d31be82e265564d66ab55ffcd9eacc54efca3ba38e23ead83bf7e2007b7c29c4bc2884b67e6d92fcc91
First check if index.jsonl exists in the directory and if it does , then ensure that the file is not empty.
DirectoryInfo dirinfo = new DirectoryInfo("Json Directory location");
string filepath = Path.Combine(dirinfo.FullName, "index.jsonl");
var options = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
var list = new List<CheckpointDetails>();
if (File.Exists(filepath))
{
foreach (var line in File.ReadLines(filepath))
{
var item = JsonSerializer.Deserialize<CheckpointDetails>(line, options);
if (item != null)
{
list.Add(item);
}
}
}
If confirmed that the file isn't empty, create an instance of FileSystemJsonCheckpointStore against the underlying checkpoint directory.
if (list.Count > 0)
{
var store_checkpoint = new FileSystemJsonCheckpointStore(dirinfo);
CheckpointInfo chk = new CheckpointInfo(SessionId, CheckpointId);
CheckpointManager checkpointManagers = CheckpointManager.CreateJson(store_checkpoint);
await using StreamingRun runa = await InProcessExecution.ResumeStreamingAsync(workflow, chk, checkpointManagers);
await runa.RestoreCheckpointAsync(chk, CancellationToken.None).ConfigureAwait(false);
await foreach (WorkflowEvent evt in runa.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is SuperStepCompletedEvent completedEvent)
{
break;
}
if (evt is ExecutorInvokedEvent outputEvent)
{
var property = outputEvent.Data.GetType().GetProperty("Reason");
var reason = property?.GetValue(outputEvent.Data);
Console.WriteLine(reason);
}
}
store_checkpoint.Dispose();
}
The iteration is required so that InProcessExecution processes all the recorded checkpoints for the provided Session ID.
Now you would wonder what is the need for
if (chk.SessionId != sessionId) { continue; }
when the checkpoints for a given session ID is filtered by the following line of code.
List<CheckpointInfo> checkpoints_t = [.. await store_checkpoint.RetrieveIndexAsync(sessionId!)];
This is because of a bug related to RetrieveIndexAsync.
I have raised this issue on the official Microsoft Agent Framework GitHub repository. The fix is in process and the issue can be tracked here https://github.com/microsoft/agent-framework/issues/5942
The output for re execution for session ID 49d7868b625943a59a804c6c3363e1b7 should be.
Complete Function
public static async Task CreateCheckpointAsync(DirectoryInfo dirinfo, Workflow workflow, string SessionId, string CheckpointId)
{
string filepath = Path.Combine(dirinfo.FullName, "index.jsonl");
var options = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
var list = new List<CheckpointDetails>();
if (File.Exists(filepath))
{
foreach (var line in File.ReadLines(filepath))
{
var item = JsonSerializer.Deserialize<CheckpointDetails>(line, options);
if (item != null)
{
list.Add(item);
}
}
}
if (list.Count > 0)
{
var store_checkpoint = new FileSystemJsonCheckpointStore(dirinfo);
CheckpointInfo chk = new CheckpointInfo(SessionId, CheckpointId);
CheckpointManager checkpointManagers = CheckpointManager.CreateJson(store_checkpoint);
await using StreamingRun runa = await InProcessExecution.ResumeStreamingAsync(workflow, chk, checkpointManagers);
await runa.RestoreCheckpointAsync(chk, CancellationToken.None).ConfigureAwait(false);
await foreach (WorkflowEvent evt in runa.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is SuperStepCompletedEvent completedEvent)
{
break;
}
if (evt is ExecutorInvokedEvent outputEvent)
{
var property = outputEvent.Data.GetType().GetProperty("Reason");
var reason = property?.GetValue(outputEvent.Data);
Console.WriteLine(reason);
}
}
store_checkpoint.Dispose();
}
}
Now lets pick a checkpoint that we want to execute from. This is our workflow path.
Since we want to execute the checkpoint for an input value of 13, the workflow will follow the PrimeNumber path. As a result, there are two executors involved in this execution path.
PrimeNo_SquareRootExecutorSendDetails_Executor_RouteA
If you open the checkpoint files you will notice that there is a property named stepNumber that helps identify the sequence of executor execution.
The most important property in the checkpoint file is queuedMessages. It helps identify which executor is next in line for execution within the workflow pipeline.
For step 0 in the checkpoint file corresponding to step 0 we have the SquareRootExecutor under queuedMessages.
and for step 1 in the checkpoint file corresponding to step 1 we have the PrimeNo_SendDetails_RouteA_Executor under queuedMessages.
Note : In checkpoint files, the executor name corresponds to the actual executor name defined in the code, rather than the object name used to reference it.
To test the functionality lets make some change to the checkpoint file. In checkpoint file corresponding to step 1 , I will make some changes to the reason property.
I will append additional text : Hello from Mumbai to the value of reason property.
and when I reference this checkpoint file in the checkpoint manager , the output should be
"The square root of 13 is a non-integer value and approximately equals 3.6055512755.Hello from Mumbai"
instead of just
"The square root of 13 is a non-integer value and approximately equals 3.6055512755"
which was from the original execution
Execution >>
var workflow = builder.Build();
string input = "a";
DirectoryInfo dirinfo = new DirectoryInfo("Your checkpoint folder");
await CreateCheckpointAsync(dirinfo, workflow, "SessionID", "CheckpointID");
Very Important
Workflow execution through checkpoint reference will create its own checkpoint file. The above execution resulted in creation of a new checkpoint file.
New checkpoint file
the step no marked in this newly created checkpoint file is 0 with the reason property value picked up from the checkpoint file referenced in the previous execution.
Conclusion:
In conclusion, checkpoints in MAF act as persisted snapshots of runtime that could be restored for an in point in time execution.
Although there isn't any solid documentation on checkpoint in MAF for C# , I explored the functionality to best of my abilities. As a result, there may still be some inaccuracies or gaps in the explanation :)
On a side note , during my days when working extensively with SQL Server a number of times I used to manually run Checkpoint commands to flush out dirty pages from memory to disk (a complicated topic). Though not as similar to Checkpoints in MAF , I just remembered the Checkpoint concept of SQL Server.
Thanks for reading !!!



