Orchestrations de flux de travail Microsoft Agent Framework - Simultanées

L’orchestration simultanée permet à plusieurs agents de travailler sur la même tâche en parallèle. Chaque agent traite l’entrée indépendamment, et leurs résultats sont collectés et agrégés. Cette approche est bien adaptée aux scénarios où diverses perspectives ou solutions sont précieuses, telles que le brainstorming, le raisonnement d’ensemble ou les systèmes de vote.

Orchestration simultanée

Ce que vous allez apprendre

  • Comment définir plusieurs agents avec une expertise différente
  • Comment orchestrer ces agents pour fonctionner simultanément sur une seule tâche
  • Comment collecter et traiter les résultats

Dans l’orchestration simultanée, plusieurs agents travaillent simultanément sur la même tâche et indépendamment, fournissant diverses perspectives sur la même entrée.

Configurer le client Azure OpenAI

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
    throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
    .GetProjectOpenAIClient()
    .GetProjectResponsesClient()
    .AsIChatClient(deploymentName);

Avertissement

DefaultAzureCredential est pratique pour le développement, mais nécessite une considération minutieuse en production. En production, envisagez d’utiliser des informations d’identification spécifiques (par exemple ManagedIdentityCredential) pour éviter les problèmes de latence, la détection involontaire des informations d’identification et les risques de sécurité potentiels liés aux mécanismes de secours.

Définir vos agents

Créez plusieurs agents spécialisés qui fonctionnent simultanément sur la même tâche :

// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
    new(chatClient,
        $"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
        $"input by outputting the name of the input language and then translating the input to {targetLanguage}.");

// Create translation agents for concurrent processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
                         select GetTranslationAgent(lang, client));

Configurer l’orchestration simultanée

Construisez le flux de travail à l’aide de AgentWorkflowBuilder pour exécuter les agents en parallèle.

// 3) Build concurrent workflow
var workflow = AgentWorkflowBuilder.BuildConcurrent(translationAgents);

Exécuter le flux de travail simultané et collecter les résultats

Exécutez les événements de flux de travail et de processus de tous les agents en cours d’exécution simultanément :

// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

List<ChatMessage> result = new();
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is AgentResponseUpdateEvent e)
    {
        Console.WriteLine($"{e.ExecutorId}: {e.Update.Text}");
    }
    else if (evt is WorkflowOutputEvent outputEvt)
    {
        result = outputEvt.As<List<ChatMessage>>()!;
        break;
    }
}

// Display aggregated results from all agents
Console.WriteLine("===== Final Aggregated Results =====");
foreach (var message in result)
{
    Console.WriteLine($"{message.Role}: {message.Text}");
}

Exemple de sortie

French_Agent: English detected. Bonjour, le monde !
Spanish_Agent: English detected. ¡Hola, mundo!
English_Agent: English detected. Hello, world!

===== Final Aggregated Results =====
User: Hello, world!
Assistant: English detected. Bonjour, le monde !
Assistant: English detected. ¡Hola, mundo!
Assistant: English detected. Hello, world!

Concepts clés

  • Exécution parallèle : tous les agents traitent l’entrée simultanément et indépendamment
  • AgentWorkflowBuilder.BuildConcurrent() : crée un flux de travail simultané à partir d’une collection d’agents
  • Agrégation automatique : les résultats de tous les agents sont automatiquement collectés dans le résultat final
  • Streaming d’événements : surveillance en temps réel de la progression de l’agent via AgentResponseUpdateEvent
  • Perspectives variées : chaque agent apporte son expertise unique au même problème

Les agents sont des entités spécialisées qui peuvent traiter des tâches. Le code suivant définit trois agents : un expert en recherche, un expert marketing et un expert juridique.

import os

from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential

# 1) Create three domain agents using FoundryChatClient
chat_client = FoundryChatClient(
    project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
    model=os.environ["FOUNDRY_MODEL"],
    credential=AzureCliCredential(),
)

researcher = chat_client.as_agent(
    instructions=(
        "You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
        " opportunities, and risks."
    ),
    name="researcher",
)

marketer = chat_client.as_agent(
    instructions=(
        "You're a creative marketing strategist. Craft compelling value propositions and target messaging"
        " aligned to the prompt."
    ),
    name="marketer",
)

legal = chat_client.as_agent(
    instructions=(
        "You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
        " based on the prompt."
    ),
    name="legal",
)

Configurer l’orchestration simultanée

La ConcurrentBuilder classe vous permet de construire un workflow pour exécuter plusieurs agents en parallèle. Vous passez la liste des agents en tant que participants.

from agent_framework.orchestrations import ConcurrentBuilder

# 2) Build a concurrent workflow
# Participants are either Agents (type of SupportsAgentRun) or Executors
workflow = ConcurrentBuilder(participants=[researcher, marketer, legal]).build()

Exécuter le flux de travail simultané et collecter les résultats

L'agrégateur par défaut produit un seul AgentResponse contenant un message d'assistant par participant :

from agent_framework import AgentResponse

# 3) Run with a single prompt and print the aggregated agent responses
events = await workflow.run("We are launching a new budget-friendly electric bike for urban commuters.")
outputs = events.get_outputs()

if outputs:
    print("===== Final Aggregated Results =====")
    final: AgentResponse = outputs[0]
    for msg in final.messages:
        name = msg.author_name or "assistant"
        print(f"{'-' * 60}\n\n[{name}]:\n{msg.text}")

Exemple de sortie

===== Final Aggregated Results =====
------------------------------------------------------------

[researcher]:
**Insights:**

- **Target Demographic:** Urban commuters seeking affordable, eco-friendly transport;
    likely to include students, young professionals, and price-sensitive urban residents.
- **Market Trends:** E-bike sales are growing globally, with increasing urbanization,
    higher fuel costs, and sustainability concerns driving adoption.
...
------------------------------------------------------------

[marketer]:
**Value Proposition:**
"Empowering your city commute: Our new electric bike combines affordability, reliability, and
    sustainable design—helping you conquer urban journeys without breaking the bank."
...
------------------------------------------------------------

[legal]:
**Constraints, Disclaimers, & Policy Concerns for Launching a Budget-Friendly Electric Bike for Urban Commuters:**

**1. Regulatory Compliance**
- Verify that the electric bike meets all applicable federal, state, and local regulations
    regarding e-bike classification, speed limits, power output, and safety features.

Avancé : exécuteurs d’agents personnalisés

L’orchestration simultanée prend en charge les exécuteurs personnalisés qui encapsulent les agents avec une logique supplémentaire. Cela est utile lorsque vous avez besoin d’un meilleur contrôle sur la façon dont les agents sont initialisés et comment ils traitent les demandes :

Définir des exécuteurs d’agent personnalisés

from agent_framework import (
    AgentExecutorRequest,
    AgentExecutorResponse,
    Agent,
    Executor,
    WorkflowContext,
    handler,
)

class ResearcherExec(Executor):
    def __init__(self, chat_client: FoundryChatClient, id: str = "researcher"):
        self.agent = chat_client.as_agent(
            instructions=(
                "You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
                " opportunities, and risks."
            ),
            name=id,
        )
        super().__init__(id=id)

    @handler
    async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
        response = await self.agent.run(request.messages)
        full_conversation = list(request.messages) + list(response.messages)
        await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))

class MarketerExec(Executor):
    def __init__(self, chat_client: FoundryChatClient, id: str = "marketer"):
        self.agent = chat_client.as_agent(
            instructions=(
                "You're a creative marketing strategist. Craft compelling value propositions and target messaging"
                " aligned to the prompt."
            ),
            name=id,
        )
        super().__init__(id=id)

    @handler
    async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
        response = await self.agent.run(request.messages)
        full_conversation = list(request.messages) + list(response.messages)
        await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))

Créer un flux de travail avec des exécuteurs personnalisés

chat_client = FoundryChatClient(
    project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
    model=os.environ["FOUNDRY_MODEL"],
    credential=AzureCliCredential(),
)

researcher = ResearcherExec(chat_client)
marketer = MarketerExec(chat_client)
legal = LegalExec(chat_client)

workflow = ConcurrentBuilder(participants=[researcher, marketer, legal]).build()

Avancé : Aggregator personnalisé

Par défaut, l’orchestration simultanée agrège toutes les réponses des agents en un seul AgentResponse contenant un message d’assistant par participant. Vous pouvez remplacer ce comportement par un agrégateur personnalisé qui traite les résultats d’une manière spécifique :

Définir un agrégateur personnalisé

from agent_framework import AgentExecutorResponse

# Create a summarizer agent for the aggregator
summarizer_agent = chat_client.as_agent(
    instructions=(
        "You are a helpful assistant that consolidates multiple domain expert outputs "
        "into one cohesive, concise summary with clear takeaways. Keep it under 200 words."
    ),
    name="summarizer",
)

# Define a custom aggregator callback
async def summarize_results(results: list[AgentExecutorResponse]) -> str:
    # Extract one final assistant message per agent
    expert_sections: list[str] = []
    for r in results:
        try:
            messages = getattr(r.agent_response, "messages", [])
            final_text = messages[-1].text if messages and hasattr(messages[-1], "text") else "(no content)"
            expert_sections.append(f"{r.executor_id}:\n{final_text}")
        except Exception as e:
            expert_sections.append(f"{r.executor_id}: (error: {type(e).__name__}: {e})")

    # Ask the model to synthesize a concise summary of the experts' outputs
    prompt = "\n\n".join(expert_sections)
    response = await summarizer_agent.run(prompt)
    # Return the model's final assistant text as the completion result
    return response.messages[-1].text if response.messages else ""

Créer un flux de travail avec un aggregator personnalisé

workflow = (
    ConcurrentBuilder(participants=[researcher, marketer, legal])
    .with_aggregator(summarize_results)
    .build()
)

output = None
async for event in workflow.run("We are launching a new budget-friendly electric bike for urban commuters.", stream=True):
    if event.type == "output":
        output = event.data

if output:
    print("===== Final Consolidated Output =====")
    print(output)

Exemple de sortie avec l’agrégateur personnalisé

===== Final Consolidated Output =====
Urban e-bike demand is rising rapidly due to eco-awareness, urban congestion, and high fuel costs,
with market growth projected at a ~10% CAGR through 2030. Key customer concerns are affordability,
easy maintenance, convenient charging, compact design, and theft protection. Differentiation opportunities
include integrating smart features (GPS, app connectivity), offering subscription or leasing options, and
developing portable, space-saving designs. Partnering with local governments and bike shops can boost visibility.

Risks include price wars eroding margins, regulatory hurdles, battery quality concerns, and heightened expectations
for after-sales support. Accurate, substantiated product claims and transparent marketing (with range disclaimers)
are essential. All e-bikes must comply with local and federal regulations on speed, wattage, safety certification,
and labeling. Clear warranty, safety instructions (especially regarding batteries), and inclusive, accessible
marketing are required. For connected features, data privacy policies and user consents are mandatory.

Effective messaging should target young professionals, students, eco-conscious commuters, and first-time buyers,
emphasizing affordability, convenience, and sustainability. Slogan suggestion: "Charge Ahead—City Commutes Made
Affordable." Legal review in each target market, compliance vetting, and robust customer support policies are
critical before launch.

Sorties intermédiaires

Par défaut, seule la sortie de l’agrégateur s’affiche en tant qu’événement de flux de travail output . Définissez intermediate_outputs=True pour afficher également le résultat individuel de chaque participant :

workflow = ConcurrentBuilder(
    participants=[researcher, marketer, legal],
    intermediate_outputs=True,
).build()

Vous pouvez gérer ces événements en temps réel en mode streaming :

from agent_framework import AgentResponseUpdate

# Track the last author to format streaming output.
last_author: str | None = None

async for event in workflow.run("Analyze our new product launch strategy.", stream=True):
    if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
        update = event.data
        author = update.author_name
        if author != last_author:
            if last_author is not None:
                print()  # Newline between different authors
            print(f"{author}: {update.text}", end="", flush=True)
            last_author = author
        else:
            print(update.text, end="", flush=True)

Concepts clés

  • Exécution parallèle : tous les agents travaillent simultanément et indépendamment sur la tâche
  • Sortie agentResponse : l’agrégateur par défaut génère un seul AgentResponse message assistant par participant (aucune invite utilisateur incluse)
  • Perspectives variées : chaque agent apporte son expertise unique au même problème
  • Participants flexibles : vous pouvez utiliser des agents directement ou les encapsuler dans des exécuteurs personnalisés
  • Traitement personnalisé : remplacez l’agrégateur par défaut pour synthétiser les résultats de manière spécifique au domaine
  • Sorties intermédiaires : définie intermediate_outputs=True pour exposer la sortie individuelle de chaque participant, en plus de la sortie finale de l’agrégateur

Prochaines étapes