Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Design and implement persistence mechanism for log receivers #2287

Closed
djaglowski opened this issue Feb 5, 2021 · 8 comments · Fixed by #3087
Closed

Design and implement persistence mechanism for log receivers #2287

djaglowski opened this issue Feb 5, 2021 · 8 comments · Fixed by #3087
Assignees

Comments

@djaglowski
Copy link
Member

Related to #2264

stanzareceiver currently supports persistence of log ingestion state (e.g. tracking how much of a file has been tailed) into a bbolt database file.

With the migration to multiple log receivers, we will need to decide how to manage persistence.

  • Will each receiver have the ability to point to a persistence file?
  • Is it ok for receivers to specify the same file? (I think so, but needs validation)
  • Is it reasonable to allow each receiver to have its own persistence file? (Probably, but this raises questions of clutter management)
  • Is there a reasonable default location for such a file or files?
    • If so, what is it?
    • If not, is the default to not persist state?
tigrannajaryan pushed a commit that referenced this issue Feb 11, 2021
**Link to tracking Issue:** 
This PR partially addresses the following issues:
- Resolves: #2265 
- Related:  #2268, #2282.

**Description:**

The main idea here is to convert `stanzareceiver` into a helper package for building various other stanza-based receivers. Each of these other receivers will only vary by input operator. Functionality pulled out of `stanzareceiver` was moved into a new `filelogreceiver`. `stanzareceiver` should most likely be renamed and/or moved, but is left in its previous package for this initial PR. 

`stanzareceiver` defines an interface called `LogReceiverType` which each stanza-based receiver must implement and pass to `stanzareceiver.NewFactory(LogReceiverType) component.ReceiverFactory`. 

With this interface, each stanza-based receiver should only need a small amount of work to have a fully functional receiver. Support for parsing operations, emission from stanza's internal pipeline, and conversion to pdata format are all handled in the helper package so that these will be standardized across all the full set of stanza-based receivers.

**Next Steps**
Input operators are _not yet_ isolated to the top level of the configuration. The end goal is: 
```
filelog:
 include: [ receiver/stanzareceiver/testdata/simple.log ]
 start_at: beginning
 operators:
   - type: regex_parser
       regex: '^(?P<time>\d{4}-\d{2}-\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$'
       timestamp:
         parse_from: time
         layout: '%Y-%m-%d'
       severity:
         parse_from: sev
```

but the current state is still:
```
filelog:
 operators:
   - type: file_input
      include: [ receiver/stanzareceiver/testdata/simple.log ]
      start_at: beginning
   - type: regex_parser
       regex: '^(?P<time>\d{4}-\d{2}-\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$'
       timestamp:
         parse_from: time
         layout: '%Y-%m-%d'
       severity:
         parse_from: sev
```

The primary requirement #2265 is to promote the input operator to the top level of the receiver config. This will be the focus of the next PR. This PR is mostly concerned with splitting up the package. The configuration changes might be a little messy so I wanted to address those separately.

On the subject of configuration - the interface defined by `stanzareceiver` has a method `Decode(configmodels.Receiver) (pipeline.Config, error)` which is in my opinion much too loosely defined. Too much responsibility is delegated to each stanza-based receiver. The main reason this is left this way for now is that `stanza` operators do not currently use `mapstructure` for config unmarshaling. There is currently a workaround in place, but once stanza operators are migrated to `mapstructure`, more responsibility for unmarshaling should be extracted back into the helper package, and this interface method should end up a lot cleaner. I'm planning to look into this in the next PR.

**Open questions** (which can be addressed in this PR or the next):
- Should the helper package be completely standalone, or does it belong in `receivercreator` or similar?
- If the helper package should be standalone, what should it be called? (probably not `stanzareceiver`)

**Temporarily removed functionality**
This functionality will be implemented in the near future. There is some design to do on how exactly this should work when used by multiple receivers:
- Offsets database (tracked by #2287)
- Plugins (tracked as item on #2264)

**Testing:** 
Unit tests are roughly the same as before. A few cases were dropped because they no longer applied. Certainly more tests will be added as this pattern is solidified. 

Testbed scenario is unchanged and still passing:
```
> make run-tests
./runtests.sh
=== RUN   TestLog10kDPS
=== RUN   TestLog10kDPS/OTLP
... (abbreviated)
=== RUN   TestLog10kDPS/Stanza
... (abbreviated)
--- PASS: TestLog10kDPS (30.73s)
    --- PASS: TestLog10kDPS/OTLP (15.32s)
    --- PASS: TestLog10kDPS/Stanza (15.41s)
PASS
ok      github.com/open-telemetry/opentelemetry-collector-contrib/testbed/tests_unstable_exe    31.406s
# Test PerformanceResults
Started: Mon, 08 Feb 2021 13:35:08 -0500

Test                                    |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items|
----------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:|
Log10kDPS/OTLP                          |PASS  |     15s|    19.9|    20.6|         39|         47|    149900|        149900|
Log10kDPS/Stanza                        |PASS  |     15s|    28.4|    29.3|         40|         48|    150000|        150000|

Total duration: 31s
```
@tigrannajaryan
Copy link
Member

Will each receiver have the ability to point to a persistence file?

Maybe optionally, but I would prefer that the default location works out of the box.

(Question: do we need a different location for Mac or just use the same as Linux? On Mac I think that should be /Library/otelcol or something?)

Is it ok for receivers to specify the same file? (I think so, but needs validation)

I would say this is probably preferably for end user, provided that internally we can handle synchronization correctly.

Is it reasonable to allow each receiver to have its own persistence file? (Probably, but this raises questions of clutter management)

I'd prefer not to. What do we gain by that?

Is there a reasonable default location for such a file or files?
If so, what is it?
If not, is the default to not persist state?

I would aim to have a default location so that it is not a required config option and persist by default (maybe have an option to disable). Possible options for the default directory for checkpoints file are:

  1. *nix: $TEMP, Windows: %TEMP% (Probably not a good idea).
  2. *nix: /var/lib/otelcol, Windows: %ProgramData%/otelcol

@djaglowski
Copy link
Member Author

I agree with the above, except that I'm not sure we can allow users to specify a persistence file for each receiver without also allowing multiple persistence files.

It might complicate implementation to support independent persistence files but the alternatives seem awkward at best.

@tigrannajaryan
Copy link
Member

tigrannajaryan commented Feb 11, 2021

Can we assume that in vast majority of cases the default locations are good enough? Then we can make choosing the non-default somewhat awkward. Possible options:

  • Allow any receiver to override the default location. Receivers can specify different locations. I assume implementation wise this is not difficult?
  • Allow receivers to override the default location. They must all specify the same location or it is a config error.
  • Configure the single location via an extension. All receivers will get the same location from the extension.

Neither is ideal, but I can't think of anything else.

@djaglowski
Copy link
Member Author

Can we assume that in vast majority of case the default locations are good enough?

Yes, I think so.

They must all specify the same location or it is a config error.

This would be my preferred, until we have a use case that requires otherwise.

@djaglowski djaglowski self-assigned this Feb 17, 2021
@tigrannajaryan tigrannajaryan added this to the Basic Logs Support milestone Mar 3, 2021
@tigrannajaryan tigrannajaryan changed the title Design and implement distributed persistence mechanism for log receivers Design and implement persistence mechanism for log receivers Mar 17, 2021
@tigrannajaryan
Copy link
Member

See discussion about a suggested approach here #2716 (comment)

@djaglowski since adding the storage to the core can take some time, I suggest that we implement the centralized generic storage as an extension first. We will iron out the interface and the desired functionality on this extension and once we are certain it is what we want we can think about making it a core feature.

As an extension it will require a tiny bit of config to be enabled:

extensions:
  storage:
    # optionally specify settings such as directory or file to store and any other storage options.
service:
  extensions: [storage]

This should be sufficient to enable the extensions. Once enabled the components can find the storage extension via Host.GetExtensions() call, iterate over the extensions and check if they implement the Storage interface (TBD). If the extension is not found then the components fall back to storage-less mode (i.e. for filelog it means no offsets saved - and we can print a warning in the logs in this case).

I would expect Storage to allow fetching something like a ComponentStore given the component name, and ComponentStore to expose the functions we need (set(key, value), get(key), delete(key), etc).

(All names above are preliminary suggestions to illustrate the idea).

Thoughts?

@djaglowski
Copy link
Member Author

I think this makes a lot of sense.

I should have a design doc ready on Monday, and will begin implementing the extension from there.

@djaglowski
Copy link
Member Author

@tigrannajaryan I've compiled our discussion and a small number of additional details into a design doc.

There are a few areas which likely still need more input, but I think we have enough clarity that I can get started on an initial implementation tomorrow.

@tigrannajaryan
Copy link
Member

@djaglowski the design LGTM. Can you please circulate it more widely and ask for comments/reviews?

tigrannajaryan pushed a commit that referenced this issue Apr 16, 2021
Follows #2883 

Will resolve #2287

### Summary

This PR includes:
- A full implementation of a `file_storage` extension, which can read and write data to the local file system. Any component in the collector may make use of this extension.
- Updates to `stanza/internal` to allow stanza-based receivers to use the extension for checkpoints.
- A new testbed scenario that has the filelogreceiver using the extension

Configuration of the extension is simple. 
```yaml
  file_storage:
  file_storage/all_settings:
    directory: /var/lib/otelcol/mydir
    timeout: 2s
```

The extension is made available to component's via the `host` parameter in their `Start` method:
```go
func (r *receiver) Start(ctx context.Context, host component.Host) error {
	for _, ext := range host.GetExtensions() {
		if se, ok := ext.(storage.Extension); ok {
			client, err := se.GetClient(ctx, component.KindReceiver, r.NamedEntity)
			if err != nil {
				return err
			}
			r.storageClient = client
			return nil
		}
	}
	r.storageClient = storage.NewNopClient()
        ...
}
```
pmatyjasek-sumo referenced this issue in pmatyjasek-sumo/opentelemetry-collector-contrib Apr 28, 2021
**Link to tracking Issue:** 
This PR partially addresses the following issues:
- Resolves: #2265 
- Related:  #2268, #2282.

**Description:**

The main idea here is to convert `stanzareceiver` into a helper package for building various other stanza-based receivers. Each of these other receivers will only vary by input operator. Functionality pulled out of `stanzareceiver` was moved into a new `filelogreceiver`. `stanzareceiver` should most likely be renamed and/or moved, but is left in its previous package for this initial PR. 

`stanzareceiver` defines an interface called `LogReceiverType` which each stanza-based receiver must implement and pass to `stanzareceiver.NewFactory(LogReceiverType) component.ReceiverFactory`. 

With this interface, each stanza-based receiver should only need a small amount of work to have a fully functional receiver. Support for parsing operations, emission from stanza's internal pipeline, and conversion to pdata format are all handled in the helper package so that these will be standardized across all the full set of stanza-based receivers.

**Next Steps**
Input operators are _not yet_ isolated to the top level of the configuration. The end goal is: 
```
filelog:
 include: [ receiver/stanzareceiver/testdata/simple.log ]
 start_at: beginning
 operators:
   - type: regex_parser
       regex: '^(?P<time>\d{4}-\d{2}-\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$'
       timestamp:
         parse_from: time
         layout: '%Y-%m-%d'
       severity:
         parse_from: sev
```

but the current state is still:
```
filelog:
 operators:
   - type: file_input
      include: [ receiver/stanzareceiver/testdata/simple.log ]
      start_at: beginning
   - type: regex_parser
       regex: '^(?P<time>\d{4}-\d{2}-\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$'
       timestamp:
         parse_from: time
         layout: '%Y-%m-%d'
       severity:
         parse_from: sev
```

The primary requirement #2265 is to promote the input operator to the top level of the receiver config. This will be the focus of the next PR. This PR is mostly concerned with splitting up the package. The configuration changes might be a little messy so I wanted to address those separately.

On the subject of configuration - the interface defined by `stanzareceiver` has a method `Decode(configmodels.Receiver) (pipeline.Config, error)` which is in my opinion much too loosely defined. Too much responsibility is delegated to each stanza-based receiver. The main reason this is left this way for now is that `stanza` operators do not currently use `mapstructure` for config unmarshaling. There is currently a workaround in place, but once stanza operators are migrated to `mapstructure`, more responsibility for unmarshaling should be extracted back into the helper package, and this interface method should end up a lot cleaner. I'm planning to look into this in the next PR.

**Open questions** (which can be addressed in this PR or the next):
- Should the helper package be completely standalone, or does it belong in `receivercreator` or similar?
- If the helper package should be standalone, what should it be called? (probably not `stanzareceiver`)

**Temporarily removed functionality**
This functionality will be implemented in the near future. There is some design to do on how exactly this should work when used by multiple receivers:
- Offsets database (tracked by #2287)
- Plugins (tracked as item on #2264)

**Testing:** 
Unit tests are roughly the same as before. A few cases were dropped because they no longer applied. Certainly more tests will be added as this pattern is solidified. 

Testbed scenario is unchanged and still passing:
```
> make run-tests
./runtests.sh
=== RUN   TestLog10kDPS
=== RUN   TestLog10kDPS/OTLP
... (abbreviated)
=== RUN   TestLog10kDPS/Stanza
... (abbreviated)
--- PASS: TestLog10kDPS (30.73s)
    --- PASS: TestLog10kDPS/OTLP (15.32s)
    --- PASS: TestLog10kDPS/Stanza (15.41s)
PASS
ok      github.com/open-telemetry/opentelemetry-collector-contrib/testbed/tests_unstable_exe    31.406s
# Test PerformanceResults
Started: Mon, 08 Feb 2021 13:35:08 -0500

Test                                    |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items|
----------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:|
Log10kDPS/OTLP                          |PASS  |     15s|    19.9|    20.6|         39|         47|    149900|        149900|
Log10kDPS/Stanza                        |PASS  |     15s|    28.4|    29.3|         40|         48|    150000|        150000|

Total duration: 31s
```
pmatyjasek-sumo referenced this issue in pmatyjasek-sumo/opentelemetry-collector-contrib Apr 28, 2021
Follows open-telemetry#2883

Will resolve #2287

### Summary

This PR includes:
- A full implementation of a `file_storage` extension, which can read and write data to the local file system. Any component in the collector may make use of this extension.
- Updates to `stanza/internal` to allow stanza-based receivers to use the extension for checkpoints.
- A new testbed scenario that has the filelogreceiver using the extension

Configuration of the extension is simple.
```yaml
  file_storage:
  file_storage/all_settings:
    directory: /var/lib/otelcol/mydir
    timeout: 2s
```

The extension is made available to component's via the `host` parameter in their `Start` method:
```go
func (r *receiver) Start(ctx context.Context, host component.Host) error {
	for _, ext := range host.GetExtensions() {
		if se, ok := ext.(storage.Extension); ok {
			client, err := se.GetClient(ctx, component.KindReceiver, r.NamedEntity)
			if err != nil {
				return err
			}
			r.storageClient = client
			return nil
		}
	}
	r.storageClient = storage.NewNopClient()
        ...
}
```
# Conflicts:
#	go.mod
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants