-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ingester Consumer #942
Ingester Consumer #942
Conversation
2f2d3a2
to
b5716d7
Compare
b5716d7
to
548097c
Compare
548097c
to
8d73c56
Compare
8d73c56
to
346630d
Compare
Codecov Report
@@ Coverage Diff @@
## master #942 +/- ##
=====================================
Coverage 100% 100%
=====================================
Files 138 138
Lines 6343 6362 +19
=====================================
+ Hits 6343 6362 +19
Continue to review full report at Codecov.
|
ce5ea8a
to
3ddb0ce
Compare
config.ConsumerBuilder | ||
} | ||
|
||
// Consumer uses sarama to consume messages from kafka and handle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this sentence doesn't sound right
pkg/kafka/config/config.go
Outdated
type Configuration struct { | ||
Brokers []string | ||
// Consumer is an interface to features of Sarama that are necessary for the consumer | ||
type Consumer interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a big fan of having the interface definition inside the config file. Having said that, I'm not a big fan of it's original location either. Keep this here for now but try to come up with a better location to define this.
pkg/kafka/config/config.go
Outdated
@@ -15,22 +15,52 @@ | |||
package config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we split this file into consumer.go and producer.go?
pkg/kafka/config/config.go
Outdated
} | ||
|
||
// ProducerBuilder builds a new kafka producer | ||
type ProducerBuilder interface { | ||
NewProducer() (sarama.AsyncProducer, error) | ||
} | ||
|
||
// ConsumerBuilder builds a new kafka producer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/producer/consumer
pkg/kafka/config/config.go
Outdated
Brokers []string | ||
Topic string | ||
GroupID string | ||
Consumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe I'm missing something but I don't see need to embed the Consumer interface in this config
cmd/ingester/app/consumer/options.go
Outdated
) | ||
|
||
const ( | ||
configPrefix = "ingester-consumer" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this variable? I know all our other configs follow this format but I don't see a reason to do so here. Do you foresee a need for us to consume from different kafka topics? This might be more a question for Prithvi
io.Closer | ||
// New is a constructor for a Consumer | ||
func New(params Params) (*Consumer, error) { | ||
saramaConsumer, err := params.ConsumerBuilder.NewConsumer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does params.NewConsumer()
not work?
e61f47c
to
d1a1ca0
Compare
pkg/kafka/config/consumer/config.go
Outdated
@@ -0,0 +1,48 @@ | |||
// Copyright (c) 2018 The Jaeger Authors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mb, I meant pkg/kafka/config/consumer.go
and pkg/kafka/config/producer.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, keep as is, I prefer producer.Configuration
over config.ProducerConfiguration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I actually did it the first way, then noticed that it would be inconsistent with ES and Cassandra and changed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need the path of this file to be pkg/kafka/config/consumer/config.go
; having this be pkg/kafka/consumer/config.go
conveys the same information
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll make the change once the producer PR #957 is merged
86cd708
to
73bb2ea
Compare
f9330f3
to
53b46cf
Compare
pkg/kafka/config/producer/config.go
Outdated
|
||
import ( | ||
"github.com/Shopify/sarama" | ||
) | ||
|
||
// Builder builds a new kafka producer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why move this from 26-30
to here? (I feel that this unnecessarily makes the code review bigger)
plugin/storage/kafka/factory.go
Outdated
producer sarama.AsyncProducer | ||
marshaller Marshaller | ||
producer.Builder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the producer changes required for this commit?
Could they be better factored into a separate commit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won had commented about splitting the config file into consumer and producer packages, would it be better to put everything into config.go in this PR and creating a new PR for the split?
53b46cf
to
0f74051
Compare
} | ||
|
||
// Close closes the Consumer and underlying sarama consumer | ||
func (c *Consumer) Close() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason for moving this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the producer PR you recommended this folder structure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I mean moving Close
from L106-L110 to here.
ProcessorFactory ProcessorFactory | ||
Factory metrics.Factory | ||
Logger *zap.Logger | ||
SaramaConsumer consumer.Consumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seeing that this consumer is an interface that we control, perhaps internalConsumer
is an apt variable name
metricsFactory metrics.Factory | ||
logger *zap.Logger | ||
baseProcessor processor.SpanProcessor | ||
parallelism int | ||
} | ||
|
||
func (c *processorFactory) new(partition int32, minOffset int64) processor.SpanProcessor { | ||
// NewFactory constructs a new ProcessorFactory | ||
func NewFactory(params FactoryParams) (ProcessorFactory, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return a pointer to ProcessorFactor instead. I'd rename this to NewProcessorFactory
, and do the same for L30
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Removed to reduce the scope of this PR. Will be added once producer and consumer are merged. Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
29817a1
to
10eda27
Compare
Which problem is this PR solving?
Short description of the changes