From 9a0d11198549c139f8bc81ba53ced33a7736ec8a Mon Sep 17 00:00:00 2001 From: vroad <396351+vroad@users.noreply.github.com> Date: Tue, 13 Jun 2023 09:45:14 +0000 Subject: [PATCH] feat: initial packet-stats implementation --- src/main.cc | 16 ++++-- src/packet_stats_collector.hh | 102 ++++++++++++++++++++++++++++++++++ src/service_recorder.hh | 46 +++++++++++++-- test/cli_tests.sh | 1 + test/service_recorder_test.cc | 55 +++++++++++++++--- 5 files changed, 205 insertions(+), 15 deletions(-) create mode 100644 src/packet_stats_collector.hh diff --git a/src/main.cc b/src/main.cc index 20c4748..61b669e 100644 --- a/src/main.cc +++ b/src/main.cc @@ -72,7 +72,7 @@ Tools to process ARIB TS streams. [--pre-streaming] [] mirakc-arib filter-program-metadata [--sid=] [] mirakc-arib record-service --sid= --file= - --chunk-size= --num-chunks= [--start-pos=] [] + --chunk-size= --num-chunks= [--start-pos=] [--packet-stats] [] mirakc-arib track-airtime --sid= --eid= [] mirakc-arib seek-start --sid= [--max-duration=] [--max-packets=] [] @@ -629,7 +629,7 @@ Record a service stream into a ring buffer file Usage: mirakc-arib record-service --sid= --file= - --chunk-size= --num-chunks= [--start-pos=] [] + --chunk-size= --num-chunks= [--start-pos=] [--packet-stats] [] Options: -h --help @@ -651,6 +651,9 @@ Record a service stream into a ring buffer file --start-pos= [default: 0] A file position to start recoring. The value must be a multiple of the chunk size. + + --packet-stats + Enables statistics on TS packets. Arguments: @@ -1221,6 +1224,7 @@ void LoadOption(const Args& args, ServiceRecorderOption* opt) { static const std::string kChunkSize = "--chunk-size"; static const std::string kNumChunks = "--num-chunks"; static const std::string kStartPos = "--start-pos"; + static const std::string kPacketStats = "--packet-stats"; opt->sid = static_cast(args.at(kSid).asLong()); opt->file = args.at(kFile).asString(); @@ -1258,9 +1262,13 @@ void LoadOption(const Args& args, ServiceRecorderOption* opt) { std::abort(); } } + if (args.at(kPacketStats).asBool()) { + opt->packet_stats = true; + } MIRAKC_ARIB_INFO( - "ServiceRecorderOptions: sid={:04X} file={} chunk-size={} num-chunks={} start-pos={}", - opt->sid, opt->file, opt->chunk_size, opt->num_chunks, opt->start_pos); + "ServiceRecorderOptions: sid={:04X} file={} chunk-size={} num-chunks={} start-pos={} " + "packet-stats={}", + opt->sid, opt->file, opt->chunk_size, opt->num_chunks, opt->start_pos, opt->packet_stats); } void LoadOption(const Args& args, AirtimeTrackerOption* opt) { diff --git a/src/packet_stats_collector.hh b/src/packet_stats_collector.hh new file mode 100644 index 0000000..93a04a8 --- /dev/null +++ b/src/packet_stats_collector.hh @@ -0,0 +1,102 @@ +#pragma once + +#include +#include +#include +#include "base.hh" + +namespace { + +class PacketStatsCollector final { + public: + PacketStatsCollector() {} + ~PacketStatsCollector() {} + + void CollectPacketStats(const ts::TSPacket& packet) { + auto pid = packet.getPID(); + auto last_cc = stats[pid].last_cc; + auto cc = packet.getCC(); + auto has_payload = packet.hasPayload(); + auto tei = packet.getTEI(); + stats[pid].last_cc = cc; + stats[pid].last_packet = packet; + + if (packet.getScrambling()) { + ++scrambled_packets_; + } + + auto duplicate_found = false; + + if (packet.getDiscontinuityIndicator() || packet.getPID() == ts::PID_NULL) { + // do nothing + } else if (tei) { + ++error_packets_; + } else if (last_cc != ts::INVALID_CC) { + if (has_payload) { + if (last_cc == cc) { + if (stats[pid].last_packet != packet) { + // non-duplicate packet + ++dropped_packets_; + } else { + // duplicate packet + duplicate_found = true; + } + } else { + // regular packet + uint8_t expectedCC = (last_cc + 1) & ts::CC_MASK; + if (expectedCC != cc) { + ++dropped_packets_; + } + } + } else { + // Continuity counter should not increment if packet has no payload + if (last_cc != cc) { + ++dropped_packets_; + } + } + } + + if (duplicate_found) { + // duplicate packet is only allowed once + ++stats[pid].duplicate_packets; + if (stats[pid].duplicate_packets > 1) { + ++dropped_packets_; + } + } else { + stats[pid].duplicate_packets = 0; + } + } + + void ResetPacketStats() { + error_packets_ = 0; + dropped_packets_ = 0; + scrambled_packets_ = 0; + } + + uint64_t GetErrorPackets() const { + return error_packets_; + } + + uint64_t GetDroppedPackets() const { + return dropped_packets_; + } + + uint64_t GetScrambledPackets() const { + return scrambled_packets_; + } + + private: + struct PacketStat { + uint8_t last_cc = ts::INVALID_CC; + uint8_t duplicate_packets = 0; + ts::TSPacket last_packet; + }; + + MIRAKC_ARIB_NON_COPYABLE(PacketStatsCollector); + std::array stats; + uint64_t error_packets_ = 0; + uint64_t dropped_packets_ = 0; + uint64_t scrambled_packets_ = 0; +}; + +} // namespace diff --git a/src/service_recorder.hh b/src/service_recorder.hh index cf3845e..731a89d 100644 --- a/src/service_recorder.hh +++ b/src/service_recorder.hh @@ -12,6 +12,7 @@ #include "jsonl_source.hh" #include "logging.hh" #include "packet_sink.hh" +#include "packet_stats_collector.hh" #include "tsduck_helper.hh" #define MIRAKC_ARIB_SERVICE_RECORDER_TRACE(...) MIRAKC_ARIB_TRACE("service-recorder: " __VA_ARGS__) @@ -28,6 +29,7 @@ struct ServiceRecorderOption final { size_t chunk_size = 0; size_t num_chunks = 0; uint64_t start_pos = 0; + bool packet_stats = false; }; class ServiceRecorder final : public PacketSink, @@ -91,6 +93,10 @@ class ServiceRecorder final : public PacketSink, demux_.feedPacket(packet); + if (option_.packet_stats) { + packet_stats_collector_.CollectPacketStats(packet); + } + switch (state_) { case State::kPreparing: return OnPreparing(packet); @@ -113,6 +119,7 @@ class ServiceRecorder final : public PacketSink, // The application may purge expired programs in the message handler for // the `chunk` message. So, the program data must be updated before that. SendEventUpdateMessage(eit_, now, pos); + SendPacketStatsMessage(); SendChunkMessage(now, pos); } @@ -310,8 +317,7 @@ class ServiceRecorder final : public PacketSink, if (event_changed) { MIRAKC_ARIB_SERVICE_RECORDER_WARN("Event#{:04X} has started before Event#{:04X} ends", GetEvent(new_eit).event_id, GetEvent(eit).event_id); - UpdateEventBoundary(now, sink_->pos()); - SendEventEndMessage(eit); + HandleEventEnd(now, eit); SendEventStartMessage(new_eit); } else { if (IsUnspecifiedEventEndTime(GetEvent(eit))) { @@ -319,8 +325,7 @@ class ServiceRecorder final : public PacketSink, } else { auto end_time = GetEventEndTime(GetEvent(eit)); if (now >= end_time) { - UpdateEventBoundary(end_time, sink_->pos()); - SendEventEndMessage(eit); + HandleEventEnd(end_time, eit); event_started_ = false; // wait for new event } } @@ -340,6 +345,12 @@ class ServiceRecorder final : public PacketSink, event_boundary_pos_ = pos; } + void HandleEventEnd(const ts::Time& endTime, const std::shared_ptr& eit) { + UpdateEventBoundary(endTime, sink_->pos()); + SendPacketStatsMessage(); + SendEventEndMessage(eit); + } + void SendStartMessage() { MIRAKC_ARIB_SERVICE_RECORDER_INFO("Started recording SID#{:04X}", option_.sid); @@ -411,6 +422,32 @@ class ServiceRecorder final : public PacketSink, SendEventMessage("event-end", eit, event_boundary_time_, event_boundary_pos_); } + void SendPacketStatsMessage() { + if (!option_.packet_stats) { + return; + } + + auto error_packets = packet_stats_collector_.GetErrorPackets(); + auto dropped_packets = packet_stats_collector_.GetDroppedPackets(); + auto scrambled_packets = packet_stats_collector_.GetScrambledPackets(); + MIRAKC_ARIB_SERVICE_RECORDER_INFO("PacketStats: Error: {}, Dropped {}, Scrambled: {}", + error_packets, dropped_packets, scrambled_packets); + + rapidjson::Document doc(rapidjson::kObjectType); + auto& allocator = doc.GetAllocator(); + + rapidjson::Value data(rapidjson::kObjectType); + data.AddMember("errorPackets", error_packets, allocator); + data.AddMember("droppedPackets", dropped_packets, allocator); + data.AddMember("scrambledPackets", scrambled_packets, allocator); + + doc.AddMember("type", "packet-stats", allocator); + doc.AddMember("data", data, allocator); + + FeedDocument(doc); + packet_stats_collector_.ResetPacketStats(); + } + void SendEventMessage(const std::string& type, const std::shared_ptr& eit, const ts::Time& time, uint64_t pos) { MIRAKC_ARIB_ASSERT(eit); @@ -466,6 +503,7 @@ class ServiceRecorder final : public PacketSink, ts::PID pmt_pid_ = ts::PID_NULL; State state_ = State::kPreparing; bool event_started_ = false; + PacketStatsCollector packet_stats_collector_; MIRAKC_ARIB_NON_COPYABLE(ServiceRecorder); }; diff --git a/test/cli_tests.sh b/test/cli_tests.sh index f7b4557..4c674d6 100644 --- a/test/cli_tests.sh +++ b/test/cli_tests.sh @@ -72,6 +72,7 @@ assert 0 "$MIRAKC_ARIB filter-program-metadata --sid=0xFFFF" assert 0 "$MIRAKC_ARIB record-service --sid=1 --file=$TMPFILE --chunk-size=8192 --num-chunks=1" assert 0 "$MIRAKC_ARIB record-service --sid=1 --file=$TMPFILE --chunk-size=8192 --num-chunks=1 --start-pos=0" assert 0 "$MIRAKC_ARIB record-service --sid=1 --file=$TMPFILE --chunk-size=8192 --num-chunks=2 --start-pos=8192" +assert 0 "$MIRAKC_ARIB record-service --sid=1 --file=$TMPFILE --chunk-size=8192 --num-chunks=2 --packet-stats" if [ -z "$CI" ] then # This test fails in GitHub Actions. diff --git a/test/service_recorder_test.cc b/test/service_recorder_test.cc index 087c5c7..a51b9cc 100644 --- a/test/service_recorder_test.cc +++ b/test/service_recorder_test.cc @@ -15,6 +15,7 @@ constexpr size_t kNumChunks = 2; constexpr size_t kChunkSize = RingFileSink::kBufferSize * kNumBuffers; constexpr uint64_t kRingSize = kChunkSize * kNumChunks; const ServiceRecorderOption kOption{"/dev/null", 3, kChunkSize, kNumChunks}; +struct ServiceRecorderTest : testing::TestWithParam {}; } // namespace TEST(ServiceRecorderTest, NoPacket) { @@ -136,8 +137,8 @@ TEST(ServiceRecorderTest, EventStart) { EXPECT_TRUE(src.IsEmpty()); } -TEST(ServiceRecorderTest, EventProgress) { - ServiceRecorderOption option = kOption; +TEST_P(ServiceRecorderTest, EventProgress) { + ServiceRecorderOption option = GetParam(); TableSource src; auto ring_sink = std::make_unique(option.chunk_size, option.num_chunks); @@ -226,6 +227,18 @@ TEST(ServiceRecorderTest, EventProgress) { MockJsonlSink::Stringify(doc)); return true; }); + if (option.packet_stats) { + EXPECT_CALL(*json_sink, HandleDocument).WillOnce([](const rapidjson::Document& doc) { + EXPECT_EQ(R"({"type":"packet-stats","data":{)" + R"("errorPackets":0,)" + R"("droppedPackets":0,)" + R"("scrambledPackets":0)" + R"(})" + R"(})", + MockJsonlSink::Stringify(doc)); + return true; + }); + } EXPECT_CALL(*json_sink, HandleDocument).WillOnce([](const rapidjson::Document& doc) { EXPECT_EQ(R"({"type":"chunk","data":{"chunk":{)" R"("timestamp":1609426800000,"pos":16384)" @@ -253,6 +266,18 @@ TEST(ServiceRecorderTest, EventProgress) { MockJsonlSink::Stringify(doc)); return true; }); + if (option.packet_stats) { + EXPECT_CALL(*json_sink, HandleDocument).WillOnce([](const rapidjson::Document& doc) { + EXPECT_EQ(R"({"type":"packet-stats","data":{)" + R"("errorPackets":0,)" + R"("droppedPackets":0,)" + R"("scrambledPackets":0)" + R"(})" + R"(})", + MockJsonlSink::Stringify(doc)); + return true; + }); + } EXPECT_CALL(*json_sink, HandleDocument).WillOnce([](const rapidjson::Document& doc) { EXPECT_EQ(R"({"type":"chunk","data":{"chunk":{)" R"("timestamp":1609426800000,"pos":0)" @@ -267,7 +292,7 @@ TEST(ServiceRecorderTest, EventProgress) { EXPECT_CALL(*ring_sink, End).WillOnce(testing::Return()); } - auto recorder = std::make_unique(kOption); + auto recorder = std::make_unique(option); recorder->ServiceRecorder::Connect(std::move(ring_sink)); recorder->JsonlSource::Connect(std::move(json_sink)); src.Connect(std::move(recorder)); @@ -275,8 +300,8 @@ TEST(ServiceRecorderTest, EventProgress) { EXPECT_TRUE(src.IsEmpty()); } -TEST(ServiceRecorderTest, EventEnd) { - ServiceRecorderOption option = kOption; +TEST_P(ServiceRecorderTest, EventEnd) { + ServiceRecorderOption option = GetParam(); TableSource src; auto file = std::make_unique(); @@ -353,6 +378,18 @@ TEST(ServiceRecorderTest, EventEnd) { MockJsonlSink::Stringify(doc)); return true; }); + if (option.packet_stats) { + EXPECT_CALL(*json_sink, HandleDocument).WillOnce([](const rapidjson::Document& doc) { + EXPECT_EQ(R"({"type":"packet-stats","data":{)" + R"("errorPackets":0,)" + R"("droppedPackets":1,)" + R"("scrambledPackets":0)" + R"(})" + R"(})", + MockJsonlSink::Stringify(doc)); + return true; + }); + } EXPECT_CALL(*json_sink, HandleDocument).WillOnce([](const rapidjson::Document& doc) { EXPECT_EQ(R"({"type":"event-end","data":{)" R"("originalNetworkId":1,)" @@ -406,13 +443,13 @@ TEST(ServiceRecorderTest, EventEnd) { auto ring = std::make_unique(std::move(file), option.chunk_size, option.num_chunks); - auto recorder = std::make_unique(kOption); + auto recorder = std::make_unique(option); recorder->ServiceRecorder::Connect(std::move(ring)); recorder->JsonlSource::Connect(std::move(json_sink)); src.Connect(std::move(recorder)); EXPECT_EQ(EXIT_SUCCESS, src.FeedPackets()); EXPECT_TRUE(src.IsEmpty()); -} +}; TEST(ServiceRecorderTest, EventStartBeforeEventEnd) { ServiceRecorderOption option = kOption; @@ -791,3 +828,7 @@ TEST(ServiceRecorderTest, UnspecifiedEventEnd) { EXPECT_EQ(EXIT_SUCCESS, src.FeedPackets()); EXPECT_TRUE(src.IsEmpty()); } + +INSTANTIATE_TEST_SUITE_P(EventTestWithPacketStats, ServiceRecorderTest, + testing::Values(ServiceRecorderOption{"/dev/null", 3, kChunkSize, kNumChunks, 0, false}, + ServiceRecorderOption{"/dev/null", 3, kChunkSize, kNumChunks, 0, true}));