Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: initial packet-stats implementation #77

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions src/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ Tools to process ARIB TS streams.
[--pre-streaming] [<file>]
mirakc-arib filter-program-metadata [--sid=<sid>] [<file>]
mirakc-arib record-service --sid=<sid> --file=<file>
--chunk-size=<bytes> --num-chunks=<num> [--start-pos=<pos>] [<file>]
--chunk-size=<bytes> --num-chunks=<num> [--start-pos=<pos>] [--packet-stats] [<file>]
mirakc-arib track-airtime --sid=<sid> --eid=<eid> [<file>]
mirakc-arib seek-start --sid=<sid>
[--max-duration=<ms>] [--max-packets=<num>] [<file>]
Expand Down Expand Up @@ -629,7 +629,7 @@ Record a service stream into a ring buffer file
Usage:
mirakc-arib record-service --sid=<sid> --file=<file>
--chunk-size=<bytes> --num-chunks=<num> [--start-pos=<pos>] [<file>]
--chunk-size=<bytes> --num-chunks=<num> [--start-pos=<pos>] [--packet-stats] [<file>]
Options:
-h --help
Expand All @@ -651,6 +651,9 @@ Record a service stream into a ring buffer file
--start-pos=<pos> [default: 0]
A file position to start recoring.
The value must be a multiple of the chunk size.
--packet-stats
Enables statistics on TS packets.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ヘルプのJSON Messagesのセクションに,サンプルとフィールドの説明を追加してください.

通知される値は,前回からの増分であることも記述しておいたほうがよいだろうと思います.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

現在の仕様では,通知の間隔を明示していません.これでもドロップ数はカウント可能なので問題ありません.

ただ,特定チャンク内のドロップ数として区間を明示しておいたほうが,情報の利用価値がより高くなるかもしれません.例えば,ある時ドロップが通知されたとして,現仕様では区間が明示されていないので,前回通知されたストリーム位置の「近辺」から,今回通知されたストリーム位置の「近辺」までにドロップがあったということになります.

現実装ではチャンク境界で通知するため,実装上はチャンクごとに通知しています.個人的には,この動作を仕様として通知間隔を仕様上明示したほうが便利に使える場面が増えるような気がします.

この場合,通知メッセージにチャンク番号のフィールドを追加したほうが良いと思います.

{
  "type": "packet-stats",
  "data": {
    // 統計情報の対象チャンク
    "chunk": {
      "timestamp": "<unix-time-ms>",
      "pos": 0
    },
    // 以下の情報はchunk#<chunk.pos>に対する統計情報
    "stats": {
      "errorPackets": 0, // Transport Error Indicatorが1のパケット数
      "droppedPackets": 1, // ドロップしたパケット数
      "scrambledPackets": 0 // Scrambling controlが1のパケット数
    }
  }
}

mirakcのweb endpointでは,レコード単位の統計情報となるため,このように仕様を変更しても何の恩恵もないのですが,mirakc-aribコマンドの仕様としては,通知間隔を明示したほうがよいのではないかと思います.

Copy link
Contributor Author

@vroad vroad Jun 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mirakc-aribコマンドの仕様としては,通知間隔を明示したほうがよいのではないかと思います.

mirakc-aribとJSONLを扱うツール(jq等)を使うスクリプトを自作し、
チャンク毎のドロップの状況を見るような場合を想定してでしょうか?

チャンク境界に到達した時だけでなく、番組が終了したときもpacket-statsの通知があります。
番組毎のドロップを収集する際、番組終了付近でドロップがあると
実際には前の番組のドロップなのにも関わらず、次の番組のドロップが加算されるというような問題を防ぐためです。
この場合、次のチャンクに到達していないにも関わらず通知されます。その後、ドロップ数のカウントもリセットされて再び0からカウントし始めます。

自作スクリプトでチャンク毎で統計取りたい場合には今の仕様だと役立たないかもしれません。

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

確かにそうですね.番組変更時の通知を見落としていました.

番組毎の集計は現在の通知シーケンスで可能なので,チャンク毎の集計はなくても問題なさそうです.通知間隔の明示も必要なく,番組毎に集計する場合,event-end(もしくはevent-start)で累積値をリセットすることで番組毎の集計ができることを記述しておけば十分な気がします.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

チャンク毎の集計はチャンネル毎の統計と番組毎の統計の両方で使うつもりでした。

番組毎の統計はcurl、
チャンネル毎の統計はVictoriaMetrics、prometheus-community/json_exporter、Grafanaを組み合わせて使って値を見るつもりでした。

番組終了時にしかドロップ数を通知しない実装だと、チャンネル毎の統計の更新間隔が長くなり
いつドロップが起きたか分かりにくくなります。
1時間番組なら番組開始から1時間後に
チャンネル毎のドロップ数の更新になってしまうのでは?流石にそれでは更新間隔が長すぎるかと思います。
また、タイムシフト録画を番組途中で止めた場合もメッセージを送らないと、
現在の番組のドロップ数のデータが失われてしまいます。

逆に、番組終了時にはpacket-statsメッセージを送らず、
チャンク境界だけで通知する実装に変えるのはどうでしょうか?
メッセージのフォーマットに番組のIDを含めて、
番組境界付近の場合、メッセージを複数回送るか、
配列にしてドロップ数を複数回送り
前の番組のドロップ数と現在の番組のドロップ数の両方を送った方が
シンプルな実装にできそうに思えてきました。

Copy link
Member

@masnagam masnagam Jun 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

私の説明が良くなかったですね,すみません.

通知タイミングは現実装のチャンク境界と番組終了時で良いと思います.
番組毎に集計できることは必要ですが(メッセージ受信側で集計できるという意味で,mirakc-arib内で集計するという意味ではありません),チャンク毎の集計はできなくても問題ないだろうという私の感想でした.

チャンク境界だけで通知する実装に変えるのはどうでしょうか?

個人的には今の実装のままで良いのではないかと考えます.

例えば,チャンク境界でまとめて通知する場合,チャンク境界に達するまで前番組の集計結果が取得できません.
mirakcにはtimeshift.record-endedというイベントがあるのですが,この通知を受けたクライアントがその番組のドロップ数をweb endpoint経由で取得しようとしても取得できないタイミングが出てきます.

このような機能を実際に作って欲しいと要望しているわけではないのですが,mirakc-aribの通知タイミングを設計時に工夫しておけば,mirakc-aribの修正なしに後からmirakcに機能を追加することもある程度可能なのではないかと思っています.

Arguments:
<file>
Expand Down Expand Up @@ -1221,6 +1224,7 @@ void LoadOption(const Args& args, ServiceRecorderOption* opt) {
static const std::string kChunkSize = "--chunk-size";
static const std::string kNumChunks = "--num-chunks";
static const std::string kStartPos = "--start-pos";
static const std::string kPacketStats = "--packet-stats";

opt->sid = static_cast<uint16_t>(args.at(kSid).asLong());
opt->file = args.at(kFile).asString();
Expand Down Expand Up @@ -1258,9 +1262,13 @@ void LoadOption(const Args& args, ServiceRecorderOption* opt) {
std::abort();
}
}
if (args.at(kPacketStats).asBool()) {
opt->packet_stats = true;
}
MIRAKC_ARIB_INFO(
"ServiceRecorderOptions: sid={:04X} file={} chunk-size={} num-chunks={} start-pos={}",
opt->sid, opt->file, opt->chunk_size, opt->num_chunks, opt->start_pos);
"ServiceRecorderOptions: sid={:04X} file={} chunk-size={} num-chunks={} start-pos={} "
"packet-stats={}",
opt->sid, opt->file, opt->chunk_size, opt->num_chunks, opt->start_pos, opt->packet_stats);
}

void LoadOption(const Args& args, AirtimeTrackerOption* opt) {
Expand Down
102 changes: 102 additions & 0 deletions src/packet_stats_collector.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#pragma once

#include <string.h>
#include <array>
#include <cstdint>
#include "base.hh"

namespace {

class PacketStatsCollector final {
public:
PacketStatsCollector() {}
~PacketStatsCollector() {}

void CollectPacketStats(const ts::TSPacket& packet) {
auto pid = packet.getPID();
auto last_cc = stats[pid].last_cc;
auto cc = packet.getCC();
auto has_payload = packet.hasPayload();
auto tei = packet.getTEI();
stats[pid].last_cc = cc;
stats[pid].last_packet = packet;

if (packet.getScrambling()) {
++scrambled_packets_;
}

auto duplicate_found = false;

if (packet.getDiscontinuityIndicator() || packet.getPID() == ts::PID_NULL) {
// do nothing
} else if (tei) {
++error_packets_;
} else if (last_cc != ts::INVALID_CC) {
if (has_payload) {
if (last_cc == cc) {
if (stats[pid].last_packet != packet) {
// non-duplicate packet
++dropped_packets_;
} else {
// duplicate packet
duplicate_found = true;
}
} else {
// regular packet
uint8_t expectedCC = (last_cc + 1) & ts::CC_MASK;
if (expectedCC != cc) {
++dropped_packets_;
}
}
} else {
// Continuity counter should not increment if packet has no payload
if (last_cc != cc) {
++dropped_packets_;
}
}
}

if (duplicate_found) {
// duplicate packet is only allowed once
++stats[pid].duplicate_packets;
if (stats[pid].duplicate_packets > 1) {
++dropped_packets_;
}
} else {
stats[pid].duplicate_packets = 0;
}
}

void ResetPacketStats() {
error_packets_ = 0;
dropped_packets_ = 0;
scrambled_packets_ = 0;
}

uint64_t GetErrorPackets() const {
return error_packets_;
}

uint64_t GetDroppedPackets() const {
return dropped_packets_;
}

uint64_t GetScrambledPackets() const {
return scrambled_packets_;
}

private:
struct PacketStat {
uint8_t last_cc = ts::INVALID_CC;
uint8_t duplicate_packets = 0;
ts::TSPacket last_packet;
};
masnagam marked this conversation as resolved.
Show resolved Hide resolved

MIRAKC_ARIB_NON_COPYABLE(PacketStatsCollector);
std::array<PacketStat, ts::PID_MAX> stats;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

すべてのPIDが使われるわけではないので,無駄にメモリーを消費してしまう気がします.

    //!
    //! Maximum number of PID's (8192).
    //!
    constexpr PID PID_MAX = 1 << PID_BITS;

sizeof(PacketStat)は200B以下なので,1.6MB程度消費します.

大した量ではないので,このままでも私は構いませんが,気になるようならハッシュマップなどを使ってください.

uint64_t error_packets_ = 0;
uint64_t dropped_packets_ = 0;
uint64_t scrambled_packets_ = 0;
};

} // namespace
46 changes: 42 additions & 4 deletions src/service_recorder.hh
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "jsonl_source.hh"
#include "logging.hh"
#include "packet_sink.hh"
#include "packet_stats_collector.hh"
#include "tsduck_helper.hh"

#define MIRAKC_ARIB_SERVICE_RECORDER_TRACE(...) MIRAKC_ARIB_TRACE("service-recorder: " __VA_ARGS__)
Expand All @@ -28,6 +29,7 @@ struct ServiceRecorderOption final {
size_t chunk_size = 0;
size_t num_chunks = 0;
uint64_t start_pos = 0;
bool packet_stats = false;
};

class ServiceRecorder final : public PacketSink,
Expand Down Expand Up @@ -91,6 +93,10 @@ class ServiceRecorder final : public PacketSink,

demux_.feedPacket(packet);

if (option_.packet_stats) {
packet_stats_collector_.CollectPacketStats(packet);
}

switch (state_) {
case State::kPreparing:
return OnPreparing(packet);
Expand All @@ -113,6 +119,7 @@ class ServiceRecorder final : public PacketSink,
// The application may purge expired programs in the message handler for
// the `chunk` message. So, the program data must be updated before that.
SendEventUpdateMessage(eit_, now, pos);
SendPacketStatsMessage();
SendChunkMessage(now, pos);
}

Expand Down Expand Up @@ -310,17 +317,15 @@ class ServiceRecorder final : public PacketSink,
if (event_changed) {
MIRAKC_ARIB_SERVICE_RECORDER_WARN("Event#{:04X} has started before Event#{:04X} ends",
GetEvent(new_eit).event_id, GetEvent(eit).event_id);
UpdateEventBoundary(now, sink_->pos());
SendEventEndMessage(eit);
HandleEventEnd(now, eit);
SendEventStartMessage(new_eit);
} else {
if (IsUnspecifiedEventEndTime(GetEvent(eit))) {
// Continue recording as the current program until the event changes.
} else {
auto end_time = GetEventEndTime(GetEvent(eit));
if (now >= end_time) {
UpdateEventBoundary(end_time, sink_->pos());
SendEventEndMessage(eit);
HandleEventEnd(end_time, eit);
event_started_ = false; // wait for new event
}
}
Expand All @@ -340,6 +345,12 @@ class ServiceRecorder final : public PacketSink,
event_boundary_pos_ = pos;
}

void HandleEventEnd(const ts::Time& endTime, const std::shared_ptr<ts::EIT>& eit) {
UpdateEventBoundary(endTime, sink_->pos());
SendPacketStatsMessage();
Copy link
Member

@masnagam masnagam Jun 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

別のところでもコメントしましたが,この通知タイミングを仕様とする場合は,後で間違ってメソッドの呼び出しを移動してしまわないように,チャンク単位で集計情報を通知する旨のコメントをここに追加してください.

チャンク単位の集計は不要.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

番組毎に集計する場合,集計値通知後にSendEventEndMessageが呼び出されることが前提となるため,何らかコメントを残しておいたほうが良いかもしれません.

SendEventEndMessage(eit);
}

void SendStartMessage() {
MIRAKC_ARIB_SERVICE_RECORDER_INFO("Started recording SID#{:04X}", option_.sid);

Expand Down Expand Up @@ -411,6 +422,32 @@ class ServiceRecorder final : public PacketSink,
SendEventMessage("event-end", eit, event_boundary_time_, event_boundary_pos_);
}

void SendPacketStatsMessage() {
if (!option_.packet_stats) {
return;
}

auto error_packets = packet_stats_collector_.GetErrorPackets();
auto dropped_packets = packet_stats_collector_.GetDroppedPackets();
auto scrambled_packets = packet_stats_collector_.GetScrambledPackets();
MIRAKC_ARIB_SERVICE_RECORDER_INFO("PacketStats: Error: {}, Dropped {}, Scrambled: {}",
error_packets, dropped_packets, scrambled_packets);

rapidjson::Document doc(rapidjson::kObjectType);
auto& allocator = doc.GetAllocator();

rapidjson::Value data(rapidjson::kObjectType);
data.AddMember("errorPackets", error_packets, allocator);
data.AddMember("droppedPackets", dropped_packets, allocator);
data.AddMember("scrambledPackets", scrambled_packets, allocator);

doc.AddMember("type", "packet-stats", allocator);
doc.AddMember("data", data, allocator);

FeedDocument(doc);
packet_stats_collector_.ResetPacketStats();
}

void SendEventMessage(const std::string& type, const std::shared_ptr<ts::EIT>& eit,
const ts::Time& time, uint64_t pos) {
MIRAKC_ARIB_ASSERT(eit);
Expand Down Expand Up @@ -466,6 +503,7 @@ class ServiceRecorder final : public PacketSink,
ts::PID pmt_pid_ = ts::PID_NULL;
State state_ = State::kPreparing;
bool event_started_ = false;
PacketStatsCollector packet_stats_collector_;

MIRAKC_ARIB_NON_COPYABLE(ServiceRecorder);
};
Expand Down
1 change: 1 addition & 0 deletions test/cli_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ assert 0 "$MIRAKC_ARIB filter-program-metadata --sid=0xFFFF"
assert 0 "$MIRAKC_ARIB record-service --sid=1 --file=$TMPFILE --chunk-size=8192 --num-chunks=1"
assert 0 "$MIRAKC_ARIB record-service --sid=1 --file=$TMPFILE --chunk-size=8192 --num-chunks=1 --start-pos=0"
assert 0 "$MIRAKC_ARIB record-service --sid=1 --file=$TMPFILE --chunk-size=8192 --num-chunks=2 --start-pos=8192"
assert 0 "$MIRAKC_ARIB record-service --sid=1 --file=$TMPFILE --chunk-size=8192 --num-chunks=2 --packet-stats"
if [ -z "$CI" ]
then
# This test fails in GitHub Actions.
Expand Down
55 changes: 48 additions & 7 deletions test/service_recorder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ constexpr size_t kNumChunks = 2;
constexpr size_t kChunkSize = RingFileSink::kBufferSize * kNumBuffers;
constexpr uint64_t kRingSize = kChunkSize * kNumChunks;
const ServiceRecorderOption kOption{"/dev/null", 3, kChunkSize, kNumChunks};
struct ServiceRecorderTest : testing::TestWithParam<ServiceRecorderOption> {};
} // namespace

TEST(ServiceRecorderTest, NoPacket) {
Expand Down Expand Up @@ -136,8 +137,8 @@ TEST(ServiceRecorderTest, EventStart) {
EXPECT_TRUE(src.IsEmpty());
}

TEST(ServiceRecorderTest, EventProgress) {
ServiceRecorderOption option = kOption;
TEST_P(ServiceRecorderTest, EventProgress) {
ServiceRecorderOption option = GetParam();

TableSource src;
auto ring_sink = std::make_unique<MockRingSink>(option.chunk_size, option.num_chunks);
Expand Down Expand Up @@ -226,6 +227,18 @@ TEST(ServiceRecorderTest, EventProgress) {
MockJsonlSink::Stringify(doc));
return true;
});
if (option.packet_stats) {
EXPECT_CALL(*json_sink, HandleDocument).WillOnce([](const rapidjson::Document& doc) {
EXPECT_EQ(R"({"type":"packet-stats","data":{)"
R"("errorPackets":0,)"
R"("droppedPackets":0,)"
R"("scrambledPackets":0)"
R"(})"
R"(})",
MockJsonlSink::Stringify(doc));
return true;
});
}
EXPECT_CALL(*json_sink, HandleDocument).WillOnce([](const rapidjson::Document& doc) {
EXPECT_EQ(R"({"type":"chunk","data":{"chunk":{)"
R"("timestamp":1609426800000,"pos":16384)"
Expand Down Expand Up @@ -253,6 +266,18 @@ TEST(ServiceRecorderTest, EventProgress) {
MockJsonlSink::Stringify(doc));
return true;
});
if (option.packet_stats) {
EXPECT_CALL(*json_sink, HandleDocument).WillOnce([](const rapidjson::Document& doc) {
EXPECT_EQ(R"({"type":"packet-stats","data":{)"
R"("errorPackets":0,)"
R"("droppedPackets":0,)"
R"("scrambledPackets":0)"
R"(})"
R"(})",
MockJsonlSink::Stringify(doc));
return true;
});
}
EXPECT_CALL(*json_sink, HandleDocument).WillOnce([](const rapidjson::Document& doc) {
EXPECT_EQ(R"({"type":"chunk","data":{"chunk":{)"
R"("timestamp":1609426800000,"pos":0)"
Expand All @@ -267,16 +292,16 @@ TEST(ServiceRecorderTest, EventProgress) {
EXPECT_CALL(*ring_sink, End).WillOnce(testing::Return());
}

auto recorder = std::make_unique<ServiceRecorder>(kOption);
auto recorder = std::make_unique<ServiceRecorder>(option);
recorder->ServiceRecorder::Connect(std::move(ring_sink));
recorder->JsonlSource::Connect(std::move(json_sink));
src.Connect(std::move(recorder));
EXPECT_EQ(EXIT_SUCCESS, src.FeedPackets());
EXPECT_TRUE(src.IsEmpty());
}

TEST(ServiceRecorderTest, EventEnd) {
ServiceRecorderOption option = kOption;
TEST_P(ServiceRecorderTest, EventEnd) {
ServiceRecorderOption option = GetParam();

TableSource src;
auto file = std::make_unique<MockFile>();
Expand Down Expand Up @@ -353,6 +378,18 @@ TEST(ServiceRecorderTest, EventEnd) {
MockJsonlSink::Stringify(doc));
return true;
});
if (option.packet_stats) {
EXPECT_CALL(*json_sink, HandleDocument).WillOnce([](const rapidjson::Document& doc) {
EXPECT_EQ(R"({"type":"packet-stats","data":{)"
R"("errorPackets":0,)"
R"("droppedPackets":1,)"
R"("scrambledPackets":0)"
R"(})"
R"(})",
MockJsonlSink::Stringify(doc));
return true;
});
}
EXPECT_CALL(*json_sink, HandleDocument).WillOnce([](const rapidjson::Document& doc) {
EXPECT_EQ(R"({"type":"event-end","data":{)"
R"("originalNetworkId":1,)"
Expand Down Expand Up @@ -406,13 +443,13 @@ TEST(ServiceRecorderTest, EventEnd) {

auto ring =
std::make_unique<RingFileSink>(std::move(file), option.chunk_size, option.num_chunks);
auto recorder = std::make_unique<ServiceRecorder>(kOption);
auto recorder = std::make_unique<ServiceRecorder>(option);
recorder->ServiceRecorder::Connect(std::move(ring));
recorder->JsonlSource::Connect(std::move(json_sink));
src.Connect(std::move(recorder));
EXPECT_EQ(EXIT_SUCCESS, src.FeedPackets());
EXPECT_TRUE(src.IsEmpty());
}
};

TEST(ServiceRecorderTest, EventStartBeforeEventEnd) {
ServiceRecorderOption option = kOption;
Expand Down Expand Up @@ -791,3 +828,7 @@ TEST(ServiceRecorderTest, UnspecifiedEventEnd) {
EXPECT_EQ(EXIT_SUCCESS, src.FeedPackets());
EXPECT_TRUE(src.IsEmpty());
}

INSTANTIATE_TEST_SUITE_P(EventTestWithPacketStats, ServiceRecorderTest,
testing::Values(ServiceRecorderOption{"/dev/null", 3, kChunkSize, kNumChunks, 0, false},
ServiceRecorderOption{"/dev/null", 3, kChunkSize, kNumChunks, 0, true}));