Skip to content

trace_id and span_id are 0 with channel.consume() generator function #1432

@rayate2410

Description

@rayate2410

Describe your environment

localhost:~ # python3 --version
Python 3.6.15

localhost:~ # pip3 freeze | grep opentelemetry
opentelemetry-api==1.12.0
opentelemetry-instrumentation==0.33b0
opentelemetry-instrumentation-logging==0.33b0
opentelemetry-instrumentation-pika==0.33b0
opentelemetry-sdk==1.12.0
opentelemetry-semantic-conventions==0.33b0

Steps to reproduce

"""Sample RabbiMQ consumer application demonstrating tracing capabilities of using opentelemetry instrumentation libraries."""

import argparse
import logging
import pika
import subprocess
import sys

from opentelemetry import trace
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from opentelemetry.instrumentation.pika import PikaInstrumentor
from opentelemetry.sdk.trace import TracerProvider

LoggingInstrumentor().instrument(set_logging_format=False)
logging.basicConfig(filename='consumer.log',
                    filemode='w',
                    format='%(asctime)s - %(levelname)s - %(threadName)s - %(filename)s:%(lineno)s - %(funcName)s - [trace_id=%(otelTraceID)s span_id=%(otelSpanID)s resource.service.name=%(otelServiceName)s] - %(message)s',
                    datefmt='%H:%M:%S',
                    level=logging.INFO)
trace.set_tracer_provider(TracerProvider())

logger = logging.getLogger("Consumer_App")


# Callback function which is called on incoming messages
def callback(ch, method, properties, body):
    logger.info(" [x] Received Message: " + str(body.decode('utf-8')))
    logger.info(" [x] Header " + str(properties.headers))
    logger.info(" Exiting the process")
    sys.exit(0)

def main():
    logger.info("[START]: Consumer Application")

    parser = argparse.ArgumentParser()
    parser.add_argument("queue", help="queue name", type=str, default="testqueue")    
    args = parser.parse_args()

    # Connect to rabbitmq.
    url = 'amqp://admin:password@localhost:31302'
    params = pika.URLParameters(url)
    connection = pika.BlockingConnection(params)
    mychannel = connection.channel()  # start a channel
    pika_instrumentation = PikaInstrumentor()
    pika_instrumentation.instrument_channel(channel=mychannel)
    mychannel.queue_declare(queue=args.queue, )  # Declare a queue

    mychannel.basic_publish("", args.queue, "This is a test message published to RabbitMQ")

    # Below code with basic_consume and callback function - it does have trace id and span ids for log statements in callback function.
    # set up subscription on the queue
    # mychannel.basic_consume(args.queue, callback, auto_ack=True)

    # # start consuming (blocks)
    # mychannel.start_consuming()

    # I'm using channel.consume method to get the messages from the queue. And expecting the logs should have trace ids and span ids. 
    for method, properties, body in mychannel.consume(queue='testqueue', auto_ack=True, inactivity_timeout=10):
        logger.info(" [x] Received Message: " + str(body.decode('utf-8')))
        logger.info(" [x] Header " + str(properties.headers))
        logger.info(" Exiting the process")
        sys.exit(0)
        
    connection.close()

if __name__ == '__main__':
    main()

What is the expected behavior?
I'm using channel.consume method to get the messages from the queue. And expecting the logs should have trace ids and span ids.
If you see below, trace_id and span_id is available if I use basic_consume with callback.

02:49:21 - INFO - MainThread - TracingConsumer.py:27 - callback - [trace_id=0f0e9af13cd7618bddf6ecaf8bc8663a span_id=32921be241a75b82 resource.service.name=] -  [x] Received Message: Hi_from_publisher
02:49:21 - INFO - MainThread - TracingConsumer.py:28 - callback - [trace_id=0f0e9af13cd7618bddf6ecaf8bc8663a span_id=32921be241a75b82 resource.service.name=] -  [x] Header {'traceparent': '00-0f0e9af13cd7618bddf6ecaf8bc8663a-27c540a6ee6fe32f-01'}
02:49:21 - INFO - MainThread - TracingConsumer.py:29 - callback - [trace_id=0f0e9af13cd7618bddf6ecaf8bc8663a span_id=32921be241a75b82 resource.service.name=] -  Exiting the process

What is the actual behavior?
Actual behavior with channel.consume(...) where trace_id and span_id are 0. Although we can clearly see that the message header contains "traceparent" id.

15:58:43 - INFO - MainThread - TracingConsumerTest.py:60 - main - [trace_id=0 span_id=0 resource.service.name=] -  [x] Received Message: This is a test message published to RabbitMQ
15:58:43 - INFO - MainThread - TracingConsumerTest.py:61 - main - [trace_id=0 span_id=0 resource.service.name=] -  [x] Header {'traceparent': '00-9f016203a363df635b9f3d3c76775c92-d5afde2a271cfe41-01'}
15:58:43 - INFO - MainThread - TracingConsumerTest.py:62 - main - [trace_id=0 span_id=0 resource.service.name=] -  Exiting the process


Additional context
None

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions