diff --git a/aerospike/aerospike.go b/aerospike/aerospike.go index c195aed..fba5d5f 100644 --- a/aerospike/aerospike.go +++ b/aerospike/aerospike.go @@ -5,9 +5,6 @@ import ( "crypto/sha256" "encoding/json" "log" - "os" - "os/signal" - "syscall" "time" aero "github.com/aerospike/aerospike-client-go/v6" @@ -15,219 +12,290 @@ import ( "github.com/reugn/go-streams/flow" ) -// AerospikeProperties represents configuration properties for an Aerospike connector. -type AerospikeProperties struct { - Policy *aero.ClientPolicy - Hostname string - Port int - Namespase string - SetName string -} - -// ChangeNotificationProperties contains the configuration for polling Aerospike cluster events. -type ChangeNotificationProperties struct { +// PollingConfig contains the configuration for polling Aerospike cluster events. +type PollingConfig struct { + // PollingInterval specifies the interval at which the database should be + // polled for changes. If not specified, the entire dataset will be queried + // once. PollingInterval time.Duration + // QueryPolicy encapsulates parameters for policy attributes used in query + // operations (optional). + QueryPolicy *aero.QueryPolicy + // SecondaryIndexFilter specifies a query filter definition (optional). + SecondaryIndexFilter *aero.Filter + // Namespace determines query namespace. + Namespace string + // SetName determines query set name (optional). + SetName string + // BinNames detemines which bins to retrieve (optional). + BinNames []string + + filterExpression *aero.Expression } -// AerospikeSource represents an Aerospike source connector. -type AerospikeSource struct { - client *aero.Client - recordsChannel chan *aero.Result - scanPolicy *aero.ScanPolicy - out chan any - ctx context.Context - properties *AerospikeProperties - changeNotificationProperties *ChangeNotificationProperties +// PollingSource is an Aerospike source connector that regularly checks the +// database and transmits any recently updated records downstream. +type PollingSource struct { + client *aero.Client + config PollingConfig + statement *aero.Statement + recordsChan chan *aero.Result + out chan any } -// NewAerospikeSource returns a new AerospikeSource instance. -// Set changeNotificationProperties to nil to scan the entire namespace/set. -func NewAerospikeSource(ctx context.Context, - properties *AerospikeProperties, - scanPolicy *aero.ScanPolicy, - changeNotificationProperties *ChangeNotificationProperties) (*AerospikeSource, error) { +var _ streams.Source = (*PollingSource)(nil) - client, err := aero.NewClientWithPolicy(properties.Policy, properties.Hostname, properties.Port) - if err != nil { - return nil, err +// NewPollingSource returns a new PollingSource instance. +func NewPollingSource(ctx context.Context, client *aero.Client, + config PollingConfig) *PollingSource { + if config.QueryPolicy == nil { + config.QueryPolicy = aero.NewQueryPolicy() + } else { + config.filterExpression = config.QueryPolicy.FilterExpression } - - if scanPolicy == nil { - scanPolicy = aero.NewScanPolicy() + statement := &aero.Statement{ + Namespace: config.Namespace, + SetName: config.SetName, + Filter: config.SecondaryIndexFilter, + BinNames: config.BinNames, } - - records := make(chan *aero.Result) - source := &AerospikeSource{ - client: client, - recordsChannel: records, - scanPolicy: scanPolicy, - out: make(chan any), - ctx: ctx, - properties: properties, - changeNotificationProperties: changeNotificationProperties, + source := &PollingSource{ + client: client, + config: config, + statement: statement, + recordsChan: make(chan *aero.Result), + out: make(chan any), } - go source.poll() - go source.init() - return source, nil + go source.pollChanges(ctx) + go source.streamRecords(ctx) + + return source } -func (as *AerospikeSource) poll() { - if as.changeNotificationProperties == nil { - // scan the entire namespace/set - as.doScan() - close(as.recordsChannel) +func (ps *PollingSource) pollChanges(ctx context.Context) { + if ps.config.PollingInterval == 0 { + // retrieve the entire namespace/set once + ps.query() + close(ps.recordsChan) return } - // get change notifications by polling - ticker := time.NewTicker(as.changeNotificationProperties.PollingInterval) + // obtain updates about data changes through scheduled queries + ticker := time.NewTicker(ps.config.PollingInterval) + defer ticker.Stop() loop: for { select { - case <-as.ctx.Done(): + case <-ctx.Done(): break loop - case t := <-ticker.C: - ts := t.UnixNano() - as.changeNotificationProperties.PollingInterval.Nanoseconds() - as.scanPolicy.FilterExpression = aero.ExpGreater( + lastUpdate := t.Add(-ps.config.PollingInterval) + // filter records by the time they were last updated + lastUpdatedExp := aero.ExpGreater( aero.ExpLastUpdate(), - aero.ExpIntVal(ts), + aero.ExpIntVal(lastUpdate.UnixNano()), ) - log.Printf("Polling records from %d", ts) - - as.doScan() + if ps.config.filterExpression == nil { + ps.config.QueryPolicy.FilterExpression = lastUpdatedExp + } else { + ps.config.QueryPolicy.FilterExpression = aero.ExpAnd( + lastUpdatedExp, + ps.config.filterExpression, + ) + } + log.Printf("Polling records from: %s", lastUpdate) + // execute the query command + ps.query() } } } -func (as *AerospikeSource) doScan() { - recordSet, err := as.client.ScanAll(as.scanPolicy, as.properties.Namespase, as.properties.SetName) +func (ps *PollingSource) query() { + recordSet, err := ps.client.Query(ps.config.QueryPolicy, ps.statement) if err != nil { - log.Printf("Aerospike client.ScanAll failed with: %s", err) - } else { - for result := range recordSet.Results() { - as.recordsChannel <- result - } + log.Printf("Aerospike polling query failed: %s", err) + return + } + for result := range recordSet.Results() { + ps.recordsChan <- result } } -// init starts the main loop -func (as *AerospikeSource) init() { - sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - +func (ps *PollingSource) streamRecords(ctx context.Context) { loop: for { select { - case <-sigchan: - break loop - - case <-as.ctx.Done(): + case <-ctx.Done(): break loop - - case result, ok := <-as.recordsChannel: + case result, ok := <-ps.recordsChan: if !ok { break loop } if result.Err == nil { - as.out <- result.Record + ps.out <- result.Record // send the record downstream } else { - log.Printf("Aerospike scan record error %s", result.Err) + log.Printf("Aerospike query record error: %s", result.Err) } } } - - log.Printf("Closing Aerospike consumer") - close(as.out) - as.client.Close() + log.Printf("Closing Aerospike polling connector") + close(ps.out) } -// Via streams data through the given flow -func (as *AerospikeSource) Via(_flow streams.Flow) streams.Flow { - flow.DoStream(as, _flow) - return _flow +// Via streams data to a specified operator and returns it. +func (ps *PollingSource) Via(operator streams.Flow) streams.Flow { + flow.DoStream(ps, operator) + return operator } -// Out returns an output channel for sending data -func (as *AerospikeSource) Out() <-chan any { - return as.out +// Out returns the output channel of the PollingSource connector. +func (ps *PollingSource) Out() <-chan any { + return ps.out } -// AerospikeKeyBins represents an Aerospike Key and BinMap container. -// Use it to stream records to an AerospikeSink. -type AerospikeKeyBins struct { +// A Record encapsulates an Aerospike Key and BinMap container. +// It is intended to be used to stream records to the Aerospike sink connector. +type Record struct { Key *aero.Key Bins aero.BinMap } -// AerospikeSink represents an Aerospike sink connector. -type AerospikeSink struct { - client *aero.Client - in chan any - ctx context.Context - properties *AerospikeProperties - writePolicy *aero.WritePolicy +// batchWrite creates and returns a batch write operation for the record. +func (r *Record) batchWrite(policy *aero.BatchWritePolicy) *aero.BatchWrite { + ops := make([]*aero.Operation, 0, len(r.Bins)) + for k, v := range r.Bins { + ops = append(ops, aero.PutOp(aero.NewBin(k, v))) + } + return aero.NewBatchWrite(policy, r.Key, ops...) } -// NewAerospikeSink returns a new AerospikeSink instance. -func NewAerospikeSink(ctx context.Context, - properties *AerospikeProperties, writePolicy *aero.WritePolicy) (*AerospikeSink, error) { - client, err := aero.NewClientWithPolicy(properties.Policy, properties.Hostname, properties.Port) - if err != nil { - return nil, err - } +// SinkConfig contains the configuration for the Aerospike sink connector. +type SinkConfig struct { + // WritePolicy encapsulates parameters for policy attributes used in + // write operations. Used in single write operations and ignored if + // BatchSize is larger than one (optional). + WritePolicy *aero.WritePolicy + // BatchSize controls the size of the batch when writing records. If not + // specified or set to a value less than two, a single write operation + // will be used for each record (optional). + BatchSize int + // BufferFlushInterval defines the maximum duration records can be buffered + // before being flushed. Used with BatchSize larger than one (optional). + BufferFlushInterval time.Duration + // BatchPolicy encapsulates parameters for policy attributes used in + // write operations. Used with BatchSize larger than one (optional). + BatchPolicy *aero.BatchPolicy + // BatchWritePolicy attributes used in batch write commands. Used with + // BatchSize larger than one (optional). + BatchWritePolicy *aero.BatchWritePolicy + // Namespace determines the target namespace. + Namespace string + // SetName determines the target set name (optional). + SetName string +} - if writePolicy == nil { - writePolicy = aero.NewWritePolicy(0, 0) - } +// Sink represents an Aerospike sink connector. +type Sink struct { + client *aero.Client + config SinkConfig + buf []*Record + in chan any +} - source := &AerospikeSink{ - client: client, - in: make(chan any), - ctx: ctx, - properties: properties, - writePolicy: writePolicy, +var _ streams.Sink = (*Sink)(nil) + +// NewSink returns a new Sink instance. +func NewSink(client *aero.Client, config SinkConfig) *Sink { + sink := &Sink{ + client: client, + config: config, + in: make(chan any), + } + // initialize the buffer for batch writes + if config.BatchSize > 1 { + sink.buf = make([]*Record, 0, config.BatchSize) } + // begin processing upstream records + go sink.processStream() - go source.init() - return source, nil + return sink } -// init starts the main loop -func (as *AerospikeSink) init() { - for msg := range as.in { - switch m := msg.(type) { - case AerospikeKeyBins: - if err := as.client.Put(as.writePolicy, m.Key, m.Bins); err != nil { - log.Printf("Aerospike client.Put failed with: %s", err) +func (as *Sink) processStream() { + var flushTickerChan <-chan time.Time + if as.config.BatchSize > 1 && as.config.BufferFlushInterval > 0 { + ticker := time.NewTicker(as.config.BufferFlushInterval) + defer ticker.Stop() + flushTickerChan = ticker.C + } +loop: + for { + select { + case msg, ok := <-as.in: // read upstream messages + if !ok { + break loop } - - case aero.BinMap: - jsonStr, err := json.Marshal(m) - if err == nil { - var key *aero.Key - // use BinMap sha256 checksum as record key - key, err = aero.NewKey(as.properties.Namespase, - as.properties.SetName, - sha256.Sum256(jsonStr)) + switch message := msg.(type) { + case *Record: + as.writeRecord(message) + case Record: + as.writeRecord(&message) + case aero.BinMap: + encoded, err := json.Marshal(message) if err == nil { - err = as.client.Put(as.writePolicy, key, m) + var key *aero.Key + // use the sha256 checksum of the bin map as the record key + key, err = aero.NewKey(as.config.Namespace, as.config.SetName, + sha256.Sum256(encoded)) + if err == nil { + as.writeRecord(&Record{key, message}) + } + } + if err != nil { + log.Printf("Error parsing bin map: %s", err) } + default: + log.Printf("Unsupported message type %v", message) } + case <-flushTickerChan: + as.flushBuffer() + } + } + as.flushBuffer() // write buffered records in batch mode +} - if err != nil { - log.Printf("Error processing Aerospike message: %s", err) - } +func (as *Sink) writeRecord(record *Record) { + if as.config.BatchSize > 1 { // batch mode + if len(as.buf) == as.config.BatchSize { + as.flushBuffer() + } + // add the record to the buffer + as.buf = append(as.buf, record) + } else { + // use single record put operation + if err := as.client.Put(as.config.WritePolicy, record.Key, record.Bins); err != nil { + log.Printf("Failed to write record: %s", err) + } + } +} - default: - log.Printf("Unsupported message type %v", m) +func (as *Sink) flushBuffer() { + if as.config.BatchSize > 1 && len(as.buf) > 0 { + // write records as a batch + records := make([]aero.BatchRecordIfc, 0, as.config.BatchSize) + for _, rec := range as.buf { + records = append(records, rec.batchWrite(as.config.BatchWritePolicy)) + } + log.Printf("Writing batch of %d records", len(records)) + if err := as.client.BatchOperate(as.config.BatchPolicy, records); err != nil { + log.Printf("Failed to write batch of records: %s", err) } + as.buf = as.buf[:0] // clear the buffer } - as.client.Close() } -// In returns an input channel for receiving data -func (as *AerospikeSink) In() chan<- any { +// In returns the input channel of the Sink connector. +func (as *Sink) In() chan<- any { return as.in } diff --git a/aerospike/go.mod b/aerospike/go.mod index b5c9ea4..ca58096 100644 --- a/aerospike/go.mod +++ b/aerospike/go.mod @@ -3,18 +3,17 @@ module github.com/reugn/go-streams/aerospike go 1.18 require ( - github.com/aerospike/aerospike-client-go/v6 v6.15.0 + github.com/aerospike/aerospike-client-go/v6 v6.15.1 github.com/reugn/go-streams v0.10.0 ) require ( - github.com/golang/protobuf v1.5.3 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect - golang.org/x/net v0.21.0 // indirect - golang.org/x/sync v0.6.0 // indirect - golang.org/x/sys v0.17.0 // indirect + golang.org/x/net v0.24.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 // indirect - google.golang.org/grpc v1.59.0 // indirect - google.golang.org/protobuf v1.31.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect + google.golang.org/grpc v1.63.2 // indirect + google.golang.org/protobuf v1.33.0 // indirect ) diff --git a/aerospike/go.sum b/aerospike/go.sum index b8e32fd..7ff6cfc 100644 --- a/aerospike/go.sum +++ b/aerospike/go.sum @@ -1,35 +1,28 @@ -github.com/aerospike/aerospike-client-go/v6 v6.15.0 h1:UF9GaKRjgwLsfRPfzgO14ivr9+DWIV8SkHhnb4AjbD4= -github.com/aerospike/aerospike-client-go/v6 v6.15.0/go.mod h1:76do+aMM6LwPAaWuMQKsOKM65NN/2P7InSmQqsuwJv4= -github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/aerospike/aerospike-client-go/v6 v6.15.1 h1:meQQ3dVNImi8+EcHJFe4f1+mF6wpg2qgv7dPNAp0L+4= +github.com/aerospike/aerospike-client-go/v6 v6.15.1/go.mod h1:8GzCrqAEvZig6Cr/dz5nwPucIOAZXJTHkt6L7WBZFaA= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= -github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4= -github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg= +github.com/google/pprof v0.0.0-20240319011627-a57c5dfe54fd h1:LjW4RcTwfcqOYGmD7UpFrn1gfBZ9mgu7QN5mSeFkCog= +github.com/onsi/ginkgo/v2 v2.17.0 h1:kdnunFXpBjbzN56hcJHrXZ8M+LOkenKA7NnBzTNigTI= +github.com/onsi/gomega v1.32.0 h1:JRYU78fJ1LPxlckP6Txi/EYqJvjtMrDC04/MM5XRHPk= github.com/reugn/go-streams v0.10.0 h1:Y0wHNihEbHsFOFV2/xTOKvud4ZpJPaRTET01fwx2/rQ= github.com/reugn/go-streams v0.10.0/go.mod h1:QI5XXifJkVJl2jQ6Cra8I9DvWdJTgqcFYR7amvXZ9Lg= github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= -golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= -golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= -golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= -golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= -golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= +golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 h1:DC7wcm+i+P1rN3Ff07vL+OndGg5OhNddHyTA+ocPqYE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4/go.mod h1:eJVxU6o+4G1PSczBr85xmyvSNYAKvAYgkub40YGomFM= -google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= -google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= +google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/examples/aerospike/main.go b/examples/aerospike/main.go index f87a883..11183db 100644 --- a/examples/aerospike/main.go +++ b/examples/aerospike/main.go @@ -5,53 +5,60 @@ import ( "log" "time" - ext "github.com/reugn/go-streams/aerospike" - aero "github.com/aerospike/aerospike-client-go/v6" + "github.com/reugn/go-streams/aerospike" "github.com/reugn/go-streams/flow" ) func main() { - properties := &ext.AerospikeProperties{ - Policy: nil, - Hostname: "localhost", - Port: 3000, - Namespase: "test", - SetName: "streams", + client, err := aero.NewClient("localhost", 3000) + if err != nil { + log.Fatal(err) } - ctx, cancelFunc := context.WithCancel(context.Background()) + ctx, cancelFunc := context.WithCancel(context.Background()) timer := time.NewTimer(time.Minute) go func() { <-timer.C cancelFunc() }() - cnProperties := &ext.ChangeNotificationProperties{ - PollingInterval: time.Second * 3, - } - - source, err := ext.NewAerospikeSource(ctx, properties, nil, cnProperties) - if err != nil { - log.Fatal(err) - } + queryPolicy := aero.NewQueryPolicy() + queryPolicy.SendKey = true // send user defined key + source := aerospike.NewPollingSource(ctx, client, aerospike.PollingConfig{ + PollingInterval: 5 * time.Second, + QueryPolicy: queryPolicy, + Namespace: "test", + SetName: "source", + }) mapFlow := flow.NewMap(transform, 1) - sink, err := ext.NewAerospikeSink(ctx, properties, nil) - if err != nil { - log.Fatal(err) - } + + batchWritePolicy := aero.NewBatchWritePolicy() + batchWritePolicy.SendKey = true // send user defined key + sink := aerospike.NewSink(client, aerospike.SinkConfig{ + BatchSize: 3, + BufferFlushInterval: 10 * time.Second, + BatchWritePolicy: batchWritePolicy, + Namespace: "test", + SetName: "sink", + }) source. Via(mapFlow). To(sink) + + client.Close() // close the Aerospike client } -var transform = func(msg *aero.Record) ext.AerospikeKeyBins { - log.Println(msg.Bins) - msg.Bins["ts"] = time.Now().UnixNano() - return ext.AerospikeKeyBins{ - Key: msg.Key, - Bins: msg.Bins, +var transform = func(rec *aero.Record) aerospike.Record { + rec.Bins["ts"] = time.Now().UnixNano() + key, err := aero.NewKey(rec.Key.Namespace(), "sink", rec.Key.Value()) + if err != nil { + log.Fatal(err) + } + return aerospike.Record{ + Key: key, + Bins: rec.Bins, } } diff --git a/examples/go.mod b/examples/go.mod index 997eac1..4b8d074 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( github.com/IBM/sarama v1.43.0 - github.com/aerospike/aerospike-client-go/v6 v6.15.0 + github.com/aerospike/aerospike-client-go/v6 v6.15.1 github.com/apache/pulsar-client-go v0.12.1 github.com/gorilla/websocket v1.5.1 github.com/nats-io/nats.go v1.33.1 @@ -40,7 +40,7 @@ require ( github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt v3.2.1+incompatible // indirect - github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect github.com/hashicorp/errwrap v1.0.0 // indirect @@ -74,18 +74,18 @@ require ( github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect go.uber.org/atomic v1.7.0 // indirect - golang.org/x/crypto v0.19.0 // indirect + golang.org/x/crypto v0.22.0 // indirect golang.org/x/mod v0.8.0 // indirect - golang.org/x/net v0.21.0 // indirect - golang.org/x/oauth2 v0.11.0 // indirect - golang.org/x/sync v0.6.0 // indirect - golang.org/x/sys v0.17.0 // indirect - golang.org/x/term v0.17.0 // indirect + golang.org/x/net v0.24.0 // indirect + golang.org/x/oauth2 v0.17.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.19.0 // indirect + golang.org/x/term v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect - google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 // indirect - google.golang.org/grpc v1.59.0 // indirect - google.golang.org/protobuf v1.31.0 // indirect + google.golang.org/appengine v1.6.8 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect + google.golang.org/grpc v1.63.2 // indirect + google.golang.org/protobuf v1.33.0 // indirect ) replace ( diff --git a/examples/go.sum b/examples/go.sum index 79fca6b..52124b5 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -11,8 +11,8 @@ github.com/DataDog/zstd v1.5.0 h1:+K/VEwIAaPcHiMtQvpLD4lqW7f0Gk3xdYZmI1hD+CXo= github.com/DataDog/zstd v1.5.0/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/IBM/sarama v1.43.0 h1:YFFDn8mMI2QL0wOrG0J2sFoVIAFl7hS9JQi2YZsXtJc= github.com/IBM/sarama v1.43.0/go.mod h1:zlE6HEbC/SMQ9mhEYaF7nNLYOUyrs0obySKCckWP9BM= -github.com/aerospike/aerospike-client-go/v6 v6.15.0 h1:UF9GaKRjgwLsfRPfzgO14ivr9+DWIV8SkHhnb4AjbD4= -github.com/aerospike/aerospike-client-go/v6 v6.15.0/go.mod h1:76do+aMM6LwPAaWuMQKsOKM65NN/2P7InSmQqsuwJv4= +github.com/aerospike/aerospike-client-go/v6 v6.15.1 h1:meQQ3dVNImi8+EcHJFe4f1+mF6wpg2qgv7dPNAp0L+4= +github.com/aerospike/aerospike-client-go/v6 v6.15.1/go.mod h1:8GzCrqAEvZig6Cr/dz5nwPucIOAZXJTHkt6L7WBZFaA= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -66,7 +66,7 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= -github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= @@ -89,8 +89,9 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -102,8 +103,8 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= -github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/pprof v0.0.0-20240319011627-a57c5dfe54fd h1:LjW4RcTwfcqOYGmD7UpFrn1gfBZ9mgu7QN5mSeFkCog= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= @@ -198,8 +199,8 @@ github.com/nats-io/stan.go v0.10.4/go.mod h1:3XJXH8GagrGqajoO/9+HgPyKV5MWsv7S5cc github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= -github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4= -github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg= +github.com/onsi/ginkgo/v2 v2.17.0 h1:kdnunFXpBjbzN56hcJHrXZ8M+LOkenKA7NnBzTNigTI= +github.com/onsi/gomega v1.32.0 h1:JRYU78fJ1LPxlckP6Txi/EYqJvjtMrDC04/MM5XRHPk= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -279,8 +280,8 @@ golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= -golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= -golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= +golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= +golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= @@ -292,7 +293,6 @@ golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -306,11 +306,11 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= -golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= +golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= -golang.org/x/oauth2 v0.11.0 h1:vPL4xzxBM4niKCW6g9whtaWVXTJf1U5e4aZxxFx/gbU= -golang.org/x/oauth2 v0.11.0/go.mod h1:LdF7O/8bLR/qWK9DrpXmbHLTouvRHK0SgJl0GmDBchk= +golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ= +golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -318,8 +318,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= -golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -344,20 +344,21 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= -golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.17.0 h1:mkTF7LCd6WGJNL3K1Ad7kwxNfYAW6a8a8QqtMblp/4U= -golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= +golang.org/x/term v0.19.0 h1:+ThwsDv+tYfnJFhF4L8jITxu1tdTWRTZpdsWgEgjL6Q= +golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= @@ -368,18 +369,18 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc= +golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= -google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= -google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 h1:DC7wcm+i+P1rN3Ff07vL+OndGg5OhNddHyTA+ocPqYE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4/go.mod h1:eJVxU6o+4G1PSczBr85xmyvSNYAKvAYgkub40YGomFM= -google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= -google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= +google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= +google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= +google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -388,8 +389,8 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=