Skip to content

Commit

Permalink
for ossrs#250, decode the PAT of PSI ts packet.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jan 27, 2015
1 parent 51aecb8 commit 52b6291
Showing 5 changed files with 394 additions and 9 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -486,6 +486,8 @@ Supported operating systems and hardware:
).
1. Support HLS(h.264+mp3) streaming, read
[#301](https://github.com/winlinvip/simple-rtmp-server/issues/301).
1. [dev] Support push MPEG-TS over UDP to SRS, read
[#250](https://github.com/winlinvip/simple-rtmp-server/issues/250).
1. [no-plan] Support <500ms latency, FRSC(Fast RTMP-compatible Stream Channel tech).
1. [no-plan] Support RTMP 302 redirect [#92](https://github.com/winlinvip/simple-rtmp-server/issues/92).
1. [no-plan] Support multiple processes, for both origin and edge
8 changes: 3 additions & 5 deletions trunk/src/app/srs_app_mpegts_udp.cpp
Original file line number Diff line number Diff line change
@@ -35,19 +35,20 @@ using namespace std;
#include <srs_kernel_ts.hpp>
#include <srs_kernel_stream.hpp>
#include <srs_kernel_ts.hpp>
#include <srs_core_autofree.hpp>

#ifdef SRS_AUTO_STREAM_CASTER

SrsMpegtsOverUdp::SrsMpegtsOverUdp(SrsConfDirective* c)
{
stream = new SrsStream();
context = new SrsTsContext();
output = _srs_config->get_stream_caster_output(c);
}

SrsMpegtsOverUdp::~SrsMpegtsOverUdp()
{
srs_freep(stream);
srs_freep(context);
}

int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
@@ -85,10 +86,7 @@ int SrsMpegtsOverUdp::on_ts_packet(SrsStream* stream)
{
int ret = ERROR_SUCCESS;

SrsTsPacket* packet = new SrsTsPacket();
SrsAutoFree(SrsTsPacket, packet);

if ((ret = packet->decode(stream)) != ERROR_SUCCESS) {
if ((ret = context->decode(stream)) != ERROR_SUCCESS) {
srs_error("mpegts: decode ts packet failed. ret=%d", ret);
return ret;
}
2 changes: 2 additions & 0 deletions trunk/src/app/srs_app_mpegts_udp.hpp
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ class sockaddr_in;
#include <string>

class SrsStream;
class SrsTsContext;
class SrsConfDirective;

#ifdef SRS_AUTO_STREAM_CASTER
@@ -45,6 +46,7 @@ class SrsMpegtsOverUdp
{
private:
SrsStream* stream;
SrsTsContext* context;
std::string output;
public:
SrsMpegtsOverUdp(SrsConfDirective* c);
186 changes: 186 additions & 0 deletions trunk/src/kernel/srs_kernel_ts.cpp
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@ using namespace std;
#include <srs_kernel_buffer.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_kernel_stream.hpp>
#include <srs_core_autofree.hpp>

// in ms, for HLS aac sync time.
#define SRS_CONF_DEFAULT_AAC_SYNC 100
@@ -401,6 +402,33 @@ SrsMpegtsFrame::SrsMpegtsFrame()
key = false;
}

SrsTsContext::SrsTsContext()
{
}

SrsTsContext::~SrsTsContext()
{
}

int SrsTsContext::decode(SrsStream* stream)
{
int ret = ERROR_SUCCESS;

// parse util EOF of stream.
// for example, parse multiple times for the PES_packet_length(0) packet.
while (!stream->empty()) {
SrsTsPacket* packet = new SrsTsPacket();
SrsAutoFree(SrsTsPacket, packet);

if ((ret = packet->decode(stream)) != ERROR_SUCCESS) {
srs_error("mpegts: decode ts packet failed. ret=%d", ret);
return ret;
}
}

return ret;
}

SrsTsPacket::SrsTsPacket()
{
sync_byte = 0;
@@ -412,11 +440,13 @@ SrsTsPacket::SrsTsPacket()
adaption_field_control = SrsTsAdaptationFieldTypeReserved;
continuity_counter = 0;
adaptation_field = NULL;
payload = NULL;
}

SrsTsPacket::~SrsTsPacket()
{
srs_freep(adaptation_field);
srs_freep(payload);
}

int SrsTsPacket::decode(SrsStream* stream)
@@ -471,6 +501,23 @@ int SrsTsPacket::decode(SrsStream* stream)
// calc the user defined data size for payload.
int nb_payload = SRS_TS_PACKET_SIZE - (stream->pos() - pos);

// optional: payload.
if (adaption_field_control == SrsTsAdaptationFieldTypePayloadOnly || adaption_field_control == SrsTsAdaptationFieldTypeBoth) {
if (pid == SrsTsPidPAT) {
// 2.4.4.3 Program association Table
srs_freep(payload);
payload = new SrsTsPayloadPAT(this);
} else {
// left bytes as reserved.
stream->skip(nb_payload);
}

if (payload && (ret = payload->decode(stream)) != ERROR_SUCCESS) {
srs_error("ts: demux payload failed. ret=%d", ret);
return ret;
}
}

return ret;
}

@@ -713,6 +760,145 @@ int SrsTsAdaptationField::decode(SrsStream* stream)
return ret;
}

SrsTsPayloadPATProgram::SrsTsPayloadPATProgram()
{
number = 0;
pid = 0;
}

SrsTsPayloadPATProgram::~SrsTsPayloadPATProgram()
{
}

SrsTsPayload::SrsTsPayload(SrsTsPacket* p)
{
packet = p;
}

SrsTsPayload::~SrsTsPayload()
{
}

SrsTsPayloadPSI::SrsTsPayloadPSI(SrsTsPacket* p) : SrsTsPayload(p)
{
pointer_field = 0;
}

SrsTsPayloadPSI::~SrsTsPayloadPSI()
{
}

int SrsTsPayloadPSI::decode(SrsStream* stream)
{
int ret = ERROR_SUCCESS;

/**
* When the payload of the Transport Stream packet contains PSI data, the payload_unit_start_indicator has the following
* significance: if the Transport Stream packet carries the first byte of a PSI section, the payload_unit_start_indicator value
* shall be '1', indicating that the first byte of the payload of this Transport Stream packet carries the pointer_field. If the
* Transport Stream packet does not carry the first byte of a PSI section, the payload_unit_start_indicator value shall be '0',
* indicating that there is no pointer_field in the payload. Refer to 2.4.4.1 and 2.4.4.2. This also applies to private streams of
* stream_type 5 (refer to Table 2-29).
*/
if (packet->payload_unit_start_indicator) {
if (!stream->require(1)) {
ret = ERROR_STREAM_CASTER_TS_AF;
srs_error("ts: demux PSI failed. ret=%d", ret);
return ret;
}
pointer_field = stream->read_1bytes();
}

return ret;
}

SrsTsPayloadPAT::SrsTsPayloadPAT(SrsTsPacket* p) : SrsTsPayloadPSI(p)
{
nb_programs = 0;
programs = NULL;
}

SrsTsPayloadPAT::~SrsTsPayloadPAT()
{
srs_freep(programs);
}

int SrsTsPayloadPAT::decode(SrsStream* stream)
{
int ret = ERROR_SUCCESS;

if ((ret = SrsTsPayloadPSI::decode(stream)) != ERROR_SUCCESS) {
return ret;
}

// atleast 8B without programs and crc32
if (!stream->require(8)) {
ret = ERROR_STREAM_CASTER_TS_AF;
srs_error("ts: demux PAT failed. ret=%d", ret);
return ret;
}
// 1B
table_id = (SrsTsPsiId)stream->read_1bytes();

// 2B
section_length = stream->read_2bytes();

section_syntax_indicator = (section_length >> 15) & 0x01;
const0_value = (section_length >> 14) & 0x01;
section_length &= 0x0FFF;

if (!stream->require(section_length)) {
ret = ERROR_STREAM_CASTER_TS_AF;
srs_error("ts: demux PAT section failed. ret=%d", ret);
return ret;
}
int pos = stream->pos();

// 2B
transport_stream_id = stream->read_2bytes();

// 1B
current_next_indicator = stream->read_1bytes();

version_number = (current_next_indicator >> 1) & 0x1F;
current_next_indicator &= 0x01;

// TODO: FIXME: check the indicator.

// 1B
section_number = stream->read_1bytes();
// 1B
last_section_number = stream->read_1bytes();

// multiple 4B program data.
int program_bytes = section_length - 4 - (stream->pos() - pos);
nb_programs = program_bytes / 4;
if (nb_programs > 0) {
srs_freep(programs);
programs = new SrsTsPayloadPATProgram[nb_programs];

for (int i = 0; i < nb_programs; i++) {
SrsTsPayloadPATProgram* program = programs + i;

int tmpv = stream->read_4bytes();
program->number = (int16_t)((tmpv >> 16) & 0xFFFF);
program->pid = (int16_t)(tmpv & 0x1FFF);
}
}

// 4B
if (!stream->require(4)) {
ret = ERROR_STREAM_CASTER_TS_AF;
srs_error("ts: demux PAT crc32 failed. ret=%d", ret);
return ret;
}
CRC_32 = stream->read_4bytes();

// TODO: FIXME: verfy crc32.

return ret;
}

SrsTSMuxer::SrsTSMuxer(SrsFileWriter* w)
{
writer = w;
Loading

0 comments on commit 52b6291

Please sign in to comment.