Skip to content

Commit

Permalink
Refactor CompilerThreadPool out of OpenGLDriver (#7067)
Browse files Browse the repository at this point in the history
  • Loading branch information
bejado authored Aug 12, 2023
1 parent f537f62 commit 4d703e3
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 139 deletions.
2 changes: 2 additions & 0 deletions filament/backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ set(SRCS
src/CircularBuffer.cpp
src/CommandBufferQueue.cpp
src/CommandStream.cpp
src/CompilerThreadPool.cpp
src/Driver.cpp
src/Handle.cpp
src/HandleAllocator.cpp
Expand All @@ -55,6 +56,7 @@ set(PRIVATE_HDRS
include/private/backend/PlatformFactory.h
include/private/backend/SamplerGroup.h
src/CommandStreamDispatcher.h
src/CompilerThreadPool.h
src/DataReshaper.h
src/DriverBase.h
)
Expand Down
131 changes: 131 additions & 0 deletions filament/backend/src/CompilerThreadPool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright (C) 2023 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "CompilerThreadPool.h"

#include <memory>

namespace filament::backend {

using namespace utils;

ProgramToken::~ProgramToken() = default;

CompilerThreadPool::CompilerThreadPool() noexcept = default;

CompilerThreadPool::~CompilerThreadPool() noexcept {
assert_invariant(mCompilerThreads.empty());
assert_invariant(mQueues[0].empty());
assert_invariant(mQueues[1].empty());
}

void CompilerThreadPool::init(uint32_t threadCount, JobSystem::Priority priority,
ThreadSetup&& threadSetup) noexcept {
auto setup = std::make_shared<ThreadSetup>(std::move(threadSetup));

for (size_t i = 0; i < threadCount; i++) {
mCompilerThreads.emplace_back([this, priority, setup]() {
// give the thread a name
JobSystem::setThreadName("CompilerThreadPool");
// run at a slightly lower priority than other filament threads
JobSystem::setThreadPriority(priority);

(*setup)();

// process jobs from the queue until we're asked to exit
while (!mExitRequested) {
std::unique_lock lock(mQueueLock);
mQueueCondition.wait(lock, [this]() {
return mExitRequested ||
(!std::all_of( std::begin(mQueues), std::end(mQueues),
[](auto&& q) { return q.empty(); }));
});
if (!mExitRequested) {
Job job;
// use the first queue that's not empty
auto& queue = [this]() -> auto& {
for (auto& q: mQueues) {
if (!q.empty()) {
return q;
}
}
return mQueues[0]; // we should never end-up here.
}();
assert_invariant(!queue.empty());
std::swap(job, queue.front().second);
queue.pop_front();

// execute the job without holding any locks
lock.unlock();
job();
}
}
});

}
}

auto CompilerThreadPool::find(program_token_t const& token) -> std::pair<Queue&, Queue::iterator> {
for (auto&& q: mQueues) {
auto pos = std::find_if(q.begin(), q.end(), [&token](auto&& item) {
return item.first == token;
});
if (pos != q.end()) {
return { q, pos };
}
}
// this can happen if the program is being processed right now
return { mQueues[0], mQueues[0].end() };
}

auto CompilerThreadPool::dequeue(program_token_t const& token) -> Job {
std::unique_lock const lock(mQueueLock);
Job job;
auto&& [q, pos] = find(token);
if (pos != q.end()) {
std::swap(job, pos->second);
q.erase(pos);
}
return job;
}

void CompilerThreadPool::queue(CompilerPriorityQueue priorityQueue,
program_token_t const& token, Job&& job) {
std::unique_lock const lock(mQueueLock);
mQueues[size_t(priorityQueue)].emplace_back(token, std::move(job));
mQueueCondition.notify_one();
}

void CompilerThreadPool::terminate() noexcept {
std::unique_lock lock(mQueueLock);
mExitRequested = true;
mQueueCondition.notify_all();
lock.unlock();

for (auto& thread: mCompilerThreads) {
if (thread.joinable()) {
thread.join();
}
}
mCompilerThreads.clear();

// Clear all the queues, dropping the remaining jobs. This relies on the jobs being cancelable.
for (auto&& q : mQueues) {
q.clear();
}
}

} // namespace filament::backend
68 changes: 68 additions & 0 deletions filament/backend/src/CompilerThreadPool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (C) 2023 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef TNT_FILAMENT_BACKEND_COMPILERTHREADPOOL_H
#define TNT_FILAMENT_BACKEND_COMPILERTHREADPOOL_H

#include <backend/DriverEnums.h>

#include <utils/Invocable.h>
#include <utils/JobSystem.h>

#include <array>
#include <atomic>
#include <deque>
#include <memory>
#include <utility>
#include <vector>

namespace filament::backend {

struct ProgramToken {
virtual ~ProgramToken();
};

using program_token_t = std::shared_ptr<ProgramToken>;

class Platform;

class CompilerThreadPool {
public:
CompilerThreadPool() noexcept;
~CompilerThreadPool() noexcept;
using Job = utils::Invocable<void()>;
using ThreadSetup = utils::Invocable<void()>;
void init(uint32_t threadCount, utils::JobSystem::Priority priority,
ThreadSetup&& threadSetup) noexcept;
void terminate() noexcept;
void queue(CompilerPriorityQueue priorityQueue, program_token_t const& token, Job&& job);
Job dequeue(program_token_t const& token);

private:
using Queue = std::deque<std::pair<program_token_t, Job>>;
std::vector<std::thread> mCompilerThreads;
std::atomic_bool mExitRequested{false};
std::mutex mQueueLock;
std::condition_variable mQueueCondition;
std::array<Queue, 2> mQueues;
// lock must be held for methods below
std::pair<Queue&, Queue::iterator> find(program_token_t const& token);
};

} // namespace filament::backend

#endif // TNT_FILAMENT_BACKEND_COMPILERTHREADPOOL_H

Loading

0 comments on commit 4d703e3

Please sign in to comment.