Skip to content

Commit

Permalink
Implement rtmp play
Browse files Browse the repository at this point in the history
  • Loading branch information
illuspas committed Dec 3, 2024
1 parent 6dc3c1a commit ead7720
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 29 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ npx node-media-server

## Features
* HTTP/HTTP2-flv Push/Play
* RTMP Push
* RTMP Push/Play

## Roadmap
* RTMP Play
* HTTP-API
* Authentication
* Notification
Expand Down
25 changes: 11 additions & 14 deletions src/protocol/flv.js
Original file line number Diff line number Diff line change
Expand Up @@ -190,22 +190,19 @@ export default class Flv {
};

/**
* @param {number} type
* @param {number} time
* @param {number} size
* @param {Buffer} data
* @param {AVPacket} avpacket
* @returns {Buffer}
*/
static createMessage = (type, time, size, data) => {
const buffer = Buffer.alloc(11 + size + 4);
buffer[0] = type;
buffer.writeUintBE(size, 1, 3);
buffer[4] = (time >> 16) & 0xFF;
buffer[5] = (time >> 8) & 0xFF;
buffer[6] = time & 0xFF;
buffer[7] = (time >> 24) & 0xFF;
data.copy(buffer, 11, 0, size);
buffer.writeUint32BE(11 + size, 11 + size);
static createMessage = (avpacket) => {
const buffer = Buffer.alloc(11 + avpacket.size + 4);
buffer[0] = avpacket.codec_type;
buffer.writeUintBE(avpacket.size, 1, 3);
buffer[4] = (avpacket.dts >> 16) & 0xFF;
buffer[5] = (avpacket.dts >> 8) & 0xFF;
buffer[6] = avpacket.dts & 0xFF;
buffer[7] = (avpacket.dts >> 24) & 0xFF;
avpacket.data.copy(buffer, 11, 0, avpacket.size);
buffer.writeUint32BE(11 + avpacket.size, 11 + avpacket.size);
return buffer;
};

Expand Down
53 changes: 41 additions & 12 deletions src/protocol/rtmp.js
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ class RtmpPacket {
stream_id: 0
};
this.clock = 0;
/**@type {Buffer} */
this.payload = Buffer.alloc(0);
this.capacity = 0;
this.bytes = 0;
Expand Down Expand Up @@ -350,8 +349,33 @@ export default class Rtmp {
return null;
};

/**
* @param {AVPacket} avpacket
* @returns {Buffer}
*/
static createMessage = (avpacket) => {
let rtmpPacket = new RtmpPacket();
rtmpPacket.header.fmt = MESSAGE_FORMAT_0;
switch (avpacket.codec_type) {
case 8:
rtmpPacket.header.cid = RTMP_CHANNEL_AUDIO;
break;
case 9:
rtmpPacket.header.cid = RTMP_CHANNEL_VIDEO;
break;
case 18:
rtmpPacket.header.cid = RTMP_CHANNEL_DATA;
break;
}
rtmpPacket.header.length = avpacket.size;
rtmpPacket.header.type = avpacket.codec_type;
rtmpPacket.header.timestamp = avpacket.dts;
rtmpPacket.clock = avpacket.dts;
rtmpPacket.payload = avpacket.data;
return Rtmp.chunksCreate(rtmpPacket);
};

chunkBasicHeaderCreate = (fmt, cid) => {
static chunkBasicHeaderCreate = (fmt, cid) => {
let out;
if (cid >= 64 + 255) {
out = Buffer.alloc(3);
Expand All @@ -369,7 +393,7 @@ export default class Rtmp {
return out;
};

chunkMessageHeaderCreate = (header) => {
static chunkMessageHeaderCreate = (header) => {
let out = Buffer.alloc(rtmpHeaderSize[header.fmt % 4]);
if (header.fmt <= RTMP_CHUNK_TYPE_2) {
out.writeUIntBE(header.timestamp >= 0xffffff ? 0xffffff : header.timestamp, 0, 3);
Expand All @@ -386,16 +410,21 @@ export default class Rtmp {
return out;
};

chunksCreate = (packet) => {
/**
*
* @param {RtmpPacket} packet
* @returns {Buffer}
*/
static chunksCreate = (packet) => {
let header = packet.header;
let payload = packet.payload;
let payloadSize = header.length;
let chunkSize = this.outChunkSize;
let chunkSize = RTMP_MAX_CHUNK_SIZE;
let chunksOffset = 0;
let payloadOffset = 0;
let chunkBasicHeader = this.chunkBasicHeaderCreate(header.fmt, header.cid);
let chunkBasicHeader3 = this.chunkBasicHeaderCreate(RTMP_CHUNK_TYPE_3, header.cid);
let chunkMessageHeader = this.chunkMessageHeaderCreate(header);
let chunkBasicHeader = Rtmp.chunkBasicHeaderCreate(header.fmt, header.cid);
let chunkBasicHeader3 = Rtmp.chunkBasicHeaderCreate(RTMP_CHUNK_TYPE_3, header.cid);
let chunkMessageHeader = Rtmp.chunkMessageHeaderCreate(header);
let useExtendedTimestamp = header.timestamp >= 0xffffff;
let headerSize = chunkBasicHeader.length + chunkMessageHeader.length + (useExtendedTimestamp ? 4 : 0);
let n = headerSize + payloadSize + Math.floor(payloadSize / chunkSize);
Expand Down Expand Up @@ -686,18 +715,18 @@ export default class Rtmp {
}
this.streamName = invokeMessage.streamName.split("?")[0];
this.streamId = this.parserPacket.header.stream_id;
this.respondPublish();
this.onConnectCallback(this.streamApp, this.streamName);
this.onPushCallback();
this.respondPublish();
};

onPlay = (invokeMessage) => {
this.streamName = invokeMessage.streamName.split("?")[0];
this.streamPath = "/" + this.streamApp + "/" + this.streamName;
this.streamId = this.parserPacket.header.stream_id;
this.respondPlay();
this.onConnectCallback(this.streamApp, this.streamName);
this.onPlayCallback();
this.respondPlay();
};

onDeleteStream = (invokeMessage) => {
Expand Down Expand Up @@ -744,7 +773,7 @@ export default class Rtmp {
packet.header.stream_id = sid;
packet.payload = AMF.encodeAmf0Cmd(opt);
packet.header.length = packet.payload.length;
let chunks = this.chunksCreate(packet);
let chunks = Rtmp.chunksCreate(packet);
this.onOutputCallback(chunks);
};

Expand All @@ -756,7 +785,7 @@ export default class Rtmp {
packet.payload = AMF.encodeAmf0Data(opt);
packet.header.length = packet.payload.length;
packet.header.stream_id = sid;
let chunks = this.chunksCreate(packet);
let chunks = Rtmp.chunksCreate(packet);
this.onOutputCallback(chunks);
}

Expand Down
28 changes: 27 additions & 1 deletion src/server/broadcast_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import AVPacket from "../core/avpacket.js";
import Flv from "../protocol/flv.js";
import Rtmp from "../protocol/rtmp.js";
import BaseSession from "../session/base_session.js";

export default class BroadcastServer {
Expand All @@ -28,6 +29,15 @@ export default class BroadcastServer {

/** @type {Buffer | null} */
this.flvVideoHeader = null;

/** @type {Buffer | null} */
this.rtmpMetaData = null;

/** @type {Buffer | null} */
this.rtmpAudioHeader = null;

/** @type {Buffer | null} */
this.rtmpVideoHeader = null;
}

/**
Expand All @@ -47,6 +57,16 @@ export default class BroadcastServer {
session.sendBuffer(this.flvVideoHeader);
}
break;
case "rtmp":
if (this.rtmpMetaData != null) {
session.sendBuffer(this.rtmpMetaData);
}
if (this.rtmpAudioHeader != null) {
session.sendBuffer(this.rtmpAudioHeader);
}
if (this.rtmpVideoHeader != null) {
session.sendBuffer(this.rtmpVideoHeader);
}
}

this.subscribers.set(session.id, session);
Expand Down Expand Up @@ -88,16 +108,20 @@ export default class BroadcastServer {
* @param {AVPacket} packet
*/
broadcastMessage = (packet) => {
const flvMessage = Flv.createMessage(packet.codec_type, packet.dts, packet.size, packet.data);
const flvMessage = Flv.createMessage(packet);
const rtmpMessage = Rtmp.createMessage(packet);
switch (packet.flags) {
case 0:
this.flvAudioHeader = Buffer.from(flvMessage);
this.rtmpAudioHeader = Buffer.from(rtmpMessage);
break;
case 2:
this.flvVideoHeader = Buffer.from(flvMessage);
this.rtmpVideoHeader = Buffer.from(rtmpMessage);
break;
case 5:
this.flvMetaData = Buffer.from(flvMessage);
this.rtmpMetaData = Buffer.from(rtmpMessage);
break;
}

Expand All @@ -106,6 +130,8 @@ export default class BroadcastServer {
case "flv":
v.sendBuffer(flvMessage);
break;
case "rtmp":
v.sendBuffer(rtmpMessage);
}
});
};
Expand Down

0 comments on commit ead7720

Please sign in to comment.