Skip to content
This repository was archived by the owner on Dec 3, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 0 additions & 20 deletions .github/codecov.yml

This file was deleted.

82 changes: 82 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
name: Build

on:
push:
branches:
- main
pull_request:

permissions:
contents: read

concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true

jobs:
check:
timeout-minutes: 10
runs-on: runs-on=${{ github.run_id }}/runner=2cpu-linux-x64/extras=s3-cache

steps:
- uses: runs-on/action@v2
with:
sccache: s3
- uses: actions/checkout@v6
- uses: actions-rust-lang/setup-rust-toolchain@1780873c7b576612439a134613cc4cc74ce5538c # 1.15.2
with:
components: clippy,rustfmt
- uses: mozilla-actions/sccache-action@7d986dd989559c6ecdb630a3fd2557667be217ad # 0.0.9

- run: cargo fmt --all -- --check
- run: cargo clippy

test:
runs-on: runs-on=${{ github.run_id }}/runner=2cpu-linux-x64/extras=s3-cache
timeout-minutes: 20
strategy:
fail-fast: false
matrix:
feature_set: [ opentelemetry, opentelemetry_30 ]
services:
postgres:
image: postgres:18
env:
POSTGRES_USER: testuser
POSTGRES_PASSWORD: testpassword
POSTGRES_DB: testdb
ports:
- 5432:5432
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5
steps:
- uses: runs-on/action@v2
with:
sccache: s3
- uses: actions/checkout@v6
- uses: actions-rust-lang/setup-rust-toolchain@1780873c7b576612439a134613cc4cc74ce5538c # 1.15.2
with:
components: clippy,rustfmt
- uses: mozilla-actions/sccache-action@7d986dd989559c6ecdb630a3fd2557667be217ad # 0.0.9

- name: Run tests (${{ matrix.feature_set }})
run: cargo test --all --features ${{ matrix.feature_set }}
env:
DATABASE_URL: postgres://testuser:testpassword@localhost:5432/testdb

publish:
runs-on: runs-on=${{ github.run_id }}/runner=4cpu-linux-x64/extras=s3-cache
needs:
- check
- test

if: ${{ github.event_name == 'push' && github.event.ref == 'refs/heads/main' }}
permissions:
contents: write

steps:
- uses: runs-on/action@v2
- uses: actions/checkout@v5
- uses: anothrNick/github-tag-action@1
env:
GITHUB_TOKEN: ${{ secrets.CI_TOKEN }}
DEFAULT_BUMP: none
28 changes: 0 additions & 28 deletions .github/workflows/check.yml

This file was deleted.

46 changes: 0 additions & 46 deletions .github/workflows/coverage.yml

This file was deleted.

24 changes: 0 additions & 24 deletions .github/workflows/release.yml

This file was deleted.

36 changes: 0 additions & 36 deletions .github/workflows/test.yml

This file was deleted.

12 changes: 12 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ tls-rustls = [
"graphile_worker_crontab_runner/tls-rustls",
"graphile_worker_migrations/tls-rustls",
]
opentelemetry_0_30 = [
"opentelemetry-30",
"tracing-opentelemetry-30",
]
opentelemetry_0_31 = [
"opentelemetry",
"tracing-opentelemetry",
]
# For now we don't support async std
# runtime-async-std-rustls = ["sqlx/runtime-async-std-rustls"]
# runtime-async-std-native-tls = ["sqlx/runtime-async-std-native-tls"]
Expand Down Expand Up @@ -89,6 +97,10 @@ hex = "0.4.3"
cfg-if = "1.0.4"
indoc = "2.0.7"
derive_builder = "0.20.2"
opentelemetry-30 = { package = "opentelemetry", version = "0.30", optional = true }
opentelemetry = { version = "0.31", optional = true }
tracing-opentelemetry-30 = { package = "tracing-opentelemetry", version = "0.31", optional = true }
tracing-opentelemetry = { version = "0.32", optional = true }

[dev-dependencies]
tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread"] }
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub mod streams;
pub mod utils;

pub mod context_ext;
mod tracing;
/// Utility functions for job management
pub mod worker_utils;

Expand Down
21 changes: 19 additions & 2 deletions src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ use graphile_worker_extensions::ReadOnlyExtensions;
use graphile_worker_job::Job;
use graphile_worker_shutdown_signal::ShutdownSignal;
use thiserror::Error;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info, trace, warn, Instrument, Span};

use crate::builder::WorkerOptions;
use crate::sql::complete_job::complete_job;
use crate::tracing::link_to_job_create_span;
use crate::{sql::fail_job::fail_job, streams::StreamSource};

/// Type alias for task handler functions.
Expand Down Expand Up @@ -425,7 +426,19 @@ enum RunJobError {
/// A `Result` indicating whether the job was successfully executed:
/// - `Ok(())` if the task handler completed successfully
/// - `Err(RunJobError)` if an error occurred during execution
#[tracing::instrument(
"run_job",
skip(job, worker, source),
fields(
job_id = job.id(),
messaging.system = "graphile-worker",
messaging.operation.name = "run_job",
messaging.destination.name = tracing::field::Empty,
otel.name = tracing::field::Empty
)
)]
async fn run_job(job: &Job, worker: &Worker, source: &StreamSource) -> Result<(), RunJobError> {
link_to_job_create_span(job.payload().clone());
let task_id = job.task_id();

// Look up the task identifier (string) from the task ID (integer)
Expand All @@ -434,6 +447,10 @@ async fn run_job(job: &Job, worker: &Worker, source: &StreamSource) -> Result<()
.get(task_id)
.ok_or_else(|| RunJobError::IdentifierNotFound(*task_id))?;

let span = Span::current();
span.record("otel.name", task_identifier.as_str());
span.record("messaging.destination.name", task_identifier.as_str());

// Find the handler function for this task identifier
let task_fn = worker
.jobs()
Expand All @@ -460,7 +477,7 @@ async fn run_job(job: &Job, worker: &Worker, source: &StreamSource) -> Result<()
let start = Instant::now();

// Spawn the task on a separate Tokio task
let job_task = tokio::spawn(task_fut);
let job_task = tokio::spawn(task_fut.instrument(span));
let abort_handle = job_task.abort_handle();

// Set up a shutdown handler that waits for the shutdown signal
Expand Down
1 change: 1 addition & 0 deletions src/sql/add_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use sqlx::{query_as, PgExecutor};
use tracing::info;

/// Add a job to the queue
#[tracing::instrument(skip_all, err, fields(otel.kind="client", db.system="postgresql"))]
pub async fn add_job(
executor: impl for<'e> PgExecutor<'e>,
escaped_schema: &str,
Expand Down
1 change: 1 addition & 0 deletions src/sql/complete_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::errors::GraphileWorkerError;

use crate::Job;

#[tracing::instrument(skip_all, err, fields(otel.kind="client", db.system="postgresql"))]
pub async fn complete_job(
executor: impl for<'e> PgExecutor<'e>,
job: &Job,
Expand Down
1 change: 1 addition & 0 deletions src/sql/fail_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::errors::GraphileWorkerError;

use crate::Job;

#[tracing::instrument(skip_all, err, fields(otel.kind="client", db.system="postgresql"))]
pub async fn fail_job(
executor: impl for<'e> PgExecutor<'e>,
job: &Job,
Expand Down
1 change: 1 addition & 0 deletions src/sql/get_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use graphile_worker_job::{DbJob, Job};

use super::task_identifiers::TaskDetails;

#[tracing::instrument(skip_all, err, fields(otel.kind="client", db.system="postgresql"))]
pub async fn get_job<'e>(
executor: impl PgExecutor<'e>,
task_details: &TaskDetails,
Expand Down
1 change: 1 addition & 0 deletions src/sql/task_identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ impl From<Vec<TaskRow>> for TaskDetails {
}
}

#[tracing::instrument(skip_all, err, fields(otel.kind="client", db.system="postgresql"))]
pub async fn get_tasks_details<'e>(
executor: impl PgExecutor<'e> + Clone,
escaped_schema: &str,
Expand Down
Loading
Loading