-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Design and implement persistence mechanism for log receivers #2287
Comments
**Link to tracking Issue:** This PR partially addresses the following issues: - Resolves: #2265 - Related: #2268, #2282. **Description:** The main idea here is to convert `stanzareceiver` into a helper package for building various other stanza-based receivers. Each of these other receivers will only vary by input operator. Functionality pulled out of `stanzareceiver` was moved into a new `filelogreceiver`. `stanzareceiver` should most likely be renamed and/or moved, but is left in its previous package for this initial PR. `stanzareceiver` defines an interface called `LogReceiverType` which each stanza-based receiver must implement and pass to `stanzareceiver.NewFactory(LogReceiverType) component.ReceiverFactory`. With this interface, each stanza-based receiver should only need a small amount of work to have a fully functional receiver. Support for parsing operations, emission from stanza's internal pipeline, and conversion to pdata format are all handled in the helper package so that these will be standardized across all the full set of stanza-based receivers. **Next Steps** Input operators are _not yet_ isolated to the top level of the configuration. The end goal is: ``` filelog: include: [ receiver/stanzareceiver/testdata/simple.log ] start_at: beginning operators: - type: regex_parser regex: '^(?P<time>\d{4}-\d{2}-\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$' timestamp: parse_from: time layout: '%Y-%m-%d' severity: parse_from: sev ``` but the current state is still: ``` filelog: operators: - type: file_input include: [ receiver/stanzareceiver/testdata/simple.log ] start_at: beginning - type: regex_parser regex: '^(?P<time>\d{4}-\d{2}-\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$' timestamp: parse_from: time layout: '%Y-%m-%d' severity: parse_from: sev ``` The primary requirement #2265 is to promote the input operator to the top level of the receiver config. This will be the focus of the next PR. This PR is mostly concerned with splitting up the package. The configuration changes might be a little messy so I wanted to address those separately. On the subject of configuration - the interface defined by `stanzareceiver` has a method `Decode(configmodels.Receiver) (pipeline.Config, error)` which is in my opinion much too loosely defined. Too much responsibility is delegated to each stanza-based receiver. The main reason this is left this way for now is that `stanza` operators do not currently use `mapstructure` for config unmarshaling. There is currently a workaround in place, but once stanza operators are migrated to `mapstructure`, more responsibility for unmarshaling should be extracted back into the helper package, and this interface method should end up a lot cleaner. I'm planning to look into this in the next PR. **Open questions** (which can be addressed in this PR or the next): - Should the helper package be completely standalone, or does it belong in `receivercreator` or similar? - If the helper package should be standalone, what should it be called? (probably not `stanzareceiver`) **Temporarily removed functionality** This functionality will be implemented in the near future. There is some design to do on how exactly this should work when used by multiple receivers: - Offsets database (tracked by #2287) - Plugins (tracked as item on #2264) **Testing:** Unit tests are roughly the same as before. A few cases were dropped because they no longer applied. Certainly more tests will be added as this pattern is solidified. Testbed scenario is unchanged and still passing: ``` > make run-tests ./runtests.sh === RUN TestLog10kDPS === RUN TestLog10kDPS/OTLP ... (abbreviated) === RUN TestLog10kDPS/Stanza ... (abbreviated) --- PASS: TestLog10kDPS (30.73s) --- PASS: TestLog10kDPS/OTLP (15.32s) --- PASS: TestLog10kDPS/Stanza (15.41s) PASS ok github.com/open-telemetry/opentelemetry-collector-contrib/testbed/tests_unstable_exe 31.406s # Test PerformanceResults Started: Mon, 08 Feb 2021 13:35:08 -0500 Test |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items| ----------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:| Log10kDPS/OTLP |PASS | 15s| 19.9| 20.6| 39| 47| 149900| 149900| Log10kDPS/Stanza |PASS | 15s| 28.4| 29.3| 40| 48| 150000| 150000| Total duration: 31s ```
Maybe optionally, but I would prefer that the default location works out of the box. (Question: do we need a different location for Mac or just use the same as Linux? On Mac I think that should be /Library/otelcol or something?)
I would say this is probably preferably for end user, provided that internally we can handle synchronization correctly.
I'd prefer not to. What do we gain by that?
I would aim to have a default location so that it is not a required config option and persist by default (maybe have an option to disable). Possible options for the default directory for checkpoints file are:
|
I agree with the above, except that I'm not sure we can allow users to specify a persistence file for each receiver without also allowing multiple persistence files. It might complicate implementation to support independent persistence files but the alternatives seem awkward at best. |
Can we assume that in vast majority of cases the default locations are good enough? Then we can make choosing the non-default somewhat awkward. Possible options:
Neither is ideal, but I can't think of anything else. |
Yes, I think so.
This would be my preferred, until we have a use case that requires otherwise. |
See discussion about a suggested approach here #2716 (comment) @djaglowski since adding the storage to the core can take some time, I suggest that we implement the centralized generic storage as an extension first. We will iron out the interface and the desired functionality on this extension and once we are certain it is what we want we can think about making it a core feature. As an extension it will require a tiny bit of config to be enabled: extensions:
storage:
# optionally specify settings such as directory or file to store and any other storage options.
service:
extensions: [storage] This should be sufficient to enable the extensions. Once enabled the components can find the storage extension via I would expect (All names above are preliminary suggestions to illustrate the idea). Thoughts? |
I think this makes a lot of sense. I should have a design doc ready on Monday, and will begin implementing the extension from there. |
@tigrannajaryan I've compiled our discussion and a small number of additional details into a design doc. There are a few areas which likely still need more input, but I think we have enough clarity that I can get started on an initial implementation tomorrow. |
@djaglowski the design LGTM. Can you please circulate it more widely and ask for comments/reviews? |
Follows #2883 Will resolve #2287 ### Summary This PR includes: - A full implementation of a `file_storage` extension, which can read and write data to the local file system. Any component in the collector may make use of this extension. - Updates to `stanza/internal` to allow stanza-based receivers to use the extension for checkpoints. - A new testbed scenario that has the filelogreceiver using the extension Configuration of the extension is simple. ```yaml file_storage: file_storage/all_settings: directory: /var/lib/otelcol/mydir timeout: 2s ``` The extension is made available to component's via the `host` parameter in their `Start` method: ```go func (r *receiver) Start(ctx context.Context, host component.Host) error { for _, ext := range host.GetExtensions() { if se, ok := ext.(storage.Extension); ok { client, err := se.GetClient(ctx, component.KindReceiver, r.NamedEntity) if err != nil { return err } r.storageClient = client return nil } } r.storageClient = storage.NewNopClient() ... } ```
**Link to tracking Issue:** This PR partially addresses the following issues: - Resolves: #2265 - Related: #2268, #2282. **Description:** The main idea here is to convert `stanzareceiver` into a helper package for building various other stanza-based receivers. Each of these other receivers will only vary by input operator. Functionality pulled out of `stanzareceiver` was moved into a new `filelogreceiver`. `stanzareceiver` should most likely be renamed and/or moved, but is left in its previous package for this initial PR. `stanzareceiver` defines an interface called `LogReceiverType` which each stanza-based receiver must implement and pass to `stanzareceiver.NewFactory(LogReceiverType) component.ReceiverFactory`. With this interface, each stanza-based receiver should only need a small amount of work to have a fully functional receiver. Support for parsing operations, emission from stanza's internal pipeline, and conversion to pdata format are all handled in the helper package so that these will be standardized across all the full set of stanza-based receivers. **Next Steps** Input operators are _not yet_ isolated to the top level of the configuration. The end goal is: ``` filelog: include: [ receiver/stanzareceiver/testdata/simple.log ] start_at: beginning operators: - type: regex_parser regex: '^(?P<time>\d{4}-\d{2}-\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$' timestamp: parse_from: time layout: '%Y-%m-%d' severity: parse_from: sev ``` but the current state is still: ``` filelog: operators: - type: file_input include: [ receiver/stanzareceiver/testdata/simple.log ] start_at: beginning - type: regex_parser regex: '^(?P<time>\d{4}-\d{2}-\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$' timestamp: parse_from: time layout: '%Y-%m-%d' severity: parse_from: sev ``` The primary requirement #2265 is to promote the input operator to the top level of the receiver config. This will be the focus of the next PR. This PR is mostly concerned with splitting up the package. The configuration changes might be a little messy so I wanted to address those separately. On the subject of configuration - the interface defined by `stanzareceiver` has a method `Decode(configmodels.Receiver) (pipeline.Config, error)` which is in my opinion much too loosely defined. Too much responsibility is delegated to each stanza-based receiver. The main reason this is left this way for now is that `stanza` operators do not currently use `mapstructure` for config unmarshaling. There is currently a workaround in place, but once stanza operators are migrated to `mapstructure`, more responsibility for unmarshaling should be extracted back into the helper package, and this interface method should end up a lot cleaner. I'm planning to look into this in the next PR. **Open questions** (which can be addressed in this PR or the next): - Should the helper package be completely standalone, or does it belong in `receivercreator` or similar? - If the helper package should be standalone, what should it be called? (probably not `stanzareceiver`) **Temporarily removed functionality** This functionality will be implemented in the near future. There is some design to do on how exactly this should work when used by multiple receivers: - Offsets database (tracked by #2287) - Plugins (tracked as item on #2264) **Testing:** Unit tests are roughly the same as before. A few cases were dropped because they no longer applied. Certainly more tests will be added as this pattern is solidified. Testbed scenario is unchanged and still passing: ``` > make run-tests ./runtests.sh === RUN TestLog10kDPS === RUN TestLog10kDPS/OTLP ... (abbreviated) === RUN TestLog10kDPS/Stanza ... (abbreviated) --- PASS: TestLog10kDPS (30.73s) --- PASS: TestLog10kDPS/OTLP (15.32s) --- PASS: TestLog10kDPS/Stanza (15.41s) PASS ok github.com/open-telemetry/opentelemetry-collector-contrib/testbed/tests_unstable_exe 31.406s # Test PerformanceResults Started: Mon, 08 Feb 2021 13:35:08 -0500 Test |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items| ----------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:| Log10kDPS/OTLP |PASS | 15s| 19.9| 20.6| 39| 47| 149900| 149900| Log10kDPS/Stanza |PASS | 15s| 28.4| 29.3| 40| 48| 150000| 150000| Total duration: 31s ```
Follows open-telemetry#2883 Will resolve #2287 ### Summary This PR includes: - A full implementation of a `file_storage` extension, which can read and write data to the local file system. Any component in the collector may make use of this extension. - Updates to `stanza/internal` to allow stanza-based receivers to use the extension for checkpoints. - A new testbed scenario that has the filelogreceiver using the extension Configuration of the extension is simple. ```yaml file_storage: file_storage/all_settings: directory: /var/lib/otelcol/mydir timeout: 2s ``` The extension is made available to component's via the `host` parameter in their `Start` method: ```go func (r *receiver) Start(ctx context.Context, host component.Host) error { for _, ext := range host.GetExtensions() { if se, ok := ext.(storage.Extension); ok { client, err := se.GetClient(ctx, component.KindReceiver, r.NamedEntity) if err != nil { return err } r.storageClient = client return nil } } r.storageClient = storage.NewNopClient() ... } ``` # Conflicts: # go.mod
Related to #2264
stanzareceiver currently supports persistence of log ingestion state (e.g. tracking how much of a file has been tailed) into a bbolt database file.
With the migration to multiple log receivers, we will need to decide how to manage persistence.
The text was updated successfully, but these errors were encountered: