Skip to content

Commit

Permalink
feat: add resource detail view (#188)
Browse files Browse the repository at this point in the history
This PR introduces the resoruce detail view. It also brings a couple of
additional changes: 

- we now rely on context of entered spans rather than explicit parent -
  child relationships
- we expose internal resources (`BatchSempahore`)
- we show the relationships between resources (`Mutex` -> `BatchSemaphore`)
- we show the async ops that are live on a specific resource
- we also show the tasks that are awaiting on these async ops, which
  allows the user to draw a relationship between tasks and resources
- there are more examples added in the examples folder (semaphore and
  mutex)

Some screenshots: 

<img width="1050" alt="Screenshot 2021-11-23 at 19 12 28" src="https://user-images.githubusercontent.com/4391506/143072764-f940ed45-350d-4be4-941b-95ef5650c4d8.png">
<img width="1388" alt="Screenshot 2021-11-23 at 19 12 38" src="https://user-images.githubusercontent.com/4391506/143072791-ee5d485f-ea1e-4609-8946-16a61bf5776f.png">

<img width="1371" alt="Screenshot 2021-11-23 at 19 12 50" src="https://user-images.githubusercontent.com/4391506/143072807-4b577d3f-17d6-4ade-8dcc-006b44fc1014.png">

Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
  • Loading branch information
zaharidichev authored Dec 15, 2021
1 parent a79c505 commit 1aa9b59
Show file tree
Hide file tree
Showing 25 changed files with 1,423 additions and 222 deletions.
9 changes: 4 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ members = [
"console-subscriber",
"console-api"
]
resolver = "2"
resolver = "2"
16 changes: 13 additions & 3 deletions console-api/proto/async_ops.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ message AsyncOp {
// The source of this async operation. Most commonly this should be the name
// of the method where the instantiation of this op has happened.
string source = 3;
// The ID of the parent async op.
//
// This field is only set if this async op was created while inside of another
// async op. For example, `tokio::sync`'s `Mutex::lock` internally calls
// `Semaphore::acquire`.
//
// This field can be empty; if it is empty, this async op is not a child of another
// async op.
common.Id parent_async_op_id = 4;
// The resources's ID.
common.Id resource_id = 5;
}

// Statistics associated with a given async operation.
Expand All @@ -49,12 +60,11 @@ message Stats {
google.protobuf.Timestamp created_at = 1;
// Timestamp of when the async op was dropped.
google.protobuf.Timestamp dropped_at = 2;
// The resource Id this `AsyncOp` is associated with. Note that both
// `resource_id` and `task_id` can be None if this async op has not been polled yet
common.Id resource_id = 3;
// The Id of the task that is awaiting on this op.
common.Id task_id = 4;
// Contains the operation poll stats.
common.PollStats poll_stats = 5;
// State attributes of the async op.
repeated common.Attribute attributes = 6;
}

15 changes: 15 additions & 0 deletions console-api/proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,18 @@ message PollStats {
// has spent *waiting* to be polled.
google.protobuf.Duration busy_time = 6;
}

// State attributes of an entity. These are dependent on the type of the entity.
//
// For example, a timer resource will have a duration, while a semaphore resource may
// have a permit count. Likewise, the async ops of a semaphore may have attributes
// indicating how many permits they are trying to acquire vs how many are acquired.
// These values may change over time. Therefore, they live in the runtime stats rather
// than the static data describing the entity.
message Attribute {
// The key-value pair for the attribute
common.Field field = 1;
// Some values carry a unit of measurement. For example, a duration
// carries an associated unit of time, such as "ms" for milliseconds.
optional string unit = 2;
}
18 changes: 8 additions & 10 deletions console-api/proto/resources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ message Resource {
Kind kind = 4;
// The location in code where the resource was created.
common.Location location = 5;
// The ID of the parent resource.
common.Id parent_resource_id = 6;
// Is the resource an internal component of another resource?
//
// For example, a `tokio::time::Interval` resource might contain a
// `tokio::time::Sleep` resource internally.
bool is_internal = 7;

// The kind of resource (e.g. timer, mutex).
message Kind {
Expand Down Expand Up @@ -70,16 +77,7 @@ message Stats {
// have permits as an attribute. These values may change over time as the state of
// the resource changes. Therefore, they live in the runtime stats rather than the
// static data describing the resource.
repeated Attribute attributes = 3;

// A single key-value pair associated with a resource.
message Attribute {
// The key-value pair for the attribute
common.Field field = 1;
// Some values carry a unit of measurement. For example, a duration
// carries an associated unit of time, such as "ms" for milliseconds.
optional string unit = 2;
}
repeated common.Attribute attributes = 3;
}

// A `PollOp` describes each poll operation that completes within the async
Expand Down
2 changes: 1 addition & 1 deletion console-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ parking_lot = ["parking_lot_crate", "tracing-subscriber/parking_lot"]

[dependencies]

tokio = { version = "^1.13", features = ["sync", "time", "macros", "tracing"] }
tokio = { version = "^1.15", features = ["sync", "time", "macros", "tracing"] }
tokio-stream = "0.1"
thread_local = "1.1.3"
console-api = { path = "../console-api", features = ["transport"] }
Expand Down
40 changes: 40 additions & 0 deletions console-subscriber/examples/barrier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Barrier;
use tokio::task;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
console_subscriber::init();
task::Builder::default()
.name("main-task")
.spawn(async move {
let mut handles = Vec::with_capacity(30);
let barrier = Arc::new(Barrier::new(30));
for i in 0..30 {
let c = barrier.clone();
let task_name = format!("task-{}", i);
handles.push(task::Builder::default().name(&task_name).spawn(async move {
tokio::time::sleep(Duration::from_secs(i)).await;
let wait_result = c.wait().await;
wait_result
}));
}

// Will not resolve until all "after wait" messages have been printed
let mut num_leaders = 0;
for handle in handles {
let wait_result = handle.await.unwrap();
if wait_result.is_leader() {
num_leaders += 1;
}
}

tokio::time::sleep(Duration::from_secs(10)).await;
// Exactly one barrier will resolve as the "leader"
assert_eq!(num_leaders, 1);
})
.await?;

Ok(())
}
31 changes: 31 additions & 0 deletions console-subscriber/examples/mutex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use std::sync::Arc;
use std::time::Duration;
use tokio::{sync::Mutex, task};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
console_subscriber::init();
task::Builder::default()
.name("main-task")
.spawn(async move {
let count = Arc::new(Mutex::new(0));
for i in 0..5 {
let my_count = Arc::clone(&count);
let task_name = format!("increment-{}", i);
tokio::task::Builder::default()
.name(&task_name)
.spawn(async move {
for _ in 0..10 {
let mut lock = my_count.lock().await;
*lock += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
}

while *count.lock().await < 50 {}
})
.await?;

Ok(())
}
37 changes: 37 additions & 0 deletions console-subscriber/examples/rwlock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::sync::Arc;
use std::time::Duration;
use tokio::{sync::RwLock, task};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
console_subscriber::init();
task::Builder::default()
.name("main-task")
.spawn(async move {
let count = Arc::new(RwLock::new(0));
for i in 0..5 {
let my_count = Arc::clone(&count);
let task_name = format!("increment-{}", i);
tokio::task::Builder::default()
.name(&task_name)
.spawn(async move {
for _ in 0..10 {
let mut lock = my_count.write().await;
*lock += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
}

loop {
let c = count.read().await;
tokio::time::sleep(Duration::from_secs(1)).await;
if *c >= 50 {
break;
}
}
})
.await?;

Ok(())
}
40 changes: 40 additions & 0 deletions console-subscriber/examples/semaphore.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use std::sync::Arc;
use std::time::Duration;
use tokio::task;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
console_subscriber::init();
task::Builder::default()
.name("main-task")
.spawn(async move {
let sem = Arc::new(tokio::sync::Semaphore::new(0));
let mut tasks = Vec::default();
for i in 0..5 {
let acquire_sem = Arc::clone(&sem);
let add_sem = Arc::clone(&sem);
let acquire_task_name = format!("acquire-{}", i);
let add_task_name = format!("add-{}", i);
tasks.push(
tokio::task::Builder::default()
.name(&acquire_task_name)
.spawn(async move {
let _permit = acquire_sem.acquire_many(i).await.unwrap();
tokio::time::sleep(Duration::from_secs(i as u64 * 2)).await;
}),
);
tasks.push(tokio::task::Builder::default().name(&add_task_name).spawn(
async move {
tokio::time::sleep(Duration::from_secs(i as u64 * 5)).await;
add_sem.add_permits(i as usize);
},
));
}

let all_tasks = futures::future::try_join_all(tasks);
all_tasks.await.unwrap();
})
.await?;

Ok(())
}
Loading

0 comments on commit 1aa9b59

Please sign in to comment.