Appearance
Microsoft Agent Framework Quick Reference
Overview
This quick reference provides code snippets and examples for common patterns when using Microsoft Agent Framework (MAF) in the Azure AI Travel Agents project.
Installation
bash
# Install MAF Python SDK
pip install agent-framework
# Install additional dependencies
pip install azure-ai-inference azure-identityBasic Agent Creation
python
from agent_framework import Agent
from azure.ai.inference import ChatCompletionsClient
from azure.core.credentials import AzureKeyCredential
# Create LLM client
llm_client = ChatCompletionsClient(
endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
credential=AzureKeyCredential(os.getenv("AZURE_OPENAI_API_KEY"))
)
# Create agent
agent = Agent(
name="MyAgent",
system_prompt="You are a helpful assistant.",
tools=[], # List of tools
llm_client=llm_client,
model=os.getenv("AZURE_OPENAI_DEPLOYMENT")
)
# Use agent
response = await agent.run("Hello, how can you help me?")
print(response)Tool Definition
Simple Tool
python
from agent_framework import Tool
def calculate_distance(origin: str, destination: str) -> float:
"""Calculate distance between two locations."""
# Implementation
return 150.5
# Create tool
distance_tool = Tool(
name="calculate_distance",
description="Calculate the distance between two locations in kilometers",
function=calculate_distance
)Async Tool with MCP Integration
python
import httpx
from agent_framework import Tool
async def search_destinations(preferences: dict) -> list:
"""Search for destinations via MCP server."""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{MCP_SERVER_URL}/mcp/call",
json={
"name": "search_destinations",
"arguments": preferences
}
)
return response.json()
# Create tool
search_tool = Tool(
name="search_destinations",
description="Search for travel destinations based on preferences",
function=search_destinations
)Tool with Type Annotations
python
from typing import Dict, List
from pydantic import BaseModel
from agent_framework import Tool
class Preferences(BaseModel):
budget: float
duration_days: int
interests: List[str]
async def recommend_destinations(preferences: Preferences) -> Dict:
"""Recommend destinations based on preferences."""
# Implementation
return {
"destinations": ["Paris", "Rome", "Barcelona"],
"reasoning": "Based on budget and interests"
}
recommend_tool = Tool(
name="recommend_destinations",
description="Get destination recommendations",
function=recommend_destinations
)Multi-Agent Workflow
Basic Workflow
python
from agent_framework import Workflow
# Create agents
triage_agent = Agent(name="TriageAgent", ...)
specialist_agent = Agent(name="SpecialistAgent", ...)
# Create workflow
workflow = Workflow(
name="TravelWorkflow",
agents=[triage_agent, specialist_agent],
root_agent=triage_agent
)
# Run workflow
result = await workflow.run("I want to plan a trip")Workflow with State
python
from agent_framework import Workflow, ConversationState
# Create workflow with state
workflow = Workflow(
name="StatefulWorkflow",
agents=[agent1, agent2],
root_agent=agent1,
state=ConversationState()
)
# Run with state preservation
async for event in workflow.run("First message"):
print(event)
# Continue conversation
async for event in workflow.run("Follow-up message"):
print(event)Agent Handoff
Explicit Handoff
python
from agent_framework import Agent, Workflow
class TriageAgent:
def __init__(self, workflow: Workflow):
self.workflow = workflow
async def process(self, message: str):
# Analyze message
if "destination" in message.lower():
# Handoff to destination agent
return await self.workflow.handoff(
to_agent="DestinationAgent",
context={"query": message}
)
# Handle normally
return await self.agent.run(message)Conditional Routing
python
async def route_query(workflow: Workflow, query: str):
"""Route query to appropriate agent."""
# Determine intent
if "plan itinerary" in query.lower():
return await workflow.handoff(
to_agent="ItineraryPlanningAgent",
context={"query": query}
)
elif "recommend destination" in query.lower():
return await workflow.handoff(
to_agent="DestinationRecommendationAgent",
context={"query": query}
)
else:
# Default to triage agent
return await workflow.root_agent.run(query)Parallel Execution
python
import asyncio
from agent_framework import Agent
async def parallel_agent_execution(agents: list, query: str):
"""Execute multiple agents in parallel."""
tasks = [agent.run(query) for agent in agents]
results = await asyncio.gather(*tasks)
return results
# Usage
results = await parallel_agent_execution(
[destination_agent, itinerary_agent],
"Find destinations in Europe"
)Error Handling
Basic Error Handling
python
from agent_framework import Agent, AgentError
async def safe_agent_call(agent: Agent, message: str):
"""Call agent with error handling."""
try:
return await agent.run(message)
except AgentError as e:
print(f"Agent error: {e}")
return {"error": str(e)}
except Exception as e:
print(f"Unexpected error: {e}")
raiseRetry with Exponential Backoff
python
import asyncio
from typing import Optional
async def call_with_retry(
agent: Agent,
message: str,
max_retries: int = 3,
base_delay: float = 1.0
) -> Optional[dict]:
"""Call agent with retry logic."""
for attempt in range(max_retries):
try:
return await agent.run(message)
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
print(f"Attempt {attempt + 1} failed, retrying in {delay}s...")
await asyncio.sleep(delay)
return NoneStreaming Responses
Basic Streaming
python
from agent_framework import Agent
async def stream_agent_response(agent: Agent, message: str):
"""Stream agent responses."""
async for event in agent.stream(message):
# Process event
if event.type == "token":
print(event.data, end="", flush=True)
elif event.type == "complete":
print("\nDone!")SSE Streaming with FastAPI
python
from fastapi import FastAPI
from sse_starlette import EventSourceResponse
from agent_framework import Workflow
app = FastAPI()
workflow = Workflow(...)
@app.post("/chat")
async def chat(message: str):
"""Chat endpoint with SSE streaming."""
async def event_generator():
async for event in workflow.run(message):
yield {
"event": "message",
"data": event.model_dump_json()
}
return EventSourceResponse(event_generator())State Management
Conversation State
python
from agent_framework import ConversationState
# Create state
state = ConversationState()
# Add messages
state.add_message("user", "Hello")
state.add_message("assistant", "Hi! How can I help?")
# Get history
history = state.get_messages()
# Clear state
state.clear()Custom State
python
from typing import Dict, Any
class TravelState:
"""Custom state for travel planning."""
def __init__(self):
self.preferences: Dict[str, Any] = {}
self.destinations: list = []
self.itinerary: Dict[str, Any] = {}
self.context: Dict[str, Any] = {}
def update_preferences(self, preferences: dict):
"""Update user preferences."""
self.preferences.update(preferences)
def add_destination(self, destination: dict):
"""Add a destination."""
self.destinations.append(destination)
def set_itinerary(self, itinerary: dict):
"""Set the travel itinerary."""
self.itinerary = itineraryObservability
OpenTelemetry Integration
python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# Configure tracer
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# Add exporter
otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:4317")
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(otlp_exporter)
)
# Use in agent
async def traced_agent_call(agent: Agent, message: str):
"""Call agent with tracing."""
with tracer.start_as_current_span("agent_call") as span:
span.set_attribute("agent.name", agent.name)
span.set_attribute("message.length", len(message))
result = await agent.run(message)
span.set_attribute("result.success", True)
return resultStructured Logging
python
import logging
import json
# Configure structured logging
logging.basicConfig(
level=logging.INFO,
format='{"timestamp": "%(asctime)s", "level": "%(levelname)s", "message": "%(message)s"}'
)
logger = logging.getLogger(__name__)
async def logged_agent_call(agent: Agent, message: str):
"""Call agent with structured logging."""
logger.info(json.dumps({
"event": "agent_call_start",
"agent": agent.name,
"message_preview": message[:50]
}))
try:
result = await agent.run(message)
logger.info(json.dumps({
"event": "agent_call_success",
"agent": agent.name
}))
return result
except Exception as e:
logger.error(json.dumps({
"event": "agent_call_error",
"agent": agent.name,
"error": str(e)
}))
raiseConfiguration Management
Environment-based Configuration
python
from pydantic_settings import BaseSettings
from typing import Optional
class AgentConfig(BaseSettings):
"""Agent configuration."""
# Azure OpenAI
azure_openai_endpoint: str
azure_openai_api_key: str
azure_openai_deployment: str
azure_openai_api_version: str = "2024-02-15-preview"
# Agent settings
agent_temperature: float = 0.7
agent_max_tokens: int = 1000
agent_timeout: int = 30
# Observability
enable_tracing: bool = True
otlp_endpoint: Optional[str] = None
class Config:
env_file = ".env"
# Load config
config = AgentConfig()
# Use in agent
llm_client = ChatCompletionsClient(
endpoint=config.azure_openai_endpoint,
credential=AzureKeyCredential(config.azure_openai_api_key)
)Testing
Unit Test
python
import pytest
from agent_framework import Agent, Tool
@pytest.mark.asyncio
async def test_agent_creation():
"""Test agent creation."""
agent = Agent(
name="TestAgent",
system_prompt="Test prompt",
tools=[],
llm_client=mock_llm_client,
model="test-model"
)
assert agent.name == "TestAgent"
@pytest.mark.asyncio
async def test_agent_execution():
"""Test agent execution."""
agent = Agent(...)
result = await agent.run("Test message")
assert result is not NoneIntegration Test
python
import pytest
from agent_framework import Workflow
@pytest.mark.asyncio
async def test_workflow_execution():
"""Test complete workflow."""
workflow = Workflow(
name="TestWorkflow",
agents=[agent1, agent2],
root_agent=agent1
)
events = []
async for event in workflow.run("Test query"):
events.append(event)
assert len(events) > 0
assert events[-1].type == "complete"Mock Tools
python
from unittest.mock import AsyncMock
from agent_framework import Tool
async def test_agent_with_mock_tool():
"""Test agent with mocked tool."""
# Create mock tool
mock_function = AsyncMock(return_value={"result": "mocked"})
mock_tool = Tool(
name="mock_tool",
description="Mock tool for testing",
function=mock_function
)
# Create agent with mock tool
agent = Agent(
name="TestAgent",
tools=[mock_tool],
...
)
# Test
result = await agent.run("Call the tool")
mock_function.assert_called_once()Best Practices
1. Agent Design
python
# ✅ Good - Focused agent
customer_query_agent = Agent(
name="CustomerQueryAgent",
system_prompt="Extract customer preferences from travel queries.",
tools=[extract_preferences_tool, analyze_query_tool]
)
# ❌ Bad - Too many responsibilities
super_agent = Agent(
name="SuperAgent",
system_prompt="Do everything",
tools=[all_tools]
)2. Error Handling
python
# ✅ Good - Specific error handling
try:
result = await agent.run(message)
except AgentError as e:
logger.error(f"Agent error: {e}")
return fallback_response()
except TimeoutError as e:
logger.error(f"Timeout: {e}")
return timeout_response()
except Exception as e:
logger.exception(f"Unexpected error: {e}")
raise
# ❌ Bad - Catch all
try:
result = await agent.run(message)
except:
pass3. Resource Cleanup
python
# ✅ Good - Proper cleanup
async with httpx.AsyncClient() as client:
response = await client.post(...)
# ❌ Bad - No cleanup
client = httpx.AsyncClient()
response = await client.post(...)
# Client never closed4. Logging
python
# ✅ Good - Structured logging
logger.info(json.dumps({
"event": "agent_call",
"agent": agent.name,
"duration_ms": duration
}))
# ❌ Bad - Unstructured logging
logger.info(f"Agent {agent.name} took {duration}ms")Common Patterns
Circuit Breaker
python
from typing import Optional
import time
class CircuitBreaker:
"""Simple circuit breaker implementation."""
def __init__(self, failure_threshold: int = 5, timeout: int = 60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failures = 0
self.last_failure_time: Optional[float] = None
self.state = "closed" # closed, open, half_open
async def call(self, func, *args, **kwargs):
"""Call function with circuit breaker."""
if self.state == "open":
if time.time() - self.last_failure_time > self.timeout:
self.state = "half_open"
else:
raise Exception("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
self.failures = 0
self.state = "closed"
return result
except Exception as e:
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.state = "open"
raise
# Usage
circuit_breaker = CircuitBreaker()
result = await circuit_breaker.call(agent.run, message)Caching
python
from functools import lru_cache
import hashlib
class AgentCache:
"""Simple cache for agent responses."""
def __init__(self, max_size: int = 100):
self.cache: Dict[str, Any] = {}
self.max_size = max_size
def _key(self, agent_name: str, message: str) -> str:
"""Generate cache key."""
content = f"{agent_name}:{message}"
return hashlib.md5(content.encode()).hexdigest()
async def get_or_call(
self,
agent: Agent,
message: str
) -> Any:
"""Get from cache or call agent."""
key = self._key(agent.name, message)
if key in self.cache:
return self.cache[key]
result = await agent.run(message)
if len(self.cache) >= self.max_size:
# Simple eviction: remove oldest
self.cache.pop(next(iter(self.cache)))
self.cache[key] = result
return result
# Usage
cache = AgentCache()
result = await cache.get_or_call(agent, "Hello")Resources
- Microsoft Agent Framework GitHub
- MAF Documentation
- Azure AI Services
- FastAPI Documentation
- Pydantic Documentation
Contributing
To add more examples or improve this reference:
- Test the code in a real environment
- Ensure examples follow best practices
- Add clear comments and documentation
- Submit a pull request
Last updated: 2025-01-02
