Skip to content

Commit

Permalink
Migrate all logging to the tracing library (#871)
Browse files Browse the repository at this point in the history
This is a significant internal change to move completely away
from the last of the "log" crate and move to "tracing".

The major advantage of doing this, is that we can now trace
functions across future boundaries using tracing's instrumentation.

We will now be requiring all tokio::spawn's usage to be
properly instrumented.

closes #870
  • Loading branch information
allada authored Apr 22, 2024
1 parent 5e4c32c commit 523ee33
Show file tree
Hide file tree
Showing 31 changed files with 764 additions and 522 deletions.
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use tokio::select;
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
use tonic::Request;
use tracing::warn;
use tracing::{event, Level};

use crate::action_scheduler::ActionScheduler;
use crate::platform_property_manager::PlatformPropertyManager;
Expand Down Expand Up @@ -180,7 +180,11 @@ impl ActionScheduler for CacheLookupScheduler {
return;
}
Err(err) => {
warn!("Error while calling `has` on `ac_store` in `CacheLookupScheduler`'s `add_action` function: {}", err);
event!(
Level::WARN,
?err,
"Error while calling `has` on `ac_store` in `CacheLookupScheduler`'s `add_action` function"
);
}
_ => {}
}
Expand Down
31 changes: 23 additions & 8 deletions nativelink-scheduler/src/grpc_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use tokio::select;
use tokio::sync::watch;
use tokio::time::sleep;
use tonic::{Request, Streaming};
use tracing::{error, info, warn};
use tracing::{error_span, event, Instrument, Level};

use crate::action_scheduler::ActionScheduler;
use crate::platform_property_manager::PlatformPropertyManager;
Expand Down Expand Up @@ -123,7 +123,10 @@ impl GrpcScheduler {
loop {
select!(
_ = tx.closed() => {
info!("Client disconnected in GrpcScheduler");
event!(
Level::INFO,
"Client disconnected in GrpcScheduler"
);
return;
}
response = result_stream.message() => {
Expand All @@ -135,16 +138,27 @@ impl GrpcScheduler {
match response.try_into() {
Ok(response) => {
if let Err(err) = tx.send(Arc::new(response)) {
info!("Client disconnected in GrpcScheduler: {}", err);
event!(
Level::INFO,
?err,
"Client error in GrpcScheduler"
);
return;
}
}
Err(err) => error!("Error converting response to ActionState in GrpcScheduler: {}", err),
Err(err) => {
event!(
Level::ERROR,
?err,
"Error converting response to ActionState in GrpcScheduler"
);
},
}
}
)
}
});
}
.instrument(error_span!("stream_state")));
return Ok(rx);
}
Err(make_err!(
Expand Down Expand Up @@ -264,9 +278,10 @@ impl ActionScheduler for GrpcScheduler {
match result_stream {
Ok(result_stream) => Some(result_stream),
Err(err) => {
warn!(
"Error response looking up action with upstream scheduler: {}",
err
event!(
Level::WARN,
?err,
"Error looking up action with upstream scheduler"
);
None
}
Expand Down
126 changes: 91 additions & 35 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use parking_lot::{Mutex, MutexGuard};
use tokio::sync::{watch, Notify};
use tokio::task::JoinHandle;
use tokio::time::Duration;
use tracing::{error, warn};
use tracing::{event, Level};

use crate::action_scheduler::ActionScheduler;
use crate::platform_property_manager::PlatformPropertyManager;
Expand Down Expand Up @@ -122,10 +122,12 @@ impl Workers {
let res = worker
.send_initial_connection_result()
.err_tip(|| "Failed to send initial connection result to worker");
if let Err(e) = &res {
error!(
"Worker connection appears to have been closed while adding to pool : {:?}",
e
if let Err(err) = &res {
event!(
Level::ERROR,
?worker_id,
?err,
"Worker connection appears to have been closed while adding to pool"
);
}
res
Expand Down Expand Up @@ -379,15 +381,22 @@ impl SimpleSchedulerImpl {
// Don't remove this task, instead we keep them around for a bit just in case
// the client disconnected and will reconnect and ask for same job to be executed
// again.
warn!(
"Action {} has no more listeners during evict_worker()",
action_info.digest().hash_str()
event!(
Level::WARN,
?action_info,
?worker_id,
"Action has no more listeners during evict_worker()"
);
}
}
None => {
self.metrics.retry_action_but_action_missing.inc();
error!("Worker stated it was running an action, but it was not in the active_actions : Worker: {:?}, ActionInfo: {:?}", worker_id, action_info);
event!(
Level::ERROR,
?action_info,
?worker_id,
"Worker stated it was running an action, but it was not in the active_actions"
);
}
}
}
Expand Down Expand Up @@ -435,9 +444,10 @@ impl SimpleSchedulerImpl {
self.queued_actions.keys().rev().cloned().collect();
for action_info in action_infos {
let Some(awaited_action) = self.queued_actions.get(action_info.as_ref()) else {
error!(
"queued_actions out of sync with itself for action {}",
action_info.digest().hash_str()
event!(
Level::ERROR,
?action_info,
"queued_actions out of sync with itself"
);
continue;
};
Expand All @@ -454,13 +464,20 @@ impl SimpleSchedulerImpl {
worker.notify_update(WorkerUpdate::RunAction(action_info.clone()));
if notify_worker_result.is_err() {
// Remove worker, as it is no longer receiving messages and let it try to find another worker.
let err = make_err!(
Code::Internal,
"Worker command failed, removing worker {}",
worker_id
event!(
Level::WARN,
?worker_id,
?action_info,
"Worker command failed, removing worker",
);
self.immediate_evict_worker(
&worker_id,
make_err!(
Code::Internal,
"Worker command failed, removing worker {}",
worker_id
),
);
warn!("{:?}", err);
self.immediate_evict_worker(&worker_id, err);
return;
}

Expand All @@ -482,9 +499,11 @@ impl SimpleSchedulerImpl {
// Don't remove this task, instead we keep them around for a bit just in case
// the client disconnected and will reconnect and ask for same job to be executed
// again.
warn!(
"Action {} has no more listeners",
awaited_action.action_info.digest().hash_str()
event!(
Level::WARN,
?action_info,
?worker_id,
"Action has no more listeners during do_try_match()"
);
}
awaited_action.attempts += 1;
Expand All @@ -505,7 +524,12 @@ impl SimpleSchedulerImpl {
self.metrics
.update_action_with_internal_error_no_action
.inc();
error!("Could not find action info in active actions : {action_info_hash_key:?}");
event!(
Level::ERROR,
?action_info_hash_key,
?worker_id,
"Could not find action info in active actions"
);
return;
};

Expand All @@ -518,13 +542,24 @@ impl SimpleSchedulerImpl {
running_action.attempts -= 1;
}
let Some(running_action_worker_id) = running_action.worker_id else {
return error!(
"Got a result from a worker that should not be running the action, Removing worker. Expected action to be unassigned got worker {worker_id}"
event!(
Level::ERROR,
?action_info_hash_key,
?worker_id,
"Got a result from a worker that should not be running the action, Removing worker. Expected action to be unassigned got worker",
);
return;
};
if running_action_worker_id == *worker_id {
// Don't set the error on an action that's running somewhere else.
warn!("Internal error for worker {}: {}", worker_id, err);
event!(
Level::WARN,
?action_info_hash_key,
?worker_id,
?running_action_worker_id,
?err,
"Internal worker error",
);
running_action.last_error = Some(err.clone());
} else {
self.metrics
Expand Down Expand Up @@ -561,11 +596,17 @@ impl SimpleSchedulerImpl {
) -> Result<(), Error> {
if !action_stage.has_action_result() {
self.metrics.update_action_missing_action_result.inc();
event!(
Level::ERROR,
?action_info_hash_key,
?worker_id,
?action_stage,
"Worker sent error while updating action. Removing worker"
);
let err = make_err!(
Code::Internal,
"Worker '{worker_id}' set the action_stage of running action {action_info_hash_key:?} to {action_stage:?}. Removing worker.",
);
error!("{:?}", err);
self.immediate_evict_worker(worker_id, err.clone());
return Err(err);
}
Expand All @@ -590,7 +631,14 @@ impl SimpleSchedulerImpl {
"Got a result from a worker that should not be running the action, Removing worker. Expected action to be unassigned got worker {worker_id}",
),
};
error!("{:?}", err);
event!(
Level::ERROR,
?action_info,
?worker_id,
?running_action.worker_id,
?err,
"Got a result from a worker that should not be running the action, Removing worker"
);
// First put it back in our active_actions or we will drop the task.
self.active_actions.insert(action_info, running_action);
self.immediate_evict_worker(worker_id, err.clone());
Expand All @@ -606,9 +654,11 @@ impl SimpleSchedulerImpl {
if !running_action.current_state.stage.is_finished() {
if send_result.is_err() {
self.metrics.update_action_no_more_listeners.inc();
warn!(
"Action {} has no more listeners during update_action()",
action_info.digest().hash_str()
event!(
Level::WARN,
?action_info,
?worker_id,
"Action has no more listeners during update_action()"
);
}
// If the operation is not finished it means the worker is still working on it, so put it
Expand Down Expand Up @@ -903,12 +953,18 @@ impl WorkerScheduler for SimpleScheduler {
})
.collect();
for worker_id in &worker_ids_to_remove {
let err = make_err!(
Code::Internal,
"Worker {worker_id} timed out, removing from pool"
event!(
Level::WARN,
?worker_id,
"Worker timed out, removing from pool"
);
inner.immediate_evict_worker(
worker_id,
make_err!(
Code::Internal,
"Worker {worker_id} timed out, removing from pool"
),
);
warn!("{:?}", err);
inner.immediate_evict_worker(worker_id, err);
}

Ok(())
Expand Down
1 change: 0 additions & 1 deletion nativelink-service/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ rust_library(
"@crates//:bytes",
"@crates//:futures",
"@crates//:hyper",
"@crates//:log",
"@crates//:parking_lot",
"@crates//:prost",
"@crates//:rand",
Expand Down
1 change: 0 additions & 1 deletion nativelink-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ bytes = "1.6.0"
futures = "0.3.30"
hyper = { version = "0.14.28" }
serde_json5 = "0.1.0"
log = "0.4.21"
parking_lot = "0.12.1"
prost = "0.12.4"
rand = "0.8.5"
Expand Down
Loading

0 comments on commit 523ee33

Please sign in to comment.