Skip to content

Commit

Permalink
Merge branch 'master' into will/workshop-feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
winder authored Jun 23, 2023
2 parents 067df25 + 7dafc16 commit 6b77ba0
Show file tree
Hide file tree
Showing 22 changed files with 380 additions and 54 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ jobs:

- name: Run e2e tests
run: |
FILENAME="f99e7b0c/rel-nightly"
export CI_E2E_FILENAME="$FILENAME"
make e2e-conduit
- name: Upload codecov report
Expand Down
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ SRCPATH := $(shell pwd)
export GOPATH := $(shell go env GOPATH)
GOPATH1 := $(firstword $(subst :, ,$(GOPATH)))

# pinned filename can be overridden in CI with an env variable.
CI_E2E_FILENAME ?= f99e7b0c/rel-nightly

GOLDFLAGS += -X github.com/algorand/conduit/version.Hash=$(shell git log -n 1 --pretty="%H")
GOLDFLAGS += -X github.com/algorand/conduit/version.ShortHash=$(shell git log -n 1 --pretty="%h")
GOLDFLAGS += -X github.com/algorand/conduit/version.CompileTime=$(shell date -u +%Y-%m-%dT%H:%M:%S%z)
Expand All @@ -21,9 +24,8 @@ conduit:
install:
cd cmd/conduit && go install -ldflags='${GOLDFLAGS}'

# note: when running e2e tests manually be sure to set the e2e filename: 'export CI_E2E_FILENAME=rel-nightly'
e2e-conduit: conduit
export PATH=$(GOPATH1)/bin:$(PATH); pip3 install e2e_tests/ && e2econduit --s3-source-net ${CI_E2E_FILENAME} --conduit-bin cmd/conduit/conduit
export PATH=$(GOPATH1)/bin:$(PATH); pip3 install e2e_tests/ && e2econduit --s3-source-net $(CI_E2E_FILENAME) --conduit-bin cmd/conduit/conduit

# check that all packages (except tests) compile
check:
Expand Down
3 changes: 3 additions & 0 deletions conduit/data/block_export_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package data

import (
sdk "github.com/algorand/go-algorand-sdk/v2/types"

"github.com/algorand/conduit/conduit/telemetry"
)

// RoundProvider is the interface which all data types sent to Exporters should implement
Expand All @@ -16,6 +18,7 @@ type InitProvider interface {
GetGenesis() *sdk.Genesis
SetGenesis(*sdk.Genesis)
NextDBRound() sdk.Round
GetTelemetryClient() telemetry.Client
}

// BlockData is provided to the Exporter on each round.
Expand Down
11 changes: 11 additions & 0 deletions conduit/data/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ type Metrics struct {
Prefix string `yaml:"prefix"`
}

// Telemetry configs for sending Telemetry to OpenSearch
type Telemetry struct {
Enabled bool `yaml:"enabled"`
URI string `yaml:"uri"`
Index string `yaml:"index"`
UserName string `yaml:"username"`
Password string `yaml:"password"`
}

// Config stores configuration specific to the conduit pipeline
type Config struct {
// ConduitArgs are the program inputs. Should not be serialized for config.
Expand All @@ -61,6 +70,8 @@ type Config struct {
RetryCount uint64 `yaml:"retry-count"`
// RetryDelay is a duration amount interpreted from a string
RetryDelay time.Duration `yaml:"retry-delay"`

Telemetry Telemetry `yaml:"telemetry"`
}

// Valid validates pipeline config
Expand Down
1 change: 0 additions & 1 deletion conduit/data/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ exporters:
name: "noop"
config:
connectionstring: ""`, "field exporters not found"},

{"config not configs", `---
log-level: info
importer:
Expand Down
19 changes: 14 additions & 5 deletions conduit/init_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,23 @@ package conduit

import (
sdk "github.com/algorand/go-algorand-sdk/v2/types"

"github.com/algorand/conduit/conduit/telemetry"
)

// PipelineInitProvider algod based init provider
type PipelineInitProvider struct {
currentRound *sdk.Round
genesis *sdk.Genesis
currentRound *sdk.Round
genesis *sdk.Genesis
telemetryClient telemetry.Client
}

// MakePipelineInitProvider constructs an init provider.
func MakePipelineInitProvider(currentRound *sdk.Round, genesis *sdk.Genesis) *PipelineInitProvider {
func MakePipelineInitProvider(currentRound *sdk.Round, genesis *sdk.Genesis, client telemetry.Client) *PipelineInitProvider {
return &PipelineInitProvider{
currentRound: currentRound,
genesis: genesis,
currentRound: currentRound,
genesis: genesis,
telemetryClient: client,
}
}

Expand All @@ -32,3 +36,8 @@ func (a *PipelineInitProvider) GetGenesis() *sdk.Genesis {
func (a *PipelineInitProvider) NextDBRound() sdk.Round {
return *a.currentRound
}

// GetTelemetryClient gets the telemetry state in the init provider
func (a *PipelineInitProvider) GetTelemetryClient() telemetry.Client {
return a.telemetryClient
}
1 change: 1 addition & 0 deletions conduit/pipeline/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type state struct {
GenesisHash string `json:"genesis-hash"`
Network string `json:"network"`
NextRound uint64 `json:"next-round"`
TelemetryID string `json:"telemetry-id,omitempty"`
}

// encodeToFile writes the state object to the dataDir
Expand Down
3 changes: 3 additions & 0 deletions conduit/pipeline/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,18 @@ func TestBlockMetaDataFile(t *testing.T) {
assert.Equal(t, pipelineMetadata.GenesisHash, metaData.GenesisHash)
assert.Equal(t, pipelineMetadata.NextRound, metaData.NextRound)
assert.Equal(t, pipelineMetadata.Network, metaData.Network)
assert.Equal(t, pipelineMetadata.TelemetryID, metaData.TelemetryID)

// Test that file encodes correctly
pipelineMetadata.GenesisHash = "HASH"
pipelineMetadata.NextRound = 7
pipelineMetadata.TelemetryID = "SOME_ID"
err = pipelineMetadata.encodeToFile(datadir)
assert.NoError(t, err)
metaData, err = readBlockMetadata(datadir)
assert.NoError(t, err)
assert.Equal(t, "HASH", metaData.GenesisHash)
assert.Equal(t, uint64(7), metaData.NextRound)
assert.Equal(t, pipelineMetadata.Network, metaData.Network)
assert.Equal(t, pipelineMetadata.TelemetryID, metaData.TelemetryID)
}
43 changes: 41 additions & 2 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/algorand/conduit/conduit/plugins/exporters"
"github.com/algorand/conduit/conduit/plugins/importers"
"github.com/algorand/conduit/conduit/plugins/processors"
"github.com/algorand/conduit/conduit/telemetry"
)

// Pipeline is a struct that orchestrates the entire
Expand Down Expand Up @@ -198,6 +199,25 @@ func (p *pipelineImpl) pluginRoundOverride() (uint64, error) {
return pluginOverride, nil
}

// initializeTelemetry initializes telemetry and reads or sets the GUID in the metadata.
func (p *pipelineImpl) initializeTelemetry() (telemetry.Client, error) {
telemetryConfig := telemetry.MakeTelemetryConfig(p.cfg.Telemetry.URI, p.cfg.Telemetry.Index, p.cfg.Telemetry.UserName, p.cfg.Telemetry.Password)
telemetryClient, err := telemetry.MakeOpenSearchClient(telemetryConfig)
if err != nil {
return nil, fmt.Errorf("failed to initialize telemetry: %w", err)
}
p.logger.Infof("Telemetry initialized with URI: %s", telemetryConfig.URI)

// If GUID is not in metadata, save it. Otherwise, use the GUID from metadata.
if p.pipelineMetadata.TelemetryID == "" {
p.pipelineMetadata.TelemetryID = telemetryClient.TelemetryConfig.GUID
} else {
telemetryClient.TelemetryConfig.GUID = p.pipelineMetadata.TelemetryID
}

return telemetryClient, nil
}

// Init prepares the pipeline for processing block data
func (p *pipelineImpl) Init() error {
p.logger.Infof("Starting Pipeline Initialization")
Expand Down Expand Up @@ -257,8 +277,27 @@ func (p *pipelineImpl) Init() error {

// InitProvider
round := sdk.Round(p.pipelineMetadata.NextRound)
// Initial genesis object is nil--gets updated after importer.Init
var initProvider data.InitProvider = conduit.MakePipelineInitProvider(&round, nil)

// Initialize Telemetry
var telemetryClient telemetry.Client
if p.cfg.Telemetry.Enabled {
// If telemetry cannot be initialized, log a warning and continue
// pipeline initialization.
var telemetryErr error
telemetryClient, telemetryErr = p.initializeTelemetry()
if telemetryErr != nil {
p.logger.Warnf("Telemetry initialization failed, continuing without telemetry: %s", telemetryErr)
} else {
// Try sending a startup event. If it fails, log a warning and continue
event := telemetryClient.MakeTelemetryStartupEvent()
if telemetryErr = telemetryClient.SendEvent(event); telemetryErr != nil {
p.logger.Warnf("failed to send telemetry event: %s", telemetryErr)
}
}
}

// Initial genesis object is nil and gets updated after importer.Init
var initProvider data.InitProvider = conduit.MakePipelineInitProvider(&round, nil, telemetryClient)
p.initProvider = &initProvider

// Initialize Importer
Expand Down
39 changes: 39 additions & 0 deletions conduit/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/algorand/conduit/conduit/plugins/exporters"
"github.com/algorand/conduit/conduit/plugins/importers"
"github.com/algorand/conduit/conduit/plugins/processors"
"github.com/algorand/conduit/conduit/telemetry"
)

// a unique block data to validate with tests
Expand Down Expand Up @@ -529,6 +530,44 @@ func TestPipelineMetricsConfigs(t *testing.T) {
assert.Equal(t, pImpl.cfg.Metrics.Prefix, prefixOverride)
}

func TestPipelineTelemetryConfigs(t *testing.T) {
pImpl, _, _, _, _ := mockPipeline(t, "")

// telemetry OFF, check that client is nil
pImpl.cfg.Telemetry = data.Telemetry{
Enabled: false,
}
pImpl.Init()
baseClient := (*pImpl.initProvider).GetTelemetryClient()
assert.Nil(t, baseClient)

// telemetry ON
pImpl.cfg.Telemetry = data.Telemetry{
Enabled: true,
URI: "test-uri",
Index: "test-index",
UserName: "test-username",
Password: "test-password",
}
pImpl.Init()
baseClient = (*pImpl.initProvider).GetTelemetryClient()
client := baseClient.(*telemetry.OpenSearchClient)

assert.NotNil(t, client)
assert.NotNil(t, client.Client)
assert.Equal(t, true, client.TelemetryConfig.Enable)
assert.Equal(t, "test-uri", client.TelemetryConfig.URI)
assert.Equal(t, "test-index", client.TelemetryConfig.Index)
assert.Equal(t, "test-username", client.TelemetryConfig.UserName)
assert.Equal(t, "test-password", client.TelemetryConfig.Password)

event := client.MakeTelemetryStartupEvent()
assert.Equal(t, "starting conduit", event.Message)
assert.NotEmpty(t, event.Time)
assert.NotEmpty(t, event.GUID)
assert.NotEmpty(t, event.Version)
}

func TestRoundOverrideValidConflict(t *testing.T) {
t.Run("processor_no_conflict", func(t *testing.T) {
pImpl, _, mImporter, mProcessor, _ := mockPipeline(t, "")
Expand Down
10 changes: 5 additions & 5 deletions conduit/plugins/exporters/filewriter/file_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestExporterInitDefaults(t *testing.T) {
defer fileExp.Close()
pcfg := plugins.MakePluginConfig(fmt.Sprintf("block-dir: %s", tc.blockdir))
pcfg.DataDir = tempdir
err := fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil), pcfg, logger)
err := fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), pcfg, logger)
require.NoError(t, err)
})
}
Expand All @@ -92,12 +92,12 @@ func TestExporterInit(t *testing.T) {
defer fileExp.Close()

// creates a new output file
err := fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil), plugins.MakePluginConfig(config), logger)
err := fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), plugins.MakePluginConfig(config), logger)
assert.NoError(t, err)
fileExp.Close()

// can open existing file
err = fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil), plugins.MakePluginConfig(config), logger)
err = fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), plugins.MakePluginConfig(config), logger)
assert.NoError(t, err)
fileExp.Close()
}
Expand All @@ -121,7 +121,7 @@ func sendData(t *testing.T, fileExp exporters.Exporter, config string, numRounds

// initialize
rnd := sdk.Round(0)
err = fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&rnd, nil), plugins.MakePluginConfig(config), logger)
err = fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&rnd, nil, nil), plugins.MakePluginConfig(config), logger)
require.NoError(t, err)

// incorrect round
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestExporterClose(t *testing.T) {
config, _ := getConfig(t)
fileExp := fileCons.New()
rnd := sdk.Round(0)
fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&rnd, nil), plugins.MakePluginConfig(config), logger)
fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&rnd, nil, nil), plugins.MakePluginConfig(config), logger)
require.NoError(t, fileExp.Close())
}

Expand Down
14 changes: 7 additions & 7 deletions conduit/plugins/exporters/postgresql/postgresql_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,26 @@ func TestExporterMetadata(t *testing.T) {
func TestConnectDisconnectSuccess(t *testing.T) {
pgsqlExp := pgsqlConstructor.New()
cfg := plugins.MakePluginConfig("test: true\nconnection-string: ''")
assert.NoError(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, &sdk.Genesis{}), cfg, logger))
assert.NoError(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, &sdk.Genesis{}, nil), cfg, logger))
assert.NoError(t, pgsqlExp.Close())
}

func TestConnectUnmarshalFailure(t *testing.T) {
pgsqlExp := pgsqlConstructor.New()
cfg := plugins.MakePluginConfig("'")
assert.ErrorContains(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil), cfg, logger), "connect failure in unmarshalConfig")
assert.ErrorContains(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), cfg, logger), "connect failure in unmarshalConfig")
}

func TestConnectDbFailure(t *testing.T) {
pgsqlExp := pgsqlConstructor.New()
cfg := plugins.MakePluginConfig("")
assert.ErrorContains(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil), cfg, logger), "connection string is empty for postgres")
assert.ErrorContains(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), cfg, logger), "connection string is empty for postgres")
}

func TestReceiveInvalidBlock(t *testing.T) {
pgsqlExp := pgsqlConstructor.New()
cfg := plugins.MakePluginConfig("test: true")
assert.NoError(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, &sdk.Genesis{}), cfg, logger))
assert.NoError(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, &sdk.Genesis{}, nil), cfg, logger))
invalidBlock := data.BlockData{
BlockHeader: sdk.BlockHeader{
Round: 1,
Expand All @@ -76,7 +76,7 @@ func TestReceiveInvalidBlock(t *testing.T) {
func TestReceiveAddBlockSuccess(t *testing.T) {
pgsqlExp := pgsqlConstructor.New()
cfg := plugins.MakePluginConfig("test: true")
assert.NoError(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, &sdk.Genesis{}), cfg, logger))
assert.NoError(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, &sdk.Genesis{}, nil), cfg, logger))

block := data.BlockData{
BlockHeader: sdk.BlockHeader{},
Expand All @@ -92,7 +92,7 @@ func TestPostgresqlExporterInit(t *testing.T) {
cfg := plugins.MakePluginConfig("test: true")

// genesis hash mismatch
initProvider := conduit.MakePipelineInitProvider(&round, &sdk.Genesis{})
initProvider := conduit.MakePipelineInitProvider(&round, &sdk.Genesis{}, nil)
initProvider.SetGenesis(&sdk.Genesis{
Network: "test",
})
Expand All @@ -101,7 +101,7 @@ func TestPostgresqlExporterInit(t *testing.T) {

// incorrect round
round = 1
err = pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, &sdk.Genesis{}), cfg, logger)
err = pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, &sdk.Genesis{}, nil), cfg, logger)
assert.Contains(t, err.Error(), "initializing block round 1 but next round to account is 0")
}

Expand Down
Loading

0 comments on commit 6b77ba0

Please sign in to comment.