From 4a5f25f5bbea01deb4f1d367dad1dc35d4db4c6f Mon Sep 17 00:00:00 2001 From: Rob Date: Thu, 19 Oct 2023 16:27:39 +0200 Subject: [PATCH] Add basic stream level metadata to catalog entries This PR does a few big changes, mainly adding the concept of metadata to the catalog and it's entries. We create and emit a few default fields into entries now that let us specify how we will act for a given stream - users can also edit these values (along with adding others) so we need to read that catalog file back in if specified. When reading it in we parse it back into our catalog object and then use that to try and filter the streams we will act on. The options are a little weird here but basically, a user can set "selected" (we cannot) if the selected is not set, then we can check if "selected-by-default" is set, as this is something we typically control and will have turned on. If none of those match or are false then we don't include the stream. If we do not find a catalog file at all - then we will create our default one (same as running --discover) and just use that immediately. --- cmd/tap-incident/cmd/app.go | 40 ++++++++++++++++-- config/loader.go | 35 ---------------- tap/catalog.go | 82 +++++++++++++++++++++++++++++++++++-- tap/metadata.go | 68 ++++++++++++++++++++++++++++++ tap/tap.go | 19 ++++++--- 5 files changed, 196 insertions(+), 48 deletions(-) create mode 100644 tap/metadata.go diff --git a/cmd/tap-incident/cmd/app.go b/cmd/tap-incident/cmd/app.go index 3c079d7..fa7812e 100644 --- a/cmd/tap-incident/cmd/app.go +++ b/cmd/tap-incident/cmd/app.go @@ -82,11 +82,27 @@ func Run(ctx context.Context) (err error) { if *discoveryMode { err = tap.Discover(ctx, logger, ol) + if err != nil { + return err + } } else { - err = tap.Sync(ctx, logger, ol, cl) - } - if err != nil { - return err + // If we're syncing - check if we were given a catalog + var ( + catalog *tap.Catalog + err error + ) + + if *catalogFile != "" { + catalog, err = loadCatalogOrError(ctx, *catalogFile) + if err != nil { + return err + } + } + + err = tap.Sync(ctx, logger, ol, cl, catalog) + if err != nil { + return err + } } return nil @@ -113,6 +129,22 @@ func versionStanza() string { ) } +func loadCatalogOrError(ctx context.Context, catalogFile string) (catalog *tap.Catalog, err error) { + defer func() { + if err == nil { + return + } + OUT("Failed to load catalog file!\n") + }() + + catalog, err = tap.CatalogFileLoader(catalogFile).Load(ctx) + if err != nil { + return nil, errors.Wrap(err, "loading catalog") + } + + return catalog, nil +} + func loadConfigOrError(ctx context.Context, configFile string) (cfg *config.Config, err error) { defer func() { if err == nil { diff --git a/config/loader.go b/config/loader.go index 18b8519..ab57abd 100644 --- a/config/loader.go +++ b/config/loader.go @@ -3,9 +3,6 @@ package config import ( "context" "os" - "time" - - kitlog "github.com/go-kit/log" ) type Loader interface { @@ -29,35 +26,3 @@ func (l FileLoader) Load(context.Context) (*Config, error) { return Parse(string(l), data) } - -// NewCachedLoader caches a loader to avoid repeated lookups. -func NewCachedLoader(logger kitlog.Logger, loader Loader, ttl time.Duration) Loader { - return &cachedLoader{ - logger: logger, - loader: loader, - ttl: ttl, - } -} - -type cachedLoader struct { - logger kitlog.Logger - loader Loader - ttl time.Duration - cfg *Config - lastUpdated time.Time -} - -func (c *cachedLoader) Load(ctx context.Context) (cfg *Config, err error) { - if c.cfg == nil || time.Since(c.lastUpdated) > c.ttl { - c.logger.Log("event", "loading_cofig", "msg", "cache expired, loading config") - cfg, err := c.loader.Load(ctx) - if err != nil { - return nil, err - } - - c.cfg = cfg - c.lastUpdated = time.Now() - } - - return c.cfg, nil -} diff --git a/tap/catalog.go b/tap/catalog.go index 5fe3c95..36baf65 100644 --- a/tap/catalog.go +++ b/tap/catalog.go @@ -1,6 +1,13 @@ package tap -import "github.com/incident-io/singer-tap/model" +import ( + "context" + "encoding/json" + "os" + + "github.com/incident-io/singer-tap/model" + "github.com/pkg/errors" +) // A catalog can contain several streams or "entries" type CatalogEntry struct { @@ -15,7 +22,7 @@ type CatalogEntry struct { Schema model.Schema `json:"schema"` // Optional metadata for this stream - // Metadata *[]Metadata `json:"metadata,omitempty"` + Metadata *[]Metadata `json:"metadata,omitempty"` } // Actual catalog that we export @@ -24,14 +31,50 @@ type Catalog struct { Streams []CatalogEntry `json:"streams"` } -func NewCatalog(streams map[string]Stream) *Catalog { +func (c *Catalog) GetEnabledStreams() []CatalogEntry { + var enabledStreams []CatalogEntry + + // Go through all streams registered in the catalog + for _, entry := range c.Streams { + // if there is no metadata then just include the stream + if entry.Metadata == nil { + enabledStreams = append(enabledStreams, entry) + } else { + for _, metadata := range *entry.Metadata { + // Only check the top level metadata + if len(metadata.Breadcrumb) > 0 { + continue + } + + // Check if the metadata has the user input "selected" bool + if metadata.Metadata.Selected != nil { + // If so, check its set to true! + if *metadata.Metadata.Selected { + enabledStreams = append(enabledStreams, entry) + } + // otherwise check if WE have set to select this by default + } else if metadata.Metadata.SelectedByDefault { + enabledStreams = append(enabledStreams, entry) + } + } + } + } + + return enabledStreams +} + +func NewDefaultCatalog(streams map[string]Stream) *Catalog { entries := []CatalogEntry{} for name, stream := range streams { + streamSchema := *stream.Output().Schema + metadata := Metadata{}.DefaultMetadata(streamSchema) + catalogEntry := CatalogEntry{ Stream: name, TapStreamID: name, - Schema: *stream.Output().Schema, + Schema: streamSchema, + Metadata: &metadata, } entries = append(entries, catalogEntry) @@ -41,3 +84,34 @@ func NewCatalog(streams map[string]Stream) *Catalog { Streams: entries, } } + +type CatalogLoader interface { + Load(context.Context) (*Catalog, error) +} + +type CatalogLoaderFunc func(context.Context) (*Catalog, error) + +func (l CatalogLoaderFunc) Load(ctx context.Context) (*Catalog, error) { + return l(ctx) +} + +// CatalogFileLoader loads Catalog from a filepath +type CatalogFileLoader string + +func (l CatalogFileLoader) Load(context.Context) (*Catalog, error) { + data, err := os.ReadFile(string(l)) + if err != nil { + return nil, err + } + + return ParseCatalogFile(string(l), data) +} + +func ParseCatalogFile(filename string, data []byte) (*Catalog, error) { + var catalog Catalog + if err := json.Unmarshal(data, &catalog); err != nil { + return nil, errors.Wrap(err, "parsing json") + } + + return &catalog, nil +} diff --git a/tap/metadata.go b/tap/metadata.go new file mode 100644 index 0000000..97e8aca --- /dev/null +++ b/tap/metadata.go @@ -0,0 +1,68 @@ +package tap + +import "github.com/incident-io/singer-tap/model" + +type Metadata struct { + // Pointer to where in the schmea this metadata applies + Breadcrumb []string `json:"breadcrumb"` + + // Fields set for this metadata object + Metadata MetadataFields `json:"metadata"` +} + +type MetadataFields struct { + /**** + * NON DISCOVERABLE FIELDS + * We don't control these - pull them in and use them + ****/ + + // Selected: if this node is selected by the user to be emitted + // Can be field level or whole stream + Selected *bool `json:"selected,omitempty"` + + // ReplicationMethod: the replication method to use + // we ignored for our tap + ReplicationMethod *string `json:"replicate-method,omitempty"` + + // ReplicationKey: the replicate key for this node + // Used as a bookmark - ignore for our tap + ReplicationKey *string `json:"replication-key,omitempty"` + + // ViewKeyProperties: not sure how this is used + // ignored for our tap + ViewKeyProperties *[]string `json:"view-key-properties,omitempty"` + + /**** + * DISCOVERABLE FIELDS + * We can read and write these fields + ****/ + + // Inclusion: whether we emit this field automatically + // can be available (you choose), automatic (we choose), or unsupported (we don't emit) + Inclusion string `json:"inclusion"` + + // SelectedByDefault: If the user doesn't specify should we + // emit this field by default + // This really only applies to available inclusion setting + SelectedByDefault bool `json:"selected-by-default"` + + // ForcedReplicateMethod: we will set to FULL_TABLE for our tap + ForcedReplicationMethod string `json:"forced-replication-method"` +} + +func (m Metadata) DefaultMetadata(schema model.Schema) []Metadata { + // By default we always include a top level metadata with the same + // settings + var metadata = []Metadata{ + { + Breadcrumb: []string{}, + Metadata: MetadataFields{ + Inclusion: "available", // always set to available at stream level + SelectedByDefault: true, // lets assume people always want our data + ForcedReplicationMethod: "FULL_TABLE", // HIGHWAY TO THE DATA ZONE + }, + }, + } + + return metadata +} diff --git a/tap/tap.go b/tap/tap.go index 975641b..5e0906b 100644 --- a/tap/tap.go +++ b/tap/tap.go @@ -8,9 +8,18 @@ import ( "github.com/incident-io/singer-tap/client" ) -func Sync(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *client.ClientWithResponses) error { - for name, stream := range streams { - logger := kitlog.With(logger, "stream", name) +func Sync(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *client.ClientWithResponses, catalog *Catalog) error { + // If we weren't given a catalog, create a default one and use that + if catalog == nil { + catalog = NewDefaultCatalog(streams) + } + + // We only want to sync enabled streams + enabledStreams := catalog.GetEnabledStreams() + + for _, catalogEntry := range enabledStreams { + stream := streams[catalogEntry.Stream] + logger := kitlog.With(logger, "stream", catalogEntry.Stream) logger.Log("msg", "outputting schema") if err := ol.Log(stream.Output()); err != nil { @@ -28,7 +37,7 @@ func Sync(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *clien for _, record := range records { op := &Output{ Type: OutputTypeRecord, - Stream: name, + Stream: catalogEntry.Stream, Record: record, TimeExtracted: timeExtracted, } @@ -42,7 +51,7 @@ func Sync(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *clien } func Discover(ctx context.Context, logger kitlog.Logger, ol *OutputLogger) error { - catalog := NewCatalog(streams) + catalog := NewDefaultCatalog(streams) if err := ol.CataLog(catalog); err != nil { return err