Appearance
Microsoft Agent Framework Implementation Guide
Overview
This guide provides step-by-step instructions for implementing the orchestration layer using Microsoft Agent Framework (MAF) in Python.
Prerequisites
- Python 3.12+
- Azure OpenAI access
- MCP tool servers running
- Understanding of async Python programming
- Familiarity with FastAPI
Installation
1. Install Microsoft Agent Framework
bash
pip install agent-frameworkThe MAF Python SDK provides:
- Agent creation and orchestration
- Workflow management
- LLM integration (Azure OpenAI, OpenAI, etc.)
- Tool calling capabilities
- State management
2. Additional Dependencies
bash
pip install fastapi uvicorn httpx sse-starlette pydantic python-dotenv
pip install opentelemetry-api opentelemetry-sdk
pip install azure-identity azure-ai-projectsCore Concepts
1. Agents in MAF
Agents are the primary building blocks in MAF. Each agent has:
- Name: Unique identifier
- System Prompt: Instructions for the agent's behavior
- Tools: Functions the agent can call
- LLM: Language model for decision making
- State: Conversation context and history
Example agent creation:
python
from agent_framework import Agent
from azure.ai.inference import ChatCompletionsClient
from azure.core.credentials import AzureKeyCredential
# Create LLM client
llm_client = ChatCompletionsClient(
endpoint=AZURE_OPENAI_ENDPOINT,
credential=AzureKeyCredential(AZURE_OPENAI_API_KEY)
)
# Create agent
customer_query_agent = Agent(
name="CustomerQueryAgent",
system_prompt="Extract customer preferences from travel queries.",
tools=[analyze_query_tool, extract_preferences_tool],
llm_client=llm_client,
model=AZURE_OPENAI_DEPLOYMENT
)2. Tools in MAF
Tools are functions that agents can call. For MCP integration, tools wrap MCP client calls:
python
from agent_framework import Tool
async def call_customer_query_mcp(query: str) -> dict:
"""Call the customer query MCP server."""
async with MCPClient(MCP_CUSTOMER_QUERY_URL) as client:
result = await client.call_tool("analyze_query", {"query": query})
return result
# Create tool from function
analyze_query_tool = Tool(
name="analyze_customer_query",
description="Analyze customer travel query and extract preferences",
function=call_customer_query_mcp
)3. Workflows in MAF
Workflows orchestrate multiple agents:
python
from agent_framework import Workflow
# Create workflow
travel_workflow = Workflow(
name="TravelPlanningWorkflow",
agents=[
triage_agent,
customer_query_agent,
destination_agent,
itinerary_agent
],
root_agent=triage_agent
)
# Execute workflow
result = await travel_workflow.run(user_message)4. Agent Handoff
MAF supports agent-to-agent handoff:
python
# In triage agent's logic
if needs_destination_recommendation:
# Handoff to destination agent
result = await workflow.handoff(
to_agent="DestinationRecommendationAgent",
context={
"preferences": extracted_preferences,
"budget": budget_info
}
)Implementation Steps
Step 1: Project Setup
Create the Python project structure:
bash
cd src
mkdir -p api-python/src/{orchestrator/agents,orchestrator/tools,api,utils}
cd api-pythonCreate pyproject.toml:
toml
[project]
name = "@azure-ai-travel-agents/api-gateway-python"
version = "1.0.0"
requires-python = ">=3.12"
dependencies = [
"agent-framework>=0.1.0",
"fastapi>=0.115.0",
"uvicorn>=0.34.0",
"httpx>=0.28.1",
"sse-starlette>=2.2.1",
"pydantic>=2.10.6",
"python-dotenv>=1.0.1",
"opentelemetry-api>=1.30.0",
"opentelemetry-sdk>=1.30.0",
"azure-identity>=1.20.0",
"azure-ai-projects>=1.0.0b5"
]
[tool.ruff]
line-length = 120
target-version = "py312"
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_backend"Step 2: Configuration
Create src/config.py:
python
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
# Azure OpenAI
azure_openai_endpoint: str
azure_openai_api_key: str
azure_openai_deployment: str
azure_openai_api_version: str = "2024-02-15-preview"
# MCP Servers
mcp_customer_query_url: str
mcp_destination_recommendation_url: str
mcp_itinerary_planning_url: str
mcp_echo_ping_url: str
mcp_echo_ping_access_token: Optional[str] = None
# Server
port: int = 4000
log_level: str = "INFO"
# Telemetry
otlp_endpoint: Optional[str] = None
otel_service_name: str = "api-python"
class Config:
env_file = ".env"
settings = Settings()Step 3: MCP Client Implementation
Create src/orchestrator/tools/mcp_client.py:
python
import httpx
from typing import Any, Dict, Optional
from abc import ABC, abstractmethod
class MCPClient(ABC):
"""Base MCP client interface."""
@abstractmethod
async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
"""Call a tool on the MCP server."""
pass
@abstractmethod
async def list_tools(self) -> list:
"""List available tools."""
pass
class HTTPMCPClient(MCPClient):
"""HTTP-based MCP client."""
def __init__(self, base_url: str, access_token: Optional[str] = None):
self.base_url = base_url
self.access_token = access_token
self.client = httpx.AsyncClient(timeout=30.0)
async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
"""Call a tool via HTTP."""
headers = {}
if self.access_token:
headers["Authorization"] = f"Bearer {self.access_token}"
response = await self.client.post(
f"{self.base_url}/call",
json={"name": tool_name, "arguments": arguments},
headers=headers
)
response.raise_for_status()
return response.json()
async def list_tools(self) -> list:
"""List available tools."""
response = await self.client.get(f"{self.base_url}")
response.raise_for_status()
return response.json()
async def close(self):
"""Close the HTTP client."""
await self.client.aclose()Step 4: Tool Registry
Create src/orchestrator/tools/tool_registry.py:
python
from typing import Dict
from .mcp_client import HTTPMCPClient
from ..config import settings
class ToolRegistry:
"""Registry for MCP tools."""
def __init__(self):
self.clients: Dict[str, HTTPMCPClient] = {}
self._initialize_clients()
def _initialize_clients(self):
"""Initialize MCP clients."""
# Customer Query
self.clients["customer-query"] = HTTPMCPClient(
settings.mcp_customer_query_url
)
# Destination Recommendation
self.clients["destination-recommendation"] = HTTPMCPClient(
settings.mcp_destination_recommendation_url
)
# Itinerary Planning
self.clients["itinerary-planning"] = HTTPMCPClient(
settings.mcp_itinerary_planning_url
)
# Echo Ping
self.clients["echo-ping"] = HTTPMCPClient(
settings.mcp_echo_ping_url,
access_token=settings.mcp_echo_ping_access_token
)
async def call_tool(self, server: str, tool_name: str, arguments: dict) -> any:
"""Call a tool on a specific MCP server."""
if server not in self.clients:
raise ValueError(f"Unknown MCP server: {server}")
return await self.clients[server].call_tool(tool_name, arguments)
async def close_all(self):
"""Close all MCP clients."""
for client in self.clients.values():
await client.close()
# Global instance
tool_registry = ToolRegistry()Step 5: Agent Implementation
Create src/orchestrator/agents/base.py:
python
from agent_framework import Agent, Tool
from typing import List, Optional
class BaseAgent:
"""Base class for all agents."""
def __init__(
self,
name: str,
system_prompt: str,
tools: List[Tool],
llm_client,
model: str
):
self.agent = Agent(
name=name,
system_prompt=system_prompt,
tools=tools,
llm_client=llm_client,
model=model
)
async def process(self, message: str, context: Optional[dict] = None):
"""Process a message."""
return await self.agent.run(message, context=context)
@property
def name(self) -> str:
"""Get agent name."""
return self.agent.nameCreate src/orchestrator/agents/triage_agent.py:
python
from .base import BaseAgent
from agent_framework import Tool
class TriageAgent(BaseAgent):
"""Triage agent that routes queries to specialized agents."""
def __init__(self, llm_client, model: str, all_tools: list):
system_prompt = """
You are a triage agent for a travel agency AI system.
Your role is to analyze customer queries and determine which specialized agents
should handle them.
You can delegate to these agents:
- CustomerQueryAgent: For understanding customer preferences
- DestinationRecommendationAgent: For suggesting destinations
- ItineraryPlanningAgent: For creating travel itineraries
Always choose the most appropriate agent(s) for the query.
You may coordinate multiple agents for complex queries.
"""
super().__init__(
name="TriageAgent",
system_prompt=system_prompt,
tools=all_tools,
llm_client=llm_client,
model=model
)Step 6: Workflow Orchestration
Create src/orchestrator/workflow.py:
python
from agent_framework import Workflow
from .agents.triage_agent import TriageAgent
from .agents.customer_query_agent import CustomerQueryAgent
# ... import other agents
from .tools.tool_registry import tool_registry
from azure.ai.inference import ChatCompletionsClient
from azure.core.credentials import AzureKeyCredential
from config import settings
class TravelWorkflow:
"""Main workflow for travel agent system."""
def __init__(self):
# Initialize LLM client
self.llm_client = ChatCompletionsClient(
endpoint=settings.azure_openai_endpoint,
credential=AzureKeyCredential(settings.azure_openai_api_key)
)
# Initialize agents
self.agents = self._initialize_agents()
# Create workflow
self.workflow = Workflow(
name="TravelPlanningWorkflow",
agents=list(self.agents.values()),
root_agent=self.agents["triage"]
)
def _initialize_agents(self) -> dict:
"""Initialize all agents."""
agents = {}
# Create tools and agents
# ... (implementation details)
return agents
async def run(self, message: str, selected_tools: list = None):
"""Execute the workflow."""
# Filter tools based on selection
# Run workflow
async for event in self.workflow.run(message):
yield eventStep 7: FastAPI Integration
Create src/main.py:
python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from sse_starlette import EventSourceResponse
from .orchestrator.workflow import TravelWorkflow
from .config import settings
import logging
app = FastAPI(title="Azure AI Travel Agents API (Python)")
workflow = TravelWorkflow()
logging.basicConfig(level=settings.log_level)
logger = logging.getLogger(__name__)
@app.get("/api/health")
async def health():
"""Health check endpoint."""
return {"status": "OK"}
@app.post("/api/chat")
async def chat(request: dict):
"""Chat endpoint with streaming responses."""
message = request.get("message")
tools = request.get("tools", [])
if not message:
return {"error": "Message is required"}, 400
async def event_generator():
"""Generate SSE events from workflow."""
async for event in workflow.run(message, selected_tools=tools):
yield {
"event": "message",
"data": event.model_dump_json()
}
return EventSourceResponse(event_generator())
@app.get("/api/tools")
async def list_tools():
"""List available tools."""
# Implementation
pass
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=settings.port)Best Practices
1. Error Handling
python
from agent_framework import AgentError
async def safe_agent_call(agent, message, context=None):
"""Safely call an agent with error handling."""
try:
return await agent.process(message, context)
except AgentError as e:
logger.error(f"Agent error: {e}")
# Implement fallback logic
return {"error": str(e), "fallback": True}
except Exception as e:
logger.exception(f"Unexpected error: {e}")
raise2. Observability
python
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
async def traced_agent_call(agent, message):
"""Call agent with tracing."""
with tracer.start_as_current_span(f"agent_{agent.name}") as span:
span.set_attribute("agent.name", agent.name)
span.set_attribute("message.length", len(message))
result = await agent.process(message)
span.set_attribute("result.success", True)
return result3. State Management
python
from agent_framework import ConversationState
class StatefulWorkflow:
"""Workflow with state management."""
def __init__(self):
self.state = ConversationState()
async def run(self, message: str):
"""Run workflow with state."""
# Add message to state
self.state.add_message("user", message)
# Execute workflow with state
result = await self.workflow.run(message, state=self.state)
# Update state with result
self.state.add_message("assistant", result)
return resultTesting
Unit Tests
python
import pytest
from orchestrator.agents.triage_agent import TriageAgent
@pytest.mark.asyncio
async def test_triage_agent():
"""Test triage agent."""
agent = TriageAgent(llm_client, model, tools)
result = await agent.process("I want to plan a trip to Japan")
assert result is not NoneIntegration Tests
python
@pytest.mark.asyncio
async def test_workflow_execution():
"""Test complete workflow."""
workflow = TravelWorkflow()
events = []
async for event in workflow.run("Plan a 7-day trip to Japan"):
events.append(event)
assert len(events) > 0Deployment
Docker
Create Dockerfile:
dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY pyproject.toml .
RUN pip install --no-cache-dir -e .
COPY src/ ./src/
EXPOSE 4000
CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "4000"]Environment Variables
Ensure all required environment variables are set in production:
- Azure OpenAI credentials
- MCP server URLs
- Telemetry endpoints
Migration from LlamaIndex.TS
Key Differences
- Language: Python vs TypeScript
- Framework: MAF vs LlamaIndex.TS
- Agent Pattern: Similar multi-agent pattern
- Tool Integration: MCP clients remain the same
Migration Steps
- Deploy Python API alongside TypeScript API
- Test with subset of traffic
- Validate all workflows
- Gradually shift traffic
- Deprecate old API when ready
Troubleshooting
Common Issues
- MCP Connection Errors: Check MCP server URLs and network connectivity
- Azure OpenAI Errors: Verify credentials and deployment name
- Agent Timeout: Increase timeout in MCP client configuration
- Memory Issues: Monitor workflow state size
Conclusion
This implementation guide provides the foundation for building the orchestration layer using Microsoft Agent Framework. Follow the steps sequentially and test thoroughly at each stage.
Next Steps
- Complete all agent implementations
- Add comprehensive error handling
- Implement full test coverage
- Set up CI/CD pipeline
- Performance optimization
- Production deployment
