diff --git a/examples/find_nodes.rs b/examples/find_nodes.rs index cca9749e5..3ed99eff4 100644 --- a/examples/find_nodes.rs +++ b/examples/find_nodes.rs @@ -128,11 +128,11 @@ async fn main() { info!("Node Id: {}", enr.node_id()); if args.enr_ip6.is_some() || args.enr_ip4.is_some() { // if the ENR is useful print it - info!("Base64 ENR: {}", enr.to_base64()); info!( - "Local ENR IpV6 socket: {:?}. Local ENR IpV4 socket: {:?}", - enr.udp6_socket(), - enr.udp4_socket() + base64_enr=&enr.to_base64(), + ipv6_socket=?enr.udp6_socket(), + ipv4_socket=?enr.udp4_socket(), + "Local ENR", ); } @@ -142,14 +142,14 @@ async fn main() { // if we know of another peer's ENR, add it known peers for enr in args.remote_peer { info!( - "Remote ENR read. udp4 socket: {:?}, udp6 socket: {:?}, tcp4_port {:?}, tcp6_port: {:?}", - enr.udp4_socket(), - enr.udp6_socket(), - enr.tcp4(), - enr.tcp6() + udp4_socket=?enr.udp4_socket(), + udp6_socket=?enr.udp6_socket(), + tcp4_port=?enr.tcp4(), + tcp6_port=?enr.tcp6(), + "Remote ENR read", ); if let Err(e) = discv5.add_enr(enr) { - warn!("Failed to add remote ENR {}", e); + warn!(error=?e, "Failed to add remote ENR"); // It's unlikely we want to continue in this example after this return; }; @@ -171,17 +171,21 @@ async fn main() { // get metrics let metrics = discv5.metrics(); let connected_peers = discv5.connected_peers(); - info!("Connected peers: {}, Active sessions: {}, Unsolicited requests/s: {:.2}", connected_peers, metrics.active_sessions, metrics.unsolicited_requests_per_second); - info!("Searching for peers..."); + info!( + connected_peers, + active_sessions=metrics.active_sessions, + unsolicited_requests_per_second=format_args!("{:.2}", metrics.unsolicited_requests_per_second), + "Searching for peers..." + ); // execute a FINDNODE query match discv5.find_node(target_random_node_id).await { - Err(e) => warn!("Find Node result failed: {:?}", e), + Err(e) => warn!(error=?e, "Find Node result failed"), Ok(v) => { // found a list of ENR's print their NodeIds let node_ids = v.iter().map(|enr| enr.node_id()).collect::>(); - info!("Nodes found: {}", node_ids.len()); + info!(len=node_ids.len(), "Nodes found"); for node_id in node_ids { - info!("Node: {}", node_id); + info!(%node_id, "Node"); } } } @@ -192,10 +196,10 @@ async fn main() { continue; } match discv5_ev { - Event::Discovered(enr) => info!("Enr discovered {}", enr), - Event::NodeInserted { node_id, replaced: _ } => info!("Node inserted {}", node_id), - Event::SessionEstablished(enr, _) => info!("Session established {}", enr), - Event::SocketUpdated(addr) => info!("Socket updated {}", addr), + Event::Discovered(enr) => info!(%enr, "Enr discovered"), + Event::NodeInserted { node_id, replaced: _ } => info!(%node_id, "Node inserted"), + Event::SessionEstablished(enr, _) => info!(%enr, "Session established"), + Event::SocketUpdated(addr) => info!(%addr, "Socket updated"), Event::TalkRequest(_) => info!("Talk request received"), _ => {} }; diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 5a3b7e37e..dea5aada8 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -339,7 +339,7 @@ impl Handler { if let Err(request_error) = self.send_request::

(contact, HandlerReqId::External(id.clone()), request).await { // If the sending failed report to the application if let Err(e) = self.service_send.send(HandlerOut::RequestFailed(id, request_error)).await { - warn!("Failed to inform that request failed {}", e) + warn!(error=%e, "Failed to inform that request failed") } } } @@ -449,7 +449,7 @@ impl Handler { mut request_call: RequestCall, ) { if request_call.retries() >= self.request_retries { - trace!("Request timed out with {}", node_address); + trace!(%node_address, "Request timed out"); // Remove the request from the awaiting packet_filter self.remove_expected_response(node_address.socket_addr); // The request has timed out. We keep any established session for future use. @@ -458,9 +458,9 @@ impl Handler { } else { // increment the request retry count and restart the timeout trace!( - "Resending message: {} to {}", - request_call.body(), - node_address + body=%request_call.body(), + %node_address, + "Resending message", ); self.send(node_address.clone(), request_call.packet().clone()) .await; @@ -488,7 +488,7 @@ impl Handler { if self.active_challenges.get(&node_address).is_some() || self.is_awaiting_session_to_be_established(&node_address) { - trace!("Request queued for node: {}", node_address); + trace!(%node_address, "Request queued for node"); self.pending_requests .entry(node_address) .or_default() @@ -516,8 +516,8 @@ impl Handler { } else { // No session exists, start a new handshake initiating a new session trace!( - "Starting session. Sending random packet to: {}", - node_address + %node_address, + "Starting session. Sending random packet", ); let packet = Packet::new_random(&self.node_id).map_err(RequestError::EntropyFailure)?; @@ -556,14 +556,15 @@ impl Handler { // Either the session is being established or has expired. We simply drop the // response in this case. return warn!( - "Session is not established. Dropping response {} for node: {}", - response, node_address.node_id + %response, + node=%node_address.node_id, + "Session is not established. Dropping response", ); }; match packet { Ok(packet) => self.send(node_address, packet).await, - Err(e) => warn!("Could not encrypt response: {:?}", e), + Err(e) => warn!(error=?e, "Could not encrypt response"), } } @@ -578,7 +579,7 @@ impl Handler { let message_nonce = wru_ref.1; if self.active_challenges.get(&node_address).is_some() { - warn!("WHOAREYOU already sent. {}", node_address); + warn!(%node_address, "WHOAREYOU already sent."); return; } @@ -598,7 +599,7 @@ impl Handler { let packet = Packet::new_whoareyou(message_nonce, id_nonce, enr_seq); let challenge_data = ChallengeData::try_from(packet.authenticated_data::

().as_slice()) .expect("Must be the correct challenge size"); - debug!("Sending WHOAREYOU to {}", node_address); + debug!(%node_address, "Sending WHOAREYOU"); self.add_expected_response(node_address.socket_addr); self.send(node_address.clone(), packet).await; self.active_challenges.insert( @@ -626,7 +627,12 @@ impl Handler { Some((node_address, request_call)) => { // Verify that the src_addresses match if node_address.socket_addr != src_address { - debug!("Received a WHOAREYOU packet for a message with a non-expected source. Source {}, expected_source: {} message_nonce {}", src_address, node_address.socket_addr, hex::encode(request_nonce)); + debug!( + source=%src_address, + expected_source=%node_address.socket_addr, + message_nonce=hex::encode(request_nonce), + "Received a WHOAREYOU packet for a message with a non-expected source.", + ); // Add the request back if src_address doesn't match self.active_requests.insert(node_address, request_call); return; @@ -634,7 +640,11 @@ impl Handler { request_call } None => { - trace!("Received a WHOAREYOU packet that references an unknown or expired request. Source {}, message_nonce {}", src_address, hex::encode(request_nonce)); + trace!( + source = %src_address, + message_nonce = hex::encode(request_nonce), + "Received a WHOAREYOU packet that references an unknown or expired request." + ); return; } }; @@ -643,14 +653,19 @@ impl Handler { if request_call.packet().message_nonce() != &request_nonce { // This could theoretically happen if a peer uses the same node id across // different connections. - warn!("Received a WHOAREYOU from a non expected source. Source: {}, message_nonce {} , expected_nonce: {}", request_call.contact(), hex::encode(request_call.packet().message_nonce()), hex::encode(request_nonce)); + warn!( + source = %request_call.contact(), + message_nonce = hex::encode(request_call.packet().message_nonce()), + expected_nonce = hex::encode(request_nonce), + "Received a WHOAREYOU from a non expected source." + ); // NOTE: Both mappings are removed in this case. return; } trace!( - "Received a WHOAREYOU packet response. Source: {}", - request_call.contact() + source = %request_call.contact(), + "Received a WHOAREYOU packet response.", ); // We do not allow multiple WHOAREYOU packets for a single challenge request. If we have @@ -658,8 +673,8 @@ impl Handler { // response. if request_call.handshake_sent() { warn!( - "Authentication response already sent. Dropping session. Node: {}", - request_call.contact() + node = %request_call.contact(), + "Authentication response already sent. Dropping session. Node", ); self.fail_request(request_call, RequestError::InvalidRemotePacket, true) .await; @@ -686,7 +701,7 @@ impl Handler { ) { Ok(v) => v, Err(e) => { - error!("Could not generate a session. Error: {:?}", e); + error!(error=?e, "Could not generate a session"); self.fail_request(request_call, RequestError::InvalidRemotePacket, true) .await; return; @@ -722,9 +737,9 @@ impl Handler { // We already know the ENR. Send the handshake response packet trace!( - "Sending Authentication response to node: {} ({:?})", - node_address, - request_call.id() + %node_address, + request_call_id=?request_call.id(), + "Sending Authentication response to node", ); request_call.update_packet(auth_packet.clone()); request_call.set_handshake_sent(); @@ -742,7 +757,7 @@ impl Handler { connection_direction, )) .await - .unwrap_or_else(|e| warn!("Error with sending channel: {}", e)); + .unwrap_or_else(|e| warn!(error=%e, "Error with sending channel")); } None => { // Don't know the ENR. Establish the session, but request an ENR also @@ -750,9 +765,9 @@ impl Handler { // Send the Auth response let contact = request_call.contact().clone(); trace!( - "Sending Authentication response to node: {} ({:?})", - node_address, - request_call.id() + %node_address, + request_call_id=?request_call.id(), + "Sending Authentication response to node", ); request_call.update_packet(auth_packet.clone()); request_call.set_handshake_sent(); @@ -767,7 +782,7 @@ impl Handler { .send_request::

(contact, HandlerReqId::Internal(id), request) .await { - warn!("Failed to send Enr request {}", e) + warn!(error=%e, "Failed to send Enr request") } } } @@ -799,7 +814,7 @@ impl Handler { node_id, }) .await - .unwrap_or_else(|e| warn!("Error with sending channel: {}", e)) + .unwrap_or_else(|e| warn!(error=%e, "Error with sending channel")) } /// Handle a message that contains an authentication header. @@ -818,8 +833,8 @@ impl Handler { // This will lead to future outgoing challenges if they proceed to send further encrypted // packets. trace!( - "Received an Authentication header message from: {}", - node_address + from=%node_address, + "Received an Authentication header message", ); if let Some(challenge) = self.active_challenges.remove(&node_address) { @@ -851,7 +866,7 @@ impl Handler { )) .await { - warn!("Failed to inform of established session {}", e) + warn!(error=%e, "Failed to inform of established session") } // When (re-)establishing a session from an outgoing challenge, we do not need // to filter out this request from active requests, so we do not pass @@ -868,10 +883,10 @@ impl Handler { } else { // IP's or NodeAddress don't match. Drop the session. warn!( - "Session has invalid ENR. Enr sockets: {:?}, {:?}. Expected: {}", - enr.udp4_socket(), - enr.udp6_socket(), - node_address + udp4_socket=?enr.udp4_socket(), + udp6_socket=?enr.udp6_socket(), + expected=%node_address, + "Session has invalid ENR", ); self.fail_session(&node_address, RequestError::InvalidRemoteEnr, true) .await; @@ -901,8 +916,8 @@ impl Handler { }; if let Some(request) = maybe_ping_request { debug!( - "Responding to a PING request using a one-time session. node_address: {}", - node_address + %node_address, + "Responding to a PING request using a one-time session.", ); self.one_time_sessions .insert(node_address.clone(), (request.id.clone(), session)); @@ -911,7 +926,7 @@ impl Handler { .send(HandlerOut::Request(node_address.clone(), Box::new(request))) .await { - warn!("Failed to report request to application {}", e); + warn!(error=%e, "Failed to report request to application"); self.one_time_sessions.remove(&node_address); } } @@ -919,16 +934,16 @@ impl Handler { } Err(Error::InvalidChallengeSignature(challenge)) => { warn!( - "Authentication header contained invalid signature. Ignoring packet from: {}", - node_address + %node_address, + "Authentication header contained invalid signature. Ignoring packet from node", ); // insert back the challenge self.active_challenges.insert(node_address, challenge); } Err(e) => { warn!( - "Invalid Authentication header. Dropping session. Error: {:?}", - e + error=?e, + "Invalid Authentication header. Dropping session", ); self.fail_session(&node_address, RequestError::InvalidRemotePacket, true) .await; @@ -951,15 +966,16 @@ impl Handler { .unwrap_or_default(); for req in pending_requests { trace!( - "Sending pending request {} to {node_address}. {}", - RequestId::from(&req.request_id), - req.request, + request_id=%RequestId::from(&req.request_id), + %node_address, + request=%req.request, + "Sending pending request", ); if let Err(request_error) = self .send_request::

(req.contact, req.request_id.clone(), req.request) .await { - warn!("Failed to send next pending request {request_error}"); + warn!(error=%request_error, "Failed to send next pending request"); // Inform the service that the request failed match req.request_id { HandlerReqId::Internal(_) => { @@ -972,7 +988,7 @@ impl Handler { .send(HandlerOut::RequestFailed(id, request_error)) .await { - warn!("Failed to inform that request failed {e}"); + warn!(error=%e, "Failed to inform that request failed"); } } } @@ -990,9 +1006,9 @@ impl Handler { message_nonce: Option, ) { trace!( - "Replaying active requests. {}, {:?}", - node_address, - message_nonce + %node_address, + ?message_nonce, + "Replaying active requests", ); let packets = if let Some(session) = self.sessions.get_mut(node_address) { @@ -1018,8 +1034,8 @@ impl Handler { packets.push((*request_call.packet().message_nonce(), new_packet)); } else { error!( - "Failed to re-encrypt packet while replaying active request with id: {:?}", - request_call.id() + id=?request_call.id(), + "Failed to re-encrypt packet while replaying active request with id", ); } } @@ -1055,7 +1071,7 @@ impl Handler { Ok(m) => match Message::decode(&m) { Ok(p) => p, Err(e) => { - warn!("Failed to decode message. Error: {:?}, {}", e, node_address); + warn!(error=?e, %node_address, "Failed to decode message"); return; } }, @@ -1064,10 +1080,10 @@ impl Handler { // sending this message has dropped their session. In this case, this message is a // Random packet and we should reply with a WHOAREYOU. // This means we need to drop the current session and re-establish. - trace!("Decryption failed. Error {}", e); + trace!(error=%e, "Decryption failed"); debug!( - "Message from node: {} is not encrypted with known session keys.", - node_address + %node_address, + "Message from node is not encrypted with known session keys.", ); self.fail_session(&node_address, RequestError::InvalidRemotePacket, true) .await; @@ -1080,16 +1096,16 @@ impl Handler { .send(HandlerOut::WhoAreYou(whoareyou_ref)) .await { - warn!("Failed to send WhoAreYou to the service {}", e) + warn!(error=%e, "Failed to send WhoAreYou to the service") } } else { - trace!("WHOAREYOU packet already sent: {}", node_address); + trace!(%node_address, "WHOAREYOU packet already sent"); } return; } }; - trace!("Received message from: {}", node_address); + trace!(%node_address, "Received message"); // Remove any associated request from pending_request match message { @@ -1100,7 +1116,7 @@ impl Handler { .send(HandlerOut::Request(node_address, Box::new(request))) .await { - warn!("Failed to report request to application {}", e) + warn!(error=%e, "Failed to report request to application") } } Message::Response(response) => { @@ -1127,7 +1143,7 @@ impl Handler { )) .await { - warn!("Failed to inform established outgoing connection {}", e) + warn!(error=%e, "Failed to inform established outgoing connection") } return; } @@ -1156,7 +1172,7 @@ impl Handler { } } else { // no session exists - trace!("Received a message without a session. {}", node_address); + trace!(%node_address, "Received a message without a session."); trace!("Requesting a WHOAREYOU packet to be sent."); // spawn a WHOAREYOU event to check for highest known ENR let whoareyou_ref = WhoAreYouRef(node_address, message_nonce); @@ -1166,8 +1182,8 @@ impl Handler { .await { warn!( - "Spawn a WHOAREYOU event to check for highest known ENR failed {}", - e + error=%e, + "Spawn a WHOAREYOU event to check for highest known ENR failed", ) } } @@ -1199,7 +1215,7 @@ impl Handler { .send(HandlerOut::Response(node_address, Box::new(response))) .await { - warn!("Failed to inform of response {}", e) + warn!(error=%e, "Failed to inform of response") } return; } @@ -1214,7 +1230,7 @@ impl Handler { .send(HandlerOut::Response(node_address, Box::new(response))) .await { - warn!("Failed to inform of response {}", e) + warn!(error=%e, "Failed to inform of response") } return; } @@ -1233,12 +1249,12 @@ impl Handler { )) .await { - warn!("Failed to inform of response {}", e) + warn!(error=%e, "Failed to inform of response") } } else { // This is likely a late response and we have already failed the request. These get // dropped here. - trace!("Late response from node: {}", node_address); + trace!(%node_address, "Late response from node"); } } @@ -1315,7 +1331,7 @@ impl Handler { .send(HandlerOut::RequestFailed(id.clone(), error.clone())) .await { - warn!("Failed to inform request failure {}", e) + warn!(error=%e, "Failed to inform request failure") } } } @@ -1351,7 +1367,7 @@ impl Handler { .send(HandlerOut::RequestFailed(id, error.clone())) .await { - warn!("Failed to inform request failure {}", e) + warn!(error=%e, "Failed to inform request failure") } } } @@ -1373,7 +1389,7 @@ impl Handler { .send(HandlerOut::RequestFailed(id.clone(), error.clone())) .await { - warn!("Failed to inform request failure {e}") + warn!(error=%e, "Failed to inform request failure") } } } @@ -1388,7 +1404,7 @@ impl Handler { packet, }; if let Err(e) = self.socket.send.send(outbound_packet).await { - warn!("Failed to send outbound packet {}", e) + warn!(error=%e, "Failed to send outbound packet") } } diff --git a/src/handler/session.rs b/src/handler/session.rs index 9c1f96a8f..378bce719 100644 --- a/src/handler/session.rs +++ b/src/handler/session.rs @@ -157,9 +157,9 @@ impl Session { (None, Some(known_enr)) => known_enr, (None, None) => { warn!( - "Peer did not respond with their ENR. Session could not be established. Node: {}", - remote_id - ); + node=%remote_id, + "Peer did not respond with their ENR. Session could not be established", + ); return Err(Error::SessionNotEstablished); } }; diff --git a/src/rpc.rs b/src/rpc.rs index dfa385cc1..3f1bff68f 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -412,7 +412,7 @@ impl Message { body: ResponseBody::Pong { enr_seq, ip, port }, }) } else { - debug!("The port number should be non zero: {raw_port}"); + debug!(raw_port, "The port number should be non zero"); return Err(DecoderError::Custom("PONG response port number invalid")); } } @@ -423,8 +423,8 @@ impl Message { for distance in distances.iter() { if distance > &256u64 { warn!( - "Rejected FindNode request asking for unknown distance {}, maximum 256", - distance + distance, + "Rejected FindNode request asking for unknown distance maximum 256", ); return Err(DecoderError::Custom("FINDNODE request distance invalid")); } diff --git a/src/service.rs b/src/service.rs index d0c5c7e50..b727f9301 100644 --- a/src/service.rs +++ b/src/service.rs @@ -89,12 +89,12 @@ impl Drop for TalkRequest { body: ResponseBody::Talk { response: vec![] }, }; - debug!("Sending empty TALK response to {}", self.node_address); + debug!(node_address=%self.node_address, "Sending empty TALK response"); if let Err(e) = sender.send(HandlerIn::Response( self.node_address.clone(), Box::new(response), )) { - warn!("Failed to send empty talk response {}", e) + warn!(error=%e,"Failed to send empty talk response") } } } @@ -117,7 +117,7 @@ impl TalkRequest { } pub fn respond(mut self, response: Vec) -> Result<(), ResponseError> { - debug!("Sending TALK response to {}", self.node_address); + debug!(node_address=%self.node_address, "Sending TALK response"); let response = Response { id: self.id.clone(), @@ -391,21 +391,21 @@ impl Service { // check what our latest known ENR is for this node. if let Some(known_enr) = self.find_enr(&whoareyou_ref.0.node_id) { if let Err(e) = self.handler_send.send(HandlerIn::WhoAreYou(whoareyou_ref, Some(known_enr))) { - warn!("Failed to send whoareyou {}", e); + warn!(error=%e, "Failed to send whoareyou"); }; } else { // do not know of this peer - debug!("NodeId unknown, requesting ENR. {}", whoareyou_ref.0); + debug!(node_address=%whoareyou_ref.0, "NodeId unknown, requesting ENR."); if let Err(e) = self.handler_send.send(HandlerIn::WhoAreYou(whoareyou_ref, None)) { - warn!("Failed to send who are you to unknown enr peer {}", e); + warn!(error=%e, "Failed to send who are you to unknown enr peer"); } } } HandlerOut::RequestFailed(request_id, error) => { if let RequestError::Timeout = error { - debug!("RPC Request timed out. id: {}", request_id); + debug!(id=%request_id, "RPC Request timed out"); } else { - warn!("RPC Request failed: id: {}, error {:?}", request_id, error); + warn!(id=%request_id, ?error, "RPC Request failed"); } self.rpc_failure(request_id, error); } @@ -442,7 +442,7 @@ impl Service { } } if result.target.callback.send(found_enrs).is_err() { - warn!("Callback dropped for query {}. Results dropped", *id); + warn!(query_id=*id, "Callback dropped for query. Results dropped"); } } } @@ -600,8 +600,8 @@ impl Service { Err(NonContactable { enr }) => { debug_unreachable!("Stored ENR is not contactable. {}", enr); error!( - "Stored ENR is not contactable! This should never happen {}", - enr + %enr, + "Stored ENR is not contactable! This should never happen", ); } } @@ -618,15 +618,15 @@ impl Service { port, }, }; - debug!("Sending PONG response to {}", node_address); + debug!(%node_address, "Sending PONG response"); if let Err(e) = self .handler_send .send(HandlerIn::Response(node_address, Box::new(response))) { - warn!("Failed to send response {}", e); + warn!(error=%e, "Failed to send response"); } } else { - warn!("The src port number should be non zero. {src}"); + warn!(%src, "The src port number should be non zero"); } } RequestBody::Talk { protocol, request } => { @@ -650,8 +650,10 @@ impl Service { if let Some(mut active_request) = self.active_requests.remove(&id) { debug!( - "Received RPC response: {} to request: {} from: {}", - response.body, active_request.request_body, active_request.contact + response=%response.body, + request=%active_request.request_body, + from=%active_request.contact, + "Received RPC response", ); // Check that the responder matches the expected request @@ -659,13 +661,18 @@ impl Service { let expected_node_address = active_request.contact.node_address(); if expected_node_address != node_address { debug_unreachable!("Handler returned a response not matching the used socket addr"); - return error!("Received a response from an unexpected address. Expected {}, received {}, request_id {}", expected_node_address, node_address, id); + return error!( + expected=%expected_node_address, + received=%node_address, + request_id=%id, + "Received a response from an unexpected address", + ); } if !response.match_request(&active_request.request_body) { warn!( - "Node gave an incorrect response type. Ignoring response from: {}", - node_address + %node_address, + "Node gave an incorrect response type. Ignoring response" ); return; } @@ -676,6 +683,7 @@ impl Service { ResponseBody::Nodes { total, mut nodes } => { if total > MAX_NODES_RESPONSES as u64 { warn!( + total, "NodesResponse has a total larger than {}, nodes will be truncated", MAX_NODES_RESPONSES ); @@ -690,7 +698,7 @@ impl Service { if let Some(CallbackResponse::Nodes(callback)) = active_request.callback.take() { if let Err(e) = callback.send(Ok(nodes)) { - warn!("Failed to send response in callback {:?}", e) + warn!(error=?e,"Failed to send response in callback") } return; } @@ -704,8 +712,8 @@ impl Service { // we requested an ENR update if nodes.len() > 1 { warn!( - "Peer returned more than one ENR for itself. Blacklisting {}", - node_address + %node_address, + "Peer returned more than one ENR for itself. Blacklisting", ); let ban_timeout = self.config.ban_duration.map(|v| Instant::now() + v); PERMIT_BAN_LIST.write().ban(node_address, ban_timeout); @@ -766,10 +774,10 @@ impl Service { } debug!( - "Received a nodes response of len: {}, total: {}, from: {}", - nodes.len(), + len = nodes.len(), total, - active_request.contact + from = %active_request.contact, + "Received a nodes response", ); // note: If a peer sends an initial NODES response with a total > 1 then // in a later response sends a response with a total of 1, all previous nodes @@ -788,7 +796,7 @@ impl Service { port: port.get(), }; if let Err(e) = callback.send(Ok(response)) { - warn!("Failed to send callback response {:?}", e) + warn!(error=?e, "Failed to send callback response") }; } else { let socket = SocketAddr::new(ip, port.get()); @@ -840,14 +848,11 @@ impl Service { match result { Ok(_) => { updated = true; - info!( - "Local UDP ip6 socket updated to: {}", - new_ip6, - ); + info!(%new_ip6, "Local UDP ip6 socket updated"); self.send_event(Event::SocketUpdated(new_ip6)); } Err(e) => { - warn!("Failed to update local UDP ip6 socket. ip6: {}, error: {:?}", new_ip6, e); + warn!(ip6=%new_ip6, error=?e, "Failed to update local UDP ip6 socket."); } } } @@ -860,11 +865,11 @@ impl Service { match result { Ok(_) => { updated = true; - info!("Local UDP socket updated to: {}", new_ip4); + info!(%new_ip4, "Local UDP socket updated"); self.send_event(Event::SocketUpdated(new_ip4)); } Err(e) => { - warn!("Failed to update local UDP socket. ip: {}, error: {:?}", new_ip4, e); + warn!(ip=%new_ip4, error=?e, "Failed to update local UDP socket."); } } } @@ -879,7 +884,7 @@ impl Service { if let Some(enr) = self.find_enr(&node_id) { if enr.seq() < enr_seq { // request an ENR update - debug!("Requesting an ENR update from: {}", active_request.contact); + debug!(from=%active_request.contact, "Requesting an ENR update"); let request_body = RequestBody::FindNode { distances: vec![0] }; let active_request = ActiveRequest { contact: active_request.contact, @@ -904,7 +909,7 @@ impl Service { match active_request.callback { Some(CallbackResponse::Talk(callback)) => { if let Err(e) = callback.send(Ok(response)) { - warn!("Failed to send callback response {:?}", e) + warn!(error=?e, "Failed to send callback response") }; } _ => error!("Invalid callback for response"), @@ -912,10 +917,7 @@ impl Service { } } } else { - warn!( - "Received an RPC response which doesn't match a request. Id: {}", - id - ); + warn!(%id, "Received an RPC response which doesn't match a request"); } } @@ -940,7 +942,7 @@ impl Service { }; self.send_rpc_request(active_request); } - Err(NonContactable { enr }) => error!("Trying to ping a non-contactable peer {}", enr), + Err(NonContactable { enr }) => error!(%enr, "Trying to ping a non-contactable peer"), } } @@ -1048,14 +1050,14 @@ impl Service { }, }; trace!( - "Sending empty FINDNODES response to: {}", - node_address.node_id + to=%node_address.node_id, + "Sending empty FINDNODES response", ); if let Err(e) = self .handler_send .send(HandlerIn::Response(node_address, Box::new(response))) { - warn!("Failed to send empty FINDNODES response {}", e) + warn!(error=%e, "Failed to send empty FINDNODES response") } } else { // build the NODES response @@ -1081,10 +1083,10 @@ impl Service { if entry_size + total_size < MAX_PACKET_SIZE - 104 { total_size += entry_size; trace!( - "Adding ENR {}, size {}, total size {}", - enr, + %enr, entry_size, - total_size + total_size, + "Adding ENR", ); to_send_nodes[rpc_index].push(enr); } else { @@ -1107,15 +1109,15 @@ impl Service { for response in responses { trace!( - "Sending FINDNODES response to: {}. Response: {} ", - node_address, - response + to=%node_address, + %response, + "Sending FINDNODES response", ); if let Err(e) = self.handler_send.send(HandlerIn::Response( node_address.clone(), Box::new(response), )) { - warn!("Failed to send FINDNODES response {}", e) + warn!(error=%e, "Failed to send FINDNODES response") } } } @@ -1145,10 +1147,11 @@ impl Service { Err(NonContactable { enr }) => { // This can happen quite often in ipv6 only nodes debug!("Query {} has a non contactable enr: {}", *query_id, enr); + debug!(query_id=*query_id, %enr, "Query has a non contactable enr"); } } } else { - error!("Query {} requested an unknown ENR", *query_id); + error!(id = *query_id, "Query requested an unknown ENR"); } // This query request has failed and we must inform the @@ -1170,7 +1173,7 @@ impl Service { }; let contact = active_request.contact.clone(); - debug!("Sending RPC {} to node: {}", request, contact); + debug!(%request, node=%contact,"Sending RPC to node"); if self .handler_send .send(HandlerIn::Request(contact, Box::new(request))) @@ -1224,10 +1227,7 @@ impl Service { self.kbuckets.write().update_node(&key, enr.clone(), None) { self.peers_to_ping.remove(&enr.node_id()); - debug!( - "Failed to update discovered ENR. Node: {}, Reason: {:?}", - source, reason - ); + debug!(node=%source, ?reason, "Failed to update discovered ENR."); return false; // Remove this peer from the discovered list if the update failed } @@ -1273,10 +1273,10 @@ impl Service { } peer_count += 1; } - debug!("{} peers found for query id {:?}", peer_count, query_id); + debug!(peer_count, ?query_id, "peers found for query id"); query.on_success(source, &enrs) } else { - debug!("Response returned for ended query {:?}", query_id) + debug!(?query_id, "Response returned for ended query") } } } @@ -1305,7 +1305,7 @@ impl Service { match insert_result { InsertResult::Inserted => { // We added this peer to the table - debug!("New connected node added to routing table: {}", node_id); + debug!(%node_id, "New connected node added to routing table"); self.peers_to_ping.insert(node_id); // PING immediately if the direction is outgoing. This allows us to receive @@ -1331,14 +1331,14 @@ impl Service { } => { // The node was updated if promoted_to_connected { - debug!("Node promoted to connected: {}", node_id); + debug!(%node_id, "Node promoted to connected"); self.peers_to_ping.insert(node_id); } } InsertResult::ValueUpdated | InsertResult::UpdatedPending => {} InsertResult::Failed(reason) => { self.peers_to_ping.remove(&node_id); - trace!("Could not insert node: {}, reason: {:?}", node_id, reason); + trace!(%node_id, ?reason, "Could not insert node"); } } } @@ -1350,13 +1350,10 @@ impl Service { { UpdateResult::Failed(reason) => { self.peers_to_ping.remove(&node_id); - debug!( - "Could not update ENR from pong. Node: {}, reason: {:?}", - node_id, reason - ); + debug!(node=%node_id, ?reason, "Could not update ENR from pong",); } update => { - debug!("Updated {:?}", update) + debug!(?update, "Updated") } // Updated ENR successfully. } } @@ -1371,13 +1368,14 @@ impl Service { FailureReason::KeyNonExistent => {} others => { warn!( - "Could not update node to disconnected. Node: {}, Reason: {:?}", - node_id, others + node=%node_id, + reason=?others, + "Could not update node to disconnected.", ); } }, _ => { - debug!("Node set to disconnected: {}", node_id) + debug!(%node_id, "Node set to disconnected") } } self.peers_to_ping.remove(&node_id); @@ -1430,17 +1428,14 @@ impl Service { _ => connection_direction, }; - debug!( - "Session established with Node: {}, direction: {}", - node_id, direction - ); + debug!(node=%node_id, %direction, "Session established with Node"); self.connection_updated(node_id, ConnectionStatus::Connected(enr, direction)); } /// A session could not be established or an RPC request timed-out (after a few retries, if /// specified). fn rpc_failure(&mut self, id: RequestId, error: RequestError) { - trace!("RPC Error removing request. Reason: {:?}, id {}", error, id); + trace!(reason=?error, %id, "RPC Error removing request."); if let Some(active_request) = self.active_requests.remove(&id) { // If this is initiated by the user, return an error on the callback. All callbacks // support a request error. @@ -1498,8 +1493,9 @@ impl Service { } } else { debug!( - "Failed RPC request: {}: {} ", - active_request.request_body, active_request.contact + request_body=%active_request.request_body, + node=%active_request.contact, + "Failed RPC request", ); } } @@ -1509,15 +1505,19 @@ impl Service { if let Some(query_id) = active_request.query_id { if let Some(query) = self.queries.get_mut(query_id) { debug!( - "Failed query request: {} for query: {} and {} ", - active_request.request_body, *query_id, active_request.contact + request_body=%active_request.request_body, + query_id=*query_id, + node=%active_request.contact, + "Failed query request", ); query.on_failure(&node_id); } } else { debug!( - "Failed RPC request: {} for node: {}, reason {:?} ", - active_request.request_body, active_request.contact, error + request_body=%active_request.request_body, + node=%active_request.contact, + error=?error, + "Failed RPC request", ); } } diff --git a/src/socket/filter/mod.rs b/src/socket/filter/mod.rs index 7bb54ad77..1a0ebd2d4 100644 --- a/src/socket/filter/mod.rs +++ b/src/socket/filter/mod.rs @@ -91,7 +91,7 @@ impl Filter { } if PERMIT_BAN_LIST.read().ban_ips.get(&src.ip()).is_some() { - debug!("Dropped unsolicited packet from banned src: {:?}", src); + debug!(?src, "Dropped unsolicited packet from banned src"); return false; } @@ -113,7 +113,7 @@ impl Filter { // Check rate limits if let Some(rate_limiter) = self.rate_limiter.as_mut() { if rate_limiter.allows(&LimitKind::Ip(src.ip())).is_err() { - warn!("Banning IP for excessive requests: {:?}", src.ip()); + warn!(ip=?src.ip(), "Banning IP for excessive requests"); // Ban the IP address let ban_timeout = self.ban_duration.map(|v| Instant::now() + v); PERMIT_BAN_LIST @@ -124,7 +124,7 @@ impl Filter { } if rate_limiter.allows(&LimitKind::Total).is_err() { - debug!("Dropped unsolicited packet from RPC limit: {:?}", src.ip()); + debug!(ip=?src.ip(), "Dropped unsolicited packet from RPC limit"); return false; } } @@ -148,8 +148,8 @@ impl Filter { .is_some() { debug!( - "Dropped unsolicited packet from banned node_id: {}", - node_address + node_id = %node_address, + "Dropped unsolicited packet from banned node_id", ); return false; } @@ -165,8 +165,8 @@ impl Filter { .is_err() { warn!( - "Node has exceeded its request limit and is now banned {}", - node_address.node_id + node_id = %node_address.node_id, + "Node has exceeded its request limit and is now banned", ); // The node is being banned @@ -211,7 +211,7 @@ impl Filter { }; if known_nodes >= max_nodes_per_ip { - warn!("IP has exceeded its node-id limit and is now banned {}", ip); + warn!(%ip, "IP has exceeded its node-id limit and is now banned"); // The node is being banned let ban_timeout = self.ban_duration.map(|v| Instant::now() + v); PERMIT_BAN_LIST.write().ban_ips.insert(ip, ban_timeout); diff --git a/src/socket/recv.rs b/src/socket/recv.rs index a300a09e0..a2a9f4869 100644 --- a/src/socket/recv.rs +++ b/src/socket/recv.rs @@ -165,7 +165,7 @@ impl RecvHandler { // based in additional checks to packets. if let SocketAddr::V6(ref mut v6_socket_addr) = src_address { if v6_socket_addr.flowinfo() != 0 || v6_socket_addr.scope_id() != 0 { - trace!("Zeroing out flowinfo and scope_id for v6 socket address. Original {v6_socket_addr}"); + trace!(original=%v6_socket_addr, "Zeroing out flowinfo and scope_id for v6 socket address"); v6_socket_addr.set_flowinfo(0); v6_socket_addr.set_scope_id(0); } @@ -177,7 +177,7 @@ impl RecvHandler { // Perform the first run of the filter. This checks for rate limits and black listed IP // addresses. if !permitted && !self.filter.initial_pass(&src_address) { - trace!("Packet filtered from source: {:?}", src_address); + trace!(?src_address, "Packet filtered from source"); return; } // Decodes the packet @@ -185,7 +185,7 @@ impl RecvHandler { match Packet::decode::

(&self.node_id, &recv_buffer[..length]) { Ok(p) => p, Err(e) => { - debug!("Packet decoding failed: {:?}", e); // could not decode the packet, drop it + debug!(error=?e, "Packet decoding failed"); // could not decode the packet, drop it return; } }; @@ -216,6 +216,6 @@ impl RecvHandler { self.handler .send(inbound) .await - .unwrap_or_else(|e| warn!("Could not send packet to handler: {}", e)); + .unwrap_or_else(|e| warn!(error=%e,"Could not send packet to handler")); } } diff --git a/src/socket/send.rs b/src/socket/send.rs index 31f52180d..7860eb9be 100644 --- a/src/socket/send.rs +++ b/src/socket/send.rs @@ -65,16 +65,20 @@ impl SendHandler { Some(packet) = self.handler_recv.recv() => { let encoded_packet = packet.packet.encode::

(&packet.node_address.node_id); if encoded_packet.len() > MAX_PACKET_SIZE { - warn!("Sending packet larger than max size: {} max: {}", encoded_packet.len(), MAX_PACKET_SIZE); + warn!( + size=encoded_packet.len(), + max=MAX_PACKET_SIZE, + "Sending packet larger than max size" + ); } let addr = &packet.node_address.socket_addr; if let Err(e) = self.send(&encoded_packet, addr).await { match e { Error::Io(e) => { - trace!("Could not send packet to {addr} . Error: {e}"); + trace!(%addr, error=%e, "Could not send packet."); }, Error::SocketMismatch => { - error!("Socket mismatch attempting to send a packet to {addr}.") + error!(%addr, "Socket mismatch attempting to send a packet.") } } } else {