AudioBufferQueue
Bounded FIFO queue that buffers audio chunks between pipeline stages.
Defined in: src/core/pipeline/AudioBufferQueue.ts:216
Bounded FIFO queue that buffers audio chunks between pipeline stages.
Remarks
The queue is the primary mechanism for fixing the race condition where the STT WebSocket handshake completes after audio capture has already started. Audio chunks produced by the input provider are enqueued while the STT connects, then flushed in order once the STT is ready.
The queue supports three overflow strategies to handle situations where the consumer is slow or disconnected for an extended period:
'drop-oldest'— Removes the oldest chunk when full (default). Best for real-time audio where recent frames are more valuable.'drop-newest'— Discards incoming chunks when full. Preserves the beginning of the stream.'block'— The enqueue call blocks (via promise) until space is available. Use with caution as it introduces backpressure.
Example
const queue = new AudioBufferQueue({
name: 'input',
maxSize: 1000,
overflowStrategy: 'drop-oldest',
});
// Enqueue while waiting for STT
queue.enqueue(chunk1);
queue.enqueue(chunk2);
console.log(queue.size); // 2
// STT connected — drain and switch to pass-through
queue.startDraining((chunk) => stt.sendAudio(chunk.data));
// chunk1 and chunk2 are flushed immediately
// subsequent enqueues pass through directly
// Pause for turn-taking
queue.stopDraining();
// chunks buffer again until next startDraining()
See
- AudioBufferQueueConfig for configuration
- QueueStats for monitoring
- AudioHeaderCache for companion header caching
Constructors
Constructor
new AudioBufferQueue(config): AudioBufferQueue;
Defined in: src/core/pipeline/AudioBufferQueue.ts:263
Creates a new AudioBufferQueue.
Parameters
| Parameter | Type | Description |
|---|---|---|
config | AudioBufferQueueConfig | Queue configuration specifying name, max size, and overflow strategy. |
Returns
AudioBufferQueue
Example
const queue = new AudioBufferQueue({
name: 'input',
maxSize: 2000,
overflowStrategy: 'drop-oldest',
});
See
Accessors
size
Get Signature
get size(): number;
Defined in: src/core/pipeline/AudioBufferQueue.ts:461
Current number of chunks in the buffer.
Remarks
Always 0 when the queue is in draining mode (pass-through).
Returns
number
Methods
clear()
clear(): void;
Defined in: src/core/pipeline/AudioBufferQueue.ts:473
Removes all chunks from the buffer and resets the block resolver.
Returns
void
Remarks
Does not affect the draining state or counters. Chunks removed by clear() are not counted as dropped — they are simply discarded. Use this when stopping the pipeline to release memory.
enqueue()
enqueue(chunk): void | Promise<void>;
Defined in: src/core/pipeline/AudioBufferQueue.ts:285
Enqueues an audio chunk into the buffer.
Parameters
| Parameter | Type | Description |
|---|---|---|
chunk | AudioChunk | The audio chunk to enqueue. |
Returns
void | Promise<void>
void for 'drop-oldest' and 'drop-newest'; a Promise<void> for 'block' when the queue is full (resolves when space is available).
Remarks
Behavior depends on the current mode:
- Draining mode: The chunk is passed directly to the drain callback (zero-copy fast path) without touching the internal buffer.
- Buffering mode: The chunk is added to the FIFO buffer. If the buffer is full, the overflow strategy determines what happens:
'drop-oldest': The oldest chunk is removed to make room.'drop-newest': The incoming chunk is discarded.'block': Returns a promise that resolves when space becomes available.
getStats()
getStats(): QueueStats;
Defined in: src/core/pipeline/AudioBufferQueue.ts:500
Returns a snapshot of queue statistics for monitoring.
Returns
A QueueStats snapshot.
Remarks
The returned QueueStats object is a snapshot — it does not update as the queue changes. Call this method again for fresh stats.
Example
const stats = queue.getStats();
if (stats.totalDropped > 0) {
console.warn(`Queue "${stats.name}" dropped ${stats.totalDropped} chunks`);
}
isDraining()
isDraining(): boolean;
Defined in: src/core/pipeline/AudioBufferQueue.ts:520
Whether the queue is currently in draining (pass-through) mode.
Returns
boolean
true if startDraining has been called and stopDraining has not been called since.
onOverflow()
onOverflow(callback): void;
Defined in: src/core/pipeline/AudioBufferQueue.ts:547
Registers a callback that is invoked whenever the queue drops chunks due to overflow.
Parameters
| Parameter | Type | Description |
|---|---|---|
callback | OverflowCallback | null | The function to call on overflow, or null to clear. |
Returns
void
Remarks
The callback receives the number of chunks dropped in this instance and the current buffer size after the drop. Only one overflow callback is supported; calling this method replaces any previously registered callback.
The orchestrator uses this to bridge overflow events to the typed EventEmitter system.
Example
queue.onOverflow((droppedChunks, currentSize) => {
console.warn(`Dropped ${droppedChunks} chunks, buffer at ${currentSize}`);
});
See
peek()
peek(): AudioChunk | undefined;
Defined in: src/core/pipeline/AudioBufferQueue.ts:451
Returns the first chunk in the buffer without removing it.
Returns
AudioChunk | undefined
The oldest chunk in the buffer, or undefined.
Remarks
Returns undefined if the buffer is empty or the queue is in draining mode (since the buffer is empty in draining mode).
startDraining()
startDraining(callback, options?): void;
Defined in: src/core/pipeline/AudioBufferQueue.ts:367
Starts draining the queue: flushes all buffered chunks, then switches to pass-through mode.
Parameters
| Parameter | Type | Description |
|---|---|---|
callback | DrainCallback | Function to call for each chunk (buffered and future). |
options? | { batchSize?: number; paced?: boolean; } | Optional drain configuration. |
options.batchSize? | number | Number of chunks per batch when paced. Defaults to 50 (~150 ms of audio at typical 3 ms chunk intervals). |
options.paced? | boolean | When true, drain in batches with event-loop yields between each batch instead of flushing synchronously. Defaults to false. |
Returns
void
Remarks
This is the key method for the race condition fix. Call it after the STT WebSocket handshake completes to receive all chunks that were buffered during the connection attempt, followed by real-time pass-through of subsequent chunks.
Immediate mode (default): All buffered chunks are delivered to the callback synchronously in FIFO order before this method returns.
Paced mode (paced: true): Buffered chunks are drained in small batches with event-loop yields between each batch. This prevents overwhelming the downstream WebSocket with a burst of data. During the paced drain, new chunks arriving via enqueue are appended to the buffer (maintaining FIFO order) and will be included in a subsequent batch. Once the buffer is empty the queue switches to pass-through mode automatically.
If the queue is already draining, calling this method replaces the existing callback.
Example
// Immediate flush (original behavior)
queue.startDraining((chunk) => stt.sendAudio(chunk.data));
// Paced catch-up drain
queue.startDraining((chunk) => stt.sendAudio(chunk.data), { paced: true });
See
stopDraining to return to buffering mode
stopDraining()
stopDraining(): void;
Defined in: src/core/pipeline/AudioBufferQueue.ts:436
Stops draining and returns the queue to buffering mode.
Returns
void
Remarks
After calling this method, subsequent enqueue calls will buffer chunks internally instead of passing them to the drain callback. The drain callback is cleared. If a paced drain is in progress, it is cancelled.
This is used during turn-taking: when the agent starts speaking, the orchestrator stops draining the input queue (pauses STT) and resumes draining when the agent finishes.
See
startDraining to resume draining