-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: initial packet-stats implementation #77
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
#pragma once | ||
|
||
#include <string.h> | ||
#include <array> | ||
#include <cstdint> | ||
#include "base.hh" | ||
|
||
namespace { | ||
|
||
class PacketStatsCollector final { | ||
public: | ||
PacketStatsCollector() {} | ||
~PacketStatsCollector() {} | ||
|
||
void CollectPacketStats(const ts::TSPacket& packet) { | ||
auto pid = packet.getPID(); | ||
auto last_cc = stats[pid].last_cc; | ||
auto cc = packet.getCC(); | ||
auto has_payload = packet.hasPayload(); | ||
auto tei = packet.getTEI(); | ||
stats[pid].last_cc = cc; | ||
stats[pid].last_packet = packet; | ||
|
||
if (packet.getScrambling()) { | ||
++scrambled_packets_; | ||
} | ||
|
||
auto duplicate_found = false; | ||
|
||
if (packet.getDiscontinuityIndicator() || packet.getPID() == ts::PID_NULL) { | ||
// do nothing | ||
} else if (tei) { | ||
++error_packets_; | ||
} else if (last_cc != ts::INVALID_CC) { | ||
if (has_payload) { | ||
if (last_cc == cc) { | ||
if (stats[pid].last_packet != packet) { | ||
// non-duplicate packet | ||
++dropped_packets_; | ||
} else { | ||
// duplicate packet | ||
duplicate_found = true; | ||
} | ||
} else { | ||
// regular packet | ||
uint8_t expectedCC = (last_cc + 1) & ts::CC_MASK; | ||
if (expectedCC != cc) { | ||
++dropped_packets_; | ||
} | ||
} | ||
} else { | ||
// Continuity counter should not increment if packet has no payload | ||
if (last_cc != cc) { | ||
++dropped_packets_; | ||
} | ||
} | ||
} | ||
|
||
if (duplicate_found) { | ||
// duplicate packet is only allowed once | ||
++stats[pid].duplicate_packets; | ||
if (stats[pid].duplicate_packets > 1) { | ||
++dropped_packets_; | ||
} | ||
} else { | ||
stats[pid].duplicate_packets = 0; | ||
} | ||
} | ||
|
||
void ResetPacketStats() { | ||
error_packets_ = 0; | ||
dropped_packets_ = 0; | ||
scrambled_packets_ = 0; | ||
} | ||
|
||
uint64_t GetErrorPackets() const { | ||
return error_packets_; | ||
} | ||
|
||
uint64_t GetDroppedPackets() const { | ||
return dropped_packets_; | ||
} | ||
|
||
uint64_t GetScrambledPackets() const { | ||
return scrambled_packets_; | ||
} | ||
|
||
private: | ||
struct PacketStat { | ||
uint8_t last_cc = ts::INVALID_CC; | ||
uint8_t duplicate_packets = 0; | ||
ts::TSPacket last_packet; | ||
}; | ||
masnagam marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
MIRAKC_ARIB_NON_COPYABLE(PacketStatsCollector); | ||
std::array<PacketStat, ts::PID_MAX> stats; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. すべてのPIDが使われるわけではないので,無駄にメモリーを消費してしまう気がします. //!
//! Maximum number of PID's (8192).
//!
constexpr PID PID_MAX = 1 << PID_BITS; で 大した量ではないので,このままでも私は構いませんが,気になるようならハッシュマップなどを使ってください. |
||
uint64_t error_packets_ = 0; | ||
uint64_t dropped_packets_ = 0; | ||
uint64_t scrambled_packets_ = 0; | ||
}; | ||
|
||
} // namespace |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ | |
#include "jsonl_source.hh" | ||
#include "logging.hh" | ||
#include "packet_sink.hh" | ||
#include "packet_stats_collector.hh" | ||
#include "tsduck_helper.hh" | ||
|
||
#define MIRAKC_ARIB_SERVICE_RECORDER_TRACE(...) MIRAKC_ARIB_TRACE("service-recorder: " __VA_ARGS__) | ||
|
@@ -28,6 +29,7 @@ struct ServiceRecorderOption final { | |
size_t chunk_size = 0; | ||
size_t num_chunks = 0; | ||
uint64_t start_pos = 0; | ||
bool packet_stats = false; | ||
}; | ||
|
||
class ServiceRecorder final : public PacketSink, | ||
|
@@ -91,6 +93,10 @@ class ServiceRecorder final : public PacketSink, | |
|
||
demux_.feedPacket(packet); | ||
|
||
if (option_.packet_stats) { | ||
packet_stats_collector_.CollectPacketStats(packet); | ||
} | ||
|
||
switch (state_) { | ||
case State::kPreparing: | ||
return OnPreparing(packet); | ||
|
@@ -113,6 +119,7 @@ class ServiceRecorder final : public PacketSink, | |
// The application may purge expired programs in the message handler for | ||
// the `chunk` message. So, the program data must be updated before that. | ||
SendEventUpdateMessage(eit_, now, pos); | ||
SendPacketStatsMessage(); | ||
SendChunkMessage(now, pos); | ||
} | ||
|
||
|
@@ -310,17 +317,15 @@ class ServiceRecorder final : public PacketSink, | |
if (event_changed) { | ||
MIRAKC_ARIB_SERVICE_RECORDER_WARN("Event#{:04X} has started before Event#{:04X} ends", | ||
GetEvent(new_eit).event_id, GetEvent(eit).event_id); | ||
UpdateEventBoundary(now, sink_->pos()); | ||
SendEventEndMessage(eit); | ||
HandleEventEnd(now, eit); | ||
SendEventStartMessage(new_eit); | ||
} else { | ||
if (IsUnspecifiedEventEndTime(GetEvent(eit))) { | ||
// Continue recording as the current program until the event changes. | ||
} else { | ||
auto end_time = GetEventEndTime(GetEvent(eit)); | ||
if (now >= end_time) { | ||
UpdateEventBoundary(end_time, sink_->pos()); | ||
SendEventEndMessage(eit); | ||
HandleEventEnd(end_time, eit); | ||
event_started_ = false; // wait for new event | ||
} | ||
} | ||
|
@@ -340,6 +345,12 @@ class ServiceRecorder final : public PacketSink, | |
event_boundary_pos_ = pos; | ||
} | ||
|
||
void HandleEventEnd(const ts::Time& endTime, const std::shared_ptr<ts::EIT>& eit) { | ||
UpdateEventBoundary(endTime, sink_->pos()); | ||
SendPacketStatsMessage(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
チャンク単位の集計は不要. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 番組毎に集計する場合,集計値通知後に |
||
SendEventEndMessage(eit); | ||
} | ||
|
||
void SendStartMessage() { | ||
MIRAKC_ARIB_SERVICE_RECORDER_INFO("Started recording SID#{:04X}", option_.sid); | ||
|
||
|
@@ -411,6 +422,32 @@ class ServiceRecorder final : public PacketSink, | |
SendEventMessage("event-end", eit, event_boundary_time_, event_boundary_pos_); | ||
} | ||
|
||
void SendPacketStatsMessage() { | ||
if (!option_.packet_stats) { | ||
return; | ||
} | ||
|
||
auto error_packets = packet_stats_collector_.GetErrorPackets(); | ||
auto dropped_packets = packet_stats_collector_.GetDroppedPackets(); | ||
auto scrambled_packets = packet_stats_collector_.GetScrambledPackets(); | ||
MIRAKC_ARIB_SERVICE_RECORDER_INFO("PacketStats: Error: {}, Dropped {}, Scrambled: {}", | ||
error_packets, dropped_packets, scrambled_packets); | ||
|
||
rapidjson::Document doc(rapidjson::kObjectType); | ||
auto& allocator = doc.GetAllocator(); | ||
|
||
rapidjson::Value data(rapidjson::kObjectType); | ||
data.AddMember("errorPackets", error_packets, allocator); | ||
data.AddMember("droppedPackets", dropped_packets, allocator); | ||
data.AddMember("scrambledPackets", scrambled_packets, allocator); | ||
|
||
doc.AddMember("type", "packet-stats", allocator); | ||
doc.AddMember("data", data, allocator); | ||
|
||
FeedDocument(doc); | ||
packet_stats_collector_.ResetPacketStats(); | ||
} | ||
|
||
void SendEventMessage(const std::string& type, const std::shared_ptr<ts::EIT>& eit, | ||
const ts::Time& time, uint64_t pos) { | ||
MIRAKC_ARIB_ASSERT(eit); | ||
|
@@ -466,6 +503,7 @@ class ServiceRecorder final : public PacketSink, | |
ts::PID pmt_pid_ = ts::PID_NULL; | ||
State state_ = State::kPreparing; | ||
bool event_started_ = false; | ||
PacketStatsCollector packet_stats_collector_; | ||
|
||
MIRAKC_ARIB_NON_COPYABLE(ServiceRecorder); | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ヘルプの
JSON Messages
のセクションに,サンプルとフィールドの説明を追加してください.通知される値は,前回からの増分であることも記述しておいたほうがよいだろうと思います.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
現在の仕様では,通知の間隔を明示していません.これでもドロップ数はカウント可能なので問題ありません.
ただ,特定チャンク内のドロップ数として区間を明示しておいたほうが,情報の利用価値がより高くなるかもしれません.例えば,ある時ドロップが通知されたとして,現仕様では区間が明示されていないので,前回通知されたストリーム位置の「近辺」から,今回通知されたストリーム位置の「近辺」までにドロップがあったということになります.
現実装ではチャンク境界で通知するため,実装上はチャンクごとに通知しています.個人的には,この動作を仕様として通知間隔を仕様上明示したほうが便利に使える場面が増えるような気がします.
この場合,通知メッセージにチャンク番号のフィールドを追加したほうが良いと思います.
mirakcのweb endpointでは,レコード単位の統計情報となるため,このように仕様を変更しても何の恩恵もないのですが,mirakc-aribコマンドの仕様としては,通知間隔を明示したほうがよいのではないかと思います.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mirakc-aribとJSONLを扱うツール(jq等)を使うスクリプトを自作し、
チャンク毎のドロップの状況を見るような場合を想定してでしょうか?
チャンク境界に到達した時だけでなく、番組が終了したときもpacket-statsの通知があります。
番組毎のドロップを収集する際、番組終了付近でドロップがあると
実際には前の番組のドロップなのにも関わらず、次の番組のドロップが加算されるというような問題を防ぐためです。
この場合、次のチャンクに到達していないにも関わらず通知されます。その後、ドロップ数のカウントもリセットされて再び0からカウントし始めます。
自作スクリプトでチャンク毎で統計取りたい場合には今の仕様だと役立たないかもしれません。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
確かにそうですね.番組変更時の通知を見落としていました.
番組毎の集計は現在の通知シーケンスで可能なので,チャンク毎の集計はなくても問題なさそうです.通知間隔の明示も必要なく,番組毎に集計する場合,
event-end
(もしくはevent-start
)で累積値をリセットすることで番組毎の集計ができることを記述しておけば十分な気がします.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
チャンク毎の集計はチャンネル毎の統計と番組毎の統計の両方で使うつもりでした。
番組毎の統計はcurl、
チャンネル毎の統計はVictoriaMetrics、prometheus-community/json_exporter、Grafanaを組み合わせて使って値を見るつもりでした。
番組終了時にしかドロップ数を通知しない実装だと、チャンネル毎の統計の更新間隔が長くなり
いつドロップが起きたか分かりにくくなります。
1時間番組なら番組開始から1時間後に
チャンネル毎のドロップ数の更新になってしまうのでは?流石にそれでは更新間隔が長すぎるかと思います。
また、タイムシフト録画を番組途中で止めた場合もメッセージを送らないと、
現在の番組のドロップ数のデータが失われてしまいます。
逆に、番組終了時にはpacket-statsメッセージを送らず、
チャンク境界だけで通知する実装に変えるのはどうでしょうか?
メッセージのフォーマットに番組のIDを含めて、
番組境界付近の場合、メッセージを複数回送るか、
配列にしてドロップ数を複数回送り
前の番組のドロップ数と現在の番組のドロップ数の両方を送った方が
シンプルな実装にできそうに思えてきました。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
私の説明が良くなかったですね,すみません.
通知タイミングは現実装のチャンク境界と番組終了時で良いと思います.
番組毎に集計できることは必要ですが(メッセージ受信側で集計できるという意味で,mirakc-arib内で集計するという意味ではありません),チャンク毎の集計はできなくても問題ないだろうという私の感想でした.
個人的には今の実装のままで良いのではないかと考えます.
例えば,チャンク境界でまとめて通知する場合,チャンク境界に達するまで前番組の集計結果が取得できません.
mirakcにはtimeshift.record-endedというイベントがあるのですが,この通知を受けたクライアントがその番組のドロップ数をweb endpoint経由で取得しようとしても取得できないタイミングが出てきます.
このような機能を実際に作って欲しいと要望しているわけではないのですが,mirakc-aribの通知タイミングを設計時に工夫しておけば,mirakc-aribの修正なしに後からmirakcに機能を追加することもある程度可能なのではないかと思っています.