ACS Flows
ACS Call Automation & Media FlowsΒΆ
Three-Thread Voice Processing Architecture
Comprehensive architecture for Azure Communication Services (ACS) media handling, specifically designed for real-time voice processing with integrated barge-in detection capabilities.
Azure Communication Services IntegrationΒΆ
Enterprise Voice Processing
Azure Speech SDK provides continuous speech recognition optimized for real-time conversations with sub-10ms barge-in detection.
Speech Recognition CapabilitiesΒΆ
| Feature | Description | Accelerator Focus |
|---|---|---|
| Real-time Processing | Immediate partial and final result processing | Low-latency patterns |
| Barge-in Detection | Advanced voice activity detection for interruptions | Reference implementation |
| Multiple Result Types | Partial results for speed, final results for accuracy | Flexible processing modes |
| Session Management | Automatic session handling with connection recovery | Robust connection patterns |
| Continuous Recognition | Persistent speech-to-text processing | 24/7 operation templates |
Microsoft Learn Resources
- Audio Streaming Quickstart - Server-side audio streaming implementation
- Call Automation SDK - Automated call routing solutions
- Media Access Overview - Real-time media stream processing patterns
- Speech to Text Service - Real-time speech recognition capabilities
- Real-time Speech Recognition - Implementation patterns for continuous STT processing
- Bidirectional Audio Streaming - Two-way media streaming architecture
- WebSocket Audio Processing - Real-time audio stream handling patterns
Three-Thread Processing ArchitectureΒΆ
Thread Separation Strategy
The architecture separates concerns across three dedicated threads for optimal performance and reliability.
Thread Responsibilities & CommunicationΒΆ
Core Design PrinciplesΒΆ
The three-thread architecture follows these key principles:
π€ Speech SDK Thread - Never BlocksΒΆ
- Continuous audio recognition using Azure Speech SDK
- Immediate barge-in detection via
on_partialcallbacks - Cross-thread communication via
run_coroutine_threadsafe - Performance: < 10ms response time for barge-in detection
π Route Turn Thread - Blocks Only on QueueΒΆ
- AI processing and response generation through orchestrator
- Queue-based serialization of conversation turns
- Safe cancellation without affecting speech recognition
- Performance: Processes one turn at a time, can be cancelled
π Main Event Loop - Never BlocksΒΆ
- WebSocket handling for real-time media streaming
- Task cancellation for barge-in scenarios
- Non-blocking coordination between threads
- Performance: < 50ms for task cancellation and stop commands
Thread Performance MatrixΒΆ
| Thread | Primary Role | Blocking Behavior | Barge-in Role | Response Time |
|---|---|---|---|---|
| Speech SDK | Audio recognition | β Never blocks | β Detection | < 10ms |
| Route Turn | AI processing | β Queue operations only | β None | Variable |
| Main Event | WebSocket & coordination | β Never blocks | β Execution | < 50ms |
Implementation FlowΒΆ
Barge-in Detection and HandlingΒΆ
- User speaks during AI response:
on_partial()callback fires immediately (< 10ms)ThreadBridge.schedule_barge_in()schedules handler on main event loop-
MainEventLoop.handle_barge_in()cancels current processing -
Task cancellation chain:
-
Speech finalization:
on_final()callback queues completed speech viaThreadBridge.queue_speech_result()RouteTurnThreadpicks up speech from queue- New AI processing task created for response generation
Key ComponentsΒΆ
ThreadBridgeΒΆ
Provides thread-safe communication between Speech SDK Thread and Main Event Loop:
- schedule_barge_in() - Schedules barge-in handler execution
- queue_speech_result() - Queues final speech for processing
- Uses run_coroutine_threadsafe and asyncio.Queue for safe cross-thread communication
SpeechSDKThreadΒΆ
Manages Speech SDK in dedicated background thread:
- Pre-initializes push_stream to prevent audio data loss
- Never blocks on AI processing or network operations
- Provides immediate callback execution for barge-in detection
RouteTurnThreadΒΆ
Handles AI processing in isolated thread:
- Blocks only on speech_queue.get() operations
- Processes speech through orchestrator
- Creates and manages TTS playback tasks
MainEventLoopΒΆ
Coordinates WebSocket operations and task management: - Handles incoming media messages and audio data - Manages barge-in interruption and task cancellation - Never blocks to ensure real-time responsiveness
π Non-Blocking Thread Communication SequenceΒΆ
π« NO BLOCKING SpeechSDK->>SpeechSDK: π on_partial() callback triggered end rect rgba(255, 59, 48, 0.2) Note over SpeechSDK,MainLoop: π CROSS-THREAD COMMUNICATION SpeechSDK-->>MainLoop: π run_coroutine_threadsafe(_handle_barge_in_async) Note right of SpeechSDK: β Speech thread continues
NOT BLOCKED Note over MainLoop: π BARGE-IN HANDLER EXECUTES MainLoop->>MainLoop: β playback_task.cancel() MainLoop->>MainLoop: π§Ή Clear route_turn_queue MainLoop->>ACS: π Send StopAudio command end rect rgba(52, 199, 89, 0.15) ACS-->>User: π Audio Playback STOPPED Note right of MainLoop: β Previous AI response
cancelled cleanly end rect rgba(0, 122, 255, 0.1) Note over SpeechSDK,RouteLoop: π USER CONTINUES SPEAKING User->>SpeechSDK: π£οΈ Continues Speaking SpeechSDK->>SpeechSDK: on_final() callback triggered Note over SpeechSDK,MainLoop: π FINAL RESULT COMMUNICATION SpeechSDK-->>MainLoop: run_coroutine_threadsafe(_handle_final_async) MainLoop->>MainLoop: route_turn_queue.put(final_text) Note right of SpeechSDK: β Speech thread continues
π« NOT BLOCKED end rect rgba(102, 51, 153, 0.1) Note over RouteLoop,ACS: π€ NEW AI PROCESSING RouteLoop->>RouteLoop: π₯ queue.get() receives final_text Note right of RouteLoop: β³ ONLY thread that blocks
π― Dedicated AI processing RouteLoop->>MainLoop: π΅ Create new playback_task MainLoop->>ACS: π Send New TTS Response ACS->>User: π΅ Play New AI Response end Note over SpeechSDK,User: β COMPLETE NON-BLOCKING CYCLE
π Critical Non-Blocking CharacteristicsΒΆ
| Event | Thread Source | Target Thread | Blocking? | Communication Method | Response Time |
|---|---|---|---|---|---|
| π¨ Barge-in Detection | Speech SDK | Main Event Loop | β NO | run_coroutine_threadsafe |
< 10ms |
| π Final Speech | Speech SDK | Route Turn Thread | β NO | asyncio.Queue.put() |
< 5ms |
| π΅ AI Processing | Route Turn | Main Event Loop | β NO | asyncio.create_task |
< 1ms |
| π Task Cancellation | Main Event Loop | Playback Task | β NO | task.cancel() |
< 1ms |
π― Key Insight: Only the Route Turn Thread blocks (on
queue.get()), ensuring Speech SDK and Main Event Loop remain responsive for real-time barge-in detection.
Key Implementation DetailsΒΆ
This section provides concrete implementation specifics for developers working with the ACS Media Handler threading architecture.
π¨ Barge-In DetectionΒΆ
- Trigger:
on_partialcallback from Speech Recognizer detects user speech - Immediate Action: Synchronous cancellation of
playback_taskusingasyncio.Task.cancel() - Stop Signal: Send
{"Kind": "StopAudio", "StopAudio": {}}JSON command to ACS via WebSocket - Logging: Comprehensive logging with emojis for real-time debugging
π Async Background Task ManagementΒΆ
- Route Turn Queue: Serializes final speech processing using
asyncio.Queue() - Playback Task: Tracks current AI response generation/playback with
self.playback_task - Task Lifecycle: Clean creation, cancellation, and cleanup of background tasks
- Cancellation Safety: Proper
try/except asyncio.CancelledErrorhandling
π Stop Audio Signal ProtocolΒΆ
This JSON message is sent to ACS to immediately halt any ongoing audio playback.β‘ Error Handling & ResilienceΒΆ
- Event Loop Detection: Graceful handling when no event loop is available
- WebSocket Validation: Connection state checks before sending messages
- Task Cancellation: Proper cleanup with
await taskafter cancellation - Queue Management: Full queue detection and message dropping strategies
π Performance OptimizationsΒΆ
- Immediate Cancellation: Barge-in triggers instant playback stop (< 50ms)
- Background Processing: Non-blocking AI response generation
- Memory Management: Proper task cleanup prevents memory leaks
- Concurrent Safety: Thread-safe queue operations for speech processing