Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exporter/Internal/queue size_channel.pop() is blocked even when queue is full #11015

Closed
timannguyen opened this issue Aug 29, 2024 · 3 comments
Closed
Labels
bug Something isn't working

Comments

@timannguyen
Copy link
Contributor

Describe the bug

size_channel.pop() is blocking when queue is full. This looks to happen when there are high number of connections and high throughput.

Steps to reproduce

TBA. will be writing a test to replicate

What did you expect to see?

size_channel.pop() should receive data and returns when queue is not empty.

What did you see instead?

size_channel.pop() is blocked indefinitely even if queue is full.

What version did you use?

v0.103.0

What config did you use?

settings:
  connectors:
    routing/system:
      error_mode: ignore
      table:
        - pipelines:
            - logs/system_telemetry.routing/system
          statement: route() where attributes["Into"] == ".splunk.edge.system.telemetry_stream_dataset"
        - pipelines:
            - logs/dd_3cfb1516-3b4d-472c-b065-7c739f5f8144.routing/system
          statement: route() where attributes["Into"] == ".splunk.edge.system.unhandled"
  exporters:
    S2S/shared.pipelines.default_splunk_cloud_destination:
      ackEnabled: false
      caCertificate: '[redacted]'
      clientCertificate: '[redacted]'
      clientPrivateKey: '[redacted]'
      compressionLevel: 0
      connectTimeoutMillis: 5000
      connection: scpbridge
      datasetName: shared.pipelines.default_splunk_cloud_destination
      disablePersistentQueue: false
      endpoints: '[redacted]'
      handshakeMaxRetries: 3
      handshakeTimeoutMillis: 1000
      heartbeatIntervalMillis: 30000
      internalName: shared.pipelines.default_splunk_cloud_destination
      retryInitialIntervalMillis: 5000
      retryMaxElapsedTimeMillis: 0
      retryMaxIntervalMillis: 30000
      s2sConnectionTtlDuration: 30s
      s2sConnectionsPerTarget: 0
      s2sFreezeDuration: 10m0s
      sendQueueNumConsumers: 10
      sendQueueSize: 10000
      sendTimeoutMillis: 5000
      serviceInstanceID: a7bc7b90-609e-11ef-aac8-002248496f87
    S3/acies_logs:
      awsRegion: us-east-1
      dataFormat: hecJson
      datasetName: product-telemetry
      env: production
      flush_timeout: 1h
      s3Telemetry:
        scpToken:
          accessToken: '[redacted]'
          servicePrincipalID: aa0166c75eef6ca6a30012495547
        urlGetterType: pt-telemetry
        usePresignedUrl: true
      sendBatchSize: 10000
      sending_queue:
        enabled: true
        num_consumers: 10
        queue_size: 10000
        storage: null
      serviceInstanceID: a7bc7b90-609e-11ef-aac8-002248496f87
      timeout: 30s
    file:
      compression: zstd
      path: /opt/splunk-edge/var/log/edge-metrics.json.zst
      rotation:
        max_backups: 100
        max_megabytes: 100
    nop: {}
    nop/.system.devnull:
      datasetName: .system.devnull
      kind: devnull
      serviceInstanceID: a7bc7b90-609e-11ef-aac8-002248496f87
    splunk_hec/pt:
      endpoint: '[redacted]'
      hec_metadata_to_otel_attrs:
        host: host.name
        index: com.splunk.index
        source: com.splunk.source
        sourcetype: com.splunk.sourcetype
      index: edge_processor_metrics
      retry_on_failure:
        enabled: true
        initial_interval: 10ms
        max_elapsed_time: "0"
        max_interval: 30s
      sending_queue:
        enabled: true
        num_consumers: 10
        queue_size: 10000
        storage: file_storage/data
      source: 3cfb1516-3b4d-472c-b065-7c739f5f8144
      splunk_app_name: a7bc7b90-609e-11ef-aac8-002248496f87
      token: '[redacted]'
      use_multi_metric_format: true
  extensions:
    file_storage/data:
      compaction:
        directory: /tmp/
        on_start: true
      directory: /opt/splunk-edge/var/data/edge
      timeout: 5s
    file_storage/logs:
      compaction:
        directory: /tmp/
        on_start: true
      directory: /opt/splunk-edge/var/log
      timeout: 5s
    hecauth:
      tokens:
        - f541f5c6-b213-4445-a80b-32300652eca5
    loopback: null
    pprof:
      block_profile_fraction: 1000
      mutex_profile_fraction: 1000
  processors:
    attributes/metrics:
      actions:
        - action: insert
          key: host.name
          value: TestSplunkENDon
        - action: insert
          key: service.instance.id
          value: a7bc7b90-609e-11ef-aac8-002248496f87
        - action: insert
          key: processor.id
          value: 3cfb1516-3b4d-472c-b065-7c739f5f8144
    batch:
      send_batch_max_size: 128
      send_batch_size: 128
      timeout: 10ms
    batch/S3_shared.pipelines.default_splunk_cloud_destination_telemetry:
      send_batch_max_size: 10000
      send_batch_size: 10000
      timeout: 1h
    hec_metadata_processor:
      tokensV2:
        - id: f541f5c6-b213-4445-a80b-32300652eca5
    resource/3cfb1516-3b4d-472c-b065-7c739f5f8144.hecreceiver:
      attributes:
        - action: upsert
          key: com.splunk.datasetName
          value: 3cfb1516-3b4d-472c-b065-7c739f5f8144.hecreceiver
        - action: upsert
          key: com.splunk.datasetKind
          value: hecreceiver
    resource/metrics:
      attributes:
        - action: upsert
          key: com.splunk.index
          value: _metrics
        - action: upsert
          key: com.splunk.sourcetype
          value: edge-metrics
        - action: upsert
          key: com.splunk.source
          value: edge
        - action: upsert
          key: host.name
          value: TestSplunkENDon
    routing:
      attribute_source: resource
      drop_resource_routing_attribute: true
      from_attribute: Into
      table:
        - exporters:
            - nop/.system.devnull
          value: .system.devnull
        - exporters:
            - S2S/shared.pipelines.default_splunk_cloud_destination
          value: shared.pipelines.default_splunk_cloud_destination
        - exporters:
            - S2S/shared.pipelines.default_splunk_cloud_destination
          value: internalInfoExporters
    splunk_acies/dd_3cfb1516-3b4d-472c-b065-7c739f5f8144:
      functions:
        - Args:
            datasource:
              - Args: []
                Type: CONST
                Value:
                  aboolean: false
                  acontainer: null
                  adouble: 0
                  along: 0
                  astring: .splunk.edge.system.unhandled
                  type: STRING
          DispatchIndex: 0
          Function: from
          Next:
            - 1
        - Args:
            target:
              - Args: []
                Type: CONST
                Value:
                  aboolean: false
                  acontainer: null
                  adouble: 0
                  along: 0
                  astring: shared.pipelines.default_splunk_cloud_destination
                  type: STRING
          DispatchIndex: 1
          Function: into
          Next: []
      instanceId: a7bc7b90-609e-11ef-aac8-002248496f87
      recordSchema:
        - Index: 0
          Name: host
        - Index: 2
          Name: sourcetype
        - Index: 1
          Name: source
      sinkDatasetNameToKind:
        .splunk.edge.system.unhandled: stream
        .system.devnull: devnull
        shared.pipelines.default_splunk_cloud_destination: indexer
    splunk_acies/system_pipeline:
      functions:
        - Args:
            datasource:
              - Args: []
                Type: CONST
                Value:
                  aboolean: false
                  acontainer: null
                  adouble: 0
                  along: 0
                  astring: all_data_ready
                  type: STRING
              - Args: null
                Type: CONST
                Value:
                  aboolean: false
                  acontainer: null
                  adouble: 0
                  along: 0
                  astring: 3cfb1516-3b4d-472c-b065-7c739f5f8144.hecreceiver
                  type: STRING
          DispatchIndex: 0
          Function: from
          Next:
            - 1
        - Args:
            target:
              - Args: []
                Type: CONST
                Value:
                  aboolean: false
                  acontainer: null
                  adouble: 0
                  along: 0
                  astring: .splunk.edge.system.unhandled
                  type: STRING
          DispatchIndex: 1
          Function: into
          Next: []
      instanceId: a7bc7b90-609e-11ef-aac8-002248496f87
      recordSchema:
        - Index: 0
          Name: host
        - Index: 2
          Name: sourcetype
        - Index: 1
          Name: source
      sinkDatasetNameToKind:
        .splunk.edge.system.unhandled: stream
        .system.devnull: devnull
        shared.pipelines.default_splunk_cloud_destination: indexer
    splunk_acies/system_telemetry:
      functions:
        - Args:
            datasource:
              - Args: []
                Type: CONST
                Value:
                  aboolean: false
                  acontainer: null
                  adouble: 0
                  along: 0
                  astring: .splunk.edge.system.telemetry_stream_dataset
                  type: STRING
          DispatchIndex: 0
          Function: from
          Next:
            - 1
        - Args:
            target:
              - Args: []
                Type: CONST
                Value:
                  aboolean: false
                  acontainer: null
                  adouble: 0
                  along: 0
                  astring: shared.pipelines.default_splunk_cloud_destination
                  type: STRING
          DispatchIndex: 1
          Function: into
          Next: []
      instanceId: a7bc7b90-609e-11ef-aac8-002248496f87
      recordSchema:
        - Index: 0
          Name: host
        - Index: 2
          Name: sourcetype
        - Index: 1
          Name: source
      sinkDatasetNameToKind:
        .splunk.edge.system.unhandled: stream
        .system.devnull: devnull
        shared.pipelines.default_splunk_cloud_destination: indexer
    splunk_acies_internal_metric: {}
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: access_combined_wcookie
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: access_common
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: asterisk_cdr
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: asterisk_event
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: asterisk_messages
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: asterisk_queue
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: cisco_syslog
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: true
          sourceType: db2_diag
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: exim_main
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: exim_reject
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: linux_messages_syslog
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: linux_secure
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: \d\d?:\d\d:\d\d
          shouldLineMerge: true
          sourceType: log4j
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ^\d{6}\s
          shouldLineMerge: true
          sourceType: mysqld_error
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ^\d{6}\s
          shouldLineMerge: true
          sourceType: mysqld
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: postfix_syslog
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: sendmail_syslog
          truncate: 10000
        - lineBreaker: ""
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: sugarcrm_log4php
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: true
          sourceType: weblogic_stdout
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ^-----
          shouldLineMerge: true
          sourceType: websphere_activity
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ^NULL\s
          shouldLineMerge: true
          sourceType: websphere_core
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: websphere_trlog_syserr
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: websphere_trlog_sysout
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: windows_snare_syslog
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: DhcpSrvLog
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: \d+\/\d+/\d+\s+\d+:\d+:\d+\s+(AM|PM)
          shouldLineMerge: true
          sourceType: WinEventLog:Application
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: \d+\/\d+/\d+\s+\d+:\d+:\d+\s+(AM|PM)
          shouldLineMerge: true
          sourceType: WinEventLog:Security
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: cisco:asa
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: panos
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: true
          sourceType: custom:events
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: true
          sourceType: syslog
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 10000
          mustBreakBefore: ""
          shouldLineMerge: true
          sourceType: aws:cloudwatchlogs:vpcflow
          truncate: 10000
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ""
          shouldLineMerge: false
          sourceType: access_combined
          truncate: 0
        - lineBreaker: ([\r\n]+)
          maxEvents: 256
          mustBreakBefore: ^\[
          shouldLineMerge: true
          sourceType: apache_error
          truncate: 0
  receivers:
    S2S/all_data_ready:
      caCertificate: '[redacted]'
      datasetName: all_data_ready
      maxChannels: 300
      port: :9997
      processorId: 3cfb1516-3b4d-472c-b065-7c739f5f8144
      serverCertificate: '[redacted]'
      serverPrivateKey: '[redacted]'
      serviceInstanceID: a7bc7b90-609e-11ef-aac8-002248496f87
    filelog/acies:
      attributes:
        processor_id: 3cfb1516-3b4d-472c-b065-7c739f5f8144
        service_instance_id: a7bc7b90-609e-11ef-aac8-002248496f87
      include:
        - /opt/splunk-edge/var/log/edge.log
      operators:
        - if: 'body contains ''"time":'' '
          regex: ("time":"(?P<timestamp>[\d]{4}-[\d]{2}-[\d]{2}T[\d]{2}:[\d]{2}:[\d]{2}\.?[\dZ]*)")
          timestamp:
            layout: '%Y-%m-%dT%H:%M:%S.%LZ'
            layout_type: strptime
            parse_from: attributes.timestamp
          type: regex_parser
      resource:
        com.splunk.index: _internal
        com.splunk.source: /opt/splunk-edge/var/log/edge.log
        com.splunk.sourcetype: edge-log
      start_at: beginning
      storage: file_storage/logs
    filelog/supervisor:
      attributes:
        processor_id: 3cfb1516-3b4d-472c-b065-7c739f5f8144
        service_instance_id: a7bc7b90-609e-11ef-aac8-002248496f87
      include:
        - /opt/splunk-edge/var/log/supervisor.log
      operators:
        - regex: (?P<timestamp>^[^\s]*\s[^\s]*)
          timestamp:
            layout: '%Y/%m/%d %H:%M:%S'
            layout_type: strptime
            parse_from: attributes.timestamp
          type: regex_parser
      resource:
        com.splunk.index: _internal
        com.splunk.source: /opt/splunk-edge/var/log/supervisor.log
        com.splunk.sourcetype: edge-log
      start_at: beginning
      storage: file_storage/logs
    hostmetrics:
      collection_interval: 30s
      scrapers:
        cpu:
          metrics:
            system.cpu.frequency:
              enabled: true
            system.cpu.logical.count:
              enabled: true
            system.cpu.physical.count:
              enabled: true
            system.cpu.utilization:
              enabled: true
        filesystem: null
        load:
          metrics:
            system.cpu.load_average.5m:
              enabled: true
        memory:
          metrics:
            system.linux.memory.available:
              enabled: true
            system.memory.limit:
              enabled: true
            system.memory.utilization:
              enabled: true
        network: null
    hostmetrics/pt:
      collection_interval: 10m0s
      scrapers:
        cpu: null
        filesystem: null
        memory: null
        network: null
    prometheus/edge:
      config:
        scrape_configs:
          - job_name: introspection
            scrape_interval: 30s
            static_configs:
              - targets:
                  - 0.0.0.0:8888
    prometheus/pt:
      config:
        scrape_configs:
          - job_name: introspection
            scrape_interval: 10m0s
            static_configs:
              - targets:
                  - 0.0.0.0:8888
    splunk_hec/3cfb1516-3b4d-472c-b065-7c739f5f8144.hecreceiver:
      access_token_passthrough: true
      auth:
        authenticator: hecauth
      endpoint: :8088
      hec_metadata_to_otel_attrs:
        host: host.name
        index: com.splunk.index
        source: com.splunk.source
        sourcetype: com.splunk.sourcetype
      splitting: none
  service:
    extensions:
      - pprof
      - file_storage/logs
      - file_storage/data
      - hecauth
    pipelines:
      logs/acies:
        exporters:
          - S2S/shared.pipelines.default_splunk_cloud_destination
        processors:
          - batch
        receivers:
          - filelog/acies
          - filelog/supervisor
      logs/acies_s3:
        exporters:
          - S3/acies_logs
        processors:
          - batch/S3_shared.pipelines.default_splunk_cloud_destination_telemetry
        receivers:
          - filelog/acies
      logs/dd_3cfb1516-3b4d-472c-b065-7c739f5f8144.routing/system:
        exporters:
          - S2S/shared.pipelines.default_splunk_cloud_destination
        processors:
          - batch
          - splunk_acies/dd_3cfb1516-3b4d-472c-b065-7c739f5f8144
          - routing
        receivers:
          - routing/system
      logs/system_pipeline.S2S/all_data_ready:
        exporters:
          - routing/system
        processors:
          - splunk_acies/system_pipeline
        receivers:
          - S2S/all_data_ready
      logs/system_pipeline.splunk_hec/3cfb1516-3b4d-472c-b065-7c739f5f8144.hecreceiver:
        exporters:
          - routing/system
        processors:
          - resource/3cfb1516-3b4d-472c-b065-7c739f5f8144.hecreceiver
          - splunk_acies/system_pipeline
        receivers:
          - splunk_hec/3cfb1516-3b4d-472c-b065-7c739f5f8144.hecreceiver
      logs/system_telemetry.routing/system:
        exporters:
          - S2S/shared.pipelines.default_splunk_cloud_destination
        processors:
          - batch
          - splunk_acies/system_telemetry
          - routing
        receivers:
          - routing/system
      metrics/edge:
        exporters:
          - S2S/shared.pipelines.default_splunk_cloud_destination
          - file
        processors:
          - splunk_acies_internal_metric
          - resource/metrics
          - attributes/metrics
          - batch
        receivers:
          - prometheus/edge
          - hostmetrics
      metrics/pt:
        exporters:
          - splunk_hec/pt
        processors:
          - attributes/metrics
          - batch
        receivers:
          - prometheus/pt
          - hostmetrics/pt
    telemetry:
      logs:
        encoding: json
        level: info
      metrics:
        address: localhost:8888
      resource:
        host.name: TestSplunkENDon
        processor.id: 3cfb1516-3b4d-472c-b065-7c739f5f8144
        service.instance.id: a7bc7b90-609e-11ef-aac8-002248496f87
        service.name: edge

Environment

Linux

go 1.21.0

Additional context

  • persistent queue file is full
  • 20+ tcp connections for S2S/all_data_ready receiver
@timannguyen timannguyen added the bug Something isn't working label Aug 29, 2024
@timannguyen
Copy link
Contributor Author

timannguyen commented Sep 4, 2024

adding test to replicate main...timannguyen:opentelemetry-collector:test-pq-concurrency

issue where
- higher throughput with same goroutines for producers consumers
- same throughput with higher groutines for producers and default consumers of 10

doesnt work. trying to recreate tests

@timannguyen
Copy link
Contributor Author

updated main...timannguyen:opentelemetry-collector:test-pq-concurrency

issue where:

  • queue is smaller than throughput
  • queue size is full at some point
  • queue consumes at a certain point but stop
  • retries until indefinitely

let me know if there is any issue with the test

@sfc-gh-sili
Copy link
Contributor

@timannguyen
Hi Tim, thanks for putting together a test! I was taking at a stab at this issue since I am recently working on exporter queues.

I noticed that in your test, ps.Offer seems to be executed only once when we enter the for loop (line 565 in the attached snapshot). I am assuming this is not intentional?
Screenshot 2024-09-04 at 19 20 02

dmitryax added a commit that referenced this issue Sep 6, 2024
#### Description
This change fixes a potential deadlock bug for persistent queue.

There is a race condition in persistent queue that caused `used` in
`sizedChannel` to become out of sync with `ch` len. This causes `Offer`
to be deadlocked in specific race condition. For example:
1. Multiple consumers are calling Consume
2. Multiple producers are calling Offer to insert into the queue
  a. All elements are taken from consumers. ch is empty
3. One consumer completes consume, calls onProcessingFinished
a. Inside sizedChannel, syncSize is invoked, used is reset to 0 when
other consumers are still waiting for lock to consume
4. More Offer is called inserting elements -> used and ch len should
equal
5. As step 3a consumers completes, used is decreased -> used is lower
than ch len
a. More Offer is called inserting since used is below capacity. however,
ch is full.
b. goroutine calling offer is holding the mutex but can’t release it as
ch is full.
c. no consumer can acquire mutex to complete previous
onProcessingFinished

This change returns an error if channel is full instead of waiting for
it to unblock.

#### Link to tracking issue
Fixes #
#11015

#### Testing
- Added concurrent test in persistent queue that can reproduce the
problem(note: need to re-run it 100 times as the race condition is not
consistent).
- Added unit test for sizedChannel

#### Documentation
Added comment in the block explaining it

---------

Co-authored-by: Dmitrii Anoshin <anoshindx@gmail.com>
@dmitryax dmitryax closed this as completed Sep 6, 2024
splunkericl added a commit to splunkericl/opentelemetry-collector that referenced this issue Sep 6, 2024
…1063)

#### Description
This change fixes a potential deadlock bug for persistent queue.

There is a race condition in persistent queue that caused `used` in
`sizedChannel` to become out of sync with `ch` len. This causes `Offer`
to be deadlocked in specific race condition. For example:
1. Multiple consumers are calling Consume
2. Multiple producers are calling Offer to insert into the queue
  a. All elements are taken from consumers. ch is empty
3. One consumer completes consume, calls onProcessingFinished
a. Inside sizedChannel, syncSize is invoked, used is reset to 0 when
other consumers are still waiting for lock to consume
4. More Offer is called inserting elements -> used and ch len should
equal
5. As step 3a consumers completes, used is decreased -> used is lower
than ch len
a. More Offer is called inserting since used is below capacity. however,
ch is full.
b. goroutine calling offer is holding the mutex but can’t release it as
ch is full.
c. no consumer can acquire mutex to complete previous
onProcessingFinished

This change returns an error if channel is full instead of waiting for
it to unblock.

#### Link to tracking issue
Fixes #
open-telemetry#11015

#### Testing
- Added concurrent test in persistent queue that can reproduce the
problem(note: need to re-run it 100 times as the race condition is not
consistent).
- Added unit test for sizedChannel

#### Documentation
Added comment in the block explaining it

---------

Co-authored-by: Dmitrii Anoshin <anoshindx@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants