Skip to content

Commit

Permalink
Refactor: Internal PID filtering addresses #174
Browse files Browse the repository at this point in the history
  • Loading branch information
Barracuda09 committed Nov 24, 2022
1 parent 5622022 commit 4ffe2c9
Show file tree
Hide file tree
Showing 21 changed files with 133 additions and 120 deletions.
2 changes: 1 addition & 1 deletion src/decrypt/dvbapi/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ namespace decrypt::dvbapi {
if (_connected && _enabled) {
const input::dvb::SpFrontendDecryptInterface frontend = _streamManager.getFrontendDecryptInterface(index);
const int maxBatchSize = frontend->getMaximumBatchSize();
static constexpr std::size_t size = buffer.getNumberOfTSPackets();
const std::size_t size = buffer.getNumberOfCompletedPackets();
for (std::size_t i = 0; i < size; ++i) {
// Get TS packet from the buffer
unsigned char *data = buffer.getTSPacketPtr(i);
Expand Down
5 changes: 3 additions & 2 deletions src/input/Device.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ class Device :
virtual bool isDataAvailable() = 0;

/// Read the available data from this device
/// @param buffer
virtual bool readFullTSPacket(mpegts::PacketBuffer &buffer) = 0;
/// @param buffer this is the buffer were to wirite to
/// @param finalCall this should be the last try and should return as soon as possible
virtual bool readTSPackets(mpegts::PacketBuffer &buffer, bool finalCall) = 0;

/// Check the capability of this device
/// @param system
Expand Down
24 changes: 10 additions & 14 deletions src/input/childpipe/TSReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,27 +112,24 @@ bool TSReader::isDataAvailable() {
return true;
}

bool TSReader::readFullTSPacket(mpegts::PacketBuffer &buffer) {
bool TSReader::readTSPackets(mpegts::PacketBuffer &buffer, const bool finalCall) {
if (!_exec.isOpen()) {
return false;
}
int readSize = 1;
for (int i = 7; i > 0 && !buffer.full() && readSize > 0; --i) {
int start = !_deviceData.isInternalPidFilteringEnabled() ? -1 : buffer.getNumberOfCompletePackets();
readSize = _exec.read(buffer.getWriteBufferPtr(), buffer.getAmountOfBytesToWrite());
for (int i = 0; i < 7; ++i) {
const int readSize = _exec.read(buffer.getWriteBufferPtr(), buffer.getAmountOfBytesToWrite());
if (readSize > 0) {
buffer.addAmountOfBytesWritten(readSize);
// Add data to Filter
if (buffer.isSynced()) {
_deviceData.getFilter().filterData(_feID, buffer, start);
} else if (buffer.trySyncing()) {
_deviceData.getFilter().filterData(_feID, buffer, start < 0 ? -1 : 0);
buffer.trySyncing();
_deviceData.getFilter().filterData(_feID, buffer, _deviceData.isInternalPidFilteringEnabled());
if (buffer.full() || finalCall) {
break;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(5));
}

// Check again if buffer is still not full
return buffer.full();
// Check again if buffer is full or final call before sending
return buffer.full() || (finalCall && buffer.isReadyToSend());
}

bool TSReader::capableOf(const input::InputSystem system) const {
Expand Down Expand Up @@ -185,7 +182,6 @@ bool TSReader::update() {
}
}
updatePIDFilters();
SI_LOG_DEBUG("Frontend: @#1, PIDs Table: @#2", _feID, _deviceData.getFilter().getPidCSV());
SI_LOG_DEBUG("Frontend: @#1, Updating frontend (Finished)", _feID);
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/input/childpipe/TSReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class TSReader :

virtual bool isDataAvailable() final;

virtual bool readFullTSPacket(mpegts::PacketBuffer &buffer) final;
virtual bool readTSPackets(mpegts::PacketBuffer &buffer, bool finalCall) final;

virtual bool capableOf(input::InputSystem msys) const final;

Expand Down
3 changes: 1 addition & 2 deletions src/input/childpipe/TSReaderData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ TSReaderData::TSReaderData() {
doInitialize();
}

TSReaderData::~TSReaderData() {}

// =============================================================================
// -- input::DeviceData --------------------------------------------------------
// =============================================================================
Expand All @@ -53,6 +51,7 @@ void TSReaderData::doInitialize() {

void TSReaderData::doParseStreamString(const FeID UNUSED(id), const TransportParamVector& params) {
const std::string filePath = params.getURIParameter("exec");
// Check did we receive an new path or just the same again
if (filePath.empty() || (hasFilePath() && filePath == _filePath)) {
parseAndUpdatePidsTable(params);
return;
Expand Down
2 changes: 1 addition & 1 deletion src/input/childpipe/TSReaderData.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class TSReaderData :
public:

TSReaderData();
virtual ~TSReaderData();
virtual ~TSReaderData() = default;

// =========================================================================
// -- input::DeviceData ----------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/input/dvb/Frontend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ bool Frontend::isDataAvailable() {
return false;
}

bool Frontend::readFullTSPacket(mpegts::PacketBuffer &buffer) {
bool Frontend::readTSPackets(mpegts::PacketBuffer &buffer, const bool UNUSED(finalCall)) {
// try read maximum amount of bytes from DMX
const auto readSize = ::read(_fd_dmx, buffer.getWriteBufferPtr(), buffer.getAmountOfBytesToWrite());
if (readSize > 0) {
Expand Down
2 changes: 1 addition & 1 deletion src/input/dvb/Frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class Frontend :

virtual bool isDataAvailable() final;

virtual bool readFullTSPacket(mpegts::PacketBuffer &buffer) final;
virtual bool readTSPackets(mpegts::PacketBuffer &buffer, bool finalCall) final;

virtual bool capableOf(InputSystem system) const final;

Expand Down
2 changes: 1 addition & 1 deletion src/input/file/TSReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ bool TSReader::isDataAvailable() {
return true;
}

bool TSReader::readFullTSPacket(mpegts::PacketBuffer &buffer) {
bool TSReader::readTSPackets(mpegts::PacketBuffer &buffer, const bool UNUSED(finalCall)) {
if (!_file.is_open()) {
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion src/input/file/TSReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class TSReader :

virtual bool isDataAvailable() final;

virtual bool readFullTSPacket(mpegts::PacketBuffer &buffer) final;
virtual bool readTSPackets(mpegts::PacketBuffer &buffer, bool finalCall) final;

virtual bool capableOf(input::InputSystem msys) const final;

Expand Down
3 changes: 1 addition & 2 deletions src/input/stream/Streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ bool Streamer::isDataAvailable() {
return false;
}

bool Streamer::readFullTSPacket(mpegts::PacketBuffer &buffer) {
bool Streamer::readTSPackets(mpegts::PacketBuffer &buffer, const bool UNUSED(finalCall)) {
if (_udpMultiListen.getFD() == -1) {
return false;
}
Expand Down Expand Up @@ -183,7 +183,6 @@ bool Streamer::update() {
}
}
updatePIDFilters();
SI_LOG_DEBUG("Frontend: @#1, PIDs Table: @#2", _feID, _deviceData.getFilter().getPidCSV());
SI_LOG_DEBUG("Frontend: @#1, Updating frontend (Finished)", _feID);
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/input/stream/Streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class Streamer :

virtual bool isDataAvailable() final;

virtual bool readFullTSPacket(mpegts::PacketBuffer &buffer) final;
virtual bool readTSPackets(mpegts::PacketBuffer &buffer, bool finalCall) final;

virtual bool capableOf(input::InputSystem msys) const final;

Expand Down
14 changes: 8 additions & 6 deletions src/mpegts/Filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,16 @@ void Filter::parsePIDString(const std::string &reqPids,
}
}

void Filter::filterData(const FeID id, mpegts::PacketBuffer &buffer, const int start) {
void Filter::filterData(const FeID id, mpegts::PacketBuffer &buffer, const bool filter) {
// base::MutexLock lock(_mutex);
static constexpr std::size_t size = mpegts::PacketBuffer::getNumberOfTSPackets();
for (std::size_t i = start < 0 ? 0 : start; i < size; ++i) {
const std::size_t size = buffer.getNumberOfCompletedPackets();
const std::size_t begin = buffer.getBeginOfUnFilteredPackets();

for (std::size_t i = begin; i < size; ++i) {
const unsigned char *ptr = buffer.getTSPacketPtr(i);
// Check is this the beginning of the TS and no Transport error indicator
if (!(ptr[0] == 0x47 && (ptr[1] & 0x80) != 0x80)) {
if (start >= 0 && !_pidTable.isAllPID()) {
if (filter && !_pidTable.isAllPID()) {
buffer.markTSForPurging(i);
}
continue;
Expand All @@ -93,7 +95,7 @@ void Filter::filterData(const FeID id, mpegts::PacketBuffer &buffer, const int s
const uint16_t pid = ((ptr[1] & 0x1f) << 8) | ptr[2];
// If pid was not opened, skip this one (and perhaps filter out it)
if (!_pidTable.isPIDOpened(pid)) {
if (start >= 0 && !_pidTable.isAllPID()) {
if (filter && !_pidTable.isAllPID()) {
buffer.markTSForPurging(i);
}
continue;
Expand Down Expand Up @@ -187,7 +189,7 @@ void Filter::filterData(const FeID id, mpegts::PacketBuffer &buffer, const int s
_pcr->collectData(id, ptr);
}
}
if (start >= 0) {
if (filter) {
buffer.purge();
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/mpegts/Filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ class Filter {
/// optionally purge TS packets from unused pids if start is not negative
/// @param feID specifies the frontend ID
/// @param buffer specifies the mpegts buffer from the frontend
/// @param start enables the pid filtering starting from the packet indicated
void filterData(FeID id, mpegts::PacketBuffer &buffer, const int start = -1);
/// @param filter enables the software pid filtering
void filterData(FeID id, mpegts::PacketBuffer &buffer, bool filter = false);

///
bool isMarkedAsActivePMT(int pid) const;
Expand Down
34 changes: 20 additions & 14 deletions src/mpegts/PacketBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,14 @@ void PacketBuffer::initialize(const uint32_t ssrc, const long timestamp) {
_buffer[10] = (ssrc >> 8) & 0xff; // synchronization source
_buffer[11] = (ssrc >> 0) & 0xff; // synchronization source

_writeIndex = RTP_HEADER_LEN;
_initialized = true;
}

bool PacketBuffer::trySyncing() {
const std::size_t size = getCurrentBufferSize();
if (size < (TS_PACKET_SIZE * 3)) {
return false;
}
if (isSynced()) {
return true;
}
Expand All @@ -65,6 +68,7 @@ bool PacketBuffer::trySyncing() {
// found sync, now move it to begin of buffer
const size_t cpySize = (MTU_MAX_TS_PACKET_SIZE + RTP_HEADER_LEN) - i;
_writeIndex = _writeIndex - (i - RTP_HEADER_LEN);
_processedIndex = _writeIndex;
std::memmove(_buffer + RTP_HEADER_LEN, cData, cpySize);
return true;
}
Expand All @@ -75,21 +79,21 @@ bool PacketBuffer::trySyncing() {
}

void PacketBuffer::markTSForPurging(std::size_t packetNumber) {
unsigned char *cData = getTSPacketPtr(packetNumber);
// Invalid TS packets are labeled 0xFF _after_ the first SYNC Byte.
if (cData < getWriteBufferPtr()) {
if (packetNumber <= NUMBER_OF_TS_PACKETS) {
// Invalid TS packets are labeled 0xFF _after_ the first SYNC Byte.
unsigned char *cData = getTSPacketPtr(packetNumber);
cData[1] = 0xFF;
_purgePending = true;
++_purgePending;
}
}

void PacketBuffer::purge() {
if (!_purgePending) {
if (_purgePending == 0) {
return;
}
_purgePending = false;
std::size_t bufSize = getBufferSize();
if (bufSize <= 0) {
_purgePending = 0;
const std::size_t bufSize = getCurrentBufferSize();
if (bufSize == 0) {
return;
}
// i: represents the packet number, and not the packet position!
Expand All @@ -102,7 +106,7 @@ void PacketBuffer::purge() {
skipData += TS_PACKET_SIZE;
continue;
}
// Remove current (plus others) packet from the buffer
// Remove current (plus other) packet from the buffer
unsigned char *endData = getWriteBufferPtr();
unsigned char *nextData = cData + skipData + TS_PACKET_SIZE;
if (nextData < endData) {
Expand All @@ -127,15 +131,17 @@ void PacketBuffer::tagRTPHeaderWith(const uint16_t cseq, const long timestamp) {
}

bool PacketBuffer::isReadyToSend() const {
// can only be ready when buffer is full (or ready to flush), so start from there
bool ready = (getBufferSize() > 0) ? true : false;
// ready to send, when there is something in the buffer in TS_PACKET_SIZE chucks
// and if all scramble flags are cleared
bool ready = (getCurrentBufferSize() % TS_PACKET_SIZE) == 0;
if (_decryptPending && ready) {
for (std::size_t i = 0; i < getNumberOfCompletePackets(); ++i) {
const std::size_t size = getNumberOfCompletedPackets();
for (std::size_t i = 0; i < size; ++i) {
const unsigned char *ts = getTSPacketPtr(i);
ready &= ((ts[3] & 0x80) != 0x80);
}
}
return ready;
}

} // namespace
}
Loading

0 comments on commit 4ffe2c9

Please sign in to comment.