Skip to main content

Command Palette

Search for a command to run...

Deep Dive into Workflow Execution in Microsoft Agent Framework

Published
13 min read
Deep Dive into Workflow Execution in Microsoft Agent Framework
S
From Synapse Analytics, Power BI, Spark, Microsoft Fabric,ASP.NET Core and recently Agentic AI on .NET I try to explore, learn and share all aspects of Microsoft Data Stack in this blog.

Disclaimer : This article is highly technical and assumes that you have strong understanding on the concepts of Edges and Executors in MAF. If you are not comfortable with Executors and Edges and how they control the flow of execution in MAF Workflows then I would highly recommend reviewing this article and also my article on the topic.

Use Case

Lets assume the following use case.

Microsoft Agent Framework

We have an input number N that requires to be identified as a Prime or a non Prime number.

If its a Prime number then compute its square root and send the value through Route A .

If its not a Prime number send it through Route B without its square root.

If the number is unidentifiable or unknown route the number to an Email dispatcher.

I know some of you might argue that there is no real need to use an Agentic approach to solve a problem like this. In fact, this workflow could easily be implemented using traditional programmable logic or a simple procedural flow.

The purpose of this article is not to demonstrate the most optimal implementation of a real-life use case. Instead, the goal of this article is to demonstrate how Agentic Workflows can be designed and orchestrated within MAF using multiple execution paths and conditional routing.

SetUp

Create a new C# console application and add the following packages

dotnet add package Azure;
dotnet add package Azure.AI.OpenAI;
dotnet add package Microsoft.Agents.AI;
dotnet add package Microsoft.Extensions.AI;
dotnet add package Microsoft.Extensions.Configuration;
dotnet add package Microsoft.Extensions.DependencyInjection;
dotnet add package Microsoft.Extensions.Logging;
dotnet add package Microsoft.Agents.AI.Workflows;

Add appsetting.json to the project

"AppSettings": 
{ 
    "Chat_DeploymentName": "Deployment Name",
    "EndPoint": "Azure OpenAI endpoint",
    "ApiKey": "Azure OpenAI API key"
}

Code

Now that all the underlying artifacts in place, add the following code to read the settings from appsettings.json and register ChatClient.

var configuration = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: false)
.Build();

 var credential = new AzureKeyCredential(configuration["AppSettings:ApiKey"]);

 servicecollection.AddKeyedChatClient("ChatClient", (sp => new AzureOpenAIClient(
          new Uri(configuration["AppSettings:EndPoint"]), credential)
              .GetChatClient(configuration["AppSettings:Chat_DeploymentName"]).AsIChatClient()));

Create a DI service registry.

 ServiceCollection servicecollection = new ServiceCollection();

Now that the basic set up is ready, create two agents.

First agent is to identify if the input number is a prime number and the second agent to calculate the square root of prime number identified by the first agent.

PrimeNumberIdentifierAgent >>

servicecollection.AddSingleton<AIAgent>(sp =>

{
    Func<ChatClientAgentOptions> func = () =>
   {
       return new ChatClientAgentOptions
       {
           ChatOptions = new ChatOptions
           {
               Instructions = "You are a prime number detection assistant that identifies if the number is prime or not a prime",
               ResponseFormat = ChatResponseFormat.ForJsonSchema<DetectionResult>()
           },
           Name = "TypeDetectionAgent",
           Id = "1"
       };
   };

    return new ChatClientAgent(sp.GetKeyedService<IChatClient>("ChatClient"), options: func());

});

SquareRootGeneratorAgent >>

  servicecollection.AddSingleton<AIAgent>(sp =>

  {
      Func<ChatClientAgentOptions> func = () =>
     {
         return new ChatClientAgentOptions
         {
             ChatOptions = new ChatOptions
             {
                 Instructions = "You are an square root generator assistant. You will return the square root of the given prime number.",
                 ResponseFormat = Microsoft.Extensions.AI.ChatResponseFormat.ForJsonSchema<DetectionResult>()
             },
             Name = "SquareRootAgent",
             Id = "2"
         };
     };
      return new ChatClientAgent(sp.GetKeyedService<IChatClient>("ChatClient"), options: func());
  });

Create a Dependency Injection (DI) containerserviceProvider from the registered services in serviceCollection and fetch the list of registered agents.

ServiceProvider serviceprovider = servicecollection.BuildServiceProvider();

var AIAgents = serviceprovider.GetServices<AIAgent>();
List<AIAgent> lst = new(AIAgents);

AIAgent TypeDetectionAgent = lst.FirstOrDefault(args => args.Id == "1");

AIAgent SquareRootAgent = lst.FirstOrDefault(args => args.Id == "2");

NumberType

Declare an Enum type and set its options to PrimeNumber,NotPrimeNumber and UnSure.

 public enum NumberType
 {
     PrimeNumber,
     NotPrimeNumber,   
     UnSure
 }

InputNumber

Define an object for InputNumber with the following properties.

 internal sealed class InputNumber
 {
     [JsonPropertyName("id")]
     public string Id { get; set; } = String.Empty;

     [JsonPropertyName("inputnumber")]
     public string Value { get; set; } = String.Empty;
 }

DetectionResult

Define a class to represent the result of detection operation for the Input number along with the underlying reasons.

 public sealed class DetectionResult
 {
     [JsonPropertyName("numberType")]
     [JsonConverter(typeof(JsonStringEnumConverter))]
     public NumberType numberType { get; set; }

     [JsonPropertyName("reason")]
     public string Reason { get; set; } = string.Empty;

     [JsonPropertyName("id")]
     public string Id { get; set; } = string.Empty;
 }

_Response

Class to represent the output of the executor

public sealed class _Response
{
    [JsonPropertyName("decision")]
    public string Decision { get; set; } = string.Empty;

    [JsonPropertyName("reason")]
    public string Reason { get; set; } = string.Empty;

    [JsonPropertyName("id")]
    public string Id { get; set; } = string.Empty;
}

ScopeName

ScopeNameis required to identify shared states across agents. ScopeName ensures that Executors can access the same data in a State during the workflow execution.

internal static class NumbervalueConstants
{
    public const string NumbervalueScope = "Numbervalue";
}

Note : NumbervalueScope can also be declared as a variable instead of an object.

Executors

For our use case define two Executors.

The first executor will write to the State and the second one will read from it.

This is done by overriding the HandleAsync method of the State. You can find more details of Executor structure here.

TypeDetectionExecutor

The first Executor will detect if the input number is a prime number or not through the PrimeNumberIdentifierAgent declared earlier.

As I explained in my previous article, an Executor should inherit from the Executor base class and override the HandleAsyncmethod that exposes IWorkflowContext interface.

Create an Executor class that accepts input of type ChatMessage and the output is typeof(DetectionResult) object (defined earlier).

PrimeNumberIdentifierAgent (declared earlier) is injected into the Executor class through constructor injection as typeDetectionAgent.

internal sealed class TypeDetectionExecutor : Executor<ChatMessage, DetectionResult>
{
    private readonly AIAgent _typeDetectionAgent;
    private List<ChatMessage> messages = new();

    public TypeDetectionExecutor(AIAgent typeDetectionAgent) : base("TypeDetectionExecutor")
    {
        this._typeDetectionAgent = typeDetectionAgent;
    }

    [MessageHandler]
    public override async ValueTask<DetectionResult> HandleAsync(
        ChatMessage message,
        IWorkflowContext context,
        CancellationToken cancellationToken = default
    )
    {
        messages.Add(message);
        var newNumber = new InputNumber { Id = Guid.NewGuid().ToString(), Value = (message.Text) };
        await context.QueueStateUpdateAsync(
            newNumber.Id,
            newNumber,
            scopeName: NumbervalueConstants.NumbervalueScope,
            cancellationToken
        );
        var response = await this._typeDetectionAgent.RunAsync(
            message,
            cancellationToken: cancellationToken
        );
        var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);
        detectionResult.Id = newNumber.Id;
        return detectionResult;
    }
}

Note: There are two properties, Id and Value in the object InputNumber. The values of these two properties are set in the above code through the HandleAync overridden method. The above code also updates the State with these values for the given scope NumbervalueScope defined earlier.

The code below shows how it is implemented.

 messages.Add(message);
 var newNumber = new InputNumber
 {
     Id = Guid.NewGuid().ToString(),
     Value = (message.Text)
 };
 await context.QueueStateUpdateAsync(newNumber.Id, newNumber, scopeName: NumbervalueConstants.NumbervalueScope, cancellationToken);

In the next step, invoke the typedetectionagent which in turn sets the values for its properties defined in the DetectionResult object.

var response = await this._typeDetectionAgent.RunAsync(message, cancellationToken: cancellationToken);
detectionResult.Id = newNumber.Id;

The id value marked in the above screenshot has no bearing to the value of id stored in the State. The id format updated in the state is of the form Id = Guid.NewGuid().ToString()

Hence we have to explicitly set the value of Id property of theDetectionResult object in TypeDetectionExecutor.

detectionResult.Id = newNumber.Id;

And finally the response text is Deserialized with return type of the form tyepof(DetectionResult).

var detectionResult = JsonSerializer.Deserialize<DetectionResult>(response.Text);
return detectionResult;

Define an Executor that returns the square root of the given prime number through the SquareRootAgent.

The Executor should accept tyepof(DetectionResult)as its input and read the State based on the values of Id , NumberType and Scopename. Id , NumberType are two properties defined in the DetectionResultobject .

But before that we need to define a method that checks the value of the property NumberTypein the Detectionresult object.

CheckCondition

The CheckCondition method creates and returns a condition function that checks if the object is of type DetectionResult and if it is, then check if it contains a specific NumberType based on the input parameter of type NumberType passed to it and return either true or false.

 private static Func<object?, bool> CheckCondition(NumberType Type) =>
 detectionResult =>
     detectionResult is DetectionResult result &&
     result.numberType == Type;

Note : Recall that there is a NumberType property in the DetectionResult object.

 [JsonPropertyName("numberType")]
 [JsonConverter(typeof(JsonStringEnumConverter))]
 public NumberType numberType { get; set; }

Later in the article we will use the function CheckCondition while setting up the workflow.

Lets move ahead and define an Executor that returns the square root value of the prime number.

SquareRootExecutor

This Executor calculates the square root of the given number only if that number has been identified as a prime number by the TypeDetectionExecutor.

SquareRootAgent(declared earlier) is injected into the Executor class through constructor injection as squarerootnumberexecutionAssistantAgent.

internal sealed class SquareRootExecutor : Executor<DetectionResult, _Response>
{
    private readonly AIAgent _squarerootnumberexecutionAssistantAgent;

    public SquareRootExecutor(AIAgent squarerootnumberexecutionAssistantAgent)
        : base("SquareRootExecutor")
    {
        this._squarerootnumberexecutionAssistantAgent = squarerootnumberexecutionAssistantAgent;
    }

    [MessageHandler]
    public override async ValueTask<_Response> HandleAsync(
        DetectionResult result,
        IWorkflowContext context,
        CancellationToken cancellationToken = default
    )
    {
        if (result.numberType != NumberType.PrimeNumber)
        {
            throw new InvalidOperationException("This executor should only handle prime numbers.");
        }

        var number =
            await context.ReadStateAsync<InputNumber>(
                result.Id,
                scopeName:         NumbervalueConstants.NumbervalueScope,
                cancellationToken
            ) ?? throw new InvalidOperationException("Number not found.");

        var response_ = JsonSerializer.Deserialize<_Response>(
            (
                await this._squarerootnumberexecutionAssistantAgent.RunAsync(
                    number.Value,
                    cancellationToken: cancellationToken
                )
            ).Text
        );
        response_.Id = result.Id;
        return response_!;
    }
}

The above approach is similar to that of TypeDetectionExecutor.

The only difference here is that the TypeDetectionExecutor has ChatMessage as its input and typeof(DetectionResult)as the output .

In the case of SquareRootExecutor the DetectionResult acts as input and the Deserialized value of typeof(_Reponse) is the output read from the State.

internal sealed class SquareRootExecutor : Executor<DetectionResult, _Response>

In the following line of code, retrieve the value from the State through result.Id and if found, use it as an input to the SquareRootAgent in form of number.Value.

var number = await context.ReadStateAsync<InputNumber>(result.Id, scopeName: NumbervalueConstants.NumbervalueScope, cancellationToken)
?? throw new InvalidOperationException("Number not found.");

var response_ = JsonSerializer.Deserialize<_Response>(
            (
                await this._squarerootnumberexecutionAssistantAgent.RunAsync(
                    number.Value,
                    cancellationToken: cancellationToken
                )
            ).Text
        );

As was the case with TypeDetectionExecutor , here also we explicitly set the value for the Id property in the _Response object.

 response_.Id = result.Id;

In the next step, we require an Executor that can action the input classified as a non prime number by the TypeDetectionExecutor . But in this case there is no need for an Agent as TypeDetectionExecutor has already classified the input.

So all we have to do is create Executor that reads the NumberType Enum value from the State where DetectionResult is the input.

NonPrimeNumberExecutor

This Executor YieldsOutput when the Inputnumber is a non prime number.

internal sealed class NonPrimeNumberExecutor() : Executor<DetectionResult>("NonPrimeNumberExecutor")
 {
     [YieldsOutput(typeof(string))]
     public override async ValueTask HandleAsync(DetectionResult result, IWorkflowContext context, CancellationToken cancellationToken = default)
     {
         if (result.numberType == NumberType.NotPrimeNumber)
         {
             var number = await context.ReadStateAsync<InputNumber>(result.Id, scopeName: NumbervalueConstants.NumbervalueScope);

             await context.YieldOutputAsync($"Details for {number.Value} sent to Route B: {result.Reason}", cancellationToken);
         }
         else
         {
             throw new InvalidOperationException("This executor should only handle non prime numbers.");
         }
     }
 }

UnsureExecutor

This Executor YieldsOutput when the Inputnumber is unknown.

internal sealed partial class UnsureExecutor() : Executor<DetectionResult>("UnsureExecutor")
{
    [YieldsOutput(typeof(string))]
    public override async ValueTask HandleAsync(DetectionResult   internal sealed partial class UnsureExecutor() : Executor<DetectionResult>("UnsureExecutor")
  {
      [YieldsOutput(typeof(string))]
      public override async ValueTask HandleAsync(DetectionResult result, IWorkflowContext context, CancellationToken cancellationToken = default)
      {
          if (result.numberType == NumberType.UnSure)
          {
              var number = await context.ReadStateAsync<InputNumber>(result.Id, scopeName: NumbervalueConstants.NumbervalueScope);
              await context.YieldOutputAsync($"Input marked as unsure: {result.Reason}.Email sent for the input value : {number?.Value}");
          }
          else
          {
              throw new ArgumentException("This executor should only handle uncertain inputs");
          }
      }
  }

If you understand TypeDetectionExecutor & SquareRootExecutor, understanding NonPrimeNumberExecutor and UnsureExecutor isn't that difficult.

The input to above two Executors is typeof(DetectionResult) and the Executors add output to the workflow output queue through YieldOutputAsync.

Now that we have all the required executors, objects, and agents, it’s time to add them to the workflow and define their execution flow.

WorkFlow

                      typeDetectionExecutor
                                |
           -----------------------------------------
           |                    |                  |
           v                    v                  v
  PrimeNumber          NotPrimeNumber          Default
           |                    |                  |
           v                    v                  v
primeNumberExecutor     Executor_RouteB      Executor_Unsure
           |
           |
           v
    Executor_RouteA

Our entry point to the workflow will be the TypeDetectionExecutor.

var typeDetectionExecutor = new TypeDetectionExecutor(TypeDetectionAgent);
WorkflowBuilder builder = new(typeDetectionExecutor);

Next ,create instances of all the Executor that will be used inside a workflow to define Edges and Output.

 var primeNumberExecutor = new SquareRootExecutor(SquareRootAgent);
 var SendDetails_Executor_RouteA = new PrimeNoExecutor();
 var SendDetails_Executor_RouteB = new NonPrimeNumberExecutor();
 var SendEmail_Executor_Unsure = new UnsureExecutor();

Define Switches and Conditions.

builder.AddSwitch(typeDetectionExecutor, switchBuilder =>
  switchBuilder
  .AddCase(
      CheckCondition(NumberType.PrimeNumber),
      primeNumberExecutor
      )
  .AddCase(
      CheckCondition(NumberType.NotPrimeNumber),
      SendDetails_Executor_RouteB
  )
  .WithDefault(
      SendEmail_Executor_Unsure
         )
  )
.AddEdge(primeNumberExecutor, SendDetails_Executor_RouteA)

.WithOutputFrom(SendDetails_Executor_RouteA, SendDetails_Executor_RouteB, SendEmail_Executor_Unsure);

The typeDetectionExecutor acts as the Executor binding source to the WorkFlow.

Earlier in the article I mentioned that we will use the CheckCondition method which returns true/false based on the value of NumberType.

The CheckCondition method is used to validate the conditions associated with each AddCase(...) statement within the switch statement.

Microsoft Agent Framework

The AddCase(...) function above expects two parameters :

The first parameter is of type Func<T, bool>, where the Func accepts an input object of type T which in our case is type(DetectionResult). The Func used is the CheckCondition function that returns a Boolean value.

The second parameter is an Executor or IEnumberable<Executor>.

In the next step define an Edge with the following condition.

.AddEdge(primeNumberExecutor, SendDetails_Executor_RouteA)

The workflow executes the primeNumberExecutor only when the following case is true

.AddCase(
    CheckCondition(NumberType.PrimeNumber),
    primeNumberExecutor
)

and if not it proceeds to WithOutPutFrom(...) .

 .WithOutputFrom(SendDetails_Executor_RouteA,SendDetails_Executor_RouteB, SendEmail_Executor_Unsure)

WithOutPutFrom(..)registers Executors that YieldsOutput.

Microsoft Agent Framework

In our case it will be SendDetails_Executor_RouteA, SendDetails_Executor_RouteB and SendEmail_Executor_Unsure.

Now you might question as to why do we have SendDetails_Executor_RouteA as part of AddEdge(...) and also part of WithOutputFrom(...).

Let me try to explain :

AddEdge(...) defines the execution path .

When the given number is not a prime number we have to find the square root of that number . That is done through the primeNumberExecutor. The execution then needs to flow to the SendDetails_Executor_RouteA Executor as its next step.

Since the switch only routes execution to primeNumberExecutor, we explicitly define an edge using AddEdge(...) to instruct the workflow engine about the next Executor that should be executed in the workflow path.

Note : AddEdge(...)only defines connections between executors. To produce an output we have to use WithOutputFrom(...).

But then why haven't we defined AddEdge(...) for SendEmail_Executor_Unsure or SendDetails_Executor_RouteB ?

This is because AddCase(...) and WithDefault(...) already creates the execution routes to those Executors. Both of these flows directly reach their final Executors and do not require any additional execution step which is required in the case of primeNumberExecutor.

Once the execution of primeNumberExecutor completes, the workflow still needs to continue to SendDetails_Executor_RouteA.

Therefore, we explicitly define this continuation path using AddEdge(...)and the output from SendDetails_Executor_RouteA is captured from WithOutputFrom(...).

Execution

var workflow = builder.Build();

string input = "Input Number";

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, new ChatMessage(ChatRole.User, input));

  await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
  await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
  {
    if (evt is WorkflowOutputEvent outputEvent)
    {
     Console.WriteLine($"{outputEvent}");
    }
  }

Conclusion

First of all thank you for making all the way till here :) . I understand the complexity involved in setting up of workflows in Microsoft Agent Framework .

Through this article , I have tried my best within the best of my abilities to explain the overall execution flow of workflows in MAF. At the beginning , the implementation looks very complicated and overwhelming but once you understand and get into the essence of the overall steps involved the entire flow looks very easy.

Another crucial aspect of Workflow execution in MAF is Checkpointing which I will deep dive into in my next article.

Till then Ciao !!!