PyTrickle is the Python framework for building BYOC containers on Livepeer. Covers FrameProcessor API, StreamServer, TrickleClient, and VideoFrame/AudioFrame interfaces.
PyTrickle is a Python framework for real-time video and audio streaming over the trickle protocol. It is the canonical way to implement BYOC containers on Livepeer. PyTrickle reached production use in Phase 4 (January 2026) and is maintained at https://github.com/livepeer/pytrickle.
PyTrickle is early-stage software (3 stars, 20 open issues as of April 2026). The API is stable enough for production use (Embody SPE and Streamplace use it) but the project is under active development. Check the GitHub repository for the latest API changes before building against it.
FrameProcessor is the base class you subclass to implement your AI model. Override the async methods for the workload types your container handles.
from pytrickle import FrameProcessorfrom pytrickle.frames import VideoFrame, AudioFramefrom typing import Optional, Listimport torchclass MyProcessor(FrameProcessor): async def initialize(self): """Called once on startup. Load your model here.""" self.model = load_model() # your model loading logic async def process_video_async(self, frame: VideoFrame) -> Optional[VideoFrame]: """ Called once per video frame. Args: frame: VideoFrame containing a PyTorch tensor (H, W, C) and metadata Returns: Processed VideoFrame, or None to drop the frame """ tensor = frame.tensor # torch.Tensor, shape (H, W, C), dtype uint8 with torch.no_grad(): processed = self.model(tensor) return frame.replace_tensor(processed) async def process_audio_async(self, frame: AudioFrame) -> Optional[List[AudioFrame]]: """ Called once per audio frame. Returns: List of AudioFrames to output, or None to drop """ return [frame] # pass through def update_params(self, params: dict): """ Called when the gateway or client sends updated parameters mid-stream. Implement to support dynamic model configuration. """ pass
StreamServer wraps your FrameProcessor with the REST API contract required by the Livepeer gateway. You do not implement the endpoints manually.
from pytrickle import StreamServerserver = StreamServer( frame_processor=MyProcessor(), port=8000, capability_name='live-video-to-video', # pipeline type identifier host='0.0.0.0', target_fps=24, # output frame rate (1-60) max_queue_size=30, # frames to buffer before dropping)# Run the server (blocks until stopped)import asyncioasyncio.run(server.run_forever())
StreamServer automatically exposes four endpoints on the configured port:
Endpoint
Method
Description
/api/stream/start
POST
Start a session; receives subscribe_url, publish_url, params
For direct trickle protocol connections without the REST API layer:
from pytrickle import TrickleClientasync def stream_frames(): async with TrickleClient( subscribe_url='http://trickle-server/input-stream', publish_url='http://trickle-server/output-stream', ) as client: async for frame in client.video_frames(): processed = await my_model(frame) await client.publish_video_frame(processed)