Skip to content

Commit 5c18293

Browse files
committed
a way to store arbitrarily complex data with tracing
1 parent bc69804 commit 5c18293

File tree

6 files changed

+164
-17
lines changed

6 files changed

+164
-17
lines changed

Cargo.lock

Lines changed: 85 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gitoxide-core/Cargo.toml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ estimate-hours = ["dep:itertools", "dep:fs-err", "dep:crossbeam-channel", "dep:s
2323
query = ["dep:rusqlite"]
2424
## Run algorithms on a corpus of repositories and store their results for later comparison and intelligence gathering.
2525
## *Note that* `organize` we need for finding git repositories fast.
26-
corpus = ["dep:rusqlite", "dep:sysinfo", "organize", "dep:crossbeam-channel"]
26+
corpus = ["dep:rusqlite", "dep:sysinfo", "organize", "dep:crossbeam-channel", "dep:tracing-forest", "dep:serde_json", "dep:tracing-subscriber", "dep:tracing"]
2727

2828
#! ### Mutually Exclusive Networking
2929
#! If both are set, _blocking-client_ will take precedence, allowing `--all-features` to be used.
@@ -36,7 +36,7 @@ async-client = ["gix/async-network-client-async-std", "gix-transport-configurati
3636

3737
#! ### Other
3838
## Data structures implement `serde::Serialize` and `serde::Deserialize`.
39-
serde = ["gix/serde", "serde_json", "dep:serde", "bytesize/serde"]
39+
serde = ["gix/serde", "dep:serde_json", "dep:serde", "bytesize/serde"]
4040

4141

4242
[dependencies]
@@ -48,7 +48,6 @@ serde = { version = "1.0.114", optional = true, default-features = false, featur
4848
anyhow = "1.0.42"
4949
thiserror = "1.0.34"
5050
bytesize = "1.0.1"
51-
serde_json = { version = "1.0.65", optional = true }
5251
tempfile = "3.1.0"
5352

5453
# for async-client
@@ -74,6 +73,10 @@ rusqlite = { version = "0.29.0", optional = true, features = ["bundled"] }
7473

7574
# for 'corpus'
7675
sysinfo = { version = "0.29.2", optional = true, default-features = false }
76+
tracing-forest = { version = "0.1.5", features = ["serde"], optional = true }
77+
serde_json = { version = "1.0.65", optional = true }
78+
tracing-subscriber = { version = "0.3.17", optional = true }
79+
tracing = { version = "0.1.37", optional = true }
7780

7881
# for svg graph output
7982
layout-rs = "0.1.1"

gitoxide-core/src/corpus/db.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ pub fn create(path: impl AsRef<std::path::Path>) -> anyhow::Result<rusqlite::Con
131131
insertion_time integer NOT NULL, -- in seconds since UNIX epoch
132132
duration real, -- in seconds or NULL if not yet finished (either successfull or with failure)
133133
error text, -- or NULL if there was no error
134+
spans_json text, -- all spans collecteted while performing the run
134135
FOREIGN KEY (repository) REFERENCES repository (id),
135136
FOREIGN KEY (task) REFERENCES task (id),
136137
FOREIGN KEY (runner) REFERENCES runner (id),

gitoxide-core/src/corpus/engine.rs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,16 @@ impl Engine {
5858
task_progress.set_name("run");
5959
task_progress.init(Some(tasks.len()), gix::progress::count("tasks"));
6060
let threads = gix::parallel::num_threads(threads);
61+
let db_path = self.con.path().expect("opened from path on disk").to_owned();
6162
for (task_id, task) in tasks {
6263
let task_start = Instant::now();
6364
let mut repo_progress = task_progress.add_child(format!("run '{}'", task.short_name));
6465
repo_progress.init(Some(repos.len()), gix::progress::count("repos"));
6566

6667
if task.execute_exclusive || threads == 1 {
6768
let mut run_progress = repo_progress.add_child("set later");
69+
let (_guard, current_id) = corpus::trace::override_thread_subscriber(db_path.as_str())?;
70+
6871
for repo in &repos {
6972
if gix::interrupt::is_triggered() {
7073
bail!("interrupted by user");
@@ -76,14 +79,19 @@ impl Engine {
7679
.expect("corpus contains repo")
7780
.display()
7881
));
82+
83+
// TODO: wait for new release to be able to provide run_id via span attributes
7984
let mut run = Self::insert_run(&self.con, gitoxide_id, runner_id, *task_id, repo.id)?;
80-
task.perform(
81-
&mut run,
82-
&repo.path,
83-
&mut run_progress,
84-
Some(threads),
85-
&gix::interrupt::IS_INTERRUPTED,
86-
);
85+
current_id.store(run.id, Ordering::SeqCst);
86+
tracing::info_span!("run", run_id = run.id).in_scope(|| {
87+
task.perform(
88+
&mut run,
89+
&repo.path,
90+
&mut run_progress,
91+
Some(threads),
92+
&gix::interrupt::IS_INTERRUPTED,
93+
);
94+
});
8795
Self::update_run(&self.con, run)?;
8896
repo_progress.inc();
8997
}
@@ -96,22 +104,31 @@ impl Engine {
96104
Some(threads),
97105
{
98106
let shared_repo_progress = repo_progress.clone();
99-
let path = self.con.path().expect("opened from path on disk").to_owned();
107+
let db_path = db_path.clone();
100108
move |tid| {
101109
(
110+
corpus::trace::override_thread_subscriber(db_path.as_str()),
102111
gix::threading::lock(&shared_repo_progress).add_child(format!("{tid}")),
103-
rusqlite::Connection::open(&path),
112+
rusqlite::Connection::open(&db_path),
104113
)
105114
}
106115
},
107-
|repo, (progress, con), _threads_left, should_interrupt| -> anyhow::Result<()> {
116+
|repo, (subscriber, progress, con), _threads_left, should_interrupt| -> anyhow::Result<()> {
108117
progress.set_name(format!(
109118
"{}",
110119
repo.path
111120
.strip_prefix(corpus_path)
112121
.expect("corpus contains repo")
113122
.display()
114123
));
124+
let current_id = match subscriber {
125+
Ok((_guard, current_id)) => current_id,
126+
Err(err) => {
127+
progress.fail(format!("{err:#?}"));
128+
should_interrupt.store(true, Ordering::SeqCst);
129+
return Ok(());
130+
}
131+
};
115132
let con = match con {
116133
Ok(con) => con,
117134
Err(err) => {
@@ -121,7 +138,10 @@ impl Engine {
121138
}
122139
};
123140
let mut run = Self::insert_run(con, gitoxide_id, runner_id, *task_id, repo.id)?;
124-
task.perform(&mut run, &repo.path, progress, Some(1), should_interrupt);
141+
current_id.store(run.id, Ordering::SeqCst);
142+
tracing::info_span!("run", run_id = run.id).in_scope(|| {
143+
task.perform(&mut run, &repo.path, progress, Some(1), should_interrupt);
144+
});
125145
Self::update_run(con, run)?;
126146
if let Some(counter) = counter.as_ref() {
127147
counter.fetch_add(1, Ordering::SeqCst);

gitoxide-core/src/corpus/mod.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,46 @@ pub(crate) struct Run {
3838
error: Option<String>,
3939
}
4040

41+
pub(crate) mod trace {
42+
use rusqlite::params;
43+
use std::path::Path;
44+
use std::sync::atomic::{AtomicU32, Ordering};
45+
use std::sync::{Arc, Mutex};
46+
use tracing_forest::tree::Tree;
47+
use tracing_subscriber::layer::SubscriberExt;
48+
49+
pub fn override_thread_subscriber(
50+
db_path: impl AsRef<Path>,
51+
) -> anyhow::Result<(tracing::subscriber::DefaultGuard, Arc<AtomicU32>)> {
52+
let current_id = Arc::new(AtomicU32::default());
53+
let processor = tracing_forest::Printer::new().formatter(StoreTreeToDb {
54+
con: Arc::new(Mutex::new(rusqlite::Connection::open(&db_path)?)),
55+
run_id: current_id.clone(),
56+
});
57+
let subscriber = tracing_subscriber::Registry::default().with(tracing_forest::ForestLayer::from(processor));
58+
let guard = tracing::subscriber::set_default(subscriber);
59+
Ok((guard, current_id))
60+
}
61+
62+
pub struct StoreTreeToDb {
63+
pub con: Arc<Mutex<rusqlite::Connection>>,
64+
pub run_id: Arc<AtomicU32>,
65+
}
66+
impl tracing_forest::printer::Formatter for StoreTreeToDb {
67+
type Error = rusqlite::Error;
68+
69+
fn fmt(&self, tree: &Tree) -> Result<String, Self::Error> {
70+
let json = serde_json::to_string_pretty(&tree).expect("serialization to string always works");
71+
let run_id = self.run_id.load(Ordering::SeqCst);
72+
self.con
73+
.lock()
74+
.unwrap()
75+
.execute("UPDATE run SET spans_json = ?1 WHERE id = ?2", params![json, run_id])?;
76+
Ok(String::new())
77+
}
78+
}
79+
}
80+
4181
pub(crate) mod run {
4282
use crate::corpus;
4383
use crate::corpus::{Run, Task};

gitoxide-core/src/repository/odb.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ pub fn statistics(
166166
cancelled,
167167
),
168168
thread_limit,
169-
move |_| (repo.objects.clone().into_inner(), counter.clone()),
169+
move |_| (repo.objects.clone().into_inner(), counter),
170170
|ids, (handle, counter)| {
171171
let ids = ids?;
172172
if let Some(counter) = counter {

0 commit comments

Comments
 (0)