Durable Workflows In Microsoft Agent Framework

My earlier article on workflows in MAF was focused on implementing them within a Console application. Those types of workflows run entirely in memory through an in-process runner.
Another article focused on setting up Azure Durable agents with Docker Durable Task Scheduler (DTS) Emulator. In that article, we had to expose and manage custom API endpoints to trigger and control the agent orchestration process which requires additional implementation.
But with workflows running as durable workflows there is no need to define custom endpoints for workflow invocation. The durable workflow runtime automatically handles workflow execution , persistence, checkpointing and orchestration behind the scenes.
In this article we will see how Durable workflows can be implemented in MAF.
Use Case
The use case in this article is a more simplified version of the use case that I used in my article that was an introduction to the workflow execution. The reason I simplified the use case is for two reasons
The use case was pretty complex and I received feedback from some readers suggesting that it made them quite confused.
The use case extensively used switch/conditions to set up conditional executors which unfortunately does not work in Durable workflows at the time of this writing. I have reported the issue on MAF GitHub.
Link : https://github.com/microsoft/agent-framework/issues/6722
A simplified use case in this article
PrimeNumberDetector >> SquareRootCalculator >> NotificationSender
PrimeNumberDetector : Detects if a given number is a prime number.
SquareRootCalculator : Calculates the square root of the given number.
NotificationSender : Sends the findings of the above two executors.
Think of it as a serial workflow execution.
SetUp
Ensure that you have DTS running. You can refer to this article on how to setup Durable Task Scheduler (DTS) on Docker .
Create a new Azure Function project and add the following references.
dotnet add package Azure;
dotnet add package Azure.AI.OpenAI;
dotnet add package Microsoft.Agents.AI;
dotnet add package Microsoft.Extensions.AI;
dotnet add package Microsoft.Extensions.Configuration;
dotnet add package Microsoft.Extensions.DependencyInjection;
dotnet add package Microsoft.Extensions.Hosting;
dotnet add package Microsoft.Agents.AI.Hosting.AzureFunctions--prerelase;
dotnet add package Microsoft.Azure.Functions.Worker.Builder;
dotnet add package Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore;
dotnet add package Microsoft.Azure.Functions.Worker.Extensions.DurableTask;
dotnet add package Microsoft.Azure.Functions.Worker.Extensions.DurableTask.AzureManaged;
Of the above ,ensure that you don't miss to reference the following two libraries in the project.
Microsoft.Azure.Functions.Worker.Extensions.DurableTask;
Microsoft.Azure.Functions.Worker.Extensions.DurableTask.AzureManaged;
Otherwise you will face issue that is highlighted in the following GitHub post.
https://github.com/microsoft/agent-framework/issues/5927
Not referencing the above two libraries, will result in you having to declare a dummy orchestrator.
public static class MyDummyOrchestrator
{
[Function(nameof(MyDummyOrchestrator))]
public static Task RunOrchestrator(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
return Task.CompletedTask;
}
}
This is because the function worker fails to find an entry point of execution.
These are the major packages and version numbers that I have referenced in the project.
Code
Implementation is pretty straightforward.
After the above artifacts in place, add the following code to read the settings from appsettings.json in Program.cs of the project.
var configuration = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: false)
.Build();
Read the credentials and register a Chatclient as a keyedservice.
var credential = new AzureKeyCredential(configuration["AppSettings:ApiKey"]);
ServiceCollection servicecollection = new();
builder.Services.AddKeyedChatClient(
"ChatClient",
(
sp =>
new AzureOpenAIClient(new Uri(configuration["AppSettings:EndPoint"]), credential)
.GetChatClient(configuration["AppSettings:Chat_DeploymentName"])
.AsIChatClient()
)
);
In the next step, register the ChatClientAgent in the hosted DI container to identify if the given number is a Prime number or a Non Prime number.
servicecollection.AddSingleton<ChatClientAgent>(sp =>
{
Func<ChatClientAgentOptions> func = () =>
{
return new ChatClientAgentOptions
{
ChatOptions = new ChatOptions
{
Instructions = "You are a helpful agent. You check if a given number is a prime number or a non prime number",
ResponseFormat = Microsoft.Extensions.AI.ChatResponseFormat.ForJsonSchema<DetectionResult>()
},
Name = "Number Detector",
Id = "1"
};
};
return new ChatClientAgent(sp.GetKeyedService<IChatClient>("ChatClient"), options: func());
});
Register another ChatClientAgent in the hosted DI container that calculates the square root of the provided number.
servicecollection.AddSingleton<ChatClientAgent>(sp =>
{
Func<ChatClientAgentOptions> func = () =>
{
return new ChatClientAgentOptions
{
ChatOptions = new ChatOptions
{
Instructions = "You are a helpful assistant.You calculate the square root of the provided number",
ResponseFormat = Microsoft.Extensions.AI.ChatResponseFormat.ForJsonSchema<_Response>()
},
Name = "SquareRootCalculator",
Id = "2"
};
};
return new ChatClientAgent(sp.GetKeyedService<IChatClient>("ChatClient"), options: func());
});
For the ResponseFormat for the above two agents create the underlying class structure.
public sealed class DetectionResult
{
[JsonPropertyName("numberType")]
[JsonConverter(typeof(JsonStringEnumConverter))]
public NumberType numberType { get; set; }
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
[JsonPropertyName("id")]
public string Id { get; set; } = string.Empty;
[JsonPropertyName("inputnumber")]
public string InputNumber { get; set; } = string.Empty;
}
public sealed class _Response
{
[JsonPropertyName("id")]
public string Id { get; set; } = string.Empty;
[JsonPropertyName("inputnumber")]
public string InputNumber { get; set; } = string.Empty;
[JsonPropertyName("squareroot")]
public string SquareRoot { get; set; } = string.Empty;
}
The NumberType property in the DetectionResult class is an Enum type.
public enum NumberType
{
PrimeNumber,
NotPrimeNumber,
UnSure
}
The Input object maps to the user input.
internal sealed class Input
{
[JsonPropertyName("id")]
public string Id { get; set; } = String.Empty;
[JsonPropertyName("inputnumber")]
public string InputNumber { get; set; } = String.Empty;
}
Scopes to be used for shared states
internal static class NumbervalueConstants
{
public const string NumbervalueScope = "Numbervalue";
public const string NumberTypeScope = "Numbertype";
}
TypeDetectionExecutor >>
The TypeDetectionExecutor identifies if given number is a Prime number or a Non Prime number through the first agent Number Detector declared earlier.
Input to the executor is the Input object and output is the _Response object.
internal sealed class TypeDetectionExecutor : Executor<Input, DetectionResult>
{
private readonly AIAgent _typeDetectionAgent;
public TypeDetectionExecutor(AIAgent typeDetectionAgent) : base("TypeDetectionExecutor")
{
this._typeDetectionAgent = typeDetectionAgent;
}
[MessageHandler]
public override async ValueTask<DetectionResult> HandleAsync(Input input, IWorkflowContext context, CancellationToken cancellationToken = default)
{
var Input = new Input { Id = Guid.NewGuid().ToString(), InputNumber = (input.InputNumber) };
await context.QueueStateUpdateAsync(Input.Id, Input, scopeName: NumbervalueConstants.NumbervalueScope);
var output = await _typeDetectionAgent.RunAsync(input.InputNumber);
var detectionResult = JsonSerializer.Deserialize<DetectionResult>(output.Text);
detectionResult.Id = Input.Id;
return detectionResult;
}
}
We store the Input object as value with Input.Id being the key to the shared state through the NumbervalueScope.
await context.QueueStateUpdateAsync(Input.Id,Input, scopeName: NumbervalueConstants.NumbervalueScope);
SquareRootCalculatorExecutor >>
This executor calculates the square root of the given value through the second agent SquareRootCalculator declared earlier.
Input to executor is DetectionResult object and output is _Response object.
internal sealed class SquareRootCalculatorExecutor : Executor<DetectionResult, _Response>
{
public readonly AIAgent _squarerootagent;
public SquareRootCalculatorExecutor(AIAgent squarerootagent) : base("SquareRootCalculatorExecutor")
{
this._squarerootagent = squarerootagent;
}
[MessageHandler]
public override async ValueTask<_Response> HandleAsync(DetectionResult detectionResult, IWorkflowContext context, CancellationToken cancellationToken = default)
{
var input = await context.ReadStateAsync<Input>(detectionResult.Id, scopeName: NumbervalueConstants.NumbervalueScope);
var output = await this._squarerootagent.RunAsync(input.InputNumber);
var response = JsonSerializer.Deserialize<_Response>(output.Text);
response.InputNumber = input.InputNumber;
response.Id = input.Id;
await context.QueueStateUpdateAsync(input.Id, detectionResult, scopeName: NumbervalueConstants.NumberTypeScope);
return response;
}
}
In the above code, we read input through the shared state NumbervalueScope
var input = await context.ReadStateAsync<Input>(detectionResult.Id, scopeName: NumbervalueConstants.NumbervalueScope);
which then acts as the input to the agent as follows
var output = await this._squarerootagent.RunAsync(input.InputNumber);
Store detectionResult to the shared state through scope NumberTypeScope
await context.QueueStateUpdateAsync(input.Id, detectionResult, scopeName: NumbervalueConstants.NumberTypeScope);
SendNumberNotificationExecutor >>
The Input to the executor is _Response object. The executor YieldsOutput of type string.
internal sealed class SendNumberNotificationExecutor() : Executor<_Response>("SendNumberNotificationExecutor")
{
[YieldsOutput(typeof(string))]
public override async ValueTask HandleAsync(_Response response, IWorkflowContext context, CancellationToken cancellationToken = default)
{
var numbertype = await context.ReadStateAsync<DetectionResult>(response.Id, scopeName: NumbervalueConstants.NumberTypeScope);
await context.YieldOutputAsync($"The value {response.InputNumber} is a {numbertype.numberType} and its square root is {response.SquareRoot}");
}
}
Now that we have all the underlying structure and objects ready, time to piece them together.
Build service and fetch the agent from ServiceProvider as a ChatClientAgent.
ServiceProvider serviceProvider = servicecollection.BuildServiceProvider();
var agent = serviceProvider.GetServices<ChatClientAgent>();
List<ChatClientAgent> chatclientagent = new(agent);
Assign the chatclientagents to the executors.
var typeDetectionExecutor = new TypeDetectionExecutor(chatclientagent[0]);
var squareRootcalculatorExecutor = new SquareRootCalculatorExecutor(chatclientagent[1]);
Output of the workflow will be through the SendNumberNotificationExecutor
SendNumberNotificationExecutor sendNumberNotificationExecutor = new();
Build the workflow and assign the Edges
WorkflowBuilder builder = new(typeDetectionExecutor);
Workflow workflow = builder.AddEdge(typeDetectionExecutor, squareRootcalculatorExecutor)
.AddEdge(squareRootcalculatorExecutor, sendNumberNotificationExecutor)
.WithOutputFrom(sendNumberNotificationExecutor).WithName("NumberDetector").Build();
NumberDetector is the workflow name and acts as the endpoint to be invoked through http invoke method.
Set the host and assign the workflow to it.
sing IHost app = FunctionsApplication
.CreateBuilder(args)
.ConfigureFunctionsWebApplication()
.ConfigureDurableWorkflows(options => options.AddWorkflow(workflow))
Build();
app.Run();
That's all.. Go ahead and test it through PowerShell or Curl.
I tested it through PowerShell using the following script.
$json = '{"inputnumber":"13"}'
Invoke-RestMethod `
-Uri "http://localhost:7001/api/workflows/NumberDetector/run" `
-Method Post `
-ContentType "text/json" `
-Body $json
Timeline >>
History >>
Flow >>
You might wonder how does workflow know about property inputnumber sent through the request .
Basically the workflow auto identifies the property of the input object to the first executor in the workflow.
In this case the first executor to the workflow is TypeDetectionExecutor that has Input object which is the input to the executor which in turn has a property named InputNumber(JsonPropertyName("inputnumber")).
This is how the workflow identifies the property. Following is the code snippet for the clarification.
TypeDetectionExecutor :
internal sealed class TypeDetectionExecutor : Executor<Input, DetectionResult>
Input :
internal sealed class Input
{
[JsonPropertyName("id")]
public string Id { get; set; } = String.Empty;
[JsonPropertyName("inputnumber")]
public string InputNumber { get; set; } = String.Empty;
}
Execution >>
Conclusion
Durable Functions are a great way to build reliable, long-running and stateful workflows without having to manage the complexity of checkpoints or recovery mechanisms manually.
Through this article I tried to showcase how to design and develop Durable workflows for seamless process execution. I hope this article was helpful enough to get you started on Durable Workflows.
Thanks for reading !!!



