Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[azureeventhubreceiver] Migrate to new AzureSDK #33134

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
81f0af3
remove continue from timestamp parse failure
nslaughter Feb 13, 2024
a82fee0
Merge branch 'main' into main
nslaughter Feb 14, 2024
ff6c517
add changelog entry
nslaughter Feb 14, 2024
58f987c
ignore ObservedTimestamp in test
nslaughter Feb 14, 2024
7eb742f
Merge branch 'main' into main
nslaughter Feb 14, 2024
948f4b7
Merge branch 'main' into main
nslaughter Feb 15, 2024
200e552
Merge branch 'main' into main
nslaughter Feb 15, 2024
93e588f
Merge branch 'main' into main
nslaughter Feb 15, 2024
4379aac
Merge branch 'main' into main
nslaughter Feb 15, 2024
1946a4b
Merge branch 'main' into main
nslaughter Feb 15, 2024
a686ed0
Merge branch 'main' into main
nslaughter Feb 16, 2024
47d3fd6
first cuts - not passing
nslaughter Apr 13, 2024
81112af
Update deps
nslaughter Apr 14, 2024
14edf92
sync
nslaughter Apr 15, 2024
4268aed
fill in helpers in eventhandler
nslaughter Apr 24, 2024
7781880
refact
nslaughter May 2, 2024
678084c
fix the signatures - interfaces
nslaughter May 2, 2024
7ad522e
add processor as a strategy
nslaughter May 10, 2024
fc80d31
guard nil checks
nslaughter May 10, 2024
e41b3ca
rework structure teeny tiny
nslaughter May 10, 2024
80cdcbe
fix panic
nslaughter May 20, 2024
3e0758d
rm azureventprocessor.go
nslaughter May 20, 2024
05a5515
Delete unused persister consumer
nslaughter May 20, 2024
35f9fef
time as string
nslaughter May 22, 2024
ca27e32
scratch change in azblob
nslaughter May 22, 2024
e785643
fix chlog
nslaughter May 22, 2024
7013455
delete commented out code
nslaughter May 23, 2024
4352622
wrap Collector storageClient
nslaughter May 23, 2024
9bf9c12
add debug for logs
nslaughter May 23, 2024
1320682
fix checkpoints - and test
nslaughter May 23, 2024
9c7fb13
add checkpoints
nslaughter May 23, 2024
74863cf
vet/fmt/goimports
nslaughter May 23, 2024
2b5c1e3
Merge branch 'main' into nslaughter/migrate-azure-sdk-for-eventhub
nslaughter Jun 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
guard nil checks
  • Loading branch information
nslaughter committed May 10, 2024
commit fc80d31b79d7e75519713ed0dd56285a5d4591a5
91 changes: 58 additions & 33 deletions receiver/azureeventhubreceiver/eventhubhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ package azureeventhubreceiver // import "github.com/open-telemetry/opentelemetry
import (
"context"
"errors"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
Expand Down Expand Up @@ -65,15 +66,16 @@ func (c *consumerClientWrapperImpl) Close(ctx context.Context) error {
return c.consumerClient.Close(ctx)
}

type processorHandler struct {
processor *azeventhubs.Processor
dataConsumer dataConsumer
config *Config
settings receiver.CreateSettings
cancel context.CancelFunc
}
// type processorHandler struct {
// processor *azeventhubs.Processor
// dataConsumer dataConsumer
// config *Config
// settings receiver.CreateSettings
// cancel context.CancelFunc
// }

type eventhubHandler struct {
processor *azeventhubs.Processor
consumerClient consumerClientWrapper
dataConsumer dataConsumer
config *Config
Expand All @@ -85,12 +87,15 @@ type eventhubHandler struct {
var _ eventHandler = (*eventhubHandler)(nil)

func newEventhubHandler(config *Config, settings receiver.CreateSettings) *eventhubHandler {
return &eventhubHandler{
eh := &eventhubHandler{
config: config,
settings: settings,
// TODO: this isn't where to put feature flags and default behaviors
useProcessor: false,
}
if err := eh.init(context.TODO()); err != nil {
panic(err)
}
return eh
}

func (h *eventhubHandler) init(ctx context.Context) error {
Expand Down Expand Up @@ -130,19 +135,23 @@ func (h *eventhubHandler) runWithProcessor(ctx context.Context) error {
}

func (h *eventhubHandler) dispatchPartitionClients(processor *azeventhubs.Processor) {
var wg sync.WaitGroup
for {
partitionClient := processor.NextPartitionClient(context.TODO())

if partitionClient == nil {
break
}

go func() {
if err := h.processEventsForPartition(partitionClient); err != nil {
wg.Add(1)
go func(pc *azeventhubs.ProcessorPartitionClient) {
defer wg.Done()
if err := h.processEventsForPartition(pc); err != nil {
h.settings.Logger.Error("Error processing partition", zap.Error(err))
}
}()
}(partitionClient)
}
wg.Wait()
}

func (h *eventhubHandler) processEventsForPartition(partitionClient *azeventhubs.ProcessorPartitionClient) error {
Expand Down Expand Up @@ -212,7 +221,14 @@ func (h *eventhubHandler) setupPartition(ctx context.Context, partitionID string
if err != nil {
return err
}
defer cc.Close(ctx)
if cc == nil {
return errors.New("failed to initialize consumer client")
}
defer func() {
if cc != nil {
cc.Close(ctx)
}
}()

pcOpts := &azeventhubs.PartitionClientOptions{
StartPosition: azeventhubs.StartPosition{
Expand All @@ -224,31 +240,40 @@ func (h *eventhubHandler) setupPartition(ctx context.Context, partitionID string
if err != nil {
return err
}
defer pc.Close(ctx)

go func() {
var wait = 1
for {
rcvCtx, _ := context.WithTimeout(context.TODO(), time.Second*10)
events, err := pc.ReceiveEvents(rcvCtx, h.config.BatchCount, nil)
if err != nil {
h.settings.Logger.Error("Error receiving event", zap.Error(err))
time.Sleep(time.Duration(wait) * time.Second)
wait *= 2
continue
}

for _, event := range events {
if err := h.newMessageHandler(ctx, event); err != nil {
h.settings.Logger.Error("Error handling event", zap.Error(err))
}
}
if pc == nil {
return errors.New("failed to initialize partition client")
}
defer func() {
if pc != nil {
pc.Close(ctx)
}
}()

go h.receivePartitionEvents(ctx, pc)

return nil
}

func (h *eventhubHandler) receivePartitionEvents(ctx context.Context, pc *azeventhubs.PartitionClient) {
var wait = 1
for {
rcvCtx, _ := context.WithTimeout(context.TODO(), time.Second*10)
events, err := pc.ReceiveEvents(rcvCtx, h.config.BatchCount, nil)
if err != nil {
h.settings.Logger.Error("Error receiving event", zap.Error(err))
time.Sleep(time.Duration(wait) * time.Second)
wait *= 2
continue
}

for _, event := range events {
if err := h.newMessageHandler(ctx, event); err != nil {
h.settings.Logger.Error("Error handling event", zap.Error(err))
}
}
}
}

func (h *eventhubHandler) newMessageHandler(ctx context.Context, event *azeventhubs.ReceivedEventData) error {
err := h.dataConsumer.consume(ctx, event)
if err != nil {
Expand Down
Loading