Skip to main content

纯python RTMP服务器

项目描述

pyrtmp:纯 Python RTMP 服务器

  • 纯蟒蛇
  • 支持 uvloop 的 AsyncIO
  • 易于定制
  • 支持 RTMP
  • 支持 RTMPT

公告

在生产服务器中使用此软件包多年后。它运行完美,没有任何问题。所以我决定从0.2.0 版本开始将开发状态从Beta切换到Production 。另外,我在下面的部署部分分享我的配置。

如果您有任何问题。随意在GitHub 上创建问题。

快速开始

您必须创建自己的 rtmp 控制器来决定当用户连接或接收到流时要做什么。下面的示例显示了使用 RTMP 接收流并将它们写入 flv 文件的过程。

如果您正在寻找 RTMPT,请查看pyrtmp/misc/rtmpt.py

PS 请求请求是受欢迎的!

简单的 RTMP 控制器

import asyncio  
import logging  
import os  
import tempfile  

from pyrtmp import StreamClosedException, RTMPProtocol  
from pyrtmp.messages import SessionManager  
from pyrtmp.messages.audio import AudioMessage  
from pyrtmp.messages.command import NCConnect, NCCreateStream, NSPublish, NSCloseStream, NSDeleteStream  
from pyrtmp.messages.data import MetaDataMessage  
from pyrtmp.messages.protocolcontrol import WindowAcknowledgementSize, SetChunkSize, SetPeerBandwidth  
from pyrtmp.messages.usercontrol import StreamBegin  
from pyrtmp.messages.video import VideoMessage  
from pyrtmp.misc.flvdump import FLVFile, FLVMediaType  
  
logging.basicConfig(level=logging.DEBUG)  
logger = logging.getLogger(__name__)  
logger.setLevel(logging.DEBUG)  
  

async def simple_controller(reader, writer):  
    session = SessionManager(reader=reader, writer=writer)  
    flv = None  
    try:  
        logger.debug(f'Client connected {session.peername}')  
  
        # do handshake  
        await session.handshake()  
  
        # read chunks  
        async for chunk in session.read_chunks_from_stream():
            message = chunk.as_message()  
            logger.debug(f"Receiving {str(message)} {message.chunk_id}")  
            if isinstance(message, NCConnect):  
                session.write_chunk_to_stream(WindowAcknowledgementSize(ack_window_size=5000000))  
                session.write_chunk_to_stream(SetPeerBandwidth(ack_window_size=5000000, limit_type=2))  
                session.write_chunk_to_stream(StreamBegin(stream_id=0))  
                session.write_chunk_to_stream(SetChunkSize(chunk_size=8192))  
                session.writer_chunk_size = 8192  
                session.write_chunk_to_stream(message.create_response())  
                await session.drain()  
                logger.debug("Response to NCConnect")  
            elif isinstance(message, WindowAcknowledgementSize):  
                pass  
            elif isinstance(message, NCCreateStream):  
                session.write_chunk_to_stream(message.create_response())  
                await session.drain()  
                logger.debug("Response to NCCreateStream")  
            elif isinstance(message, NSPublish):  
                # create flv file at temp  
                flv = FLVFile(os.path.join(tempfile.gettempdir(), message.publishing_name))  
                session.write_chunk_to_stream(StreamBegin(stream_id=1))  
                session.write_chunk_to_stream(message.create_response())  
                await session.drain()  
                logger.debug("Response to NSPublish")  
            elif isinstance(message, MetaDataMessage):  
                # Write meta data to file  
                flv.write(0, message.to_raw_meta(), FLVMediaType.OBJECT)  
            elif isinstance(message, SetChunkSize):  
                session.reader_chunk_size = message.chunk_size  
            elif isinstance(message, VideoMessage):  
                # Write video data to file  
                flv.write(message.timestamp, message.payload, FLVMediaType.VIDEO)  
            elif isinstance(message, AudioMessage):  
                # Write data data to file  
                flv.write(message.timestamp, message.payload, FLVMediaType.AUDIO)  
            elif isinstance(message, NSCloseStream):  
                pass  
            elif isinstance(message, NSDeleteStream):  
                pass  
            else:  
                logger.debug(f"Unknown message {str(message)}")  
  
    except StreamClosedException as ex:  
        logger.debug(f"Client {session.peername} disconnected!")  
    finally:  
        if flv:  
            flv.close()  
  
  
async def serve_rtmp(use_protocol=True):  
    loop = asyncio.get_running_loop()  
    if use_protocol is True:  
        server = await loop.create_server(lambda: RTMPProtocol(controller=simple_controller, loop=loop), '0.0.0.0', 1935)  
    else:  
        server = await asyncio.start_server(simple_controller, '0.0.0.0', 1935)  
    addr = server.sockets[0].getsockname()  
    logger.info(f'Serving on {addr}')  
    async with server:  
        await server.serve_forever()  
  
def wrapper(port: int):
    asyncio.run(serve_rtmp(port=port))

    
IS_DEBUG=True
NUM_PROCESS=2

if __name__ == "__main__":
    if IS_DEBUG is True:
        wrapper(1935)
    else:
        from multiprocessing import Process
        import uvloop
        uvloop.install()
        process = []
        for i in range(NUM_PROCESS):
            p = Process(target=wrapper, args=(1935 + i + 1,))
            p.start()
            process.append(p)
        for p in process:
            p.join()

部署

我在生产环境中推荐了 nginx + uvloop。

示例:您有 2 个 CPU

  1. 设置 DEBUG=False 和 NUM_PROCESS=2
  2. 设置 nginx 以在 rtmp 服务器之间进行负载平衡,如下所示:

nginx.conf

stream {

    upstream stream_backend {
        127.0.0.1:1936;
        127.0.0.1:1937;
    }

    server {
        listen     1935;
        proxy_pass stream_backend;
    }
}

您可以像这样使用简单的 ffmpeg 命令测试您的服务器

ffmpeg -i my_test_file.flv -c:v copy -c:a copy -f flv rtmp://127.0.0.1:1935/test/sample

路线图

  • 支持 HTTP2/3
  • 支持 AMF3
  • 重播
  • HLS 播放
  • 文档

项目详情


下载文件

下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

源分布

pyrtmp-0.2.0.tar.gz (16.7 kB 查看哈希

已上传 source