Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.firehose): Add new plugin #15988

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

syedmhashim
Copy link

Summary

The firehose input plugin would be used to receive data from AWS Data Firehose.

Checklist

  • No AI generated code was used in this PR

Related issues

resolves #15870

@telegraf-tiger telegraf-tiger bot added the feat Improvement on an existing feature such as adding a new setting/mode to an existing plugin label Oct 7, 2024
@srebhan srebhan changed the title feat(inputs/firehose): add input plugin for AWS Data Firehose feat(inputs.firehose): Add new plugin Oct 8, 2024
@telegraf-tiger telegraf-tiger bot added the plugin/input 1. Request for new input plugins 2. Issues/PRs that are related to input plugins label Oct 8, 2024
Copy link
Member

@srebhan srebhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@syedmhashim I do have some comments, mostly on style, so overall this looks quite promising. I urge you to keep the code as simple and less nested as possible to ease review and debugging. And of course some unit-tests would be great!

plugins/inputs/firehose/README.md Outdated Show resolved Hide resolved
plugins/inputs/firehose/README.md Outdated Show resolved Hide resolved
plugins/inputs/firehose/firehose.go Outdated Show resolved Hide resolved
plugins/inputs/firehose/firehose.go Outdated Show resolved Hide resolved
plugins/inputs/firehose/firehose.go Outdated Show resolved Hide resolved
plugins/inputs/firehose/firehose.go Outdated Show resolved Hide resolved
plugins/inputs/firehose/firehose.go Outdated Show resolved Hide resolved
plugins/inputs/firehose/firehose.go Outdated Show resolved Hide resolved
plugins/inputs/firehose/firehose.go Outdated Show resolved Hide resolved
plugins/inputs/firehose/firehose.go Outdated Show resolved Hide resolved
@srebhan srebhan self-assigned this Oct 8, 2024
@syedmhashim
Copy link
Author

@syedmhashim I do have some comments, mostly on style, so overall this looks quite promising. I urge you to keep the code as simple and less nested as possible to ease review and debugging. And of course some unit-tests would be great!

Thanks for the review. Will definitely go over the comments and update accordingly. Just FYI, I have been using http_listener_v2 input plugin as a reference to write the code. The unit tests will come soon as well

@srebhan
Copy link
Member

srebhan commented Oct 9, 2024

Yeah we do have some older code we didn't adapt yet but things changed both on the golang side as well as on us being more strict with the way things are done. ;-) No worries, we will figure it out together. :-)

@srebhan
Copy link
Member

srebhan commented Oct 10, 2024

@syedmhashim I'm loosing a bit the track in the discussion above, could you please push an update with the stuff that is clear and then we discuss the unclear parts?

@syedmhashim
Copy link
Author

@srebhan Hey! Just pushed some changes and commented "done" on the threads that I resolved. Sorry for the delay. I got occupied with my job

Copy link
Member

@srebhan srebhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks much better @syedmhashim! Some more cleanups and comments from my side...

plugins/inputs/firehose/README.md Outdated Show resolved Hide resolved
plugins/inputs/firehose/README.md Outdated Show resolved Hide resolved
plugins/inputs/firehose/README.md Outdated Show resolved Hide resolved
plugins/inputs/firehose/firehose.go Outdated Show resolved Hide resolved
plugins/inputs/firehose/firehose_request.go Outdated Show resolved Hide resolved
plugins/inputs/firehose/firehose_request.go Outdated Show resolved Hide resolved
plugins/inputs/firehose/firehose_request.go Outdated Show resolved Hide resolved
plugins/inputs/firehose/firehose_request.go Outdated Show resolved Hide resolved
plugins/inputs/firehose/firehose_request.go Outdated Show resolved Hide resolved
plugins/inputs/firehose/firehose_request.go Outdated Show resolved Hide resolved
@syedmhashim
Copy link
Author

@srebhan Hi! Pushed some changes and added comments where relevant

Copy link
Member

@srebhan srebhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@syedmhashim thanks for the update. I tried to put more concrete suggestions into my review. Overall, I think we are almost there, the only thing missing are unit-tests with a mocked sender...

Comment on lines 34 to 40
ParameterTags []string `toml:"parameter_tags"`

tlsint.ServerConfig
tlsConf *tls.Config

once sync.Once
Log telegraf.Logger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ParameterTags []string `toml:"parameter_tags"`
tlsint.ServerConfig
tlsConf *tls.Config
once sync.Once
Log telegraf.Logger
ParameterTags []string `toml:"parameter_tags"`
Log telegraf.Logger `toml:"-"`
tlsint.ServerConfig
tlsConf *tls.Config
once sync.Once

Comment on lines 53 to 55
func (f *Firehose) Gather(_ telegraf.Accumulator) error {
return nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (f *Firehose) Gather(_ telegraf.Accumulator) error {
return nil
}
func (*Firehose) Gather(telegraf.Accumulator) error {
return nil
}

Comment on lines 236 to 239
return &Firehose{
ServiceAddress: ":8080",
Paths: []string{"/telegraf"},
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the timeout initialization here, so the user can overwrite it in the config with whatever he wants

Suggested change
return &Firehose{
ServiceAddress: ":8080",
Paths: []string{"/telegraf"},
}
return &Firehose{
ServiceAddress: ":8080",
Paths: []string{"/telegraf"},
ReadTimeout: config.Duration(time.Second * 5),
WriteTimeout: config.Duration(time.Second * 5),
}

Comment on lines 68 to 73
if f.ReadTimeout < config.Duration(time.Second) {
f.ReadTimeout = config.Duration(time.Second * 5)
}
if f.WriteTimeout < config.Duration(time.Second) {
f.WriteTimeout = config.Duration(time.Second * 5)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move timeout to init() (see my suggestion there)... This way, the timeouts are initialized with the defaults but the user can override the values with whatever he/she wants...

Suggested change
if f.ReadTimeout < config.Duration(time.Second) {
f.ReadTimeout = config.Duration(time.Second * 5)
}
if f.WriteTimeout < config.Duration(time.Second) {
f.WriteTimeout = config.Duration(time.Second * 5)
}

func (f *Firehose) Stop() {
err := f.server.Shutdown(context.Background())
if err != nil {
f.Log.Infof("shutting down server failed: %v", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
f.Log.Infof("shutting down server failed: %v", err)
f.Log.Errorf("shutting down server failed: %v", err)

plugins/inputs/firehose/firehose_request.go Show resolved Hide resolved
Comment on lines 25 to 26
## Optional access key to accept for authentication.
## AWS Data Firehose uses "x-amz-firehose-access-key" header to set the access key
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
## Optional access key to accept for authentication.
## AWS Data Firehose uses "x-amz-firehose-access-key" header to set the access key
## Optional access key to accept for authentication.
## AWS Data Firehose uses "x-amz-firehose-access-key" header to set the access key.
## If no access_key is provided (default), authentication is completely disabled and
## this plugin will accept all request ignoring the provided access-key in the request!

plugins/inputs/firehose/firehose_request.go Show resolved Hide resolved
}

var metrics []telegraf.Metric
for _, bytes := range data {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't use bytes as variable name as this is also the name of a built-in Go library. How about sample , msg or record?

Comment on lines 183 to 198
if err := f.addParameterTags(r, metrics); err != nil {
f.Log.Errorf("add parameter tags for request %q failed: %v", requestID, err)
r.res.statusCode = http.StatusBadRequest
if err = r.sendResponse(res); err != nil {
f.Log.Errorf("sending response to request %q failed: %v", requestID, err)
}
return
}

for _, m := range metrics {
m.AddTag("firehose_http_path", req.URL.Path)
}

for _, m := range metrics {
f.acc.AddMetric(m)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

Suggested change
if err := f.addParameterTags(r, metrics); err != nil {
f.Log.Errorf("add parameter tags for request %q failed: %v", requestID, err)
r.res.statusCode = http.StatusBadRequest
if err = r.sendResponse(res); err != nil {
f.Log.Errorf("sending response to request %q failed: %v", requestID, err)
}
return
}
for _, m := range metrics {
m.AddTag("firehose_http_path", req.URL.Path)
}
for _, m := range metrics {
f.acc.AddMetric(m)
}
tags, err := r.extractParameterTags(f.ParameterTags)
if err != nil {
f.Log.Errorf("extracting parameter tags for request %q failed: %v", requestID, err)
r.res.statusCode = http.StatusBadRequest
if err = r.sendResponse(res); err != nil {
f.Log.Errorf("sending response to request %q failed: %v", requestID, err)
}
return
}
for _, m := range metrics {
for k, v := range tags {
m.AddTag(k, v)
}
m.AddTag("firehose_http_path", req.URL.Path)
f.acc.AddMetric(m)
}

This way you don't need to iterate over the metrics three times and do have a clear code structure where everything related to processing the request is in the request "class"...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

@telegraf-tiger
Copy link
Contributor

telegraf-tiger bot commented Nov 5, 2024

Copy link
Member

@srebhan srebhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice @syedmhashim! Some very small things with the biggest being that log messages should start with a captial letter... The only thing missing are the unit-tests...

Comment on lines +67 to +73
var err error
f.tlsConf, err = f.ServerConfig.TLSConfig()
if err != nil {
return err
}

return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var err error
f.tlsConf, err = f.ServerConfig.TLSConfig()
if err != nil {
return err
}
return nil
var err error
f.tlsConf, err = f.ServerConfig.TLSConfig()
return err

}
}()

f.Log.Infof("listening on %s", f.listener.Addr().String())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
f.Log.Infof("listening on %s", f.listener.Addr().String())
f.Log.Infof("Listening on %s", f.listener.Addr().String())

Comment on lines +113 to +116
err := f.server.Shutdown(context.Background())
if err != nil {
f.Log.Errorf("shutting down server failed: %v", err)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
err := f.server.Shutdown(context.Background())
if err != nil {
f.Log.Errorf("shutting down server failed: %v", err)
}
if err := f.server.Shutdown(context.Background()); err != nil {
f.Log.Errorf("Shutting down server failed: %v", err)
}

r.res.statusCode = http.StatusBadRequest
f.Log.Errorf("x-amz-firehose-request-id header is not set")
if err := r.sendResponse(res); err != nil {
f.Log.Errorf("sending response failed: %v", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For log-messages please capitalize the message for better readability for the user! Same for all other f.Log calls...

return
}

r := &request{req: req}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to set the response status-code to a sensible default here? E.g. 500 internal server error so we catch the cases where we forgot to set a status-code? This would make things future proof. :-)

Comment on lines +161 to +164
paramTags, err := r.extractParameterTags(f.ParameterTags)
if err != nil {
f.Log.Errorf("extracting parameter tags for request %q failed: %v", requestID, err)
r.res.statusCode = http.StatusBadRequest
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not setting the status-code in extractParameterTags like you do in the other functions?

Comment on lines +210 to +211
ServiceAddress: ":8080",
Paths: []string{"/telegraf"},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove those here as you handle them in Init()

Suggested change
ServiceAddress: ":8080",
Paths: []string{"/telegraf"},

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Just a question. What's the difference between initialising the attributes in init()and in Init() and how do we decide which option to choose?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The init() (lowercase) function will be called when importing the package by Go itself. So what you do here is how the structure is registered and that's how each instance looks like before parsing any config options! The Init() (uppercase) function is called after setting the struct fields from the config options. So you should do everything in Init() if the nil value is invalid for the type and option (like strings etc). However, if you do have for example timeouts where a zero-value might mean "wait forever" it is better to set this in init() (lowercase) as the user might want to overwrite the default with zero. Does that make sense?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feat Improvement on an existing feature such as adding a new setting/mode to an existing plugin plugin/input 1. Request for new input plugins 2. Issues/PRs that are related to input plugins
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AWS Data Firehose HTTP Endpoint Input Plugin
2 participants