Skip to content

Commit

Permalink
Add basic stream level metadata to catalog entries
Browse files Browse the repository at this point in the history
This PR does a few big changes, mainly adding the concept of metadata to
the catalog and it's entries.

We create and emit a few default fields into entries now that let us
specify how we will act for a given stream - users can also edit these
values (along with adding others) so we need to read that catalog file
back in if specified.

When reading it in we parse it back into our catalog object and then use
that to try and filter the streams we will act on.

The options are a little weird here but basically, a user can set
"selected" (we cannot) if the selected is not set, then we can check if
"selected-by-default" is set, as this is something we typically control
and will have turned on.

If none of those match or are false then we don't include the stream.

If we do not find a catalog file at all - then we will create our
default one (same as running --discover) and just use that immediately.
  • Loading branch information
rliddler committed Oct 19, 2023
1 parent 2a88a45 commit 4a5f25f
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 48 deletions.
40 changes: 36 additions & 4 deletions cmd/tap-incident/cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,27 @@ func Run(ctx context.Context) (err error) {

if *discoveryMode {
err = tap.Discover(ctx, logger, ol)
if err != nil {
return err
}
} else {
err = tap.Sync(ctx, logger, ol, cl)
}
if err != nil {
return err
// If we're syncing - check if we were given a catalog
var (
catalog *tap.Catalog
err error
)

if *catalogFile != "" {
catalog, err = loadCatalogOrError(ctx, *catalogFile)
if err != nil {
return err
}
}

err = tap.Sync(ctx, logger, ol, cl, catalog)
if err != nil {
return err
}
}

return nil
Expand All @@ -113,6 +129,22 @@ func versionStanza() string {
)
}

func loadCatalogOrError(ctx context.Context, catalogFile string) (catalog *tap.Catalog, err error) {
defer func() {
if err == nil {
return
}
OUT("Failed to load catalog file!\n")
}()

catalog, err = tap.CatalogFileLoader(catalogFile).Load(ctx)
if err != nil {
return nil, errors.Wrap(err, "loading catalog")
}

return catalog, nil
}

func loadConfigOrError(ctx context.Context, configFile string) (cfg *config.Config, err error) {
defer func() {
if err == nil {
Expand Down
35 changes: 0 additions & 35 deletions config/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package config
import (
"context"
"os"
"time"

kitlog "github.com/go-kit/log"
)

type Loader interface {
Expand All @@ -29,35 +26,3 @@ func (l FileLoader) Load(context.Context) (*Config, error) {

return Parse(string(l), data)
}

// NewCachedLoader caches a loader to avoid repeated lookups.
func NewCachedLoader(logger kitlog.Logger, loader Loader, ttl time.Duration) Loader {
return &cachedLoader{
logger: logger,
loader: loader,
ttl: ttl,
}
}

type cachedLoader struct {
logger kitlog.Logger
loader Loader
ttl time.Duration
cfg *Config
lastUpdated time.Time
}

func (c *cachedLoader) Load(ctx context.Context) (cfg *Config, err error) {
if c.cfg == nil || time.Since(c.lastUpdated) > c.ttl {
c.logger.Log("event", "loading_cofig", "msg", "cache expired, loading config")
cfg, err := c.loader.Load(ctx)
if err != nil {
return nil, err
}

c.cfg = cfg
c.lastUpdated = time.Now()
}

return c.cfg, nil
}
82 changes: 78 additions & 4 deletions tap/catalog.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package tap

import "github.com/incident-io/singer-tap/model"
import (
"context"
"encoding/json"
"os"

"github.com/incident-io/singer-tap/model"
"github.com/pkg/errors"
)

// A catalog can contain several streams or "entries"
type CatalogEntry struct {
Expand All @@ -15,7 +22,7 @@ type CatalogEntry struct {
Schema model.Schema `json:"schema"`

// Optional metadata for this stream
// Metadata *[]Metadata `json:"metadata,omitempty"`
Metadata *[]Metadata `json:"metadata,omitempty"`
}

// Actual catalog that we export
Expand All @@ -24,14 +31,50 @@ type Catalog struct {
Streams []CatalogEntry `json:"streams"`
}

func NewCatalog(streams map[string]Stream) *Catalog {
func (c *Catalog) GetEnabledStreams() []CatalogEntry {
var enabledStreams []CatalogEntry

// Go through all streams registered in the catalog
for _, entry := range c.Streams {
// if there is no metadata then just include the stream
if entry.Metadata == nil {
enabledStreams = append(enabledStreams, entry)
} else {
for _, metadata := range *entry.Metadata {
// Only check the top level metadata
if len(metadata.Breadcrumb) > 0 {
continue
}

// Check if the metadata has the user input "selected" bool
if metadata.Metadata.Selected != nil {
// If so, check its set to true!
if *metadata.Metadata.Selected {
enabledStreams = append(enabledStreams, entry)
}
// otherwise check if WE have set to select this by default
} else if metadata.Metadata.SelectedByDefault {
enabledStreams = append(enabledStreams, entry)
}
}
}
}

return enabledStreams
}

func NewDefaultCatalog(streams map[string]Stream) *Catalog {
entries := []CatalogEntry{}

for name, stream := range streams {
streamSchema := *stream.Output().Schema
metadata := Metadata{}.DefaultMetadata(streamSchema)

catalogEntry := CatalogEntry{
Stream: name,
TapStreamID: name,
Schema: *stream.Output().Schema,
Schema: streamSchema,
Metadata: &metadata,
}

entries = append(entries, catalogEntry)
Expand All @@ -41,3 +84,34 @@ func NewCatalog(streams map[string]Stream) *Catalog {
Streams: entries,
}
}

type CatalogLoader interface {
Load(context.Context) (*Catalog, error)
}

type CatalogLoaderFunc func(context.Context) (*Catalog, error)

func (l CatalogLoaderFunc) Load(ctx context.Context) (*Catalog, error) {
return l(ctx)
}

// CatalogFileLoader loads Catalog from a filepath
type CatalogFileLoader string

func (l CatalogFileLoader) Load(context.Context) (*Catalog, error) {
data, err := os.ReadFile(string(l))
if err != nil {
return nil, err
}

return ParseCatalogFile(string(l), data)
}

func ParseCatalogFile(filename string, data []byte) (*Catalog, error) {
var catalog Catalog
if err := json.Unmarshal(data, &catalog); err != nil {
return nil, errors.Wrap(err, "parsing json")
}

return &catalog, nil
}
68 changes: 68 additions & 0 deletions tap/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package tap

import "github.com/incident-io/singer-tap/model"

type Metadata struct {
// Pointer to where in the schmea this metadata applies
Breadcrumb []string `json:"breadcrumb"`

// Fields set for this metadata object
Metadata MetadataFields `json:"metadata"`
}

type MetadataFields struct {
/****
* NON DISCOVERABLE FIELDS
* We don't control these - pull them in and use them
****/

// Selected: if this node is selected by the user to be emitted
// Can be field level or whole stream
Selected *bool `json:"selected,omitempty"`

// ReplicationMethod: the replication method to use
// we ignored for our tap
ReplicationMethod *string `json:"replicate-method,omitempty"`

// ReplicationKey: the replicate key for this node
// Used as a bookmark - ignore for our tap
ReplicationKey *string `json:"replication-key,omitempty"`

// ViewKeyProperties: not sure how this is used
// ignored for our tap
ViewKeyProperties *[]string `json:"view-key-properties,omitempty"`

/****
* DISCOVERABLE FIELDS
* We can read and write these fields
****/

// Inclusion: whether we emit this field automatically
// can be available (you choose), automatic (we choose), or unsupported (we don't emit)
Inclusion string `json:"inclusion"`

// SelectedByDefault: If the user doesn't specify should we
// emit this field by default
// This really only applies to available inclusion setting
SelectedByDefault bool `json:"selected-by-default"`

// ForcedReplicateMethod: we will set to FULL_TABLE for our tap
ForcedReplicationMethod string `json:"forced-replication-method"`
}

func (m Metadata) DefaultMetadata(schema model.Schema) []Metadata {
// By default we always include a top level metadata with the same
// settings
var metadata = []Metadata{
{
Breadcrumb: []string{},
Metadata: MetadataFields{
Inclusion: "available", // always set to available at stream level
SelectedByDefault: true, // lets assume people always want our data
ForcedReplicationMethod: "FULL_TABLE", // HIGHWAY TO THE DATA ZONE
},
},
}

return metadata
}
19 changes: 14 additions & 5 deletions tap/tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,18 @@ import (
"github.com/incident-io/singer-tap/client"
)

func Sync(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *client.ClientWithResponses) error {
for name, stream := range streams {
logger := kitlog.With(logger, "stream", name)
func Sync(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *client.ClientWithResponses, catalog *Catalog) error {
// If we weren't given a catalog, create a default one and use that
if catalog == nil {
catalog = NewDefaultCatalog(streams)
}

// We only want to sync enabled streams
enabledStreams := catalog.GetEnabledStreams()

for _, catalogEntry := range enabledStreams {
stream := streams[catalogEntry.Stream]
logger := kitlog.With(logger, "stream", catalogEntry.Stream)

logger.Log("msg", "outputting schema")
if err := ol.Log(stream.Output()); err != nil {
Expand All @@ -28,7 +37,7 @@ func Sync(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *clien
for _, record := range records {
op := &Output{
Type: OutputTypeRecord,
Stream: name,
Stream: catalogEntry.Stream,
Record: record,
TimeExtracted: timeExtracted,
}
Expand All @@ -42,7 +51,7 @@ func Sync(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *clien
}

func Discover(ctx context.Context, logger kitlog.Logger, ol *OutputLogger) error {
catalog := NewCatalog(streams)
catalog := NewDefaultCatalog(streams)

if err := ol.CataLog(catalog); err != nil {
return err
Expand Down

0 comments on commit 4a5f25f

Please sign in to comment.