Skip to main content

PyTrickle is a Python framework for real-time video and audio streaming over the trickle protocol. It is the canonical way to implement BYOC containers on Livepeer. PyTrickle reached production use in Phase 4 (January 2026) and is maintained at https://github.com/livepeer/pytrickle.
PyTrickle is early-stage software (3 stars, 20 open issues as of April 2026). The API is stable enough for production use (Embody SPE and Streamplace use it) but the project is under active development. Check the GitHub repository for the latest API changes before building against it.

Installation

pip install git+https://github.com/livepeer/pytrickle.git
Requirements:
  • Python 3.8 or later
  • PyTorch (for GPU tensor support)
  • FFmpeg (for encoding/decoding)
  • NVIDIA GPU recommended for inference workloads

FrameProcessor

FrameProcessor is the base class you subclass to implement your AI model. Override the async methods for the workload types your container handles.
from pytrickle import FrameProcessor
from pytrickle.frames import VideoFrame, AudioFrame
from typing import Optional, List
import torch

class MyProcessor(FrameProcessor):

    async def initialize(self):
        """Called once on startup. Load your model here."""
        self.model = load_model()  # your model loading logic

    async def process_video_async(self, frame: VideoFrame) -> Optional[VideoFrame]:
        """
        Called once per video frame.

        Args:
            frame: VideoFrame containing a PyTorch tensor (H, W, C) and metadata

        Returns:
            Processed VideoFrame, or None to drop the frame
        """
        tensor = frame.tensor  # torch.Tensor, shape (H, W, C), dtype uint8

        with torch.no_grad():
            processed = self.model(tensor)

        return frame.replace_tensor(processed)

    async def process_audio_async(self, frame: AudioFrame) -> Optional[List[AudioFrame]]:
        """
        Called once per audio frame.

        Returns:
            List of AudioFrames to output, or None to drop
        """
        return [frame]  # pass through

    def update_params(self, params: dict):
        """
        Called when the gateway or client sends updated parameters mid-stream.
        Implement to support dynamic model configuration.
        """
        pass

VideoFrame

VideoFrame wraps a decoded video frame as a PyTorch tensor with metadata. Key methods:
# Replace the tensor while keeping metadata
new_frame = frame.replace_tensor(processed_tensor)

# Move tensor to GPU
frame_gpu = frame.to('cuda')

# Move tensor to CPU
frame_cpu = frame.to('cpu')

AudioFrame

StreamServer

StreamServer wraps your FrameProcessor with the REST API contract required by the Livepeer gateway. You do not implement the endpoints manually.
from pytrickle import StreamServer

server = StreamServer(
    frame_processor=MyProcessor(),
    port=8000,
    capability_name='live-video-to-video',  # pipeline type identifier
    host='0.0.0.0',
    target_fps=24,        # output frame rate (1-60)
    max_queue_size=30,    # frames to buffer before dropping
)

# Run the server (blocks until stopped)
import asyncio
asyncio.run(server.run_forever())
StreamServer automatically exposes four endpoints on the configured port:
EndpointMethodDescription
/api/stream/startPOSTStart a session; receives subscribe_url, publish_url, params
/api/stream/paramsPOSTUpdate parameters mid-stream
/api/stream/statusGETReturns current session status
/api/stream/stopPOSTStop the current session

TrickleClient

For direct trickle protocol connections without the REST API layer:
from pytrickle import TrickleClient

async def stream_frames():
    async with TrickleClient(
        subscribe_url='http://trickle-server/input-stream',
        publish_url='http://trickle-server/output-stream',
    ) as client:
        async for frame in client.video_frames():
            processed = await my_model(frame)
            await client.publish_video_frame(processed)

Built-in monitoring

FrameProcessor exposes metrics via get_metrics():
metrics = processor.get_metrics()
# {
#   'fps': 24.1,
#   'latency_ms': 42.3,
#   'gpu_memory_mb': 4096,
#   'frames_processed': 10234,
#   'frames_dropped': 3,
#   'error_count': 0,
# }
These metrics are also exposed on the /api/stream/status endpoint when running via StreamServer.

BYOC Guide

Full BYOC walkthrough: implementing FrameProcessor, building a container, and deploying to the network.

Build with ComfyStream

ComfyStream uses PyTrickle internally — use it if your model is a ComfyUI workflow.
Last modified on April 7, 2026