-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(inputs.firehose): Add new plugin #15988
base: master
Are you sure you want to change the base?
Conversation
feat(inputs/firehose): add input plugin for AWS Data Firehose
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@syedmhashim I do have some comments, mostly on style, so overall this looks quite promising. I urge you to keep the code as simple and less nested as possible to ease review and debugging. And of course some unit-tests would be great!
Thanks for the review. Will definitely go over the comments and update accordingly. Just FYI, I have been using |
Yeah we do have some older code we didn't adapt yet but things changed both on the golang side as well as on us being more strict with the way things are done. ;-) No worries, we will figure it out together. :-) |
@syedmhashim I'm loosing a bit the track in the discussion above, could you please push an update with the stuff that is clear and then we discuss the unclear parts? |
@srebhan Hey! Just pushed some changes and commented "done" on the threads that I resolved. Sorry for the delay. I got occupied with my job |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That looks much better @syedmhashim! Some more cleanups and comments from my side...
@srebhan Hi! Pushed some changes and added comments where relevant |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@syedmhashim thanks for the update. I tried to put more concrete suggestions into my review. Overall, I think we are almost there, the only thing missing are unit-tests with a mocked sender...
plugins/inputs/firehose/firehose.go
Outdated
ParameterTags []string `toml:"parameter_tags"` | ||
|
||
tlsint.ServerConfig | ||
tlsConf *tls.Config | ||
|
||
once sync.Once | ||
Log telegraf.Logger |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ParameterTags []string `toml:"parameter_tags"` | |
tlsint.ServerConfig | |
tlsConf *tls.Config | |
once sync.Once | |
Log telegraf.Logger | |
ParameterTags []string `toml:"parameter_tags"` | |
Log telegraf.Logger `toml:"-"` | |
tlsint.ServerConfig | |
tlsConf *tls.Config | |
once sync.Once |
plugins/inputs/firehose/firehose.go
Outdated
func (f *Firehose) Gather(_ telegraf.Accumulator) error { | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (f *Firehose) Gather(_ telegraf.Accumulator) error { | |
return nil | |
} | |
func (*Firehose) Gather(telegraf.Accumulator) error { | |
return nil | |
} |
plugins/inputs/firehose/firehose.go
Outdated
return &Firehose{ | ||
ServiceAddress: ":8080", | ||
Paths: []string{"/telegraf"}, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the timeout initialization here, so the user can overwrite it in the config with whatever he wants
return &Firehose{ | |
ServiceAddress: ":8080", | |
Paths: []string{"/telegraf"}, | |
} | |
return &Firehose{ | |
ServiceAddress: ":8080", | |
Paths: []string{"/telegraf"}, | |
ReadTimeout: config.Duration(time.Second * 5), | |
WriteTimeout: config.Duration(time.Second * 5), | |
} |
plugins/inputs/firehose/firehose.go
Outdated
if f.ReadTimeout < config.Duration(time.Second) { | ||
f.ReadTimeout = config.Duration(time.Second * 5) | ||
} | ||
if f.WriteTimeout < config.Duration(time.Second) { | ||
f.WriteTimeout = config.Duration(time.Second * 5) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move timeout to init()
(see my suggestion there)... This way, the timeouts are initialized with the defaults but the user can override the values with whatever he/she wants...
if f.ReadTimeout < config.Duration(time.Second) { | |
f.ReadTimeout = config.Duration(time.Second * 5) | |
} | |
if f.WriteTimeout < config.Duration(time.Second) { | |
f.WriteTimeout = config.Duration(time.Second * 5) | |
} |
plugins/inputs/firehose/firehose.go
Outdated
func (f *Firehose) Stop() { | ||
err := f.server.Shutdown(context.Background()) | ||
if err != nil { | ||
f.Log.Infof("shutting down server failed: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
f.Log.Infof("shutting down server failed: %v", err) | |
f.Log.Errorf("shutting down server failed: %v", err) |
plugins/inputs/firehose/sample.conf
Outdated
## Optional access key to accept for authentication. | ||
## AWS Data Firehose uses "x-amz-firehose-access-key" header to set the access key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
## Optional access key to accept for authentication. | |
## AWS Data Firehose uses "x-amz-firehose-access-key" header to set the access key | |
## Optional access key to accept for authentication. | |
## AWS Data Firehose uses "x-amz-firehose-access-key" header to set the access key. | |
## If no access_key is provided (default), authentication is completely disabled and | |
## this plugin will accept all request ignoring the provided access-key in the request! | |
plugins/inputs/firehose/firehose.go
Outdated
} | ||
|
||
var metrics []telegraf.Metric | ||
for _, bytes := range data { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't use bytes
as variable name as this is also the name of a built-in Go library. How about sample
, msg
or record
?
plugins/inputs/firehose/firehose.go
Outdated
if err := f.addParameterTags(r, metrics); err != nil { | ||
f.Log.Errorf("add parameter tags for request %q failed: %v", requestID, err) | ||
r.res.statusCode = http.StatusBadRequest | ||
if err = r.sendResponse(res); err != nil { | ||
f.Log.Errorf("sending response to request %q failed: %v", requestID, err) | ||
} | ||
return | ||
} | ||
|
||
for _, m := range metrics { | ||
m.AddTag("firehose_http_path", req.URL.Path) | ||
} | ||
|
||
for _, m := range metrics { | ||
f.acc.AddMetric(m) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
if err := f.addParameterTags(r, metrics); err != nil { | |
f.Log.Errorf("add parameter tags for request %q failed: %v", requestID, err) | |
r.res.statusCode = http.StatusBadRequest | |
if err = r.sendResponse(res); err != nil { | |
f.Log.Errorf("sending response to request %q failed: %v", requestID, err) | |
} | |
return | |
} | |
for _, m := range metrics { | |
m.AddTag("firehose_http_path", req.URL.Path) | |
} | |
for _, m := range metrics { | |
f.acc.AddMetric(m) | |
} | |
tags, err := r.extractParameterTags(f.ParameterTags) | |
if err != nil { | |
f.Log.Errorf("extracting parameter tags for request %q failed: %v", requestID, err) | |
r.res.statusCode = http.StatusBadRequest | |
if err = r.sendResponse(res); err != nil { | |
f.Log.Errorf("sending response to request %q failed: %v", requestID, err) | |
} | |
return | |
} | |
for _, m := range metrics { | |
for k, v := range tags { | |
m.AddTag(k, v) | |
} | |
m.AddTag("firehose_http_path", req.URL.Path) | |
f.acc.AddMetric(m) | |
} |
This way you don't need to iterate over the metrics three times and do have a clear code structure where everything related to processing the request is in the request "class"...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense
Download PR build artifacts for linux_amd64.tar.gz, darwin_arm64.tar.gz, and windows_amd64.zip. 📦 Click here to get additional PR build artifactsArtifact URLs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really nice @syedmhashim! Some very small things with the biggest being that log messages should start with a captial letter... The only thing missing are the unit-tests...
var err error | ||
f.tlsConf, err = f.ServerConfig.TLSConfig() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var err error | |
f.tlsConf, err = f.ServerConfig.TLSConfig() | |
if err != nil { | |
return err | |
} | |
return nil | |
var err error | |
f.tlsConf, err = f.ServerConfig.TLSConfig() | |
return err |
} | ||
}() | ||
|
||
f.Log.Infof("listening on %s", f.listener.Addr().String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
f.Log.Infof("listening on %s", f.listener.Addr().String()) | |
f.Log.Infof("Listening on %s", f.listener.Addr().String()) |
err := f.server.Shutdown(context.Background()) | ||
if err != nil { | ||
f.Log.Errorf("shutting down server failed: %v", err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err := f.server.Shutdown(context.Background()) | |
if err != nil { | |
f.Log.Errorf("shutting down server failed: %v", err) | |
} | |
if err := f.server.Shutdown(context.Background()); err != nil { | |
f.Log.Errorf("Shutting down server failed: %v", err) | |
} |
r.res.statusCode = http.StatusBadRequest | ||
f.Log.Errorf("x-amz-firehose-request-id header is not set") | ||
if err := r.sendResponse(res); err != nil { | ||
f.Log.Errorf("sending response failed: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For log-messages please capitalize the message for better readability for the user! Same for all other f.Log
calls...
return | ||
} | ||
|
||
r := &request{req: req} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to set the response status-code to a sensible default here? E.g. 500 internal server error so we catch the cases where we forgot to set a status-code? This would make things future proof. :-)
paramTags, err := r.extractParameterTags(f.ParameterTags) | ||
if err != nil { | ||
f.Log.Errorf("extracting parameter tags for request %q failed: %v", requestID, err) | ||
r.res.statusCode = http.StatusBadRequest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not setting the status-code in extractParameterTags
like you do in the other functions?
ServiceAddress: ":8080", | ||
Paths: []string{"/telegraf"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove those here as you handle them in Init()
ServiceAddress: ":8080", | |
Paths: []string{"/telegraf"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Just a question. What's the difference between initialising the attributes in init()
and in Init()
and how do we decide which option to choose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The init()
(lowercase) function will be called when importing the package by Go itself. So what you do here is how the structure is registered and that's how each instance looks like before parsing any config options! The Init()
(uppercase) function is called after setting the struct fields from the config options. So you should do everything in Init()
if the nil
value is invalid for the type and option (like strings etc). However, if you do have for example timeouts where a zero-value might mean "wait forever" it is better to set this in init()
(lowercase) as the user might want to overwrite the default with zero. Does that make sense?
Summary
The firehose input plugin would be used to receive data from AWS Data Firehose.
Checklist
Related issues
resolves #15870