From 7294b58d182458b69cf1e5d9d9a3d7fa18503111 Mon Sep 17 00:00:00 2001 From: axfor Date: Sun, 19 Mar 2023 01:56:53 +0800 Subject: [PATCH] fixed some err of integration test Signed-off-by: axfor --- cmd/ingester/app/consumer/consumer.go | 5 +++++ plugin/storage/integration/kafka_test.go | 3 +++ 2 files changed, 8 insertions(+) diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index 6c43f84b5cc..c57a86c0f5b 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -77,7 +77,11 @@ func (c *Consumer) Start() { ctx, cancel := context.WithCancel(context.Background()) c.cancel = cancel c.doneWg.Add(1) + + var firstStart sync.WaitGroup + firstStart.Add(1) go func() { + firstStart.Done() defer c.doneWg.Done() for { select { @@ -92,6 +96,7 @@ func (c *Consumer) Start() { } } }() + firstStart.Wait() } // Close closes the Consumer and underlying sarama consumer diff --git a/plugin/storage/integration/kafka_test.go b/plugin/storage/integration/kafka_test.go index 29f87a89545..7b019d7856f 100644 --- a/plugin/storage/integration/kafka_test.go +++ b/plugin/storage/integration/kafka_test.go @@ -99,6 +99,9 @@ func (s *KafkaIntegrationTestSuite) initialize() error { } spanConsumer.Start() + //wait consumer running + time.Sleep(time.Second * 5) + s.SpanWriter = spanWriter s.SpanReader = &ingester{traceStore} s.Refresh = func() error { return nil }