-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
First PR - Failover Connector skeleton (#28818)
This is the Part 1 PR for the Failover Connector (split according to the CONTRIBUTING.md doc) Link to tracking Issue: #20766 Testing: Added factory test Note: Full functionality PR exists [here](#27641) and will likely be refactored to serve as the part 2 PR cc: @djaglowski @sethallen @MovieStoreGuy
- Loading branch information
Showing
23 changed files
with
1,106 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# Use this changelog template to create an entry for release notes. | ||
|
||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: new_component | ||
|
||
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) | ||
component: failoverconnector | ||
|
||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
note: New component that will allow for pipeline failover triggered by the health of target downstream exporters | ||
|
||
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. | ||
issues: [20766] | ||
|
||
# (Optional) One or more lines of additional information to render under the primary note. | ||
# These lines will be padded with 2 spaces and then inserted directly into the document. | ||
# Use pipe (|) for multiline entries. | ||
subtext: | ||
|
||
# If your change doesn't affect end users or the exported elements of any package, | ||
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. | ||
# Optional: The change log or logs in which this entry should be included. | ||
# e.g. '[user]' or '[user, api]' | ||
# Include 'user' if the change is relevant to end users. | ||
# Include 'api' if there is a change to a library API. | ||
# Default: '[user]' | ||
change_logs: [] |
Validating CODEOWNERS rules …
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include ../../Makefile.Common |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
# Failover Connector | ||
|
||
<!-- status autogenerated section --> | ||
| Status | | | ||
| ------------- |-----------| | ||
| Distributions | [contrib] | | ||
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aconnector%2Ffailover%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aconnector%2Ffailover) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aconnector%2Ffailover%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aconnector%2Ffailover) | | ||
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@djaglowski](https://www.github.com/djaglowski), [@fatsheep9146](https://www.github.com/fatsheep9146) | | ||
|
||
[development]: https://github.com/open-telemetry/opentelemetry-collector#development | ||
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib | ||
|
||
## Supported Pipeline Types | ||
|
||
| [Exporter Pipeline Type] | [Receiver Pipeline Type] | [Stability Level] | | ||
| ------------------------ | ------------------------ | ----------------- | | ||
| metrics | metrics | [development] | | ||
| traces | traces | [development] | | ||
| logs | logs | [development] | | ||
|
||
[Exporter Pipeline Type]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#exporter-pipeline-type | ||
[Receiver Pipeline Type]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#receiver-pipeline-type | ||
[Stability Level]: https://github.com/open-telemetry/opentelemetry-collector#stability-levels | ||
<!-- end autogenerated section --> | ||
|
||
Allows for health based routing between trace, metric, and log pipelines depending on the health of target downstream exporters. | ||
|
||
## Configuration | ||
|
||
If you are not already familiar with connectors, you may find it helpful to first visit the [Connectors README]. | ||
|
||
The following settings are available: | ||
|
||
- `priority_levels (required)`: list of pipeline level priorities in a 1 - n configuration, multiple pipelines can sit at a single priority level. | ||
- `retry_interval (optional)`: the frequency at which the pipeline levels will attempt to reestablish connection with all higher priority levels. Default value is 10 minutes. (See Example below for further explanation) | ||
- `retry_gap (optional)`: the amount of time between trying two separate priority levels in a single retry_interval timeframe. Default value is 30 seconds. (See Example below for further explanation) | ||
- `max_retries (optional)`: the maximum retries per level. Default value is 10. | ||
|
||
The connector intakes a list of `priority_levels` each of which can contain multiple pipelines. | ||
If any pipeline at a stable level fails, the level is considered unhealthy and the connector will move down one priority level and route all data to the new level (assuming it is stable). | ||
|
||
The connector will periodically try to reestablish a stable connection with the higher priority levels. `retry_interval` will be the frequency at which the connector will try to iterate through all unhealthy higher priority levels while `retry_gap` is how long it will wait after a failed retry at one level before retrying the next level (if retry_gap is 2m, after trying to reestablish level 1, it will wait 2m before trying level 2) It will retry a maximum of one unhealthy level before returning to the current stable level.) | ||
There is a `max_retries` config param as well that will track how many retries have occurred at each level, and once the max is hit, it will no longer retry that priority level. | ||
|
||
#### Configuration Example: | ||
|
||
```yaml | ||
connectors: | ||
failover: | ||
priority_levels: | ||
- [traces/first, traces/also_first] | ||
- [traces/second] | ||
- [traces/third] | ||
retry_interval: 5m | ||
retry_gap: 1m | ||
max_retries: 10 | ||
|
||
service: | ||
pipelines: | ||
traces: | ||
receivers: [otlp] | ||
exporters: [failover] | ||
traces/first: | ||
receivers: [failover] | ||
exporters: [otlp/first] | ||
traces/second: | ||
receivers: [failover] | ||
exporters: [otlp/second] | ||
traces/third: | ||
receivers: [failover] | ||
exporters: [otlp/third] | ||
traces/also_first: | ||
receivers: [failover] | ||
exporters: [otlp/fourth] | ||
``` | ||
#### Example with Explanation: | ||
```yaml | ||
connectors: | ||
failover: | ||
priority_levels: | ||
- [traces/first] | ||
- [traces/second] | ||
- [traces/third] | ||
- [traces/fourth] | ||
retry_interval: 5m | ||
retry_gap: 1m | ||
max_retries: 10 | ||
``` | ||
Assume the current stable level is level 4 (traces/fourth) on the priority_level list. | ||
At the start of the `retry_interval`, the connector will try to reestablish the pipeline on level 1 (trace/first). If it fails, the connector will return to level 4 (traces/fourth) and wait the 1m as the `retry_gap`, when that 1m passes it will now retry level 2 (traces/second) and if that fails will first return to level 4 before waiting another 1m until trying level 3. | ||
Once it tries level 3 and it fails, it will return to level 4 and wait the 10m retry_interval again before repeating the process. If a retry is successful then the retried level becomes the stable level, and the connector will continue to retry any higher priority levels that haven't exceeded the `max_retries`. | ||
|
||
[Connectors README]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md | ||
[Exporter Pipeline Type]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#exporter-pipeline-type | ||
[Receiver Pipeline Type]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#receiver-pipeline-type | ||
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package failoverconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/failoverconnector" | ||
|
||
import ( | ||
"errors" | ||
"time" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
) | ||
|
||
var ( | ||
errNoPipelinePriority = errors.New("No pipelines are defined in the priority list") | ||
errInvalidRetryIntervals = errors.New("Retry interval must be positive, and retry_interval must be greater than retry_gap times the length of the priority list") | ||
) | ||
|
||
type Config struct { | ||
// PipelinePriority is the list of pipeline level priorities in a 1 - n configuration, multiple pipelines can | ||
// sit at a single priority level and will be routed in a fanout. If any pipeline at a level fails, the | ||
// level is considered unhealthy | ||
PipelinePriority [][]component.ID `mapstructure:"priority_levels"` | ||
|
||
// RetryInterval is the frequency at which the pipeline levels will attempt to recover by going over | ||
// all levels below the current | ||
RetryInterval time.Duration `mapstructure:"retry_interval"` | ||
|
||
// RetryGap is how much time will pass between trying two separate priority levels in a single RetryInterval | ||
// If the priority list has 3 levels, the RetryInterval is 5m, and the retryGap is 1m, within the 5m RetryInterval, | ||
// the connector will only try one level every 1m, and will return to the stable level in the interim | ||
RetryGap time.Duration `mapstructure:"retry_gap"` | ||
|
||
// MaxRetry is the maximum retries per level, once this limit is hit for a level, even if the next pipeline level fails, | ||
// it will not try to recover the level that exceeded the maximum retries | ||
MaxRetries int `mapstructure:"max_retries"` | ||
} | ||
|
||
// Validate needs to ensure RetryInterval > # elements in PriorityList * RetryGap | ||
func (c *Config) Validate() error { | ||
if len(c.PipelinePriority) == 0 { | ||
return errNoPipelinePriority | ||
} | ||
retryTime := c.RetryGap * time.Duration(len(c.PipelinePriority)) | ||
if c.RetryGap <= 0 || c.RetryInterval <= 0 || c.RetryInterval <= retryTime { | ||
return errInvalidRetryIntervals | ||
} | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package failoverconnector | ||
|
||
import ( | ||
"path/filepath" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/confmap/confmaptest" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/connector/failoverconnector/internal/metadata" | ||
) | ||
|
||
func TestLoadConfig(t *testing.T) { | ||
testcases := []struct { | ||
id component.ID | ||
expected *Config | ||
}{ | ||
{ | ||
id: component.NewIDWithName(metadata.Type, "default"), | ||
expected: &Config{ | ||
PipelinePriority: [][]component.ID{ | ||
{ | ||
component.NewIDWithName(component.DataTypeTraces, ""), | ||
}, | ||
}, | ||
RetryInterval: 10 * time.Minute, | ||
RetryGap: 30 * time.Second, | ||
MaxRetries: 10, | ||
}, | ||
}, | ||
{ | ||
id: component.NewIDWithName(metadata.Type, "full"), | ||
expected: &Config{ | ||
PipelinePriority: [][]component.ID{ | ||
{ | ||
component.NewIDWithName(component.DataTypeTraces, "first"), | ||
component.NewIDWithName(component.DataTypeTraces, "also_first"), | ||
}, | ||
{ | ||
component.NewIDWithName(component.DataTypeTraces, "second"), | ||
}, | ||
{ | ||
component.NewIDWithName(component.DataTypeTraces, "third"), | ||
}, | ||
{ | ||
component.NewIDWithName(component.DataTypeTraces, "fourth"), | ||
}, | ||
}, | ||
RetryInterval: 5 * time.Minute, | ||
RetryGap: time.Minute, | ||
MaxRetries: 10, | ||
}, | ||
}, | ||
} | ||
|
||
for _, tc := range testcases { | ||
t.Run(tc.id.String(), func(t *testing.T) { | ||
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) | ||
require.NoError(t, err) | ||
|
||
factory := NewFactory() | ||
cfg := factory.CreateDefaultConfig() | ||
|
||
sub, err := cm.Sub(tc.id.String()) | ||
require.NoError(t, err) | ||
require.NoError(t, component.UnmarshalConfig(sub, cfg)) | ||
|
||
assert.NoError(t, component.ValidateConfig(cfg)) | ||
assert.Equal(t, tc.expected, cfg) | ||
}) | ||
} | ||
} | ||
|
||
func TestValidateConfig(t *testing.T) { | ||
testcases := []struct { | ||
name string | ||
id component.ID | ||
err error | ||
}{ | ||
{ | ||
name: "no priority levels provided", | ||
id: component.NewIDWithName(metadata.Type, ""), | ||
err: errNoPipelinePriority, | ||
}, | ||
{ | ||
name: "invalid ratio of retry_gap to retry_interval", | ||
id: component.NewIDWithName(metadata.Type, "invalid"), | ||
err: errInvalidRetryIntervals, | ||
}, | ||
} | ||
|
||
for _, tc := range testcases { | ||
t.Run(tc.id.String(), func(t *testing.T) { | ||
t.Run(tc.name, func(t *testing.T) { | ||
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) | ||
require.NoError(t, err) | ||
|
||
factory := NewFactory() | ||
cfg := factory.CreateDefaultConfig() | ||
|
||
sub, err := cm.Sub(tc.id.String()) | ||
require.NoError(t, err) | ||
require.NoError(t, component.UnmarshalConfig(sub, cfg)) | ||
|
||
assert.EqualError(t, component.ValidateConfig(cfg), tc.err.Error()) | ||
}) | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
//go:generate mdatagen metadata.yaml | ||
|
||
package failoverconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/failoverconnector" |
Oops, something went wrong.