Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Internal pid filtering doesn't work with last changes #174

Open
lars18th opened this issue Nov 13, 2022 · 9 comments
Open

[BUG] Internal pid filtering doesn't work with last changes #174

lars18th opened this issue Nov 13, 2022 · 9 comments
Labels
Fixed Testers Wanted If like to help out, please test this

Comments

@lars18th
Copy link
Contributor

Hi @Barracuda09 ,

I see your efforts to cleanup the internal pid filtering code. And I agree with the changes. However, with last changes the internal pid filtering with the Child PIPE is broken (and I feel with the Streamer input too). The problem is this:

  • When some packets are filtered out, then the buffer is not flushed when needed. And the observed behaviour is that the SAT>IP client loses the correct synchronization to reproduce the content.

The root cause is that you've removed the loop to re-read more data when the buffer is not full. I can understand that you want to maintain the code clean and easy to change it. However, I feel we need to accept a compromise. Let me first to comment one edge case that is the cause to implement the previously commented function mpegts::PacketBuffer::markToFlush() that now is removed:

  • The edge case is when you're reading a TS stream with a very low bitrate. One example is when the SAT>IP client requests only pid=0 (the PAT) table. This table could have a repetition rate of seconds, perhaps every 5 seconds. And the input perhaps have only PSI data. Then it could be (and this is true in several cases) that you receive some RTP packets time to time. For example, you receive 1 or 2 chunks of 7*188 TS packets at a rate of 1 per second. But imagine that you filter out unnecessary packets and leave only 1 (the PAT table). In this case the you're waiting until you complete one full RTP packet. And that's 7 TS packets, that in time represents 35 seconds. Because you only receive one PAT packet every 5 seconds. So the solution here is to flush the PacketBuffer at some rate even it is not full (aka with 7 TS packets).

Perhaps you could find this example very edge. But nothing more far than the reality. When a SAT>IP client is doing a scanning, the client is only interested on some PSI tables. And in some networks the repetition rate is very scarse. So in this case the scanning time is very long or it fails if you don't flush the TS packets very soon.

I hope you now understand the need for flush PacketBuffer content when it has less than 7 TS packets. Therefore, related to this question you agree with the idea to restore the function mpegts::PacketBuffer::markToFlush() and the associated code? If not then please comment about the alternatives that you see.

Another question is how to do in the concrete functions of Device::readFullTSPacket() when we apply the internal pid filtering. I feel you think that when you return false, then in this case in the next loop the code will continue filling the same PacketBuffer at the same point. But from my realworld tests this is not true. And for this reason I've implemented a code that rereads more data if the buffer is not full... until it will be completed or if a lot of time has passed. In this last case a flushing of the packet it's necessary. This is my behaviour for the Child PIPE input. And my idea is to complete the code (not already implemented) for the rest of inputs. For example, for the Streamer input my code will not check if more data is pending in the socket. And instead of doing that the buffer is always flushed. However, this is WIP code. So please not consider it finished.

The question is that we need to be sure of the best behaviour when some packets are filtered out from the reading buffer. What's your point of view? Perhaps I'm completly wrong in my assumptions. And as you have created the code, then I'm missing some important aspects. So please comment your opinion.

Regards.

@Barracuda09
Copy link
Owner

Hi @lars18th

Yes I understand your comment about low bitrate streams, but I feel that this has always been like this (I realize now).

But my point is, we need to change this in a different way. And I think it needs to go here (some how):

void StreamThreadBase::readDataFromInputDevice(StreamClient &client) {
const input::SpDevice inputDevice = _stream.getInputDevice();
size_t availableSize = (MAX_BUF - (_writeIndex - _readIndex));
if (availableSize > MAX_BUF) {
availableSize %= MAX_BUF;
}
// SI_LOG_DEBUG("Frontend: @#1, PacketBuffer MAX @#2 W @#3 R @#4 S @#5", _stream.getFeID(), MAX_BUF, _writeIndex, _readIndex, availableSize);
if (inputDevice->isDataAvailable() && availableSize > 1) {
if (inputDevice->readFullTSPacket(_tsBuffer[_writeIndex])) {
#ifdef LIBDVBCSA
decrypt::dvbapi::SpClient decrypt = _stream.getDecryptDevice();
if (decrypt != nullptr) {
decrypt->decrypt(_stream.getFeIndex(), _stream.getFeID(), _tsBuffer[_writeIndex]);
}
#endif
// goto next, so inc write index
++_writeIndex;
_writeIndex %= MAX_BUF;
// reset next
_tsBuffer[_writeIndex].reset();
}
}
// calculate interval
_t2 = std::chrono::steady_clock::now();
const unsigned long interval = std::chrono::duration_cast<std::chrono::microseconds>(_t2 - _t1).count();
if (interval > _sendInterval) {
_t1 = _t2;
if (_tsBuffer[_readIndex].isReadyToSend()) {
if (writeDataToOutputDevice(_tsBuffer[_readIndex], client)) {
// inc read index only when send is successful
++_readIndex;
_readIndex %= MAX_BUF;
}
} else if (_signalLock) {
writeDataToOutputDevice(_tsEmpty, client);
}
}
}

Here is already a time interval that will send null packets and maybe we need to flush it here. Then it makes more sense to me.

@lars18th
Copy link
Contributor Author

Hi @Barracuda09 ,

Thank you for your answer. From it I feel you're with me in the idea to discuss this in deep before implementing something uggly. However, first of all: the current code doesn't work. The internal filtering at time is buggy (I need to deactivate it in the configuration of the Frontend to obtain a working output). So please, take this in consideration. From my point of view is not a problem to leave it in this situation until the final implementation will be architectured. And I understand that you have the best idea of the code, so I'll follow your suggestions.

With the objective to architecture the best I'll suggest to discuss these questions:

  1. Restore or not the function mpegts::PacketBuffer::markToFlush(). Even though we don't use it at all (now or in the future), I feel is not a problem to maintain it. The only related code is the function PacketBuffer::isReadyToSend(). And perhaps this function can help implementing something (now or in the future). This is acceptable for you?
  2. Who needs to manage the timeout of the reading buffer? The StreamThreadBase class or the concrete Device? You suggest the first. But from my point of view, two different timeouts have sense. The READ timeout and the WRITE timeout. The write timeout it's necessary to send something to the client when no data is available. Because if you don't do it then the client could think that the stream is stalled or the connection is lost. But this is different from a READ timeout from the source. Why? Because if you're doing some filtering in the middle, then the read device requires to send something to the client if insufficient data is available. I'm right with this idea? The fact of sending a null packet to the client because the WRITE timeout I feel is not the same as sending an incomplete (less than 7 TS packets) datagram to the client. Do you think so?
  3. Even though you want to maintain the code well structured I feel that the reading in the function Device::readFullTSPacket() with any device requires to be very efficient. Therefore, I feel that is not questionable that if after the filtering it's more space available in the (reading) buffer, then a checking of the incoming source has sense to try to reread more data if it's available. And this could be independent of any upper code in StreamThreadBase::readDataFromInputDevice() to handle a flush of the buffer. It's that in line with your proposed idea?

Please, try to clarify these questions.
Regards.

@Barracuda09
Copy link
Owner

Hi @lars18th

My main problem (i saw) with not handeling full buffers is that this will skew the count of packets because it will go multiple times through the filterData(..) and for that matter also if you filter out PIDs. And maybe even collecting table data will be affected.
So maybe there needs to be something to check the packet is already processed (filtered and collected).

But I understand now this was always a problem with slow pid rates (scanning probably) that wil not fill the buffer that fast.

I have to think about it some more

@lars18th
Copy link
Contributor Author

Hi @Barracuda09 ,

OK. Then the problem is more complex. However, please try to confirm or rebate this:

  • We can agree that to be efficient then every concrete device requires to reread after filtering if more data is available in the function Device::readFullTSPacket()?

With this assuption we can reimplement the code of the function Filter::filterData() doing this:

  • First filter out, and not collect data if the buffer is not full (7 packets) or is not flushed.
  • In case of full buffer or flushing buffer, then collect (process) the data one time.

You can see that then, when the device is reading, it can call to Filter::filterData() more than one time without troubles. And the only requirement is that it ends the internal loop with a full buffer or flushing it.

What you think about this approach?

@lars18th
Copy link
Contributor Author

Hi @Barracuda09 ,

The internal pid filtering is so relevant, bacause after supporting it (wihtout errors or troubles), my next objective is to support the external pid filtering for the Child PIPE input. I see your recent commit 2a5899a and I'm using HDHRs too. So what I want is to drive them including the control of the pid filtering inside the device. I've more or less the idea to complete it. So please, could you try to define how to resolve the current issues with the internal pid filtering? I'll follow your rescomendations but I feel that we need to advance and not stay busy. You agree?

Regards.

@Barracuda09
Copy link
Owner

Hi @lars18th

Yes I think too we need to resolve this. The problem is that I still trying to determind how to handle this.
During my test this weekend the current soft filter did not work as expected, so I changed it to what it is now and which seem to work beter in my situation. But your argument with slow data rates is something I have not thought about, but whas probably always an issue.

Problem is when not handeling full buffers

  • skew in CC and CC-Errors (more cosmetic but ugly)

I will look again into the markToFlush method you had introduced.

Kind regards

@lars18th
Copy link
Contributor Author

Hi @Barracuda09 ,

I can't say why your tests aren't working as expected. In my case is the opposite: the current code doesn't work. My environment it's one HDHRs with DVB-T/C tuners. I configure them (using a script) to send the Full Transport Stream. Then with the Child PIPE and mapping.m3u I'm serving some DVB-S SAT>IP clients. Whats the problem now? With the current code state a lot of CC-Errors appears. With my original code all is going fine. The difference? Your last change of the ::readFullTSPacket() function for the Child PIPE input (also your similar previous change for the Stream input). Now after the filtering, if more data is available you aren't rereading to complete the buffer. That this is root cause of the trouble. And not the low bitrate edge case. They're different problems!

So, please try to accept some simple changes to advance. My proposal:

  1. In the implementation of Device::readFullTSPacket() for Child PIPE restablish the loop for rereading.
  2. Restore the function mpegts::PacketBuffer::markToFlush() even if we don't use it. Because in the meantime during the develop we can use it for example for sending less than 7 TS packets. I admit that perhaps in the final implementation it could be preferable to use another best strategy. However, the function is useful as a failback.

Please consider this.
Thank you.

@lars18th
Copy link
Contributor Author

Hi @Barracuda09 ,

Please check my PR #176 . In this I've simplified a lot the code. And I've incorporated your idea to leave to the function StreamThreadBase::readDataFromInputDevice() the work of flushing a packet. Therefore, the only difference with the previous code (apart of the flush by the timeout) is that I've added the reread in Device::readFullTSPacket(). This not only is more efficient, but also less error-prone.

So please, check it. With this I've restored the functionality for the Child PIPE input.
If you agree with it and merge it, then after I'll update for the Stream input.
Regards.

@Barracuda09 Barracuda09 added Fixed Testers Wanted If like to help out, please test this labels Nov 24, 2022
@lars18th
Copy link
Contributor Author

Hi @Barracuda09 ,

Please read my comment about last commit: #180 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Fixed Testers Wanted If like to help out, please test this
Projects
None yet
Development

No branches or pull requests

2 participants