Orchestrazioni dei flussi di lavoro di Microsoft Agent Framework - Sequenziale

Nell'orchestrazione sequenziale gli agenti sono organizzati in una pipeline. Ogni agente elabora l'attività a sua volta, passando l'output all'agente successivo nella sequenza. Questo è ideale per i flussi di lavoro in cui ogni passaggio si basa su quello precedente, ad esempio la revisione dei documenti, le pipeline di elaborazione dei dati o il ragionamento a più fasi.

Orchestrazione sequenziale

Importante

Per impostazione predefinita, ogni agente nella sequenza utilizza la conversazione completa dell'agente precedente, sia i messaggi di input forniti all'agente precedente che i relativi messaggi di risposta. È possibile configurare gli agenti in modo da utilizzare solo i messaggi di risposta dell'agente precedente. Per informazioni dettagliate, vedere Controllo del contesto tra gli agenti .

Cosa Imparerai

  • Come creare una pipeline sequenziale di agenti
  • Come concatenare gli agenti in cui ognuno si basa sull'output precedente
  • Come aggiungere l'approvazione umana per le chiamate a strumenti sensibili
  • Come combinare agenti con executor personalizzati per attività specializzate
  • Come tenere traccia del flusso della conversazione attraverso la pipeline

Definire gli agenti

Nell'orchestrazione sequenziale, gli agenti sono organizzati in una pipeline in cui ogni agente elabora l'attività a sua volta, passando l'output all'agente successivo nella sequenza.

Configurare il client OpenAI di Azure

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
    throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
    .GetProjectOpenAIClient()
    .GetProjectResponsesClient()
    .AsIChatClient(deploymentName);

Avviso

DefaultAzureCredential è utile per lo sviluppo, ma richiede un'attenta considerazione nell'ambiente di produzione. Nell'ambiente di produzione prendere in considerazione l'uso di credenziali specifiche ,ad esempio ManagedIdentityCredential, per evitare problemi di latenza, probe di credenziali indesiderate e potenziali rischi per la sicurezza dai meccanismi di fallback.

Creare agenti specializzati che funzioneranno in sequenza:

// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
    new(chatClient,
        $"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
        $"input by outputting the name of the input language and then translating the input to {targetLanguage}.");

// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
                         select GetTranslationAgent(lang, client));

Configurare l'orchestrazione sequenziale

Compilare il flusso di lavoro usando AgentWorkflowBuilder:

// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);

Eseguire il flusso di lavoro sequenziale

Eseguire il flusso di lavoro ed elaborare gli eventi:

// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

string? lastExecutorId = null;
List<ChatMessage> result = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is AgentResponseUpdateEvent e)
    {
        if (e.ExecutorId != lastExecutorId)
        {
            lastExecutorId = e.ExecutorId;
            Console.WriteLine();
            Console.Write($"{e.ExecutorId}: ");
        }

        Console.Write(e.Update.Text);
    }
    else if (evt is WorkflowOutputEvent outputEvt)
    {
        result = outputEvt.As<List<ChatMessage>>()!;
        break;
    }
}

// Display final result
Console.WriteLine();
foreach (var message in result)
{
    Console.WriteLine($"{message.Role}: {message.Text}");
}

Output di esempio

French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!

Orchestrazione sequenziale con human-in-the-loop

Le orchestrazioni sequenziali supportano le interazioni umane nel ciclo tramite l'approvazione degli strumenti. Quando gli agenti usano strumenti avvolti con ApprovalRequiredAIFunction, il flusso di lavoro si sospende e genera un RequestInfoEvent contenente ToolApprovalRequestContent. I sistemi esterni (ad esempio un operatore umano) possono esaminare la chiamata dello strumento, approvarla o rifiutarla e il flusso di lavoro riprende di conseguenza.

Orchestrazione sequenziale con intervento umano

Suggerimento

Per altri dettagli sul modello di richiesta e risposta, vedere Human-in-the-Loop.

Definire gli agenti con strumenti che richiedono approvazione

Creare agenti in cui gli strumenti sensibili vengono incapsulati con ApprovalRequiredAIFunction:

ChatClientAgent deployAgent = new(
    client,
    "You are a DevOps engineer. Check staging status first, then deploy to production.",
    "DeployAgent",
    "Handles deployments",
    [
        AIFunctionFactory.Create(CheckStagingStatus),
        new ApprovalRequiredAIFunction(AIFunctionFactory.Create(DeployToProduction))
    ]);

ChatClientAgent verifyAgent = new(
    client,
    "You are a QA engineer. Verify that the deployment was successful and summarize the results.",
    "VerifyAgent",
    "Verifies deployments");

Compilare ed eseguire con la gestione delle approvazioni

Creare il flusso di lavoro sequenziale come di consueto. Il flusso di approvazione viene gestito tramite il flusso di eventi:

var workflow = AgentWorkflowBuilder.BuildSequential([deployAgent, verifyAgent]);

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is RequestInfoEvent e &&
        e.Request.TryGetDataAs(out ToolApprovalRequestContent? approvalRequest))
    {
        await run.SendResponseAsync(
            e.Request.CreateResponse(approvalRequest.CreateResponse(approved: true)));
    }
}

Annotazioni

AgentWorkflowBuilder.BuildSequential() supporta l'approvazione degli strumenti di default — non è necessaria alcuna configurazione aggiuntiva. Quando un agente chiama uno strumento wrappato con ApprovalRequiredAIFunction, il flusso di lavoro si sospende automaticamente e genera un RequestInfoEvent.

Suggerimento

Per un esempio completo eseguibile di questo flusso di approvazione, vedere l'esempioGroupChatToolApproval. Lo stesso RequestInfoEvent modello di gestione si applica ad altre orchestrazioni.

Concetti chiave

  • Elaborazione sequenziale: ogni agente elabora l'output dell'agente precedente nell'ordine
  • AgentWorkflowBuilder.BuildSequential(): crea un flusso di lavoro della pipeline da una raccolta di agenti
  • ChatClientAgent: rappresenta un agente supportato da un client di chat con istruzioni specifiche
  • InProcessExecution.RunStreamingAsync(): esegue il flusso di lavoro e restituisce un StreamingRun per lo streaming di eventi in tempo reale
  • Gestione eventi: monitorare l'avanzamento attraverso AgentResponseUpdateEvent e il completamento attraverso WorkflowOutputEvent
  • Approvazione degli strumenti: Avvolgere gli strumenti sensibili con per richiedere l'approvazione umana prima dell'esecuzione
  • RequestInfoEvent: generato quando uno strumento richiede l'approvazione; contiene ToolApprovalRequestContent con i dettagli della chiamata dello strumento

Nell'orchestrazione sequenziale, ogni agente elabora l'attività a sua volta, con il flusso dell'output che passa da uno a quello successivo. Per iniziare, definire gli agenti per un processo in due fasi:

import os
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential

# 1) Create agents using FoundryChatClient
chat_client = FoundryChatClient(
    project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
    model=os.environ["FOUNDRY_MODEL"],
    credential=AzureCliCredential(),
)

writer = chat_client.as_agent(
    instructions=(
        "You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
    ),
    name="writer",
)

reviewer = chat_client.as_agent(
    instructions=(
        "You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
    ),
    name="reviewer",
)

Configurare l'orchestrazione sequenziale

La SequentialBuilder classe crea una pipeline in cui gli agenti elaborano le attività in ordine. Ogni agente vede la cronologia completa della conversazione e aggiunge la risposta:

from agent_framework.orchestrations import SequentialBuilder

# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder(participants=[writer, reviewer]).build()

Eseguire il flusso di lavoro sequenziale

Eseguire il flusso di lavoro e raccogliere l'output finale. L'output del terminale è un oggetto AgentResponse contenente i messaggi di risposta dell'ultimo agente:

from agent_framework import AgentResponse

# 3) Run and print the last agent's response
events = await workflow.run("Write a tagline for a budget-friendly eBike.")
outputs = events.get_outputs()

if outputs:
    print("===== Final Response =====")
    final: AgentResponse = outputs[0]
    for msg in final.messages:
        name = msg.author_name or "assistant"
        print(f"[{name}]\n{msg.text}")

Output di esempio

===== Final Response =====
[reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!

Avanzato: integrazione di agenti con executor personalizzati

L'orchestrazione sequenziale supporta la combinazione di agenti con executor personalizzati per l'elaborazione specializzata. Ciò è utile quando è necessaria una logica personalizzata che non richiede un LLM:

Definire un executor personalizzato

Annotazioni

Quando un executor personalizzato segue un agente nella sequenza, il relativo gestore riceve un oggetto AgentExecutorResponse (perché gli agenti sono internamente avvolti da AgentExecutor). Usare agent_response.full_conversation per accedere alla cronologia completa della conversazione. Un executor personalizzato usato come ultimo partecipante (terminatore) deve chiamare ctx.yield_output(AgentResponse(...)) affinché il suo output diventi l'output finale del flusso di lavoro.

from agent_framework import AgentExecutorResponse, AgentResponse, Executor, WorkflowContext, handler
from agent_framework import Message
from typing_extensions import Never

class Summarizer(Executor):
    """Terminator custom executor: consumes full conversation and yields a summary as the workflow's final answer."""

    @handler
    async def summarize(
        self,
        agent_response: AgentExecutorResponse,
        ctx: WorkflowContext[Never, AgentResponse]
    ) -> None:
        if not agent_response.full_conversation:
            await ctx.yield_output(AgentResponse(messages=[Message("assistant", ["No conversation to summarize."])]))
            return

        users = sum(1 for m in agent_response.full_conversation if m.role == "user")
        assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant")
        summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"])
        await ctx.yield_output(AgentResponse(messages=[summary]))

Creare un flusso di lavoro sequenziale misto

# Create a content agent
content = chat_client.as_agent(
    instructions="Produce a concise paragraph answering the user's request.",
    name="content",
)

# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()

Output di esempio con executor personalizzato

===== Final Summary =====
Summary -> users:1 assistants:1

Controllare il contesto tra agenti

Per impostazione predefinita, ogni agente in un SequentialBuilder flusso di lavoro utilizza la conversazione completa dell'agente precedente (input e messaggi di risposta). L'impostazione chain_only_agent_responses=True configura tutti gli agenti nella sequenza in modo da utilizzare solo i messaggi di risposta dell'agente precedente:

workflow = SequentialBuilder(
    participants=[writer, translator, reviewer],
    chain_only_agent_responses=True,
).build()

Ciò è utile per le pipeline di traduzione, il perfezionamento progressivo e altri scenari in cui ogni agente deve concentrarsi esclusivamente sulla trasformazione dell'output dell'agente precedente senza essere influenzato dai turni di conversazione precedenti.

Per un esempio completo, vedere sequential_chain_only_agent_responses.py nel repository di Agent Framework.

Suggerimento

Per un controllo più granulare sul flusso di contesto, incluse le funzioni di filtro personalizzate, vedere Modalità di contesto nel riferimento all'executor dell'agente.

Output intermediari

Per impostazione predefinita, solo l'output dell'ultimo partecipante viene visualizzato come evento del flusso di lavoro output . Impostare intermediate_outputs=True per visualizzare l'output di ogni partecipante, oltre all'output finale:

workflow = SequentialBuilder(
    participants=[writer, reviewer, editor],
    intermediate_outputs=True,
).build()

È possibile gestire questi eventi in tempo reale in modalità di streaming:

from agent_framework import AgentResponseUpdate

# Track the last author to format streaming output.
last_author: str | None = None

async for event in workflow.run("Write a tagline for a budget-friendly eBike.", stream=True):
    if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
        update = event.data
        author = update.author_name
        if author != last_author:
            if last_author is not None:
                print()  # Newline between different authors
            print(f"{author}: {update.text}", end="", flush=True)
            last_author = author
        else:
            print(update.text, end="", flush=True)

Orchestrazione sequenziale con human-in-the-loop

Le orchestrazioni sequenziali supportano le interazioni umane nel ciclo in due modi: l'approvazione dello strumento per controllare le chiamate agli strumenti sensibili e richiedere informazioni per la sospensione dopo ogni risposta dell'agente per raccogliere feedback.

Orchestrazione sequenziale con intervento umano

Suggerimento

Per altri dettagli sul modello di richiesta e risposta, vedere Human-in-the-Loop.

Approvazione degli strumenti nei flussi di lavoro sequenziali

Usare @tool(approval_mode="always_require") per contrassegnare gli strumenti che richiedono l'approvazione umana prima dell'esecuzione. Il flusso di lavoro sospende e genera un request_info evento quando l'agente tenta di chiamare lo strumento.

@tool(approval_mode="always_require")
def execute_database_query(query: str) -> str:
    return f"Query executed successfully: {query}"


database_agent = Agent(
    client=chat_client,
    name="DatabaseAgent",
    instructions="You are a database assistant.",
    tools=[execute_database_query],
)

workflow = SequentialBuilder(participants=[database_agent]).build()

Elaborare il flusso di eventi e gestire le richieste di approvazione:

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info" and event.data.type == "function_approval_request":
            responses[event.request_id] = event.data.to_function_approval_response(approved=True)
    return responses if responses else None

stream = workflow.run("Check the schema and update all pending orders", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Suggerimento

Per un esempio eseguibile completo, vedere sequential_builder_tool_approval.py. L'approvazione degli strumenti funziona con SequentialBuilder senza alcuna configurazione aggiuntiva del builder.

Richiedi informazioni per commenti e suggerimenti dell'agente

Usare .with_request_info() per sospendere dopo la risposta di agenti specifici, consentendo l'input esterno (ad esempio la revisione umana) prima che inizi l'agente successivo:

drafter = Agent(
    client=chat_client,
    name="drafter",
    instructions="You are a document drafter. Create a brief draft on the given topic.",
)

editor = Agent(
    client=chat_client,
    name="editor",
    instructions="You are an editor. Review and improve the draft. Incorporate any human feedback.",
)

finalizer = Agent(
    client=chat_client,
    name="finalizer",
    instructions="You are a finalizer. Create a polished final version.",
)

# Enable request info for the editor agent only
workflow = (
    SequentialBuilder(participants=[drafter, editor, finalizer])
    .with_request_info(agents=["editor"])
    .build()
)

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info":
            responses[event.request_id] = AgentRequestInfoResponse.approve()
    return responses if responses else None

stream = workflow.run("Write a brief introduction to artificial intelligence.", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Concetti chiave

  • Contesto condiviso: per impostazione predefinita, ogni agente utilizza la conversazione completa dell'agente precedente, inclusi i messaggi di input e risposta
  • Controllo del contesto: usare chain_only_agent_responses=True per configurare gli agenti per utilizzare solo i messaggi di risposta dell'agente precedente
  • Output di AgentResponse: l'output del terminale del flusso di lavoro è un oggetto AgentResponse contenente la risposta dell'ultimo agente (non la conversazione completa)
  • L'ordine è importante: gli agenti vengono eseguiti rigorosamente nell'ordine specificato nell'elenco participants
  • Partecipanti flessibili: è possibile combinare agenti ed esecutori personalizzati in qualsiasi ordine
  • Contratto terminatore personalizzato: un executor personalizzato usato come ultimo partecipante deve chiamare ctx.yield_output(AgentResponse(...)) per produrre l'output del terminale
  • Output intermedi: impostare intermediate_outputs=True per rendere visibile l'output di ogni singolo partecipante come evento del flusso di lavoro output, non solo l'ultimo partecipante.
  • Approvazione degli strumenti: usare @tool(approval_mode="always_require") per operazioni sensibili che richiedono una revisione umana
  • Richiedi informazioni: usare .with_request_info(agents=[...]) per sospendere dopo agenti specifici per commenti e suggerimenti esterni

Passaggi successivi