diff --git a/eventkitd-bigquery/bigquery/config.go b/eventkitd-bigquery/bigquery/config.go new file mode 100644 index 0000000..2941f5d --- /dev/null +++ b/eventkitd-bigquery/bigquery/config.go @@ -0,0 +1,122 @@ +package bigquery + +import ( + "context" + "strconv" + "strings" + "time" + + "github.com/zeebo/errs/v2" + + "storj.io/eventkit" +) + +// CreateDestination creates eventkit destination based on complex configuration. +// Example configurations: +// +// 127.0.0.1:1234 +// bigquery:app=...,project=...,dataset=... +// bigquery:app=...,project=...,dataset=...|batch:queueSize=111,flashSize=111,flushInterval=111 +// bigquery:app=...,project=...,dataset=...|parallel:runners=10|batch:queueSize=111,flashSize=111,flushInterval=111 +func CreateDestination(ctx context.Context, config string) (eventkit.Destination, error) { + layers := strings.Split(config, "|") + var lastLayer func() (eventkit.Destination, error) + for _, layer := range layers { + layer = strings.TrimSpace(layer) + if layer == "" { + continue + } + typeName, params, found := strings.Cut(layer, ":") + if !found { + return nil, errs.Errorf("eventkit destination parameters should be defined in the form type:param=value,...") + } + switch typeName { + case "bigquery", "bq": + var appName, project, dataset string + for _, param := range strings.Split(params, ",") { + key, value, found := strings.Cut(param, "=") + if !found { + return nil, errs.Errorf("eventkit destination parameters should be defined in param2=value2 format") + } + switch key { + case "appName": + appName = value + case "project": + project = value + case "dataset": + dataset = value + default: + return nil, errs.Errorf("Unknown parameter for bigquery destination %s. Please use appName/project/dataset", key) + } + + } + lastLayer = func() (eventkit.Destination, error) { + return NewBigQueryDestination(ctx, appName, project, dataset) + } + case "parallel": + var workers int + for _, param := range strings.Split(params, ",") { + key, value, found := strings.Cut(param, "=") + if !found { + return nil, errs.Errorf("eventkit destination parameters should be defined in param2=value2 format") + } + switch key { + case "workers": + var err error + workers, err = strconv.Atoi(value) + if err != nil { + return nil, errs.Errorf("workers parameter of parallel destination should be a number and not %s", value) + } + default: + return nil, errs.Errorf("Unknown parameter for parallel destination %s. Please use appName/project/dataset", value) + } + } + + ll := lastLayer + lastLayer = func() (eventkit.Destination, error) { + return NewParallel(ll, workers), nil + } + + case "batch": + var queueSize, batchSize int + var flushInterval time.Duration + var err error + for _, param := range strings.Split(params, ",") { + key, value, found := strings.Cut(param, "=") + if !found { + return nil, errs.Errorf("eventkit destination parameters should be defined in param2=value2 format") + } + switch key { + case "queueSize": + queueSize, err = strconv.Atoi(value) + if err != nil { + return nil, errs.Errorf("queueSize parameter of batch destination should be a number and not %s", value) + } + case "batchSize": + batchSize, err = strconv.Atoi(value) + if err != nil { + return nil, errs.Errorf("batchSize parameter of batch destination should be a number and not %s", value) + } + case "flushInterval": + flushInterval, err = time.ParseDuration(value) + if err != nil { + return nil, errs.Errorf("flushInterval parameter of batch destination should be a duration and not %s", value) + } + default: + return nil, errs.Errorf("Unknown parameter for batch destination %s. Please use queueSize/batchSize/flushInterval", key) + } + } + destination, err := lastLayer() + if err != nil { + return nil, err + } + lastLayer = func() (eventkit.Destination, error) { + return NewBatchQueue(destination, queueSize, batchSize, flushInterval), nil + } + } + } + if lastLayer == nil { + return nil, errs.Errorf("No evenkit destinatino is defined") + } + return lastLayer() +}