Skip to content

Commit

Permalink
Use Buffered channels to avoid blocking on callback (apache#5336)
Browse files Browse the repository at this point in the history
  • Loading branch information
hrsakai authored and merlimat committed Oct 9, 2019
1 parent db15d32 commit 5df6488
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 14 deletions.
14 changes: 7 additions & 7 deletions pulsar-client-go/pulsar/c_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (client *client) CreateProducer(options ProducerOptions) (Producer, error)
c := make(chan struct {
Producer
error
})
}, 1)

client.CreateProducerAsync(options, nil, func(producer Producer, err error) {
c <- struct {
Expand All @@ -214,7 +214,7 @@ func (client *client) CreateProducerWithSchema(options ProducerOptions, schema S
c := make(chan struct {
Producer
error
})
}, 1)

client.CreateProducerAsync(options, schema, func(producer Producer, err error) {
c <- struct {
Expand All @@ -236,7 +236,7 @@ func (client *client) Subscribe(options ConsumerOptions) (Consumer, error) {
c := make(chan struct {
Consumer
error
})
}, 1)

client.SubscribeAsync(options, nil, func(consumer Consumer, err error) {
c <- struct {
Expand All @@ -254,7 +254,7 @@ func (client *client) SubscribeWithSchema(options ConsumerOptions, schema Schema
c := make(chan struct {
Consumer
error
})
}, 1)

client.SubscribeAsync(options, schema, func(consumer Consumer, err error) {
c <- struct {
Expand All @@ -276,7 +276,7 @@ func (client *client) CreateReader(options ReaderOptions) (Reader, error) {
c := make(chan struct {
Reader
error
})
}, 1)

client.CreateReaderAsync(options, nil, func(reader Reader, err error) {
c <- struct {
Expand All @@ -294,7 +294,7 @@ func (client *client) CreateReaderWithSchema(options ReaderOptions, schema Schem
c := make(chan struct {
Reader
error
})
}, 1)

client.CreateReaderAsync(options, schema, func(reader Reader, err error) {
c <- struct {
Expand Down Expand Up @@ -335,7 +335,7 @@ func (client *client) TopicPartitions(topic string) ([]string, error) {
c := make(chan struct {
partitions []string
err error
})
}, 1)

topicPartitionsAsync(client, topic, func(partitions []string, err error) {
c <- struct {
Expand Down
6 changes: 3 additions & 3 deletions pulsar-client-go/pulsar/c_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (c *consumer) Subscription() string {
}

func (c *consumer) Unsubscribe() error {
channel := make(chan error)
channel := make(chan error, 1)
c.UnsubscribeAsync(func(err error) {
channel <- err
close(channel)
Expand Down Expand Up @@ -320,7 +320,7 @@ func (c *consumer) NackID(msgId MessageID) error {
}

func (c *consumer) Close() error {
channel := make(chan error)
channel := make(chan error, 1)
c.CloseAsync(func(err error) { channel <- err; close(channel) })
return <-channel
}
Expand Down Expand Up @@ -349,7 +349,7 @@ func (c *consumer) RedeliverUnackedMessages() {
}

func (c *consumer) Seek(msgID MessageID) error {
channel := make(chan error)
channel := make(chan error, 1)
c.SeekAsync(msgID, func(err error) {
channel <- err
close(channel)
Expand Down
6 changes: 3 additions & 3 deletions pulsar-client-go/pulsar/c_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (p *producer) LastSequenceID() int64 {
}

func (p *producer) Send(ctx context.Context, msg ProducerMessage) error {
c := make(chan error)
c := make(chan error, 1)
p.SendAsync(ctx, msg, func(msg ProducerMessage, err error) { c <- err; close(c) })

select {
Expand Down Expand Up @@ -283,7 +283,7 @@ func (p *producer) SendAsync(ctx context.Context, msg ProducerMessage, callback
}

func (p *producer) Close() error {
c := make(chan error)
c := make(chan error, 1)
p.CloseAsync(func(err error) { c <- err; close(c) })
return <-c
}
Expand All @@ -304,7 +304,7 @@ func pulsarProducerCloseCallbackProxy(res C.pulsar_result, ctx unsafe.Pointer) {
}

func (p *producer) Flush() error {
f := make(chan error)
f := make(chan error, 1)
p.FlushAsync(func(err error) {
f <- err
close(f)
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-go/pulsar/c_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (r *reader) HasNext() (bool, error) {
}

func (r *reader) Close() error {
channel := make(chan error)
channel := make(chan error, 1)
r.CloseAsync(func(err error) { channel <- err; close(channel) })
return <-channel
}
Expand Down

0 comments on commit 5df6488

Please sign in to comment.