エージェント駆動型アプリケーションパターン

AI を使用してエージェント アプリケーションを構築するには、次の 2 つの一般的な方法があります。

  • 決定論的ワークフロー - コードによって制御フローが定義されます。 標準のプログラミングコンストラクトを使用して、一連のステップ、分岐、並列処理、およびエラー処理を記述します。 LLM は各ステップ内で作業を実行しますが、全体的なフローは制御しません。
  • エージェント向けワークフロー (エージェント ループ) — LLM によって制御フローが駆動されます。 エージェントは、呼び出すツールを、どの順序で、いつタスクが完了するかを決定します。 ツールと手順は指定しますが、実行時にエージェントによって実行パスが決定されます。

どちらの方法も 永続的な実行 の利点があり、 Durable Task プログラミング モデルを使用して実装できます。 この記事では、C#、Python、JavaScript/TypeScript、および Java ですぐに使用できるコード サンプルを使用して各パターンを構築する方法について説明します。

ヒント

Prerequisites: これらのパターンを使用する前に、Durable Task SDK または Durable Functionsを設定します。 開始するには、SDK の概要 または Durable Functions の概要を参照してください。

このページでは、次の操作を行います。

ヒント

これらのパターンは、Anthropic の 効果的なエージェントの構築に関する記事で説明されているエージェントワークフロー設計と一致します。 Durable Task プログラミング モデルは、次のパターンに自然にマップされます。 オーケストレーション はワークフロー制御フローを定義し、自動的にチェックポイント処理されますが、 アクティビティ は LLM 呼び出し、ツール呼び出し、API 要求などの非決定的な操作をラップします。

アプローチを選択する

次の表は、各アプローチを使用するタイミングを決定するのに役立ちます。

決定論的ワークフローを使用する場合... エージェント ループを使用する場合...
一連の手順は、事前にわかっています。 タスクは未終了であり、ステップを予測することはできません。
エージェントの動作に対する明示的なガードレールが必要です。 LLM で使用するツールとタイミングを決定する必要があります。
コンプライアンスまたは監査可能性には、確認可能な制御フローが必要です。 エージェントは、中間結果に基づいてアプローチを調整する必要があります。
1 つのワークフローで複数の AI フレームワークを組み合わせる必要があります。 ツール呼び出し機能を備えた会話型エージェントを構築しています。

どちらの方法でも、永続的な実行による自動チェックポイント、再試行ポリシー、分散スケーリング、および人間のループ内サポートが提供されます。

確定的なワークフロー パターン

決定論的ワークフローでは、コードによって実行パスが制御されます。 LLM はワークフロー内のステップとして呼び出されますが、次に何が起こるかは決めていません。 Durable Task プログラミング モデルは、このアプローチに自然にマップされます。

  • オーケストレーションは 、ワークフロー制御フロー (シーケンス、分岐、並列処理、エラー処理) を定義し、自動的にチェックポイント処理されます。
  • アクティビティ は、LLM 呼び出し、ツール呼び出し、API 要求などの非決定的な操作をラップします。 アクティビティは、使用可能な任意のコンピューティング インスタンスで実行できます。

次の例では、Durable Functions を使用します。これは、サーバーレス ホスティングを使用してAzure Functionsで実行されます。

次の例では、ポータブル Durable Task SDK を使用します。これは、Azure Container Apps、Kubernetes、仮想マシン、ローカルなど、任意のホスト コンピューティングで実行されます。

プロンプト チェーン パターン

プロンプト チェーンは最もシンプルなエージェント パターンです。 複雑なタスクを一連の連続した LLM 操作に分割し、各ステップの出力が次のステップの入力にフィードされます。 各アクティビティ呼び出しは自動的にチェックポイント処理されるため、パイプラインの途中でクラッシュしても、最初から再起動して高価な LLM トークンを再利用する必要はありません。最後に完了した手順から実行が再開されます。

ステップ間にプログラムによる検証ゲートを挿入することもできます。 たとえば、アウトラインを生成した後、下書き手順に渡す前に、アウトラインが長さまたはトピックの制約を満たしていることを確認できます。

このパターンは、Durable Task プログラミング モデルの 関数チェーン パターンに直接マップされます。

使用するタイミング: コンテンツ生成パイプライン、マルチステップ ドキュメント処理、シーケンシャル データ エンリッチメント、中間検証ゲートを必要とするワークフロー。

次のすべての例では、オーケストレーションの状態は、各 await/yield ステートメントで自動的にチェックポイント処理されます。 ホスト プロセスがクラッシュした場合、または VM がリサイクルされた場合、オーケストレーションは最初からやり直すのではなく、最後に完了した手順から再開されます。

[Function(nameof(PromptChainingOrchestration))]
public async Task<string> PromptChainingOrchestration(
    [OrchestrationTrigger] TaskOrchestrationContext context)
{
    var topic = context.GetInput<string>();

    // Step 1: Generate research outline
    string outline = await context.CallActivityAsync<string>(
        nameof(GenerateOutlineAgent), topic);

    // Step 2: Write first draft from outline
    string draft = await context.CallActivityAsync<string>(
        nameof(WriteDraftAgent), outline);

    // Step 3: Refine and polish the draft
    string finalContent = await context.CallActivityAsync<string>(
        nameof(RefineDraftAgent), draft);

    return finalContent;
}
[DurableTask]
public class PromptChainingOrchestration : TaskOrchestrator<string, string>
{
    public override async Task<string> RunAsync(
        TaskOrchestrationContext context, string topic)
    {
        // Step 1: Generate research outline
        string outline = await context.CallActivityAsync<string>(
            nameof(GenerateOutlineAgent), topic);

        // Step 2: Write first draft from outline
        string draft = await context.CallActivityAsync<string>(
            nameof(WriteDraftAgent), outline);

        // Step 3: Refine and polish the draft
        string finalContent = await context.CallActivityAsync<string>(
            nameof(RefineDraftAgent), draft);

        return finalContent;
    }
}

経路選択

ルーティングでは、分類手順を使用して、要求を処理するダウンストリーム エージェントまたはモデルを決定します。 オーケストレーションは最初に分類子アクティビティを呼び出し、次に結果に基づいて適切なハンドラーに分岐します。 このアプローチを使用すると、各ハンドラーのプロンプト、モデル、ツールセットを個別に調整できます。たとえば、課金の質問を支払い API にアクセスできる特殊なエージェントに誘導しながら、一般的な質問を軽量モデルに送信できます。

使用するタイミング: カスタマー サポートのトリアージ、特殊なエージェントへの意図の分類、タスクの複雑さに基づく動的モデルの選択。

[Function(nameof(RoutingOrchestration))]
public async Task<string> RoutingOrchestration(
    [OrchestrationTrigger] TaskOrchestrationContext context)
{
    var request = context.GetInput<SupportRequest>();

    // Classify the request type
    string category = await context.CallActivityAsync<string>(
        nameof(ClassifyRequestAgent), request.Message);

    // Route to the appropriate specialized agent
    return category switch
    {
        "billing" => await context.CallActivityAsync<string>(
            nameof(BillingAgent), request),
        "technical" => await context.CallActivityAsync<string>(
            nameof(TechnicalSupportAgent), request),
        "general" => await context.CallActivityAsync<string>(
            nameof(GeneralInquiryAgent), request),
        _ => await context.CallActivityAsync<string>(
            nameof(GeneralInquiryAgent), request),
    };
}
[DurableTask]
public class RoutingOrchestration : TaskOrchestrator<SupportRequest, string>
{
    public override async Task<string> RunAsync(
        TaskOrchestrationContext context, SupportRequest request)
    {
        // Classify the request type
        string category = await context.CallActivityAsync<string>(
            nameof(ClassifyRequestAgent), request.Message);

        // Route to the appropriate specialized agent
        return category switch
        {
            "billing" => await context.CallActivityAsync<string>(
                nameof(BillingAgent), request),
            "technical" => await context.CallActivityAsync<string>(
                nameof(TechnicalSupportAgent), request),
            _ => await context.CallActivityAsync<string>(
                nameof(GeneralInquiryAgent), request),
        };
    }
}

並列化

複数の独立したサブタスクがある場合は、並列アクティビティ呼び出しとしてディスパッチし、すべての結果を待ってから続行できます。 Durable Task Scheduler は、使用可能なすべてのコンピューティング インスタンスにこれらのアクティビティを自動的に分散します。つまり、ワーカーを追加すると、実時間の合計が直接短縮されます。

一般的なバリエーションは、複数モデルの投票です。複数のモデル (または異なる温度の同じモデル) に同じプロンプトを並列で送信し、応答から集計または選択します。 各並列分岐は個別にチェックポイント処理されるため、1 つの分岐の一時的な障害は他の分岐には影響しません。

このパターンは、Durable Task の ファンアウト/ファンイン パターンに直接マップされます。

使用するタイミング: ドキュメントのバッチ分析、並列ツール呼び出し、マルチモデル評価、複数のレビュー担当者によるコンテンツ モデレーション。

[Function(nameof(ParallelResearchOrchestration))]
public async Task<string> ParallelResearchOrchestration(
    [OrchestrationTrigger] TaskOrchestrationContext context)
{
    var request = context.GetInput<ResearchRequest>();

    // Fan-out: research multiple subtopics in parallel
    var researchTasks = request.Subtopics
        .Select(subtopic => context.CallActivityAsync<string>(
            nameof(ResearchSubtopicAgent), subtopic))
        .ToList();
    string[] researchResults = await Task.WhenAll(researchTasks);

    // Aggregate: synthesize all research into a single summary
    string summary = await context.CallActivityAsync<string>(
        nameof(SynthesizeAgent),
        new { request.Topic, Research = researchResults });

    return summary;
}
[DurableTask]
public class ParallelResearchOrchestration : TaskOrchestrator<ResearchRequest, string>
{
    public override async Task<string> RunAsync(
        TaskOrchestrationContext context, ResearchRequest request)
    {
        // Fan-out: research multiple subtopics in parallel
        var researchTasks = request.Subtopics
            .Select(subtopic => context.CallActivityAsync<string>(
                nameof(ResearchSubtopicAgent), subtopic))
            .ToList();
        string[] researchResults = await Task.WhenAll(researchTasks);

        // Aggregate: synthesize all research into a single summary
        string summary = await context.CallActivityAsync<string>(
            nameof(SynthesizeAgent),
            new { request.Topic, Research = researchResults });

        return summary;
    }
}

Orchestrator-workers パターン

このパターンでは、中央オーケストレーターが最初に LLM を (アクティビティを介して) 呼び出して作業を計画します。 次に、LLM の出力に基づいて、オーケストレーターは必要なサブタスクを決定します。 オーケストレーターは、これらのサブタスクを特殊なワーカー オーケストレーションにディスパッチします。 並列化との主な違いは、サブタスクのセットがデザイン時に固定されていないことです。オーケストレーターによって実行時に動的に決定されます。

このパターンでは、独立してチェックポイント処理された子ワークフローである サブオーケストレーションが使用されます。 各ワーカー オーケストレーションには、複数のステップ、再試行、入れ子になった並列処理を含めることができます。

使用するタイミング: 詳細な調査パイプライン、複数のファイルを変更するコーディング エージェント ワークフロー、各エージェントが異なる役割を持つマルチエージェント コラボレーション。

[Function(nameof(OrchestratorWorkersOrchestration))]
public async Task<string> OrchestratorWorkersOrchestration(
    [OrchestrationTrigger] TaskOrchestrationContext context)
{
    var request = context.GetInput<ResearchRequest>();

    // Central orchestrator: determine what research is needed
    string[] subtasks = await context.CallActivityAsync<string[]>(
        nameof(PlanResearchAgent), request.Topic);

    // Delegate to worker orchestrations in parallel
    var workerTasks = subtasks
        .Select(subtask => context.CallSubOrchestratorAsync<string>(
            nameof(ResearchWorkerOrchestration), subtask))
        .ToList();
    string[] results = await Task.WhenAll(workerTasks);

    // Synthesize results
    string finalReport = await context.CallActivityAsync<string>(
        nameof(SynthesizeAgent),
        new { request.Topic, Research = results });

    return finalReport;
}
[DurableTask]
public class OrchestratorWorkersOrchestration : TaskOrchestrator<ResearchRequest, string>
{
    public override async Task<string> RunAsync(
        TaskOrchestrationContext context, ResearchRequest request)
    {
        // Central orchestrator: determine what research is needed
        string[] subtasks = await context.CallActivityAsync<string[]>(
            nameof(PlanResearchAgent), request.Topic);

        // Delegate to worker orchestrations in parallel
        var workerTasks = subtasks
            .Select(subtask => context.CallSubOrchestratorAsync<string>(
                nameof(ResearchWorkerOrchestration), subtask))
            .ToList();
        string[] results = await Task.WhenAll(workerTasks);

        // Synthesize results
        string finalReport = await context.CallActivityAsync<string>(
            nameof(SynthesizeAgent),
            new { request.Topic, Research = results });

        return finalReport;
    }
}

エバリュエーター オプティマイザー パターン

エバリュエーター オプティマイザー パターンは、ジェネレーター エージェントと、絞り込みループ内のエバリュエーター エージェントをペアにしています。 ジェネレーターは出力を生成し、エバリュエーターはそれを品質基準に対してスコア付けしてフィードバックを提供し、出力が成功するか、最大反復回数に達するまでループが繰り返されます。 各ループイテレーションはチェックポイントに保存されるため、3 回の洗練された反復が成功した後にクラッシュしても、その進捗は失われません。

このパターンは、品質をプログラムで測定できる場合 (たとえば、生成されたコードのコンパイルの検証や、名前付きエンティティを翻訳で保持する場合など) に特に役立ちます。

使用するタイミング: 自動レビュー、文学翻訳、反復的なコンテンツ絞り込み、複数の分析を必要とする複雑な検索タスクを使用したコード生成。

[Function(nameof(EvaluatorOptimizerOrchestration))]
public async Task<string> EvaluatorOptimizerOrchestration(
    [OrchestrationTrigger] TaskOrchestrationContext context)
{
    var request = context.GetInput<ContentRequest>();
    int maxIterations = 5;
    string content = "";
    string feedback = "";

    for (int i = 0; i < maxIterations; i++)
    {
        // Generate or refine content
        content = await context.CallActivityAsync<string>(
            nameof(GenerateContentAgent),
            new { request.Prompt, PreviousContent = content, Feedback = feedback });

        // Evaluate quality
        var evaluation = await context.CallActivityAsync<EvaluationResult>(
            nameof(EvaluateContentAgent), content);

        if (evaluation.MeetsQualityBar)
            return content;

        feedback = evaluation.Feedback;
    }

    return content; // Return best effort after max iterations
}
[DurableTask]
public class EvaluatorOptimizerOrchestration : TaskOrchestrator<ContentRequest, string>
{
    public override async Task<string> RunAsync(
        TaskOrchestrationContext context, ContentRequest request)
    {
        int maxIterations = 5;
        string content = "";
        string feedback = "";

        for (int i = 0; i < maxIterations; i++)
        {
            // Generate or refine content
            content = await context.CallActivityAsync<string>(
                nameof(GenerateContentAgent),
                new { request.Prompt, PreviousContent = content, Feedback = feedback });

            // Evaluate quality
            var evaluation = await context.CallActivityAsync<EvaluationResult>(
                nameof(EvaluateContentAgent), content);

            if (evaluation.MeetsQualityBar)
                return content;

            feedback = evaluation.Feedback;
        }

        return content; // Return best effort after max iterations
    }
}

エージェント ループ

一般的な AI エージェントの実装では、LLM がループ内で呼び出され、ツールが呼び出され、タスクが完了するか停止条件に達するまで決定が行われます。 決定論的ワークフローとは異なり、実行パスは定義済みではありません。 エージェントは、前の手順の結果に基づいて、各ステップでの処理を決定します。

エージェント ループは、ステップの数または順序を予測できないタスクに適しています。 一般的な例としては、オープン エンドのコーディング エージェント、自律的な調査、ツール呼び出し機能を備えた会話ボットなどがあります。

Durable Task プログラミング モデルを使用してエージェント ループを実装するには、次の 2 つの推奨される方法があります。

方法 説明 いつ使用するか
オーケストレーションベース エージェント ループは永続的なオーケストレーションです。 ツール呼び出しはアクティビティです。人間の入力は外部イベントを使用します。 きめ細かな制御、ツールごとの再試行ポリシー、分散ツールの実行、IDE デバッグ。
エンティティに基づく 各エージェント インスタンスは永続的エンティティです。 エージェント フレームワークはループを管理します。エンティティはセッションの永続化を提供します。 既にエージェント ループがあり、最小限の変更で持続性を追加するフレームワーク (Microsoft Agent Framework など) を使用しています。

オーケストレーション ベースのエージェント ループ

オーケストレーション ベースのエージェント ループには、 永続的なオーケストレーション (新規として続行) を組み合わせてメモリの境界を維持する機能、並列ツール実行用の ファンアウト/ファンイン 機能、および人間とループ内の対話のための 外部イベント という、いくつかの Durable Task 機能が組み合わされています。 ループの各イテレーション:

  1. アクティビティまたは ステートフル エンティティを介して、現在の会話コンテキストを LLM に送信します。
  2. ツール呼び出しを含む LLM の応答を受信します。
  3. 任意のツール呼び出しをアクティビティとして実行します (使用可能なコンピューティング全体に分散されます)。
  4. 必要に応じて、外部イベントを使用して人間の入力を待機します。
  5. 更新された状態でループを続行するか、エージェントが完了を通知すると完了します。
[Function(nameof(AgentLoopOrchestration))]
public async Task<string> AgentLoopOrchestration(
    [OrchestrationTrigger] TaskOrchestrationContext context)
{
    // Get state from input (supports continue-as-new)
    var state = context.GetInput<AgentState>() ?? new AgentState();

    int maxIterations = 100;
    while (state.Iteration < maxIterations)
    {
        // Send conversation history to the LLM
        var llmResponse = await context.CallActivityAsync<LlmResponse>(
            nameof(CallLlmAgent), state.Messages);

        state.Messages.Add(llmResponse.Message);

        // If the LLM returned tool calls, execute them in parallel
        if (llmResponse.ToolCalls is { Count: > 0 })
        {
            var toolTasks = llmResponse.ToolCalls
                .Select(tc => context.CallActivityAsync<ToolResult>(
                    nameof(ExecuteTool), tc))
                .ToList();
            ToolResult[] toolResults = await Task.WhenAll(toolTasks);

            foreach (var result in toolResults)
                state.Messages.Add(result.ToMessage());
        }
        // If the LLM needs human input, wait for it
        else if (llmResponse.NeedsHumanInput)
        {
            string humanInput = await context.WaitForExternalEvent<string>("HumanInput");
            state.Messages.Add(new Message("user", humanInput));
        }
        // LLM is done
        else
        {
            return llmResponse.FinalAnswer;
        }

        state.Iteration++;

        // Periodically continue-as-new to keep the history bounded
        if (state.Iteration % 10 == 0)
        {
            context.ContinueAsNew(state);
            return null!; // Orchestration will restart with updated state
        }
    }

    return "Max iterations reached.";
}
[DurableTask]
public class AgentLoopOrchestration : TaskOrchestrator<AgentState, string>
{
    public override async Task<string> RunAsync(
        TaskOrchestrationContext context, AgentState? state)
    {
        state ??= new AgentState();

        int maxIterations = 100;
        while (state.Iteration < maxIterations)
        {
            // Send conversation history to the LLM
            var llmResponse = await context.CallActivityAsync<LlmResponse>(
                nameof(CallLlmAgent), state.Messages);

            state.Messages.Add(llmResponse.Message);

            // If the LLM returned tool calls, execute them
            if (llmResponse.ToolCalls is { Count: > 0 })
            {
                var toolTasks = llmResponse.ToolCalls
                    .Select(tc => context.CallActivityAsync<ToolResult>(
                        nameof(ExecuteTool), tc))
                    .ToList();
                ToolResult[] toolResults = await Task.WhenAll(toolTasks);

                foreach (var result in toolResults)
                    state.Messages.Add(result.ToMessage());
            }
            // If the LLM needs human input, wait for it
            else if (llmResponse.NeedsHumanInput)
            {
                string humanInput = await context.WaitForExternalEvent<string>("HumanInput");
                state.Messages.Add(new Message("user", humanInput));
            }
            // LLM is done
            else
            {
                return llmResponse.FinalAnswer;
            }

            state.Iteration++;

            // Periodically continue-as-new to keep the history bounded
            if (state.Iteration % 10 == 0)
            {
                context.ContinueAsNew(state);
                return null!;
            }
        }

        return "Max iterations reached.";
    }
}

エンティティ ベースのエージェント ループ

独自のエージェント ループを既に実装しているエージェント フレームワークを使用している場合は、ループ ロジックを書き換えずに持続性を追加するために 永続的エンティティ でラップできます。 各エンティティ インスタンスは、1 つのエージェント セッションを表します。 エンティティはメッセージを受信し、エージェント フレームワークに内部的に委任し、対話間で会話状態を保持します。

このアプローチの主な利点は、シンプルさです。エージェントの制御フローを再設計するのではなく、優先フレームワークを使用してエージェントを記述し、ホスティングの懸念事項として持続性を追加します。 エンティティは永続的ラッパーとして機能し、セッションの永続化と回復を自動的に処理します。

次の例では、既存のエージェント SDK を永続的エンティティとしてラップする方法を示します。 エンティティは、クライアントがユーザー入力を送信するために呼び出す message 操作を公開します。 内部的には、エンティティは、独自のツール呼び出しループを管理するエージェント フレームワークに委任します。

// Define the entity that wraps an existing agent SDK
public class ChatAgentEntity : TaskEntity<ChatAgentState>
{
    private readonly IChatClient _chatClient;

    public ChatAgentEntity(IChatClient chatClient)
    {
        _chatClient = chatClient;
    }

    // Called by clients to send a message to the agent
    public async Task<string> Message(string userMessage)
    {
        // Add the user message to the conversation history
        State.Messages.Add(new ChatMessage(ChatRole.User, userMessage));

        // Delegate to the agent SDK for the LLM call (with tool loop)
        ChatResponse response = await _chatClient.GetResponseAsync(
            State.Messages, State.Options);

        // Persist the response in the entity state
        State.Messages.AddRange(response.Messages);

        return response.Text;
    }

    // Azure Functions entry point for the entity
    [Function(nameof(ChatAgentEntity))]
    public Task RunEntityAsync([EntityTrigger] TaskEntityDispatcher dispatcher)
    {
        return dispatcher.DispatchAsync<ChatAgentEntity>();
    }
}
// Define the entity that wraps an existing agent SDK
[DurableTask(Name = "ChatAgent")]
public class ChatAgentEntity : TaskEntity<ChatAgentState>
{
    private readonly IChatClient _chatClient;

    public ChatAgentEntity(IChatClient chatClient)
    {
        _chatClient = chatClient;
    }

    // Called by clients to send a message to the agent
    public async Task<string> Message(string userMessage)
    {
        // Add the user message to the conversation history
        State.Messages.Add(new ChatMessage(ChatRole.User, userMessage));

        // Delegate to the agent SDK for the LLM call (with tool loop)
        ChatResponse response = await _chatClient.GetResponseAsync(
            State.Messages, State.Options);

        // Persist the response in the entity state
        State.Messages.AddRange(response.Messages);

        return response.Text;
    }
}

Microsoft Agent FrameworkDurable Task 拡張機能では>このアプローチが使用されます。 Microsoft Agent Framework エージェントが永続的エンティティとしてラップされ、永続的なセッション、自動チェックポイント処理、および組み込みの API エンドポイントが 1 行の構成で提供されます。

次のステップ