纯python RTMP服务器
项目描述
pyrtmp:纯 Python RTMP 服务器
- 纯蟒蛇
- 支持 uvloop 的 AsyncIO
- 易于定制
- 支持 RTMP
- 支持 RTMPT
公告
在生产服务器中使用此软件包多年后。它运行完美,没有任何问题。所以我决定从0.2.0 版本开始将开发状态从Beta切换到Production 。另外,我在下面的部署部分分享我的配置。
如果您有任何问题。随意在GitHub 上创建问题。
快速开始
您必须创建自己的 rtmp 控制器来决定当用户连接或接收到流时要做什么。下面的示例显示了使用 RTMP 接收流并将它们写入 flv 文件的过程。
如果您正在寻找 RTMPT,请查看pyrtmp/misc/rtmpt.py
PS 请求请求是受欢迎的!
import asyncio
import logging
import os
import tempfile
from pyrtmp import StreamClosedException, RTMPProtocol
from pyrtmp.messages import SessionManager
from pyrtmp.messages.audio import AudioMessage
from pyrtmp.messages.command import NCConnect, NCCreateStream, NSPublish, NSCloseStream, NSDeleteStream
from pyrtmp.messages.data import MetaDataMessage
from pyrtmp.messages.protocolcontrol import WindowAcknowledgementSize, SetChunkSize, SetPeerBandwidth
from pyrtmp.messages.usercontrol import StreamBegin
from pyrtmp.messages.video import VideoMessage
from pyrtmp.misc.flvdump import FLVFile, FLVMediaType
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
async def simple_controller(reader, writer):
session = SessionManager(reader=reader, writer=writer)
flv = None
try:
logger.debug(f'Client connected {session.peername}')
# do handshake
await session.handshake()
# read chunks
async for chunk in session.read_chunks_from_stream():
message = chunk.as_message()
logger.debug(f"Receiving {str(message)} {message.chunk_id}")
if isinstance(message, NCConnect):
session.write_chunk_to_stream(WindowAcknowledgementSize(ack_window_size=5000000))
session.write_chunk_to_stream(SetPeerBandwidth(ack_window_size=5000000, limit_type=2))
session.write_chunk_to_stream(StreamBegin(stream_id=0))
session.write_chunk_to_stream(SetChunkSize(chunk_size=8192))
session.writer_chunk_size = 8192
session.write_chunk_to_stream(message.create_response())
await session.drain()
logger.debug("Response to NCConnect")
elif isinstance(message, WindowAcknowledgementSize):
pass
elif isinstance(message, NCCreateStream):
session.write_chunk_to_stream(message.create_response())
await session.drain()
logger.debug("Response to NCCreateStream")
elif isinstance(message, NSPublish):
# create flv file at temp
flv = FLVFile(os.path.join(tempfile.gettempdir(), message.publishing_name))
session.write_chunk_to_stream(StreamBegin(stream_id=1))
session.write_chunk_to_stream(message.create_response())
await session.drain()
logger.debug("Response to NSPublish")
elif isinstance(message, MetaDataMessage):
# Write meta data to file
flv.write(0, message.to_raw_meta(), FLVMediaType.OBJECT)
elif isinstance(message, SetChunkSize):
session.reader_chunk_size = message.chunk_size
elif isinstance(message, VideoMessage):
# Write video data to file
flv.write(message.timestamp, message.payload, FLVMediaType.VIDEO)
elif isinstance(message, AudioMessage):
# Write data data to file
flv.write(message.timestamp, message.payload, FLVMediaType.AUDIO)
elif isinstance(message, NSCloseStream):
pass
elif isinstance(message, NSDeleteStream):
pass
else:
logger.debug(f"Unknown message {str(message)}")
except StreamClosedException as ex:
logger.debug(f"Client {session.peername} disconnected!")
finally:
if flv:
flv.close()
async def serve_rtmp(use_protocol=True):
loop = asyncio.get_running_loop()
if use_protocol is True:
server = await loop.create_server(lambda: RTMPProtocol(controller=simple_controller, loop=loop), '0.0.0.0', 1935)
else:
server = await asyncio.start_server(simple_controller, '0.0.0.0', 1935)
addr = server.sockets[0].getsockname()
logger.info(f'Serving on {addr}')
async with server:
await server.serve_forever()
def wrapper(port: int):
asyncio.run(serve_rtmp(port=port))
IS_DEBUG=True
NUM_PROCESS=2
if __name__ == "__main__":
if IS_DEBUG is True:
wrapper(1935)
else:
from multiprocessing import Process
import uvloop
uvloop.install()
process = []
for i in range(NUM_PROCESS):
p = Process(target=wrapper, args=(1935 + i + 1,))
p.start()
process.append(p)
for p in process:
p.join()
部署
我在生产环境中推荐了 nginx + uvloop。
示例:您有 2 个 CPU
- 设置 DEBUG=False 和 NUM_PROCESS=2
- 设置 nginx 以在 rtmp 服务器之间进行负载平衡,如下所示:
nginx.conf
stream {
upstream stream_backend {
127.0.0.1:1936;
127.0.0.1:1937;
}
server {
listen 1935;
proxy_pass stream_backend;
}
}
您可以像这样使用简单的 ffmpeg 命令测试您的服务器
ffmpeg -i my_test_file.flv -c:v copy -c:a copy -f flv rtmp://127.0.0.1:1935/test/sample
路线图
- 支持 HTTP2/3
- 支持 AMF3
- 重播
- HLS 播放
- 文档