Skip to content

Commit

Permalink
feat: initial packet-stats implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
vroad committed Jun 13, 2023
1 parent e5b4e51 commit de6df08
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 12 deletions.
16 changes: 12 additions & 4 deletions src/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ Tools to process ARIB TS streams.
[--pre-streaming] [<file>]
mirakc-arib filter-program-metadata [--sid=<sid>] [<file>]
mirakc-arib record-service --sid=<sid> --file=<file>
--chunk-size=<bytes> --num-chunks=<num> [--start-pos=<pos>] [<file>]
--chunk-size=<bytes> --num-chunks=<num> [--start-pos=<pos>] [--packet-stats] [<file>]
mirakc-arib track-airtime --sid=<sid> --eid=<eid> [<file>]
mirakc-arib seek-start --sid=<sid>
[--max-duration=<ms>] [--max-packets=<num>] [<file>]
Expand Down Expand Up @@ -629,7 +629,7 @@ Record a service stream into a ring buffer file
Usage:
mirakc-arib record-service --sid=<sid> --file=<file>
--chunk-size=<bytes> --num-chunks=<num> [--start-pos=<pos>] [<file>]
--chunk-size=<bytes> --num-chunks=<num> [--start-pos=<pos>] [--packet-stats] [<file>]
Options:
-h --help
Expand All @@ -651,6 +651,9 @@ Record a service stream into a ring buffer file
--start-pos=<pos> [default: 0]
A file position to start recoring.
The value must be a multiple of the chunk size.
--packet-stats
Enables statistics on TS packets.
Arguments:
<file>
Expand Down Expand Up @@ -1221,6 +1224,7 @@ void LoadOption(const Args& args, ServiceRecorderOption* opt) {
static const std::string kChunkSize = "--chunk-size";
static const std::string kNumChunks = "--num-chunks";
static const std::string kStartPos = "--start-pos";
static const std::string kPacketStats = "--packet-stats";

opt->sid = static_cast<uint16_t>(args.at(kSid).asLong());
opt->file = args.at(kFile).asString();
Expand Down Expand Up @@ -1258,9 +1262,13 @@ void LoadOption(const Args& args, ServiceRecorderOption* opt) {
std::abort();
}
}
if (args.at(kPacketStats).asBool()) {
opt->packet_stats = true;
}
MIRAKC_ARIB_INFO(
"ServiceRecorderOptions: sid={:04X} file={} chunk-size={} num-chunks={} start-pos={}",
opt->sid, opt->file, opt->chunk_size, opt->num_chunks, opt->start_pos);
"ServiceRecorderOptions: sid={:04X} file={} chunk-size={} num-chunks={} start-pos={} "
"packet-stats={}",
opt->sid, opt->file, opt->chunk_size, opt->num_chunks, opt->start_pos, opt->packet_stats);
}

void LoadOption(const Args& args, AirtimeTrackerOption* opt) {
Expand Down
102 changes: 102 additions & 0 deletions src/packet_stats_collector.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#pragma once

#include <string.h>
#include <array>
#include <cstdint>
#include "base.hh"

namespace {

class PacketStatsCollector final {
public:
PacketStatsCollector() {}
~PacketStatsCollector() {}

void CollectPacketStats(const ts::TSPacket& packet) {
auto pid = packet.getPID();
auto last_cc = stats[pid].last_cc;
auto cc = packet.getCC();
auto has_payload = packet.hasPayload();
auto tei = packet.getTEI();
stats[pid].last_cc = cc;
stats[pid].last_packet = packet;

if (packet.getScrambling()) {
++scrambled_packets_;
}

auto duplicate_found = false;

if (packet.getDiscontinuityIndicator() || packet.getPID() == ts::PID_NULL) {
// do nothing
} else if (tei) {
++error_packets_;
} else if (last_cc != ts::INVALID_CC) {
if (has_payload) {
if (last_cc == cc) {
if (stats[pid].last_packet != packet) {
// non-duplicate packet
++dropped_packets_;
} else {
// duplicate packet
duplicate_found = true;
}
} else {
// regular packet
uint8_t expectedCC = (last_cc + 1) & ts::CC_MASK;
if (expectedCC != cc) {
++dropped_packets_;
}
}
} else {
// Continuity counter should not increment if packet has no payload
if (last_cc != cc) {
++dropped_packets_;
}
}
}

if (duplicate_found) {
// duplicate packet is only allowed once
++stats[pid].duplicate_packets;
if (stats[pid].duplicate_packets > 1) {
++dropped_packets_;
}
} else {
stats[pid].duplicate_packets = 0;
}
}

void ResetPacketStats() {
error_packets_ = 0;
dropped_packets_ = 0;
scrambled_packets_ = 0;
}

uint64_t GetErrorPackets() const {
return error_packets_;
}

uint64_t GetDroppedPackets() const {
return dropped_packets_;
}

uint64_t GetScrambledPackets() const {
return scrambled_packets_;
}

private:
struct PacketStat {
uint8_t last_cc = ts::INVALID_CC;
uint8_t duplicate_packets = 0;
ts::TSPacket last_packet;
};

MIRAKC_ARIB_NON_COPYABLE(PacketStatsCollector);
std::array<PacketStat, ts::PID_MAX> stats;
uint64_t error_packets_ = 0;
uint64_t dropped_packets_ = 0;
uint64_t scrambled_packets_ = 0;
};

} // namespace
46 changes: 42 additions & 4 deletions src/service_recorder.hh
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "jsonl_source.hh"
#include "logging.hh"
#include "packet_sink.hh"
#include "packet_stats_collector.hh"
#include "tsduck_helper.hh"

#define MIRAKC_ARIB_SERVICE_RECORDER_TRACE(...) MIRAKC_ARIB_TRACE("service-recorder: " __VA_ARGS__)
Expand All @@ -28,6 +29,7 @@ struct ServiceRecorderOption final {
size_t chunk_size = 0;
size_t num_chunks = 0;
uint64_t start_pos = 0;
bool packet_stats = false;
};

class ServiceRecorder final : public PacketSink,
Expand Down Expand Up @@ -91,6 +93,10 @@ class ServiceRecorder final : public PacketSink,

demux_.feedPacket(packet);

if (option_.packet_stats) {
packet_stats_collector_.CollectPacketStats(packet);
}

switch (state_) {
case State::kPreparing:
return OnPreparing(packet);
Expand All @@ -113,6 +119,7 @@ class ServiceRecorder final : public PacketSink,
// The application may purge expired programs in the message handler for
// the `chunk` message. So, the program data must be updated before that.
SendEventUpdateMessage(eit_, now, pos);
SendPacketStatsMessage();
SendChunkMessage(now, pos);
}

Expand Down Expand Up @@ -310,17 +317,15 @@ class ServiceRecorder final : public PacketSink,
if (event_changed) {
MIRAKC_ARIB_SERVICE_RECORDER_WARN("Event#{:04X} has started before Event#{:04X} ends",
GetEvent(new_eit).event_id, GetEvent(eit).event_id);
UpdateEventBoundary(now, sink_->pos());
SendEventEndMessage(eit);
HandleEventEnd(now, eit);
SendEventStartMessage(new_eit);
} else {
if (IsUnspecifiedEventEndTime(GetEvent(eit))) {
// Continue recording as the current program until the event changes.
} else {
auto end_time = GetEventEndTime(GetEvent(eit));
if (now >= end_time) {
UpdateEventBoundary(end_time, sink_->pos());
SendEventEndMessage(eit);
HandleEventEnd(end_time, eit);
event_started_ = false; // wait for new event
}
}
Expand All @@ -340,6 +345,12 @@ class ServiceRecorder final : public PacketSink,
event_boundary_pos_ = pos;
}

void HandleEventEnd(const ts::Time& endTime, const std::shared_ptr<ts::EIT>& eit) {
UpdateEventBoundary(endTime, sink_->pos());
SendPacketStatsMessage();
SendEventEndMessage(eit);
}

void SendStartMessage() {
MIRAKC_ARIB_SERVICE_RECORDER_INFO("Started recording SID#{:04X}", option_.sid);

Expand Down Expand Up @@ -411,6 +422,32 @@ class ServiceRecorder final : public PacketSink,
SendEventMessage("event-end", eit, event_boundary_time_, event_boundary_pos_);
}

void SendPacketStatsMessage() {
if (!option_.packet_stats) {
return;
}

auto error_packets = packet_stats_collector_.GetErrorPackets();
auto dropped_packets = packet_stats_collector_.GetDroppedPackets();
auto scrambled_packets = packet_stats_collector_.GetScrambledPackets();
MIRAKC_ARIB_SERVICE_RECORDER_INFO("PacketStats: Error: {}, Dropped {}, Scrambled: {}",
error_packets, dropped_packets, scrambled_packets);

rapidjson::Document doc(rapidjson::kObjectType);
auto& allocator = doc.GetAllocator();

rapidjson::Value data(rapidjson::kObjectType);
data.AddMember("errorPackets", error_packets, allocator);
data.AddMember("droppedPackets", dropped_packets, allocator);
data.AddMember("scrambledPackets", scrambled_packets, allocator);

doc.AddMember("type", "packet-stats", allocator);
doc.AddMember("data", data, allocator);

FeedDocument(doc);
packet_stats_collector_.ResetPacketStats();
}

void SendEventMessage(const std::string& type, const std::shared_ptr<ts::EIT>& eit,
const ts::Time& time, uint64_t pos) {
MIRAKC_ARIB_ASSERT(eit);
Expand Down Expand Up @@ -466,6 +503,7 @@ class ServiceRecorder final : public PacketSink,
ts::PID pmt_pid_ = ts::PID_NULL;
State state_ = State::kPreparing;
bool event_started_ = false;
PacketStatsCollector packet_stats_collector_;

MIRAKC_ARIB_NON_COPYABLE(ServiceRecorder);
};
Expand Down
1 change: 1 addition & 0 deletions test/cli_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ assert 0 "$MIRAKC_ARIB filter-program-metadata --sid=0xFFFF"
assert 0 "$MIRAKC_ARIB record-service --sid=1 --file=$TMPFILE --chunk-size=8192 --num-chunks=1"
assert 0 "$MIRAKC_ARIB record-service --sid=1 --file=$TMPFILE --chunk-size=8192 --num-chunks=1 --start-pos=0"
assert 0 "$MIRAKC_ARIB record-service --sid=1 --file=$TMPFILE --chunk-size=8192 --num-chunks=2 --start-pos=8192"
assert 0 "$MIRAKC_ARIB record-service --sid=1 --file=$TMPFILE --chunk-size=8192 --num-chunks=2 --packet-stats"
if [ -z "$CI" ]
then
# This test fails in GitHub Actions.
Expand Down
25 changes: 21 additions & 4 deletions test/service_recorder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ constexpr size_t kNumChunks = 2;
constexpr size_t kChunkSize = RingFileSink::kBufferSize * kNumBuffers;
constexpr uint64_t kRingSize = kChunkSize * kNumChunks;
const ServiceRecorderOption kOption{"/dev/null", 3, kChunkSize, kNumChunks};
struct ServiceRecorderTest : testing::TestWithParam<ServiceRecorderOption> {};
} // namespace

TEST(ServiceRecorderTest, NoPacket) {
Expand Down Expand Up @@ -275,8 +276,8 @@ TEST(ServiceRecorderTest, EventProgress) {
EXPECT_TRUE(src.IsEmpty());
}

TEST(ServiceRecorderTest, EventEnd) {
ServiceRecorderOption option = kOption;
TEST_P(ServiceRecorderTest, EventEnd) {
ServiceRecorderOption option = GetParam();

TableSource src;
auto file = std::make_unique<MockFile>();
Expand Down Expand Up @@ -353,6 +354,18 @@ TEST(ServiceRecorderTest, EventEnd) {
MockJsonlSink::Stringify(doc));
return true;
});
if (option.packet_stats) {
EXPECT_CALL(*json_sink, HandleDocument).WillOnce([](const rapidjson::Document& doc) {
EXPECT_EQ(R"({"type":"packet-stats","data":{)"
R"("errorPackets":0,)"
R"("droppedPackets":1,)"
R"("scrambledPackets":0)"
R"(})"
R"(})",
MockJsonlSink::Stringify(doc));
return true;
});
}
EXPECT_CALL(*json_sink, HandleDocument).WillOnce([](const rapidjson::Document& doc) {
EXPECT_EQ(R"({"type":"event-end","data":{)"
R"("originalNetworkId":1,)"
Expand Down Expand Up @@ -406,13 +419,17 @@ TEST(ServiceRecorderTest, EventEnd) {

auto ring =
std::make_unique<RingFileSink>(std::move(file), option.chunk_size, option.num_chunks);
auto recorder = std::make_unique<ServiceRecorder>(kOption);
auto recorder = std::make_unique<ServiceRecorder>(option);
recorder->ServiceRecorder::Connect(std::move(ring));
recorder->JsonlSource::Connect(std::move(json_sink));
src.Connect(std::move(recorder));
EXPECT_EQ(EXIT_SUCCESS, src.FeedPackets());
EXPECT_TRUE(src.IsEmpty());
}
};

INSTANTIATE_TEST_SUITE_P(EventEndWithParams, ServiceRecorderTest,
testing::Values(ServiceRecorderOption{"/dev/null", 3, kChunkSize, kNumChunks, 0, false},
ServiceRecorderOption{"/dev/null", 3, kChunkSize, kNumChunks, 0, true}));

TEST(ServiceRecorderTest, EventStartBeforeEventEnd) {
ServiceRecorderOption option = kOption;
Expand Down

0 comments on commit de6df08

Please sign in to comment.