Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make memqueue.Broker internal (now memqueue.broker) #16667

Merged
merged 3 commits into from
Feb 27, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config)
}

queueFactory := func(e queue.Eventer) (queue.Queue, error) {
return memqueue.NewBroker(log,
return memqueue.NewQueue(log,
memqueue.Settings{
Eventer: e,
Events: 20,
Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/queue/memqueue/ackloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ package memqueue
// broker event loop.
// Producer ACKs are run in the ackLoop go-routine.
type ackLoop struct {
broker *Broker
broker *broker
sig chan batchAckMsg
lst chanList

Expand All @@ -36,7 +36,7 @@ type ackLoop struct {
processACK func(chanList, int)
}

func newACKLoop(b *Broker, processACK func(chanList, int)) *ackLoop {
func newACKLoop(b *broker, processACK func(chanList, int)) *ackLoop {
l := &ackLoop{broker: b}
l.processACK = processACK
return l
Expand Down
20 changes: 10 additions & 10 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var Feature = queue.Feature("mem",
feature.Stable),
)

type Broker struct {
type broker struct {
done chan struct{}

logger logger
Expand Down Expand Up @@ -97,21 +97,21 @@ func create(eventer queue.Eventer, logger *logp.Logger, cfg *common.Config) (que
logger = logp.L()
}

return NewBroker(logger, Settings{
return NewQueue(logger, Settings{
Eventer: eventer,
Events: config.Events,
FlushMinEvents: config.FlushMinEvents,
FlushTimeout: config.FlushTimeout,
}), nil
}

// NewBroker creates a new broker based in-memory queue holding up to sz number of events.
// NewQueue creates a new broker based in-memory queue holding up to sz number of events.
// If waitOnClose is set to true, the broker will block on Close, until all internal
// workers handling incoming messages and ACKs have been shut down.
func NewBroker(
func NewQueue(
logger logger,
settings Settings,
) *Broker {
) queue.Queue {
// define internal channel size for producer/client requests
// to the broker
chanSize := 20
Expand All @@ -137,7 +137,7 @@ func NewBroker(
logger = logp.NewLogger("memqueue")
}

b := &Broker{
b := &broker{
done: make(chan struct{}),
logger: logger,

Expand Down Expand Up @@ -182,25 +182,25 @@ func NewBroker(
return b
}

func (b *Broker) Close() error {
func (b *broker) Close() error {
close(b.done)
if b.waitOnClose {
b.wg.Wait()
}
return nil
}

func (b *Broker) BufferConfig() queue.BufferConfig {
func (b *broker) BufferConfig() queue.BufferConfig {
return queue.BufferConfig{
Events: b.bufSize,
}
}

func (b *Broker) Producer(cfg queue.ProducerConfig) queue.Producer {
func (b *broker) Producer(cfg queue.ProducerConfig) queue.Producer {
return newProducer(b, cfg.ACK, cfg.OnDrop, cfg.DropOnCancel)
}

func (b *Broker) Consumer() queue.Consumer {
func (b *broker) Consumer() queue.Consumer {
return newConsumer(b)
}

Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/queue/memqueue/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

type consumer struct {
broker *Broker
broker *broker
resp chan getResponse

done chan struct{}
Expand All @@ -49,7 +49,7 @@ const (
batchACK
)

func newConsumer(b *Broker) *consumer {
func newConsumer(b *broker) *consumer {
return &consumer{
broker: b,
resp: make(chan getResponse),
Expand Down
8 changes: 4 additions & 4 deletions libbeat/publisher/queue/memqueue/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// directEventLoop implements the broker main event loop. It buffers events,
// but tries to forward events as early as possible.
type directEventLoop struct {
broker *Broker
broker *broker

buf ringBuffer

Expand All @@ -45,7 +45,7 @@ type directEventLoop struct {
// bufferingEventLoop implements the broker main event loop.
// Events in the buffer are forwarded to consumers only if the buffer is full or on flush timeout.
type bufferingEventLoop struct {
broker *Broker
broker *broker

buf *batchBuffer
flushList flushList
Expand Down Expand Up @@ -77,7 +77,7 @@ type flushList struct {
count int
}

func newDirectEventLoop(b *Broker, size int) *directEventLoop {
func newDirectEventLoop(b *broker, size int) *directEventLoop {
l := &directEventLoop{
broker: b,
events: b.events,
Expand Down Expand Up @@ -285,7 +285,7 @@ func (l *directEventLoop) processACK(lst chanList, N int) {
}
}

func newBufferingEventLoop(b *Broker, size int, minEvents int, flushTimeout time.Duration) *bufferingEventLoop {
func newBufferingEventLoop(b *broker, size int, minEvents int, flushTimeout time.Duration) *bufferingEventLoop {
l := &bufferingEventLoop{
broker: b,
maxEvents: size,
Expand Down
6 changes: 3 additions & 3 deletions libbeat/publisher/queue/memqueue/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import (
)

type forgetfulProducer struct {
broker *Broker
broker *broker
openState openState
}

type ackProducer struct {
broker *Broker
broker *broker
cancel bool
seq uint32
state produceState
Expand All @@ -53,7 +53,7 @@ type produceState struct {

type ackHandler func(count int)

func newProducer(b *Broker, cb ackHandler, dropCB func(beat.Event), dropOnCancel bool) queue.Producer {
func newProducer(b *broker, cb ackHandler, dropCB func(beat.Event), dropOnCancel bool) queue.Producer {
openState := openState{
log: b.logger,
isOpen: atomic.MakeBool(true),
Expand Down