Skip to content

ACS Flows

ACS Call Automation & Media FlowsΒΆ

Three-Thread Voice Processing Architecture

Comprehensive architecture for Azure Communication Services (ACS) media handling, specifically designed for real-time voice processing with integrated barge-in detection capabilities.

Azure Communication Services IntegrationΒΆ

Enterprise Voice Processing

Azure Speech SDK provides continuous speech recognition optimized for real-time conversations with sub-10ms barge-in detection.

Speech Recognition CapabilitiesΒΆ

Feature Description Accelerator Focus
Real-time Processing Immediate partial and final result processing Low-latency patterns
Barge-in Detection Advanced voice activity detection for interruptions Reference implementation
Multiple Result Types Partial results for speed, final results for accuracy Flexible processing modes
Session Management Automatic session handling with connection recovery Robust connection patterns
Continuous Recognition Persistent speech-to-text processing 24/7 operation templates

Microsoft Learn Resources

Three-Thread Processing ArchitectureΒΆ

Thread Separation Strategy

The architecture separates concerns across three dedicated threads for optimal performance and reliability.

graph TB subgraph SpeechSDK["🎀 Speech SDK Thread"] A1["Continuous Audio Recognition"] A2["on_partial β†’ Barge-in Detection"] A3["on_final β†’ Queue Speech Result"] A1 --> A2 A1 --> A3 end subgraph RouteLoop["πŸ”„ Route Turn Thread"] B1["await speech_queue.get()"] B2["Orchestrator Processing"] B3["TTS Generation & Playback"] B1 --> B2 --> B3 end subgraph MainLoop["🌐 Main Event Loop"] C1["WebSocket Media Handler"] C2["Barge-in Response"] C3["Task Cancellation"] C1 --> C2 --> C3 end %% Cross-thread communication A2 -.->|"run_coroutine_threadsafe"| C2 A3 -.->|"queue.put_nowait"| B1 B3 -.->|"Task Reference"| C1 C2 -.->|"cancel()"| B2 classDef speechStyle fill:#9B59B6,stroke:#6B3E99,stroke-width:2px,color:#FFFFFF classDef routeStyle fill:#FF6B35,stroke:#E55100,stroke-width:2px,color:#FFFFFF classDef mainStyle fill:#4A90E2,stroke:#2E5C8A,stroke-width:2px,color:#FFFFFF class A1,A2,A3 speechStyle class B1,B2,B3 routeStyle class C1,C2,C3 mainStyle

Thread Responsibilities & CommunicationΒΆ

Core Design PrinciplesΒΆ

The three-thread architecture follows these key principles:

🎀 Speech SDK Thread - Never Blocks¢
  • Continuous audio recognition using Azure Speech SDK
  • Immediate barge-in detection via on_partial callbacks
  • Cross-thread communication via run_coroutine_threadsafe
  • Performance: < 10ms response time for barge-in detection
πŸ”„ Route Turn Thread - Blocks Only on QueueΒΆ
  • AI processing and response generation through orchestrator
  • Queue-based serialization of conversation turns
  • Safe cancellation without affecting speech recognition
  • Performance: Processes one turn at a time, can be cancelled
🌐 Main Event Loop - Never Blocks¢
  • WebSocket handling for real-time media streaming
  • Task cancellation for barge-in scenarios
  • Non-blocking coordination between threads
  • Performance: < 50ms for task cancellation and stop commands

Thread Performance MatrixΒΆ

Thread Primary Role Blocking Behavior Barge-in Role Response Time
Speech SDK Audio recognition ❌ Never blocks βœ… Detection < 10ms
Route Turn AI processing βœ… Queue operations only ❌ None Variable
Main Event WebSocket & coordination ❌ Never blocks βœ… Execution < 50ms

Implementation FlowΒΆ

Barge-in Detection and HandlingΒΆ

  1. User speaks during AI response:
  2. on_partial() callback fires immediately (< 10ms)
  3. ThreadBridge.schedule_barge_in() schedules handler on main event loop
  4. MainEventLoop.handle_barge_in() cancels current processing

  5. Task cancellation chain:

    on_partial() β†’ schedule_barge_in() β†’ cancel_current_processing() β†’ send_stop_audio()
    

  6. Speech finalization:

  7. on_final() callback queues completed speech via ThreadBridge.queue_speech_result()
  8. RouteTurnThread picks up speech from queue
  9. New AI processing task created for response generation

Key ComponentsΒΆ

ThreadBridgeΒΆ

Provides thread-safe communication between Speech SDK Thread and Main Event Loop: - schedule_barge_in() - Schedules barge-in handler execution - queue_speech_result() - Queues final speech for processing - Uses run_coroutine_threadsafe and asyncio.Queue for safe cross-thread communication

SpeechSDKThreadΒΆ

Manages Speech SDK in dedicated background thread: - Pre-initializes push_stream to prevent audio data loss - Never blocks on AI processing or network operations - Provides immediate callback execution for barge-in detection

RouteTurnThreadΒΆ

Handles AI processing in isolated thread: - Blocks only on speech_queue.get() operations - Processes speech through orchestrator - Creates and manages TTS playback tasks

MainEventLoopΒΆ

Coordinates WebSocket operations and task management: - Handles incoming media messages and audio data - Manages barge-in interruption and task cancellation - Never blocks to ensure real-time responsiveness

πŸ”„ Non-Blocking Thread Communication SequenceΒΆ

sequenceDiagram participant SpeechSDK as 🧡 Speech SDK Thread participant MainLoop as 🧡 Main Event Loop participant RouteLoop as 🧡 Route Turn Thread participant ACS as πŸ”Š Azure Communication Services participant User as πŸ‘€ User Note over SpeechSDK,User: 🎡 AI Currently Playing Audio MainLoop->>ACS: πŸ”Š Streaming TTS Audio Response ACS->>User: 🎡 Audio Playback Active rect rgba(255, 149, 0, 0.15) Note over SpeechSDK,User: 🚨 USER SPEAKS (BARGE-IN EVENT) User->>SpeechSDK: πŸ—£οΈ Audio Input (Partial Recognition) Note right of SpeechSDK: ⚑ IMMEDIATE ACTION
🚫 NO BLOCKING SpeechSDK->>SpeechSDK: πŸ” on_partial() callback triggered end rect rgba(255, 59, 48, 0.2) Note over SpeechSDK,MainLoop: πŸ”— CROSS-THREAD COMMUNICATION SpeechSDK-->>MainLoop: πŸš€ run_coroutine_threadsafe(_handle_barge_in_async) Note right of SpeechSDK: βœ… Speech thread continues
NOT BLOCKED Note over MainLoop: πŸ›‘ BARGE-IN HANDLER EXECUTES MainLoop->>MainLoop: ❌ playback_task.cancel() MainLoop->>MainLoop: 🧹 Clear route_turn_queue MainLoop->>ACS: πŸ›‘ Send StopAudio command end rect rgba(52, 199, 89, 0.15) ACS-->>User: πŸ”‡ Audio Playback STOPPED Note right of MainLoop: βœ… Previous AI response
cancelled cleanly end rect rgba(0, 122, 255, 0.1) Note over SpeechSDK,RouteLoop: πŸ“ USER CONTINUES SPEAKING User->>SpeechSDK: πŸ—£οΈ Continues Speaking SpeechSDK->>SpeechSDK: on_final() callback triggered Note over SpeechSDK,MainLoop: πŸ”— FINAL RESULT COMMUNICATION SpeechSDK-->>MainLoop: run_coroutine_threadsafe(_handle_final_async) MainLoop->>MainLoop: route_turn_queue.put(final_text) Note right of SpeechSDK: βœ… Speech thread continues
🚫 NOT BLOCKED end rect rgba(102, 51, 153, 0.1) Note over RouteLoop,ACS: πŸ€– NEW AI PROCESSING RouteLoop->>RouteLoop: πŸ“₯ queue.get() receives final_text Note right of RouteLoop: ⏳ ONLY thread that blocks
🎯 Dedicated AI processing RouteLoop->>MainLoop: 🎡 Create new playback_task MainLoop->>ACS: πŸ”Š Send New TTS Response ACS->>User: 🎡 Play New AI Response end Note over SpeechSDK,User: βœ… COMPLETE NON-BLOCKING CYCLE

πŸš€ Critical Non-Blocking CharacteristicsΒΆ

Event Thread Source Target Thread Blocking? Communication Method Response Time
🚨 Barge-in Detection Speech SDK Main Event Loop ❌ NO run_coroutine_threadsafe < 10ms
πŸ“‹ Final Speech Speech SDK Route Turn Thread ❌ NO asyncio.Queue.put() < 5ms
🎡 AI Processing Route Turn Main Event Loop ❌ NO asyncio.create_task < 1ms
πŸ›‘ Task Cancellation Main Event Loop Playback Task ❌ NO task.cancel() < 1ms

🎯 Key Insight: Only the Route Turn Thread blocks (on queue.get()), ensuring Speech SDK and Main Event Loop remain responsive for real-time barge-in detection.


Key Implementation DetailsΒΆ

This section provides concrete implementation specifics for developers working with the ACS Media Handler threading architecture.

🚨 Barge-In Detection¢

  • Trigger: on_partial callback from Speech Recognizer detects user speech
  • Immediate Action: Synchronous cancellation of playback_task using asyncio.Task.cancel()
  • Stop Signal: Send {"Kind": "StopAudio", "StopAudio": {}} JSON command to ACS via WebSocket
  • Logging: Comprehensive logging with emojis for real-time debugging

πŸ”„ Async Background Task ManagementΒΆ

  • Route Turn Queue: Serializes final speech processing using asyncio.Queue()
  • Playback Task: Tracks current AI response generation/playback with self.playback_task
  • Task Lifecycle: Clean creation, cancellation, and cleanup of background tasks
  • Cancellation Safety: Proper try/except asyncio.CancelledError handling

πŸ›‘ Stop Audio Signal ProtocolΒΆ

{
  "Kind": "StopAudio",
  "AudioData": null,
  "StopAudio": {}
}
This JSON message is sent to ACS to immediately halt any ongoing audio playback.

⚑ Error Handling & Resilience¢

  • Event Loop Detection: Graceful handling when no event loop is available
  • WebSocket Validation: Connection state checks before sending messages
  • Task Cancellation: Proper cleanup with await task after cancellation
  • Queue Management: Full queue detection and message dropping strategies

πŸ“Š Performance OptimizationsΒΆ

  • Immediate Cancellation: Barge-in triggers instant playback stop (< 50ms)
  • Background Processing: Non-blocking AI response generation
  • Memory Management: Proper task cleanup prevents memory leaks
  • Concurrent Safety: Thread-safe queue operations for speech processing