Skip to content

AudioBufferQueue

Bounded FIFO queue that buffers audio chunks between pipeline stages.

Defined in: src/core/pipeline/AudioBufferQueue.ts:216

Bounded FIFO queue that buffers audio chunks between pipeline stages.

Remarks

The queue is the primary mechanism for fixing the race condition where the STT WebSocket handshake completes after audio capture has already started. Audio chunks produced by the input provider are enqueued while the STT connects, then flushed in order once the STT is ready.

The queue supports three overflow strategies to handle situations where the consumer is slow or disconnected for an extended period:

  • 'drop-oldest' — Removes the oldest chunk when full (default). Best for real-time audio where recent frames are more valuable.
  • 'drop-newest' — Discards incoming chunks when full. Preserves the beginning of the stream.
  • 'block' — The enqueue call blocks (via promise) until space is available. Use with caution as it introduces backpressure.

Example

const queue = new AudioBufferQueue({
  name: 'input',
  maxSize: 1000,
  overflowStrategy: 'drop-oldest',
});

// Enqueue while waiting for STT
queue.enqueue(chunk1);
queue.enqueue(chunk2);
console.log(queue.size); // 2

// STT connected — drain and switch to pass-through
queue.startDraining((chunk) => stt.sendAudio(chunk.data));
// chunk1 and chunk2 are flushed immediately
// subsequent enqueues pass through directly

// Pause for turn-taking
queue.stopDraining();
// chunks buffer again until next startDraining()

See

Constructors

Constructor

new AudioBufferQueue(config): AudioBufferQueue;

Defined in: src/core/pipeline/AudioBufferQueue.ts:263

Creates a new AudioBufferQueue.

Parameters

ParameterTypeDescription
configAudioBufferQueueConfigQueue configuration specifying name, max size, and overflow strategy.

Returns

AudioBufferQueue

Example

const queue = new AudioBufferQueue({
  name: 'input',
  maxSize: 2000,
  overflowStrategy: 'drop-oldest',
});

See

AudioBufferQueueConfig

Accessors

size

Get Signature

get size(): number;

Defined in: src/core/pipeline/AudioBufferQueue.ts:461

Current number of chunks in the buffer.

Remarks

Always 0 when the queue is in draining mode (pass-through).

Returns

number

Methods

clear()

clear(): void;

Defined in: src/core/pipeline/AudioBufferQueue.ts:473

Removes all chunks from the buffer and resets the block resolver.

Returns

void

Remarks

Does not affect the draining state or counters. Chunks removed by clear() are not counted as dropped — they are simply discarded. Use this when stopping the pipeline to release memory.


enqueue()

enqueue(chunk): void | Promise<void>;

Defined in: src/core/pipeline/AudioBufferQueue.ts:285

Enqueues an audio chunk into the buffer.

Parameters

ParameterTypeDescription
chunkAudioChunkThe audio chunk to enqueue.

Returns

void | Promise<void>

void for 'drop-oldest' and 'drop-newest'; a Promise<void> for 'block' when the queue is full (resolves when space is available).

Remarks

Behavior depends on the current mode:

  • Draining mode: The chunk is passed directly to the drain callback (zero-copy fast path) without touching the internal buffer.
  • Buffering mode: The chunk is added to the FIFO buffer. If the buffer is full, the overflow strategy determines what happens:
    • 'drop-oldest': The oldest chunk is removed to make room.
    • 'drop-newest': The incoming chunk is discarded.
    • 'block': Returns a promise that resolves when space becomes available.

getStats()

getStats(): QueueStats;

Defined in: src/core/pipeline/AudioBufferQueue.ts:500

Returns a snapshot of queue statistics for monitoring.

Returns

QueueStats

A QueueStats snapshot.

Remarks

The returned QueueStats object is a snapshot — it does not update as the queue changes. Call this method again for fresh stats.

Example

const stats = queue.getStats();
if (stats.totalDropped > 0) {
  console.warn(`Queue "${stats.name}" dropped ${stats.totalDropped} chunks`);
}

isDraining()

isDraining(): boolean;

Defined in: src/core/pipeline/AudioBufferQueue.ts:520

Whether the queue is currently in draining (pass-through) mode.

Returns

boolean

true if startDraining has been called and stopDraining has not been called since.


onOverflow()

onOverflow(callback): void;

Defined in: src/core/pipeline/AudioBufferQueue.ts:547

Registers a callback that is invoked whenever the queue drops chunks due to overflow.

Parameters

ParameterTypeDescription
callbackOverflowCallback | nullThe function to call on overflow, or null to clear.

Returns

void

Remarks

The callback receives the number of chunks dropped in this instance and the current buffer size after the drop. Only one overflow callback is supported; calling this method replaces any previously registered callback.

The orchestrator uses this to bridge overflow events to the typed EventEmitter system.

Example

queue.onOverflow((droppedChunks, currentSize) => {
  console.warn(`Dropped ${droppedChunks} chunks, buffer at ${currentSize}`);
});

See

OverflowCallback


peek()

peek(): AudioChunk | undefined;

Defined in: src/core/pipeline/AudioBufferQueue.ts:451

Returns the first chunk in the buffer without removing it.

Returns

AudioChunk | undefined

The oldest chunk in the buffer, or undefined.

Remarks

Returns undefined if the buffer is empty or the queue is in draining mode (since the buffer is empty in draining mode).


startDraining()

startDraining(callback, options?): void;

Defined in: src/core/pipeline/AudioBufferQueue.ts:367

Starts draining the queue: flushes all buffered chunks, then switches to pass-through mode.

Parameters

ParameterTypeDescription
callbackDrainCallbackFunction to call for each chunk (buffered and future).
options?{ batchSize?: number; paced?: boolean; }Optional drain configuration.
options.batchSize?numberNumber of chunks per batch when paced. Defaults to 50 (~150 ms of audio at typical 3 ms chunk intervals).
options.paced?booleanWhen true, drain in batches with event-loop yields between each batch instead of flushing synchronously. Defaults to false.

Returns

void

Remarks

This is the key method for the race condition fix. Call it after the STT WebSocket handshake completes to receive all chunks that were buffered during the connection attempt, followed by real-time pass-through of subsequent chunks.

Immediate mode (default): All buffered chunks are delivered to the callback synchronously in FIFO order before this method returns.

Paced mode (paced: true): Buffered chunks are drained in small batches with event-loop yields between each batch. This prevents overwhelming the downstream WebSocket with a burst of data. During the paced drain, new chunks arriving via enqueue are appended to the buffer (maintaining FIFO order) and will be included in a subsequent batch. Once the buffer is empty the queue switches to pass-through mode automatically.

If the queue is already draining, calling this method replaces the existing callback.

Example

// Immediate flush (original behavior)
queue.startDraining((chunk) => stt.sendAudio(chunk.data));

// Paced catch-up drain
queue.startDraining((chunk) => stt.sendAudio(chunk.data), { paced: true });

See

stopDraining to return to buffering mode


stopDraining()

stopDraining(): void;

Defined in: src/core/pipeline/AudioBufferQueue.ts:436

Stops draining and returns the queue to buffering mode.

Returns

void

Remarks

After calling this method, subsequent enqueue calls will buffer chunks internally instead of passing them to the drain callback. The drain callback is cleared. If a paced drain is in progress, it is cancelled.

This is used during turn-taking: when the agent starts speaking, the orchestrator stops draining the input queue (pauses STT) and resumes draining when the agent finishes.

See

startDraining to resume draining

© 2026 CompositeVoice. All rights reserved.

Font size
Contrast
Motion
Transparency