Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions crates/iota-graphql-rpc/src/extensions/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,25 @@
// Modifications Copyright (c) 2024 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use std::{fmt::Write, net::SocketAddr, sync::Arc};
use std::{
fmt::Write,
net::SocketAddr,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
};

use async_graphql::{
PathSegment, Response, ServerError, ServerResult, ValidationResult, Variables,
extensions::{
Extension, ExtensionContext, ExtensionFactory, NextExecute, NextParseQuery, NextResolve,
NextValidation, ResolveInfo,
NextSubscribe, NextValidation, ResolveInfo,
},
parser::types::{ExecutableDocument, OperationType, Selection},
};
use async_graphql_value::ConstValue;
use futures::stream::BoxStream;
use tracing::{debug, error, info, warn};
use uuid::Uuid;

Expand Down Expand Up @@ -44,16 +52,31 @@ impl ExtensionFactory for Logger {
fn create(&self) -> Arc<dyn Extension> {
Arc::new(LoggerExtension {
config: self.config.clone(),
is_subscription: AtomicBool::default(),
})
}
}

struct LoggerExtension {
config: LoggerConfig,
/// Marks if the current request is a subscription.
is_subscription: AtomicBool,
}

#[async_trait::async_trait]
impl Extension for LoggerExtension {
fn subscribe<'s>(
&self,
ctx: &ExtensionContext<'_>,
stream: BoxStream<'s, Response>,
next: NextSubscribe<'_>,
) -> BoxStream<'s, Response> {
// flag this operation as a subscription so later hooks (execute) can
// adjust logging.
self.is_subscription.store(true, Ordering::Relaxed);
next.run(ctx, stream)
}

// This hook is used to get the top level node name for recording in the metrics
// which top level nodes are being called.
async fn resolve(
Expand Down Expand Up @@ -199,10 +222,19 @@ impl Extension for LoggerExtension {
"[Schema] {}", resp.data
);
}
_ => info!(
_ if self.is_subscription.load(Ordering::Relaxed) => {
// a subscription can emit many payloads; to avoid flooding normal response
// logs we log subscription payloads at debug level.
debug!(
%query_id,
%session_id,
"[Response] {}", resp.data
"[Subscription] {}", resp.data
);
}
_ => info!(
%query_id,
%session_id,
"[Response] {}", resp.data
),
}
}
Expand Down
Loading