Speech Recognition
Speech Recognition API¶
The Real-Time Voice Agent integrates Azure Cognitive Speech Services through multiple API endpoints, each optimized for different interaction patterns and streaming modes.
API Integration Points¶
WebSocket Endpoints with STT Integration¶
/api/v1/media/stream - ACS Media Streaming¶
Real-time speech recognition for Azure Communication Services calls:
- Handler:
ACSMediaHandlerorVoiceLiveHandlerbased onACS_STREAMING_MODE - STT Integration: Pooled
StreamingSpeechRecognizerFromByteswith three-thread architecture - Features: Immediate barge-in detection, conversation memory, Azure OpenAI orchestration
- Use Case: Phone calls through Azure Communication Services
// Connect to ACS media streaming with speech recognition
const ws = new WebSocket(
`wss://api.domain.com/api/v1/media/stream?call_connection_id=${callId}`
);
// Send audio frames for recognition
ws.send(base64AudioData);
// Receive transcripts and AI responses
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'transcript') {
console.log('Recognized:', data.text);
}
};
/api/v1/realtime/conversation - Browser Voice Conversations¶
Speech recognition for web-based voice interactions:
- Handler: Dedicated orchestrator with STT/TTS pooling
- STT Integration: Per-connection speech recognizer with partial/final callbacks
- Features: Session persistence, dashboard broadcasting, connection queuing
- Use Case: Browser-based voice conversations and testing
// Connect for browser-based speech recognition
const ws = new WebSocket(
`wss://api.domain.com/api/v1/realtime/conversation?session_id=${sessionId}`
);
// Send audio bytes for real-time recognition
ws.send(audioBuffer);
Core Speech Recognition Class¶
All endpoints use the StreamingSpeechRecognizerFromBytes class for consistent speech processing:
from src.speech.speech_recognizer import StreamingSpeechRecognizerFromBytes
# Initialized automatically by handlers based on endpoint
recognizer = StreamingSpeechRecognizerFromBytes(
speech_key="${AZURE_SPEECH_KEY}", # or DefaultAzureCredential
speech_region="eastus",
languages=["en-US", "es-ES"],
enable_diarization=True,
)
# Callbacks are set by handlers for integration
async def handle_partial_result(text):
# Immediate barge-in detection for ACS calls
print("Partial (barge-in):", text)
async def handle_final_result(text):
# Complete utterance for orchestrator processing
print("Final transcript:", text)
recognizer.on_partial_result = handle_partial_result
recognizer.on_final_result = handle_final_result
Handler-Specific Speech Recognition¶
ACS Media Handler (ACSMediaHandler)¶
Streaming Mode: MEDIA or TRANSCRIPTION
Endpoint: /api/v1/media/stream
Implements three-thread architecture for sub-50ms barge-in detection:
# Thread 1: Speech SDK Thread (never blocks)
def on_partial_callback(text: str, lang: str, speaker_id: str):
"""Immediate barge-in detection - called from Speech SDK thread"""
# Schedule cancellation on main event loop
main_loop.call_soon_threadsafe(schedule_barge_in, text)
def on_final_callback(text: str, lang: str):
"""Queue final speech for processing - called from Speech SDK thread"""
# Thread-safe queue operation
speech_queue.put_nowait((text, lang))
# Thread 2: Route Turn Thread (blocks on queue only)
while True:
final_text, lang = await speech_queue.get()
# Process through orchestrator (may take seconds)
await route_turn(memory_manager, final_text, websocket)
# Thread 3: Main Event Loop (never blocks)
async def schedule_barge_in(partial_text: str):
"""Cancel current TTS playback immediately (< 50ms)"""
if playback_task and not playback_task.done():
playback_task.cancel()
await send_stop_audio_to_acs()
Key Features:
- Immediate barge-in: Partial results trigger instant TTS cancellation
- Non-blocking recognition: Speech SDK runs in dedicated thread
- Queue-based processing: Final results processed sequentially
- Resource pooling: Shared STT clients across ACS calls
Voice Live Handler (VoiceLiveHandler)¶
Streaming Mode: VOICE_LIVE
Endpoint: /api/v1/media/stream
Integrates with Azure Voice Live API for advanced conversation handling:
# Voice Live integration handles STT internally
voice_live_agent = build_lva_from_yaml(agent_config)
await voice_live_agent.connect()
async def handle_audio_data(audio_base64: str):
"""Send audio to Voice Live API"""
await voice_live_agent.send_audio(audio_base64)
# Responses come back through Voice Live websocket
def on_voice_live_response(response):
"""Handle AI response from Voice Live"""
await websocket.send_json({
"type": "assistant_message",
"content": response.text,
"audio": response.audio_data
})
Key Features:
- Azure Voice Live Integration: Direct API connection to advanced conversational AI
- Semantic Voice Activity: Advanced voice activity detection beyond traditional VAD
- Natural Conversations: Maintains conversation context and flow
- Emotion Detection: Can detect and respond to emotional cues
Realtime Conversation Handler¶
Endpoint: /api/v1/realtime/conversation
Browser-based speech recognition with session persistence:
# Per-connection STT client with callback registration
stt_client = await stt_pool.acquire()
def on_partial(text: str, lang: str, speaker_id: str):
"""Handle partial results for barge-in"""
if websocket.state.is_synthesizing:
# Stop current TTS synthesis
websocket.state.tts_client.stop_speaking()
websocket.state.is_synthesizing = False
def on_final(text: str, lang: str):
"""Queue final text for orchestrator processing"""
websocket.state.user_buffer += text.strip() + "\n"
stt_client.set_partial_result_callback(on_partial)
stt_client.set_final_result_callback(on_final)
# Process accumulated text through orchestrator
if user_buffer.strip():
await route_turn(memory_manager, user_buffer, websocket, is_acs=False)
Key Features: - Session Management: Persistent conversation state across reconnections - Dashboard Integration: Real-time updates to connected dashboard clients - Resource Pooling: Dedicated STT/TTS clients per browser connection - Parallel Processing: Background orchestration tasks for non-blocking responses
Configuration and Best Practices¶
Endpoint Selection¶
Use /api/v1/media/stream when:
- Processing phone calls through Azure Communication Services
- Need sub-50ms barge-in detection for natural conversations
- Working with ACS call automation and media streaming
- Require three-thread architecture for production call centers
Use /api/v1/realtime/conversation when:
- Building browser-based voice applications
- Need session persistence across page reloads
- Want dashboard integration and monitoring
- Developing voice-enabled web experiences
Authentication Options¶
# Option 1: Azure Entra ID (Recommended for production)
recognizer = StreamingSpeechRecognizerFromBytes(
speech_region="eastus",
use_default_credential=True, # Uses DefaultAzureCredential
enable_tracing=True
)
# Option 2: API Key (Development/testing)
recognizer = StreamingSpeechRecognizerFromBytes(
speech_key=os.getenv("AZURE_SPEECH_KEY"),
speech_region="eastus",
enable_tracing=True
)
Audio Format Requirements¶
All endpoints expect 16 kHz, mono PCM audio:
# Audio preprocessing for optimal recognition
SAMPLE_RATE = 16000
CHANNELS = 1
SAMPLE_WIDTH = 2 # 16-bit PCM
# WebSocket audio streaming
audio_data = resample_audio(raw_audio, target_rate=16000)
base64_audio = base64.b64encode(audio_data).decode('utf-8')
websocket.send_text(base64_audio)
Language and Feature Configuration¶
# Multi-language auto-detection
recognizer = StreamingSpeechRecognizerFromBytes(
speech_region="eastus",
languages=["en-US", "es-ES", "fr-FR"], # BCP-47 language codes
enable_diarization=True, # Speaker identification
enable_profanity_filter=True, # Content filtering
enable_detailed_results=True # Word-level timing
)
Resource Pool Management¶
The API uses connection pooling for optimal performance:
# STT Pool Configuration (managed by application)
STT_POOL_SIZE = 4 # Concurrent speech recognizers
TTS_POOL_SIZE = 4 # Concurrent synthesizers
# Handlers automatically acquire/release pool resources
# No manual pool management required in client code
Integration with State Management¶
Speech recognition integrates with conversation memory:
# Automatic session persistence via MemoManager
memory_manager = MemoManager.from_redis(session_id, redis_mgr)
# Speech recognition handlers automatically:
# 1. Load conversation history from Redis
# 2. Add recognized text to conversation context
# 3. Pass to orchestrator for response generation
# 4. Persist updated conversation state
# Access conversation history
history = memory_manager.get_chat_history()
for entry in history:
print(f"{entry.role}: {entry.content}")
Observability and Monitoring¶
Speech recognition includes comprehensive tracing:
# OpenTelemetry spans automatically created for:
# - Speech recognition session lifecycle
# - Audio frame processing
# - Partial/final result callbacks
# - Handler routing and processing
# Correlation with call connection IDs
recognizer.enable_tracing = True
recognizer.call_connection_id = "acs-call-123" # For ACS correlation
# Custom attributes in spans include:
# - Speech SDK session IDs
# - Language detection results
# - Processing latencies
# - Error conditions and recovery
See Streaming Modes Documentation for detailed configuration options and Speech Synthesis for TTS integration patterns.