Skip to content

Commit

Permalink
Merge pull request #15 from incident-io/rob/emit-stream-level-metadata
Browse files Browse the repository at this point in the history
Add basic stream level metadata to catalog entries
  • Loading branch information
rliddler authored Oct 19, 2023
2 parents 2a88a45 + 4a5f25f commit ab02aa8
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 48 deletions.
40 changes: 36 additions & 4 deletions cmd/tap-incident/cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,27 @@ func Run(ctx context.Context) (err error) {

if *discoveryMode {
err = tap.Discover(ctx, logger, ol)
if err != nil {
return err
}
} else {
err = tap.Sync(ctx, logger, ol, cl)
}
if err != nil {
return err
// If we're syncing - check if we were given a catalog
var (
catalog *tap.Catalog
err error
)

if *catalogFile != "" {
catalog, err = loadCatalogOrError(ctx, *catalogFile)
if err != nil {
return err
}
}

err = tap.Sync(ctx, logger, ol, cl, catalog)
if err != nil {
return err
}
}

return nil
Expand All @@ -113,6 +129,22 @@ func versionStanza() string {
)
}

func loadCatalogOrError(ctx context.Context, catalogFile string) (catalog *tap.Catalog, err error) {
defer func() {
if err == nil {
return
}
OUT("Failed to load catalog file!\n")
}()

catalog, err = tap.CatalogFileLoader(catalogFile).Load(ctx)
if err != nil {
return nil, errors.Wrap(err, "loading catalog")
}

return catalog, nil
}

func loadConfigOrError(ctx context.Context, configFile string) (cfg *config.Config, err error) {
defer func() {
if err == nil {
Expand Down
35 changes: 0 additions & 35 deletions config/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package config
import (
"context"
"os"
"time"

kitlog "github.com/go-kit/log"
)

type Loader interface {
Expand All @@ -29,35 +26,3 @@ func (l FileLoader) Load(context.Context) (*Config, error) {

return Parse(string(l), data)
}

// NewCachedLoader caches a loader to avoid repeated lookups.
func NewCachedLoader(logger kitlog.Logger, loader Loader, ttl time.Duration) Loader {
return &cachedLoader{
logger: logger,
loader: loader,
ttl: ttl,
}
}

type cachedLoader struct {
logger kitlog.Logger
loader Loader
ttl time.Duration
cfg *Config
lastUpdated time.Time
}

func (c *cachedLoader) Load(ctx context.Context) (cfg *Config, err error) {
if c.cfg == nil || time.Since(c.lastUpdated) > c.ttl {
c.logger.Log("event", "loading_cofig", "msg", "cache expired, loading config")
cfg, err := c.loader.Load(ctx)
if err != nil {
return nil, err
}

c.cfg = cfg
c.lastUpdated = time.Now()
}

return c.cfg, nil
}
82 changes: 78 additions & 4 deletions tap/catalog.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package tap

import "github.com/incident-io/singer-tap/model"
import (
"context"
"encoding/json"
"os"

"github.com/incident-io/singer-tap/model"
"github.com/pkg/errors"
)

// A catalog can contain several streams or "entries"
type CatalogEntry struct {
Expand All @@ -15,7 +22,7 @@ type CatalogEntry struct {
Schema model.Schema `json:"schema"`

// Optional metadata for this stream
// Metadata *[]Metadata `json:"metadata,omitempty"`
Metadata *[]Metadata `json:"metadata,omitempty"`
}

// Actual catalog that we export
Expand All @@ -24,14 +31,50 @@ type Catalog struct {
Streams []CatalogEntry `json:"streams"`
}

func NewCatalog(streams map[string]Stream) *Catalog {
func (c *Catalog) GetEnabledStreams() []CatalogEntry {
var enabledStreams []CatalogEntry

// Go through all streams registered in the catalog
for _, entry := range c.Streams {
// if there is no metadata then just include the stream
if entry.Metadata == nil {
enabledStreams = append(enabledStreams, entry)
} else {
for _, metadata := range *entry.Metadata {
// Only check the top level metadata
if len(metadata.Breadcrumb) > 0 {
continue
}

// Check if the metadata has the user input "selected" bool
if metadata.Metadata.Selected != nil {
// If so, check its set to true!
if *metadata.Metadata.Selected {
enabledStreams = append(enabledStreams, entry)
}
// otherwise check if WE have set to select this by default
} else if metadata.Metadata.SelectedByDefault {
enabledStreams = append(enabledStreams, entry)
}
}
}
}

return enabledStreams
}

func NewDefaultCatalog(streams map[string]Stream) *Catalog {
entries := []CatalogEntry{}

for name, stream := range streams {
streamSchema := *stream.Output().Schema
metadata := Metadata{}.DefaultMetadata(streamSchema)

catalogEntry := CatalogEntry{
Stream: name,
TapStreamID: name,
Schema: *stream.Output().Schema,
Schema: streamSchema,
Metadata: &metadata,
}

entries = append(entries, catalogEntry)
Expand All @@ -41,3 +84,34 @@ func NewCatalog(streams map[string]Stream) *Catalog {
Streams: entries,
}
}

type CatalogLoader interface {
Load(context.Context) (*Catalog, error)
}

type CatalogLoaderFunc func(context.Context) (*Catalog, error)

func (l CatalogLoaderFunc) Load(ctx context.Context) (*Catalog, error) {
return l(ctx)
}

// CatalogFileLoader loads Catalog from a filepath
type CatalogFileLoader string

func (l CatalogFileLoader) Load(context.Context) (*Catalog, error) {
data, err := os.ReadFile(string(l))
if err != nil {
return nil, err
}

return ParseCatalogFile(string(l), data)
}

func ParseCatalogFile(filename string, data []byte) (*Catalog, error) {
var catalog Catalog
if err := json.Unmarshal(data, &catalog); err != nil {
return nil, errors.Wrap(err, "parsing json")
}

return &catalog, nil
}
68 changes: 68 additions & 0 deletions tap/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package tap

import "github.com/incident-io/singer-tap/model"

type Metadata struct {
// Pointer to where in the schmea this metadata applies
Breadcrumb []string `json:"breadcrumb"`

// Fields set for this metadata object
Metadata MetadataFields `json:"metadata"`
}

type MetadataFields struct {
/****
* NON DISCOVERABLE FIELDS
* We don't control these - pull them in and use them
****/

// Selected: if this node is selected by the user to be emitted
// Can be field level or whole stream
Selected *bool `json:"selected,omitempty"`

// ReplicationMethod: the replication method to use
// we ignored for our tap
ReplicationMethod *string `json:"replicate-method,omitempty"`

// ReplicationKey: the replicate key for this node
// Used as a bookmark - ignore for our tap
ReplicationKey *string `json:"replication-key,omitempty"`

// ViewKeyProperties: not sure how this is used
// ignored for our tap
ViewKeyProperties *[]string `json:"view-key-properties,omitempty"`

/****
* DISCOVERABLE FIELDS
* We can read and write these fields
****/

// Inclusion: whether we emit this field automatically
// can be available (you choose), automatic (we choose), or unsupported (we don't emit)
Inclusion string `json:"inclusion"`

// SelectedByDefault: If the user doesn't specify should we
// emit this field by default
// This really only applies to available inclusion setting
SelectedByDefault bool `json:"selected-by-default"`

// ForcedReplicateMethod: we will set to FULL_TABLE for our tap
ForcedReplicationMethod string `json:"forced-replication-method"`
}

func (m Metadata) DefaultMetadata(schema model.Schema) []Metadata {
// By default we always include a top level metadata with the same
// settings
var metadata = []Metadata{
{
Breadcrumb: []string{},
Metadata: MetadataFields{
Inclusion: "available", // always set to available at stream level
SelectedByDefault: true, // lets assume people always want our data
ForcedReplicationMethod: "FULL_TABLE", // HIGHWAY TO THE DATA ZONE
},
},
}

return metadata
}
19 changes: 14 additions & 5 deletions tap/tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,18 @@ import (
"github.com/incident-io/singer-tap/client"
)

func Sync(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *client.ClientWithResponses) error {
for name, stream := range streams {
logger := kitlog.With(logger, "stream", name)
func Sync(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *client.ClientWithResponses, catalog *Catalog) error {
// If we weren't given a catalog, create a default one and use that
if catalog == nil {
catalog = NewDefaultCatalog(streams)
}

// We only want to sync enabled streams
enabledStreams := catalog.GetEnabledStreams()

for _, catalogEntry := range enabledStreams {
stream := streams[catalogEntry.Stream]
logger := kitlog.With(logger, "stream", catalogEntry.Stream)

logger.Log("msg", "outputting schema")
if err := ol.Log(stream.Output()); err != nil {
Expand All @@ -28,7 +37,7 @@ func Sync(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *clien
for _, record := range records {
op := &Output{
Type: OutputTypeRecord,
Stream: name,
Stream: catalogEntry.Stream,
Record: record,
TimeExtracted: timeExtracted,
}
Expand All @@ -42,7 +51,7 @@ func Sync(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *clien
}

func Discover(ctx context.Context, logger kitlog.Logger, ol *OutputLogger) error {
catalog := NewCatalog(streams)
catalog := NewDefaultCatalog(streams)

if err := ol.CataLog(catalog); err != nil {
return err
Expand Down

0 comments on commit ab02aa8

Please sign in to comment.