Skip to content

Commit c69655d

Browse files
new commit because lots of changes, generated protos and added metadata, support for lifecycle management
1 parent e9eb057 commit c69655d

File tree

7 files changed

+342
-176
lines changed

7 files changed

+342
-176
lines changed

nativelink-proto/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ load(
1111
PROTO_NAMES = [
1212
"build.bazel.remote.asset.v1",
1313
"build.bazel.remote.execution.v2",
14+
"build.bazel.remote.execution.worker_protocol",
1415
"build.bazel.semver",
1516
"com.github.trace_machina.nativelink.remote_execution",
1617
"com.github.trace_machina.nativelink.events",
@@ -80,6 +81,7 @@ genrule(
8081
srcs = [
8182
"build/bazel/remote/asset/v1/remote_asset.proto",
8283
"build/bazel/remote/execution/v2/remote_execution.proto",
84+
"build/bazel/remote/execution/worker_protocol.proto",
8385
"build/bazel/semver/semver.proto",
8486
"com/github/trace_machina/nativelink/remote_execution/events.proto",
8587
"com/github/trace_machina/nativelink/remote_execution/worker_api.proto",
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright 2022 The NativeLink Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// This file is @generated by prost-build.
16+
/// Work request sent to persistent worker
17+
#[derive(Clone, PartialEq, ::prost::Message)]
18+
pub struct WorkRequest {
19+
/// Command-line arguments for the worker
20+
#[prost(string, repeated, tag = "1")]
21+
pub arguments: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
22+
/// Input files for this request
23+
#[prost(message, repeated, tag = "2")]
24+
pub inputs: ::prost::alloc::vec::Vec<Input>,
25+
/// Unique request ID for this work item
26+
#[prost(string, tag = "3")]
27+
pub request_id: ::prost::alloc::string::String,
28+
/// Whether to cancel this request
29+
#[prost(bool, tag = "4")]
30+
pub cancel: bool,
31+
/// Verbosity level for logging
32+
#[prost(int32, tag = "5")]
33+
pub verbosity: i32,
34+
/// Sandbox directory for this request
35+
#[prost(string, tag = "6")]
36+
pub sandbox_dir: ::prost::alloc::string::String,
37+
}
38+
/// Input file for work request
39+
#[derive(Clone, PartialEq, ::prost::Message)]
40+
pub struct Input {
41+
/// Path to the input file
42+
#[prost(string, tag = "1")]
43+
pub path: ::prost::alloc::string::String,
44+
/// Digest of the file content
45+
#[prost(bytes = "bytes", tag = "2")]
46+
pub digest: ::prost::bytes::Bytes,
47+
}
48+
/// Work response from persistent worker
49+
#[derive(Clone, PartialEq, ::prost::Message)]
50+
pub struct WorkResponse {
51+
/// Exit code of the work (0 = success)
52+
#[prost(int32, tag = "1")]
53+
pub exit_code: i32,
54+
/// Output from the worker
55+
#[prost(string, tag = "2")]
56+
pub output: ::prost::alloc::string::String,
57+
/// Request ID this response corresponds to
58+
#[prost(string, tag = "3")]
59+
pub request_id: ::prost::alloc::string::String,
60+
/// Whether the request was cancelled
61+
#[prost(bool, tag = "4")]
62+
pub was_cancelled: bool,
63+
}

nativelink-proto/genproto/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ pub mod build {
4545
pub mod v2 {
4646
include!("build.bazel.remote.execution.v2.pb.rs");
4747
}
48+
pub mod worker_protocol {
49+
include!("build.bazel.remote.execution.worker_protocol.pb.rs");
50+
}
4851
}
4952
}
5053
pub mod semver {

nativelink-scheduler/src/api_worker_scheduler.rs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -151,13 +151,18 @@ impl ApiWorkerSchedulerImpl {
151151
let worker_id = worker.id.clone();
152152

153153
// Check if this worker supports persistent worker operations
154-
if let Some(persistent_key) = worker.platform_properties.get("persistentWorkerKey") {
154+
if let Some(persistent_key) = worker
155+
.platform_properties
156+
.properties
157+
.get("persistentWorkerKey")
158+
{
159+
let key_str = format!("{persistent_key:?}");
155160
self.persistent_worker_pools
156-
.entry(persistent_key.clone())
157-
.or_insert_with(Vec::new)
161+
.entry(key_str)
162+
.or_default()
158163
.push(worker_id.clone());
159164
tracing::info!(
160-
"Registered worker {} for persistent key: {}",
165+
"Registered worker {} for persistent key: {:?}",
161166
worker_id,
162167
persistent_key
163168
);
@@ -190,7 +195,7 @@ impl ApiWorkerSchedulerImpl {
190195
let result = self.workers.pop(worker_id);
191196

192197
// Remove from persistent worker pools if applicable
193-
for (_, workers) in self.persistent_worker_pools.iter_mut() {
198+
for workers in self.persistent_worker_pools.values_mut() {
194199
workers.retain(|id| id != worker_id);
195200
}
196201

@@ -219,18 +224,22 @@ impl ApiWorkerSchedulerImpl {
219224
platform_properties: &PlatformProperties,
220225
) -> Option<WorkerId> {
221226
// First check if this is a persistent worker request
222-
if let Some(persistent_key) = platform_properties.get("persistentWorkerKey") {
223-
tracing::trace!("Looking for persistent worker with key: {}", persistent_key);
227+
if let Some(persistent_key) = platform_properties.properties.get("persistentWorkerKey") {
228+
tracing::trace!(
229+
"Looking for persistent worker with key: {:?}",
230+
persistent_key
231+
);
224232

233+
let key_str = format!("{persistent_key:?}");
225234
// Try to find a dedicated persistent worker
226-
if let Some(worker_ids) = self.persistent_worker_pools.get(persistent_key) {
235+
if let Some(worker_ids) = self.persistent_worker_pools.get(&key_str) {
227236
for worker_id in worker_ids {
228237
if let Some(worker) = self.workers.peek(worker_id) {
229238
if worker.can_accept_work()
230239
&& platform_properties.is_satisfied_by(&worker.platform_properties)
231240
{
232241
tracing::debug!(
233-
"Found persistent worker {} for key {}",
242+
"Found persistent worker {} for key {:?}",
234243
worker_id,
235244
persistent_key
236245
);
@@ -241,7 +250,7 @@ impl ApiWorkerSchedulerImpl {
241250
}
242251

243252
tracing::info!(
244-
"No available persistent worker found for key: {}, will spawn new one",
253+
"No available persistent worker found for key: {:?}, will spawn new one",
245254
persistent_key
246255
);
247256
}

nativelink-worker/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ rust_library(
1212
srcs = [
1313
"src/lib.rs",
1414
"src/local_worker.rs",
15+
"src/persistent_worker.rs",
16+
"src/persistent_worker_runner.rs",
1517
"src/running_actions_manager.rs",
1618
"src/worker_api_client_wrapper.rs",
1719
"src/worker_utils.rs",
@@ -52,6 +54,8 @@ rust_test_suite(
5254
timeout = "short",
5355
srcs = [
5456
"tests/local_worker_test.rs",
57+
"tests/persistent_worker_runner_test.rs",
58+
"tests/persistent_worker_test.rs",
5559
"tests/running_actions_manager_test.rs",
5660
],
5761
compile_data = [

0 commit comments

Comments
 (0)