Complete API reference and examples for the rtms Python package.
Requirements: Python >= 3.10
For product-specific webhook events and payload fields, see the product guides.
pip install rtmsThe @rtms.on_webhook_event decorator sets up an HTTP server that receives Zoom webhook deliveries. The SDK starts polling for RTMS events automatically after join() succeeds.
import rtms
@rtms.on_webhook_event
def handle_webhook(payload):
if 'rtms_started' not in payload.get('event', ''):
return
client = rtms.Client()
@client.on_transcript_data
def on_transcript(data, size, timestamp, metadata):
print(f'[{timestamp}] {metadata.userName}: {data.decode()}')
client.join(payload['payload'])
rtms.run() # blocks; Ctrl-C to stopFor the specific event names for your product, see the product guides.
⚠️ Required for production. The example above processes all incoming requests without verification. In production, Zoom cryptographically signs every webhook — you must validate the signature to reject forged requests.
import rtms
import hmac
import hashlib
import os
def verify_signature(body: str, timestamp: str, signature: str) -> bool:
message = f"v0:{timestamp}:{body}"
expected = "v0=" + hmac.new(
os.environ["ZM_RTMS_WEBHOOK_SECRET"].encode(),
message.encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
@rtms.on_webhook_event
def handle_webhook(payload, request, response):
signature = request.headers.get('x-zm-signature', '')
timestamp = request.headers.get('x-zm-request-timestamp', '')
# Zoom endpoint validation challenge
validator = request.headers.get('x-zm-webhook-validator')
if validator:
response.set_status(200)
response.send({'plainToken': validator})
return
if not verify_signature(str(payload), timestamp, signature):
response.set_status(401)
response.send({'error': 'Unauthorized'})
return
response.send({'status': 'ok'})
if 'rtms_started' in payload.get('event', ''):
client = rtms.Client()
client.on_transcript_data(
lambda data, size, ts, meta: print(f'{meta.userName}: {data.decode()}')
)
client.join(payload['payload'])
rtms.run()Use with rtms.Client() as client: to ensure leave() is always called — even if an exception occurs:
import rtms
with rtms.Client() as client:
client.on_audio_data(lambda data, size, ts, meta: process(data))
client.join(
meeting_uuid=...,
rtms_stream_id=...,
server_urls=...
)
rtms.run(stop_on_empty=True)
# client.leave() called automatically on exitAll callbacks receive a metadata object with userId and userName:
# Transcript — text data with speaker info
@client.on_transcript_data
def on_transcript(data, size, timestamp, metadata):
print(f'[{timestamp}] {metadata.userName}: {data.decode()}')
# Audio — raw PCM / Opus frames
@client.on_audio_data
def on_audio(data, size, timestamp, metadata):
print(f'Audio: {len(data)}B from {metadata.userName}')
# Video — H.264 / raw frames
@client.on_video_data
def on_video(data, size, timestamp, metadata):
print(f'Video: {size}B from {metadata.userName}')
# Desktop share
@client.on_deskshare_data
def on_deskshare(data, size, timestamp, metadata):
print(f'Deskshare: {size}B from {metadata.userName}')Speaker identification with mixed audio: When using the default
AUDIO_MIXED_STREAM, audio metadata does not identify the current speaker. Useon_active_speaker_eventto track who is speaking:@client.on_active_speaker_event def on_speaker(timestamp, user_id, user_name): print(f'Active speaker: {user_name} ({user_id})')
By default each stream type uses sensible settings (OPUS audio at 48 kHz, H.264 video at HD/30 fps). Call the relevant set_*_params method before join() to override any field — unspecified fields keep their defaults.
# Switch from the default composite active-speaker stream to per-participant streams
params = rtms.VideoParams()
params.data_opt = rtms.DataOption.VIDEO_SINGLE_INDIVIDUAL_STREAM
client.set_video_params(params)
# Full control — set only the fields you want to change
params = rtms.VideoParams()
params.codec = rtms.VideoCodec.H264
params.resolution = rtms.VideoResolution.HD
params.fps = 30
params.data_opt = rtms.DataOption.VIDEO_SINGLE_ACTIVE_STREAM
client.set_video_params(params)VideoCodec constants: H264, JPG, PNG. VideoResolution constants: SD, HD, FHD, QHD. DataOption video constants: VIDEO_SINGLE_ACTIVE_STREAM (default composite), VIDEO_SINGLE_INDIVIDUAL_STREAM (per-participant), VIDEO_MIXED_GALLERY_VIEW.
# Receive a single mixed stream instead of the default per-participant streams
params = rtms.AudioParams()
params.data_opt = rtms.DataOption.AUDIO_MIXED_STREAM
client.set_audio_params(params)AudioSampleRate constants: SR_8K, SR_16K, SR_32K, SR_48K (default). AudioChannel constants: MONO, STEREO (default). DataOption audio constants: AUDIO_MULTI_STREAMS (default, per-participant), AUDIO_MIXED_STREAM.
params = rtms.DeskshareParams()
params.codec = rtms.VideoCodec.H264
params.resolution = rtms.VideoResolution.FHD
params.fps = 5
client.set_deskshare_params(params)Uses the same codec, resolution, fps, and data_opt fields as video.
By default the SDK auto-detects the spoken language before enabling transcription (~30 seconds). Providing a language hint lets transcription begin immediately:
# Hint the source language — skips auto-detect, transcription starts immediately
params = rtms.TranscriptParams()
params.src_language = rtms.TranscriptLanguage.ENGLISH
client.set_transcript_params(params)TranscriptLanguage constants: ENGLISH, SPANISH, JAPANESE, CHINESE_SIMPLIFIED, and many more. To use auto-detection, omit set_transcript_params or set src_language = rtms.TranscriptLanguage.NONE.
By default you receive a single composite stream of the active speaker. To receive per-participant video, first configure VIDEO_SINGLE_INDIVIDUAL_STREAM, then subscribe per participant as they join:
# Must be called before join() — switches from composite to per-participant streams
params = rtms.VideoParams()
params.data_opt = rtms.DataOption.VIDEO_SINGLE_INDIVIDUAL_STREAM
client.set_video_params(params)
# Subscribe when a participant joins, unsubscribe when they leave
@client.on_user_update
def on_user(op, participant):
if op == rtms.USER_JOIN and participant.id:
client.subscribe_video(participant.id, True)
if op == rtms.USER_LEAVE and participant.id:
client.subscribe_video(participant.id, False)
# Fires when a participant's video turns on or off
@client.on_participant_video
def on_participant_video(user_ids, is_on):
print(f'Video {"on" if is_on else "off"} for users: {user_ids}')
# Fires with the subscription result for each subscribe_video() call
@client.on_video_subscribed
def on_video_subscribed(user_id, status, error):
print(f'subscribe_video({user_id}): status={status}' + (f' error={error}' if error else ''))
@client.on_video_data
def on_video(data, size, timestamp, metadata):
print(f'Video: {size}B from {metadata.userName}')run_async() is a drop-in replacement for run() that uses asyncio.sleep() between polls, so it composes naturally with aiohttp, FastAPI, asyncpg, and any other async framework on a shared event loop:
import asyncio
import rtms
from aiohttp import web
routes = web.RouteTableDef()
@routes.post('/webhook')
async def webhook(request):
payload = await request.json()
if 'rtms_started' in payload.get('event', ''):
client = rtms.Client()
client.on_transcript_data(
lambda d, s, t, m: print(m.userName, d.decode())
)
client.join(payload['payload'])
return web.Response(text='ok')
async def main():
app = web.Application()
app.add_routes(routes)
runner = web.AppRunner(app)
await runner.setup()
await web.TCPSite(runner, port=8080).start()
await rtms.run_async() # yields control between polls — never blocks
asyncio.run(main())Async callbacks are detected automatically and scheduled on the running event loop:
client = rtms.Client()
async def save_audio(data, size, timestamp, metadata):
await db.insert('audio', data) # fully non-blocking
client.on_audio_data(save_audio) # coroutine detected, dispatched via loopFor CPU-bound or I/O-heavy callbacks that should not block the poll loop, pass a concurrent.futures.Executor:
from concurrent.futures import ThreadPoolExecutor
import rtms
# Per-client executor
client = rtms.Client(executor=ThreadPoolExecutor(max_workers=8))
client.on_audio_data(run_asr_model) # dispatched to thread pool
client.on_transcript_data(write_to_database) # dispatched to thread pool
# Global default for all clients in the event loop
rtms.run(executor=ThreadPoolExecutor(max_workers=32))Callbacks without an executor continue to run inline — identical to v1.0 behavior.
The Python SDK provides a layered set of primitives for scaling from a single meeting to hundreds of concurrent streams.
Appropriate for < ~20 concurrent meetings with lightweight callbacks (logging, forwarding, simple storage).
import rtms
@rtms.on_webhook_event
def handle(payload):
if payload.get('event') != 'meeting.rtms_started':
return
client = rtms.Client()
client.on_transcript_data(lambda d, s, t, m: print(m.userName, d.decode()))
client.join(payload['payload'])
rtms.run()- Event loop:
rtms.run()— synchronous, single-threaded - Callbacks: inline, no dispatch overhead
- GIL: released during
poll(), so other Python threads (e.g. the webhook HTTP server) stay responsive
Appropriate for CPU-bound callbacks (ML inference, audio processing) or I/O callbacks (database writes, S3 uploads) that would otherwise block the poll loop.
from concurrent.futures import ThreadPoolExecutor
import rtms
executor = ThreadPoolExecutor(max_workers=32)
@rtms.on_webhook_event
def handle(payload):
if payload.get('event') != 'meeting.rtms_started':
return
client = rtms.Client(executor=executor)
client.on_audio_data(run_asr_model) # dispatched to thread pool
client.on_transcript_data(write_to_database) # dispatched to thread pool
client.join(payload['payload'])
rtms.run()- Event loop:
rtms.run()— poll loop stays fast; callbacks run in the pool - Thread pool size: tune
max_workersto match your callback latency profile - Back-pressure: executor queue depth limits unbounded growth
Appropriate when the rest of your stack is already async (aiohttp, FastAPI, asyncpg). Callbacks declared as async def are auto-scheduled on the running event loop.
import asyncio
import rtms
async def on_transcript(data, size, timestamp, metadata):
await db.execute('INSERT INTO transcripts VALUES ($1, $2)', metadata.userId, data)
@rtms.on_webhook_event
def handle(payload):
if payload.get('event') != 'meeting.rtms_started':
return
client = rtms.Client()
client.on_transcript_data(on_transcript) # coroutine auto-detected
client.join(payload['payload'])
async def main():
await asyncio.gather(
rtms.run_async(),
start_web_server(),
)
asyncio.run(main())- Event loop:
rtms.run_async()—await asyncio.sleep()between polls, never blocks - Coroutine dispatch:
asyncio.run_coroutine_threadsafebridges SDK callbacks to the loop - Composable: runs alongside any other async service on the same loop
Appropriate for 50+ concurrent meetings. Each process runs its own rtms.run() loop; a load balancer routes webhook events to available workers.
Zoom → Load Balancer (nginx/HAProxy)
├── Worker 0 (rtms.run(), meetings 0..N)
├── Worker 1 (rtms.run(), meetings N..2N)
└── Worker 2 (rtms.run(), meetings 2N..3N)
Use a message queue (Redis pub/sub, RabbitMQ, Kafka) to distribute join requests:
# worker.py — one process per worker
import rtms, redis, json
r = redis.Redis()
sub = r.pubsub()
sub.subscribe('rtms:join')
@rtms.on_webhook_event
def handle(payload):
# Publish to queue; a free worker picks it up
r.publish('rtms:join', json.dumps(payload))
rtms.run()| Workload | Recommended Layer |
|---|---|
| Prototyping / light callbacks | Layer 1 — inline |
| CPU-bound (ASR, video) | Layer 2 — executor |
| Async framework (FastAPI, aiohttp) | Layer 3 — run_async |
| 50+ concurrent meetings | Layer 4 — multi-process |
All layers use the same Client API. You can mix layers — e.g. Layer 3 for the web server plus Layer 2 executors for heavy callbacks — without any API changes.
python3 -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install python-dotenv rtmsCreate a .env file:
# Required
ZM_RTMS_CLIENT=your_client_id
ZM_RTMS_SECRET=your_client_secret
# Optional
ZM_RTMS_PORT=8080
ZM_RTMS_PATH=/webhook
ZM_RTMS_LOG_LEVEL=info # error, warn, info, debug, trace
ZM_RTMS_LOG_FORMAT=progressive # progressive or json
ZM_RTMS_LOG_ENABLED=true