Real-time voice agent (Pipecat + LiveKit)

Production voice agent: Deepgram ASR → Claude → ElevenLabs TTS, all routed through a LiveKit WebRTC room.

Real-time voice agent (Pipecat + LiveKit)

The cascaded ASR → LLM → TTS pipeline is the workhorse of production voice AI. Below is the minimum Pipecat program that joins a LiveKit room and runs a fully-streaming agent.

Install

pip install "pipecat-ai[livekit,deepgram,anthropic,elevenlabs,silero]"

Agent

import asyncio
import os

from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.services.livekit import LiveKitTransport, LiveKitParams
from pipecat.vad.silero import SileroVADAnalyzer


async def main(room: str, token: str):
    transport = LiveKitTransport(
        url=os.environ["LIVEKIT_URL"],
        token=token,
        room_name=room,
        params=LiveKitParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            vad_enabled=True,
            vad_analyzer=SileroVADAnalyzer(),
        ),
    )

    stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
    llm = AnthropicLLMService(
        api_key=os.environ["ANTHROPIC_API_KEY"],
        model="claude-opus-4-7",
    )
    tts = ElevenLabsTTSService(
        api_key=os.environ["ELEVENLABS_API_KEY"],
        voice_id=os.environ["ELEVEN_VOICE_ID"],
    )

    context = OpenAILLMContext(
        messages=[
            {
                "role": "system",
                "content": "You are a helpful concierge. Keep replies under 2 sentences.",
            }
        ]
    )

    pipeline = Pipeline([
        transport.input(),
        stt,
        llm.create_context_aggregator(context).user(),
        llm,
        tts,
        transport.output(),
        llm.create_context_aggregator(context).assistant(),
    ])

    task = PipelineTask(pipeline)

    @transport.event_handler("on_first_participant_joined")
    async def _on_join(_t, participant):
        await task.queue_frames([LLMMessagesFrame(context.messages)])

    @transport.event_handler("on_participant_left")
    async def _on_leave(_t, _p, _r):
        await task.queue_frame(EndFrame())

    await PipelineRunner().run(task)


if __name__ == "__main__":
    asyncio.run(main(os.environ["ROOM"], os.environ["TOKEN"]))

Latency budget that worked at 40k locations

| Stage | Target | |---|---| | ASR partial → final | < 250 ms | | LLM TTFT | < 350 ms | | TTS TTFB | < 250 ms | | End-to-end | < 1.2 s |

Five optimizations gave a measured −41.8% E2E latency in production:

  1. Streaming-first TTS chunking (start synthesizing on the first sentence boundary, not on full reply).
  2. Concurrent intent detection and synthesis — biggest single win.
  3. Prompt compression on the LLM call (cache the long system prompt; trim turn history).
  4. Session-state caching to avoid re-priming Claude every turn.
  5. Adaptive VAD endpointing — Silero sensitivity tuned per environment noise floor.

Source: my paper Latency Optimization in Production Voice AI Pipelines (Rodrigues, 2026).