Skip to content

Commit

Permalink
Fix: no attribute '_SkyWalkingAgent__log_queue' using kafka plain text (
Browse files Browse the repository at this point in the history
  • Loading branch information
tsonglew authored Jun 29, 2024
1 parent 02dc53c commit 93dd9b3
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- **Tentative**: Set upper bound <=5.9.5 for psutil package due to test failure. (#326)
- Remove `DeprecationWarning` from `pkg_resources` by replace it with `importlib_metadata` (#329)
- Fix unexpected 'decode' AttributeError when MySQLdb module is mapped by PyMySQL (#336)
- Fix SkyWalking agent failed to start if using kafka protocol with sasl_mechanism=PLAIN. (#343)

### 1.0.1

Expand Down
30 changes: 20 additions & 10 deletions skywalking/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ def __init__(self):
def __bootstrap(self):
# when forking, already instrumented modules must not be instrumented again
# otherwise it will cause double instrumentation! (we should provide an un-instrument method)

# Initialize queues for segment, log, meter and profiling snapshots
self.__init_queues()

if config.agent_protocol == 'grpc':
from skywalking.agent.protocol.grpc import GrpcProtocol
self.__protocol = GrpcProtocol()
Expand All @@ -129,18 +133,29 @@ def __bootstrap(self):
from skywalking.agent.protocol.kafka import KafkaProtocol
self.__protocol = KafkaProtocol()

# Initialize queues for segment, log, meter and profiling snapshots
self.__segment_queue: Optional[Queue] = None
# Start reporter threads and register queues
self.__init_threading()

def __init_queues(self) -> None:
"""
This method initializes all the queues for the agent and reporters.
"""
self.__segment_queue = Queue(maxsize=config.agent_trace_reporter_max_buffer_size)
self.__log_queue: Optional[Queue] = None
self.__meter_queue: Optional[Queue] = None
self.__snapshot_queue: Optional[Queue] = None

# Start reporter threads and register queues
self.__init_threading()
if config.agent_meter_reporter_active:
self.__meter_queue = Queue(maxsize=config.agent_meter_reporter_max_buffer_size)
if config.agent_log_reporter_active:
self.__log_queue = Queue(maxsize=config.agent_log_reporter_max_buffer_size)
if config.agent_profile_active:
self.__snapshot_queue = Queue(maxsize=config.agent_profile_snapshot_transport_buffer_size)


def __init_threading(self) -> None:
"""
This method initializes all the queues and threads for the agent and reporters.
This method initializes all the threads for the agent and reporters.
Upon os.fork(), callback will reinitialize threads and queues by calling this method
Heartbeat thread is started by default.
Expand All @@ -152,12 +167,10 @@ def __init_threading(self) -> None:
__heartbeat_thread = Thread(name='HeartbeatThread', target=self.__heartbeat, daemon=True)
__heartbeat_thread.start()

self.__segment_queue = Queue(maxsize=config.agent_trace_reporter_max_buffer_size)
__segment_report_thread = Thread(name='SegmentReportThread', target=self.__report_segment, daemon=True)
__segment_report_thread.start()

if config.agent_meter_reporter_active:
self.__meter_queue = Queue(maxsize=config.agent_meter_reporter_max_buffer_size)
__meter_report_thread = Thread(name='MeterReportThread', target=self.__report_meter, daemon=True)
__meter_report_thread.start()

Expand All @@ -173,7 +186,6 @@ def __init_threading(self) -> None:
ThreadDataSource().register()

if config.agent_log_reporter_active:
self.__log_queue = Queue(maxsize=config.agent_log_reporter_max_buffer_size)
__log_report_thread = Thread(name='LogReportThread', target=self.__report_log, daemon=True)
__log_report_thread.start()

Expand All @@ -183,8 +195,6 @@ def __init_threading(self) -> None:
daemon=True)
__command_dispatch_thread.start()

self.__snapshot_queue = Queue(maxsize=config.agent_profile_snapshot_transport_buffer_size)

__query_profile_thread = Thread(name='QueryProfileCommandThread', target=self.__query_profile_command,
daemon=True)
__query_profile_thread.start()
Expand Down

0 comments on commit 93dd9b3

Please sign in to comment.