Skip to content

Commit 2d42c74

Browse files
author
Chris Yang
authored
Replace Pipeline's ProduceToFront with ProduceIfEmpty to handle thread merging. (#17122)
1 parent 77e6d32 commit 2d42c74

File tree

3 files changed

+31
-35
lines changed

3 files changed

+31
-35
lines changed

shell/common/pipeline.h

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -111,16 +111,22 @@ class Pipeline : public fml::RefCountedThreadSafe<Pipeline<R>> {
111111
GetNextPipelineTraceID()}; // trace id
112112
}
113113

114-
// Pushes task to the front of the pipeline.
115-
//
116-
// If we exceed the depth completing this continuation, we drop the
117-
// last frame to preserve the depth of the pipeline.
118-
//
119-
// Note: Use |Pipeline::Produce| where possible. This should only be
120-
// used to en-queue high-priority resources.
121-
ProducerContinuation ProduceToFront() {
114+
// Create a `ProducerContinuation` that will only push the task if the queue
115+
// is empty.
116+
// Prefer using |Produce|. ProducerContinuation returned by this method
117+
// doesn't guarantee that the frame will be rendered.
118+
ProducerContinuation ProduceIfEmpty() {
119+
if (!empty_.TryWait()) {
120+
return {};
121+
}
122+
++inflight_;
123+
FML_TRACE_COUNTER("flutter", "Pipeline Depth",
124+
reinterpret_cast<int64_t>(this), //
125+
"frames in flight", inflight_.load() //
126+
);
127+
122128
return ProducerContinuation{
123-
std::bind(&Pipeline::ProducerCommitFront, this, std::placeholders::_1,
129+
std::bind(&Pipeline::ProducerCommitIfEmpty, this, std::placeholders::_1,
124130
std::placeholders::_2), // continuation
125131
GetNextPipelineTraceID()}; // trace id
126132
}
@@ -181,13 +187,16 @@ class Pipeline : public fml::RefCountedThreadSafe<Pipeline<R>> {
181187
available_.Signal();
182188
}
183189

184-
void ProducerCommitFront(ResourcePtr resource, size_t trace_id) {
190+
void ProducerCommitIfEmpty(ResourcePtr resource, size_t trace_id) {
185191
{
186192
std::scoped_lock lock(queue_mutex_);
187-
queue_.emplace_front(std::move(resource), trace_id);
188-
while (queue_.size() > depth_) {
189-
queue_.pop_back();
193+
if (!queue_.empty()) {
194+
// Bail if the queue is not empty, opens up spaces to produce other
195+
// frames.
196+
empty_.Signal();
197+
return;
190198
}
199+
queue_.emplace_back(std::move(resource), trace_id);
191200
}
192201

193202
// Ensure the queue mutex is not held as that would be a pessimization.

shell/common/pipeline_unittests.cc

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -89,46 +89,34 @@ TEST(PipelineTest, PushingMultiProcessesInOrder) {
8989
ASSERT_EQ(consume_result_2, PipelineConsumeResult::Done);
9090
}
9191

92-
TEST(PipelineTest, PushingToFrontOverridesOrder) {
92+
TEST(PipelineTest, ProduceIfEmptyDoesNotConsumeWhenQueueIsNotEmpty) {
9393
const int depth = 2;
9494
fml::RefPtr<IntPipeline> pipeline = fml::MakeRefCounted<IntPipeline>(depth);
9595

9696
Continuation continuation_1 = pipeline->Produce();
97-
Continuation continuation_2 = pipeline->ProduceToFront();
97+
Continuation continuation_2 = pipeline->ProduceIfEmpty();
9898

9999
const int test_val_1 = 1, test_val_2 = 2;
100100
continuation_1.Complete(std::make_unique<int>(test_val_1));
101101
continuation_2.Complete(std::make_unique<int>(test_val_2));
102102

103103
PipelineConsumeResult consume_result_1 = pipeline->Consume(
104-
[&test_val_2](std::unique_ptr<int> v) { ASSERT_EQ(*v, test_val_2); });
105-
ASSERT_EQ(consume_result_1, PipelineConsumeResult::MoreAvailable);
106-
107-
PipelineConsumeResult consume_result_2 = pipeline->Consume(
108104
[&test_val_1](std::unique_ptr<int> v) { ASSERT_EQ(*v, test_val_1); });
109-
ASSERT_EQ(consume_result_2, PipelineConsumeResult::Done);
105+
ASSERT_EQ(consume_result_1, PipelineConsumeResult::Done);
110106
}
111107

112-
TEST(PipelineTest, PushingToFrontDropsLastResource) {
113-
const int depth = 2;
108+
TEST(PipelineTest, ProduceIfEmptySuccessfulIfQueueIsEmpty) {
109+
const int depth = 1;
114110
fml::RefPtr<IntPipeline> pipeline = fml::MakeRefCounted<IntPipeline>(depth);
115111

116-
Continuation continuation_1 = pipeline->Produce();
117-
Continuation continuation_2 = pipeline->Produce();
118-
Continuation continuation_3 = pipeline->ProduceToFront();
112+
Continuation continuation_1 = pipeline->ProduceIfEmpty();
119113

120-
const int test_val_1 = 1, test_val_2 = 2, test_val_3 = 3;
114+
const int test_val_1 = 1;
121115
continuation_1.Complete(std::make_unique<int>(test_val_1));
122-
continuation_2.Complete(std::make_unique<int>(test_val_2));
123-
continuation_3.Complete(std::make_unique<int>(test_val_3));
124116

125117
PipelineConsumeResult consume_result_1 = pipeline->Consume(
126-
[&test_val_3](std::unique_ptr<int> v) { ASSERT_EQ(*v, test_val_3); });
127-
ASSERT_EQ(consume_result_1, PipelineConsumeResult::MoreAvailable);
128-
129-
PipelineConsumeResult consume_result_2 = pipeline->Consume(
130118
[&test_val_1](std::unique_ptr<int> v) { ASSERT_EQ(*v, test_val_1); });
131-
ASSERT_EQ(consume_result_2, PipelineConsumeResult::Done);
119+
ASSERT_EQ(consume_result_1, PipelineConsumeResult::Done);
132120
}
133121

134122
} // namespace testing

shell/common/rasterizer.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,8 @@ void Rasterizer::Draw(fml::RefPtr<Pipeline<flutter::LayerTree>> pipeline) {
127127
// if the raster status is to resubmit the frame, we push the frame to the
128128
// front of the queue and also change the consume status to more available.
129129
if (raster_status == RasterStatus::kResubmit) {
130-
auto front_continuation = pipeline->ProduceToFront();
130+
auto front_continuation = pipeline->ProduceIfEmpty();
131131
front_continuation.Complete(std::move(resubmitted_layer_tree_));
132-
consume_result = PipelineConsumeResult::MoreAvailable;
133132
} else if (raster_status == RasterStatus::kEnqueuePipeline) {
134133
consume_result = PipelineConsumeResult::MoreAvailable;
135134
}

0 commit comments

Comments
 (0)