diff --git a/examples/aerospike/main.go b/examples/aerospike/main.go index b2a1a12..0b520cf 100644 --- a/examples/aerospike/main.go +++ b/examples/aerospike/main.go @@ -36,20 +36,23 @@ func main() { if err != nil { log.Fatal(err) } - flow1 := flow.NewMap(transform, 1) + + mapFlow := flow.NewMap(transform, 1) sink, err := ext.NewAerospikeSink(ctx, properties, nil) if err != nil { log.Fatal(err) } source. - Via(flow1). + Via(mapFlow). To(sink) } -var transform = func(in interface{}) interface{} { - msg := in.(*aero.Record) +var transform = func(msg *aero.Record) ext.AerospikeKeyBins { log.Println(msg.Bins) msg.Bins["ts"] = util.NowNano() - return ext.AerospikeKeyBins{Key: msg.Key, Bins: msg.Bins} + return ext.AerospikeKeyBins{ + Key: msg.Key, + Bins: msg.Bins, + } } diff --git a/examples/fs/main.go b/examples/fs/main.go index 56b4f4a..86e8803 100644 --- a/examples/fs/main.go +++ b/examples/fs/main.go @@ -9,19 +9,20 @@ import ( func main() { source := ext.NewFileSource("in.txt") - flow := flow.NewMap(reverse, 1) + reverseMapFlow := flow.NewMap(reverse, 1) sink := ext.NewFileSink("out.txt") - source.Via(flow).To(sink) + source. + Via(reverseMapFlow). + To(sink) time.Sleep(time.Second) } -var reverse = func(in interface{}) interface{} { - s := in.(string) +var reverse = func(str string) string { var reverse string - for i := len(s) - 1; i >= 0; i-- { - reverse += string(s[i]) + for i := len(str) - 1; i >= 0; i-- { + reverse += string(str[i]) } return reverse } diff --git a/examples/go.mod b/examples/go.mod index 17b8821..dbf52d9 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -1,6 +1,6 @@ module github.com/reugn/go-streams/examples -go 1.15 +go 1.18 require ( github.com/Shopify/sarama v1.33.0 @@ -10,7 +10,7 @@ require ( github.com/gorilla/websocket v1.4.2 github.com/nats-io/nats.go v1.15.0 github.com/nats-io/stan.go v0.10.2 - github.com/reugn/go-streams v0.7.0 + github.com/reugn/go-streams v0.8.0 github.com/reugn/go-streams/aerospike v0.0.0 github.com/reugn/go-streams/kafka v0.0.0 github.com/reugn/go-streams/nats v0.0.0 @@ -19,6 +19,66 @@ require ( github.com/reugn/go-streams/ws v0.0.0 ) +require ( + github.com/99designs/keyring v1.1.5 // indirect + github.com/AthenZ/athenz v1.10.15 // indirect + github.com/DataDog/zstd v1.4.6-0.20210211175136-c6db21d202f4 // indirect + github.com/apache/pulsar-client-go/oauth2 v0.0.0-20201120111947-b8bd55bc02bd // indirect + github.com/ardielle/ardielle-go v1.5.2 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.1.1 // indirect + github.com/danieljoos/wincred v1.0.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect + github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a // indirect + github.com/eapache/go-resiliency v1.2.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.2 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.0.0 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d // indirect + github.com/klauspost/compress v1.15.0 // indirect + github.com/linkedin/goavro/v2 v2.9.8 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/mitchellh/go-homedir v1.1.0 // indirect + github.com/mtibben/percent v0.2.1 // indirect + github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/pierrec/lz4 v2.6.1+incompatible // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_golang v1.7.1 // indirect + github.com/prometheus/client_model v0.2.0 // indirect + github.com/prometheus/common v0.10.0 // indirect + github.com/prometheus/procfs v0.2.0 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/sirupsen/logrus v1.8.1 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/stretchr/testify v1.7.0 // indirect + github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da // indirect + go.uber.org/atomic v1.7.0 // indirect + golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect + golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect + golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect + golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect + golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect + google.golang.org/appengine v1.4.0 // indirect + google.golang.org/protobuf v1.26.0 // indirect + gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect +) + replace ( github.com/reugn/go-streams/aerospike => ../aerospike github.com/reugn/go-streams/kafka => ../kafka diff --git a/examples/go.sum b/examples/go.sum index c62cf33..bd2030c 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -80,7 +80,6 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= -github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= @@ -122,9 +121,7 @@ github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= -github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= -github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7FsgI= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= @@ -191,7 +188,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/lib/pq v1.8.0 h1:9xohqzkUwzR4Ga4ivdTcawVS89YSDVxXMa3xJX3cGzg= github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/linkedin/goavro/v2 v2.9.8 h1:jN50elxBsGBDGVDEKqUlDuU1cFwJ11K/yrJCBMe/7Wg= github.com/linkedin/goavro/v2 v2.9.8/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= @@ -246,7 +242,6 @@ github.com/onsi/gomega v1.10.4/go.mod h1:g/HbgYopi++010VEqkFgJHKC09uJiW9UkXvMUuK github.com/onsi/gomega v1.15.0 h1:WjP/FQ/sk43MRmnEcT+MlDw2TFvkrXlprrPST/IudjU= github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= -github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= @@ -279,8 +274,9 @@ github.com/prometheus/procfs v0.2.0 h1:wH4vA7pcjKuZzjF7lM8awk4fnuJO6idemZXoKnULU github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= -github.com/reugn/go-streams v0.7.0 h1:z5CaC4SIDd/zCt/K/jP6IG3k1+/p87Ela2GuBLnr7kw= github.com/reugn/go-streams v0.7.0/go.mod h1:OsEa/+BuietQY5sJUCm5nrZujfWtuKIv7KQVCZCXYGg= +github.com/reugn/go-streams v0.8.0 h1:7E/5Dl5U/lH2LND9SRATqRHAUYmYlbjJ1py+jkgzRc8= +github.com/reugn/go-streams v0.8.0/go.mod h1:QI5XXifJkVJl2jQ6Cra8I9DvWdJTgqcFYR7amvXZ9Lg= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= diff --git a/examples/kafka/confluent/kafka.txt b/examples/kafka/confluent/kafka.txt deleted file mode 100644 index c4a5d85..0000000 --- a/examples/kafka/confluent/kafka.txt +++ /dev/null @@ -1,267 +0,0 @@ -// package main - -// import ( -// "fmt" -// "log" -// "os" -// "os/signal" -// "sync" -// "syscall" - -// "github.com/confluentinc/confluent-kafka-go/kafka" -// "github.com/reugn/go-streams" -// "github.com/reugn/go-streams/flow" -// ) - -// // KafkaSource implementation -// // supports rebalance handling for given group.id -// // auto.commit is enabled by default -// type KafkaSource struct { -// consumer *kafka.Consumer -// topics []string -// in chan interface{} -// once sync.Once -// commitFlow *CommitOffset -// } - -// // Commit returns new commit Flow -// func (ks *KafkaSource) Commit() *CommitOffset { -// ks.once.Do(func() { -// ks.commitFlow = NewCommitOffset(ks.consumer) -// }) -// return ks.commitFlow -// } - -// // CommitOffset is a Kafka manual offsets commit flow -// type CommitOffset struct { -// consumer *kafka.Consumer -// in chan interface{} -// } - -// // NewCommitOffset returns new CommitOffset instance -// func NewCommitOffset(consumer *kafka.Consumer) *CommitOffset { -// return &CommitOffset{ -// consumer, -// make(chan interface{}), -// } -// } - -// // Via streams data through given flow -// // panics on invalid message type -// func (co *CommitOffset) Via(flow streams.Flow) streams.Flow { -// go co.loop(flow) -// return flow -// } - -// // To streams data to given sink -// // panics on invalid message type -// func (co *CommitOffset) To(sink streams.Sink) { -// co.loop(sink) -// } - -// func (co *CommitOffset) loop(inlet streams.Inlet) { -// for ev := range co.Out() { -// switch e := ev.(type) { -// case *kafka.Message: -// co.consumer.CommitMessage(e) -// inlet.In() <- e -// default: -// panic("CommitOffset invalid msg type") -// } -// } -// close(inlet.In()) -// } - -// // Out returns channel for sending data -// func (co *CommitOffset) Out() <-chan interface{} { -// return co.in -// } - -// // In returns channel for receiving data -// func (co *CommitOffset) In() chan<- interface{} { -// return co.in -// } - -// // NewKafkaSource returns new KafkaSource instance -// func NewKafkaSource(config *kafka.ConfigMap, topics ...string) *KafkaSource { -// consumer, err := kafka.NewConsumer(config) -// streams.Check(err) -// source := &KafkaSource{ -// consumer, -// topics, -// make(chan interface{}), -// sync.Once{}, -// nil, -// } -// go source.init() -// return source -// } - -// // start main loop -// func (ks *KafkaSource) init() { -// ks.consumer.SubscribeTopics(ks.topics, rebalanceCallback) -// sigchan := make(chan os.Signal, 1) -// signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) -// run := true -// for run == true { -// select { -// case sig := <-sigchan: -// log.Printf("Caught signal %v: terminating\n", sig) -// run = false -// default: -// ev := ks.consumer.Poll(100) -// if ev == nil { -// continue -// } -// switch e := ev.(type) { -// case *kafka.Message: -// log.Printf("%% Message on %s:\n%s\n", e.TopicPartition, string(e.Value)) -// if e.Headers != nil { -// log.Printf("%% Headers: %v\n", e.Headers) -// } -// ks.in <- e -// case kafka.Error: -// fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) -// if e.Code() == kafka.ErrAllBrokersDown { -// run = false -// } -// default: -// log.Printf("Ignored %v\n", e) -// } -// } -// } -// log.Printf("Closing consumer\n") -// close(ks.in) -// ks.consumer.Close() -// } - -// // Via streams data through given flow -// func (ks *KafkaSource) Via(_flow streams.Flow) streams.Flow { -// flow.DoStream(ks, _flow) -// return _flow -// } - -// // Out returns channel for sending data -// func (ks *KafkaSource) Out() <-chan interface{} { -// return ks.in -// } - -// // handle rebalance events -// var rebalanceCallback = func(c *kafka.Consumer, e kafka.Event) error { -// switch evt := e.(type) { -// case *kafka.AssignedPartitions: -// c.Assign(evt.Partitions) -// case *kafka.RevokedPartitions: -// assigned, err := c.Assignment() -// if err != nil { -// return err -// } -// partitions := clearUnassigned(assigned, evt.Partitions) -// c.Assign(partitions) -// } -// return nil -// } - -// // clear unassigned partitions from current assignment -// func clearUnassigned(a []kafka.TopicPartition, u []kafka.TopicPartition) []kafka.TopicPartition { -// var rt []kafka.TopicPartition -// for _, p := range a { -// if !contains(u, p) { -// rt = append(rt, p) -// } -// } -// return rt -// } - -// func contains(s []kafka.TopicPartition, p kafka.TopicPartition) bool { -// for _, sp := range s { -// if sp == p { -// return true -// } -// } -// return false -// } - -// // KafkaSink implementation -// // Produces messages using Round Robin partitioning strategy on empty key -// type KafkaSink struct { -// producer *kafka.Producer -// topic string -// in chan interface{} -// partition int32 -// topicPartitions int32 -// } - -// // NewKafkaSink returns new KafkaSink instance -// func NewKafkaSink(config *kafka.ConfigMap, topic string) *KafkaSink { -// producer, err := kafka.NewProducer(config) -// streams.Check(err) -// sink := &KafkaSink{ -// producer, -// topic, -// make(chan interface{}), -// 0, -// topicPartitionsNumber(producer, topic), -// } -// go sink.init() -// return sink -// } - -// // start main loop -// func (ks *KafkaSink) init() { -// for msg := range ks.in { -// switch m := msg.(type) { -// case *kafka.Message: -// ks.produce(m.Key, m.Value, m.Headers) -// case string: -// ks.produce(nil, []byte(m), []kafka.Header{}) -// } -// } -// log.Printf("Closing producer\n") -// ks.producer.Close() -// } - -// // produce message -// func (ks *KafkaSink) produce(key []byte, value []byte, headers []kafka.Header) error { -// var partition int32 -// if key == nil { -// partition = ks.nextPartition() -// } else { -// partition = ks.keyPartition(key) -// } -// msg := kafka.Message{ -// TopicPartition: kafka.TopicPartition{Topic: &ks.topic, Partition: partition}, -// Value: value, -// Key: key, -// Headers: headers, -// } -// log.Printf("Producing message: %s, to topic: %s\n", msg.Value, msg.TopicPartition.String()) -// return ks.producer.Produce(&msg, nil) -// } - -// // get topic partition number -// func topicPartitionsNumber(producer *kafka.Producer, topic string) int32 { -// metadata, err := producer.GetMetadata(&topic, false, 5000) -// streams.Check(err) -// return int32(len(metadata.Topics[topic].Partitions)) -// } - -// // Round Robin partitioning strategy -// // thread-safe, used from main loop goroutine only -// func (ks *KafkaSink) nextPartition() int32 { -// ks.partition++ -// partition := (ks.partition & 0x7fffffff) % ks.topicPartitions -// log.Printf("Partition: %d from %d\n", partition, ks.topicPartitions) -// return partition -// } - -// // Calculate message partition by key -// func (ks *KafkaSink) keyPartition(key []byte) int32 { -// hashCode := streams.HashCode(key) -// return (int32(hashCode) & 0x7fffffff) % ks.topicPartitions -// } - -// // In returns channel for receiving data -// func (ks *KafkaSink) In() chan<- interface{} { -// return ks.in -// } diff --git a/examples/kafka/confluent/main.txt b/examples/kafka/confluent/main.txt deleted file mode 100644 index 3f00539..0000000 --- a/examples/kafka/confluent/main.txt +++ /dev/null @@ -1,62 +0,0 @@ -// package main - -// import ( -// "os" -// "os/signal" -// "strings" -// "syscall" -// "time" - -// "github.com/confluentinc/confluent-kafka-go/kafka" -// ext "github.com/reugn/go-streams/extension" -// "github.com/reugn/go-streams/flow" -// ) - -// func main() { -// host := "127.0.0.1:9092" -// config := kafka.ConfigMap{ -// "bootstrap.servers": host, -// "group.id": "myGroup", -// "enable.auto.commit": false, -// "auto.offset.reset": "earliest", -// } - -// source := ext.NewKafkaSource(&config, "test") -// flow1 := flow.NewMap(toUpper, 1) -// flow2 := flow.NewFlatMap(appendAsterix, 1) -// sink := ext.NewKafkaSink(&config, "test2") -// throttler := flow.NewThrottler(1, time.Second*1, 50, flow.Discard) -// // slidingWindow := flow.NewSlidingWindow(time.Second*30, time.Second*5) -// tumblingWindow := flow.NewTumblingWindow(time.Second * 5) -// //manual offset commit flow -// commit := source.Commit() - -// source.Via(flow1).Via(commit).Via(throttler).Via(tumblingWindow).Via(flow2).To(sink) -// wait() -// } - -// var toUpper = func(in interface{}) interface{} { -// msg := in.(*kafka.Message) -// msg.Value = []byte(strings.ToUpper(string(msg.Value))) -// return msg -// } - -// var appendAsterix = func(in interface{}) []interface{} { -// arr := in.([]interface{}) -// rt := make([]interface{}, len(arr)) -// for i, item := range arr { -// msg := item.(*kafka.Message) -// msg.Value = []byte(string(msg.Value) + "*") -// rt[i] = msg -// } -// return rt -// } - -// func wait() { -// c := make(chan os.Signal) -// signal.Notify(c, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) -// select { -// case <-c: -// return -// } -// } diff --git a/examples/kafka/main.go b/examples/kafka/main.go index 4182daa..7bada99 100644 --- a/examples/kafka/main.go +++ b/examples/kafka/main.go @@ -26,35 +26,34 @@ func main() { if err != nil { log.Fatal(err) } - flow1 := flow.NewMap(toUpper, 1) - flow2 := flow.NewFlatMap(appendAsterisk, 1) + + toUpperMapFlow := flow.NewMap(toUpper, 1) + appendAsteriskFlatMapFlow := flow.NewFlatMap(appendAsterisk, 1) sink, err := ext.NewKafkaSink(hosts, config, "test2") if err != nil { log.Fatal(err) } + throttler := flow.NewThrottler(1, time.Second, 50, flow.Discard) // slidingWindow := flow.NewSlidingWindow(time.Second*30, time.Second*5) tumblingWindow := flow.NewTumblingWindow(time.Second * 5) source. - Via(flow1). + Via(toUpperMapFlow). Via(throttler). Via(tumblingWindow). - Via(flow2). + Via(appendAsteriskFlatMapFlow). To(sink) } -var toUpper = func(in interface{}) interface{} { - msg := in.(*sarama.ConsumerMessage) +var toUpper = func(msg *sarama.ConsumerMessage) *sarama.ConsumerMessage { msg.Value = []byte(strings.ToUpper(string(msg.Value))) return msg } -var appendAsterisk = func(in interface{}) []interface{} { - arr := in.([]interface{}) - outArr := make([]interface{}, len(arr)) - for i, item := range arr { - msg := item.(*sarama.ConsumerMessage) +var appendAsterisk = func(inArr []*sarama.ConsumerMessage) []*sarama.ConsumerMessage { + outArr := make([]*sarama.ConsumerMessage, len(inArr)) + for i, msg := range inArr { msg.Value = []byte(string(msg.Value) + "*") outArr[i] = msg } diff --git a/examples/nats/main.go b/examples/nats/main.go index f6ebaf2..cec468d 100644 --- a/examples/nats/main.go +++ b/examples/nats/main.go @@ -35,7 +35,7 @@ func jetStream() { defer cancel() fileSource := extension.NewFileSource("in.txt") - flow1 := flow.NewMap(toUpperString, 1) + toUpperMapFlow := flow.NewMap(toUpperString, 1) jetSink, err := ext.NewJetStreamSink("stream1", "stream1.subject1", "nats://localhost:4222") if err != nil { log.Fatal(err) @@ -45,11 +45,16 @@ func jetStream() { if err != nil { log.Fatal(err) } - flow2 := flow.NewMap(fetchJetMsg, 1) + fetchJetMsgMapFlow := flow.NewMap(fetchJetMsg, 1) stdOutSInk := extension.NewStdoutSink() - fileSource.Via(flow1).To(jetSink) - jetSource.Via(flow2).To(stdOutSInk) + fileSource. + Via(toUpperMapFlow). + To(jetSink) + + jetSource. + Via(fetchJetMsgMapFlow). + To(stdOutSInk) } // docker run --rm --name nats-streaming -p 4223:4223 -p 8223:8223 nats-streaming -p 4223 -m 8223 @@ -57,7 +62,7 @@ func streaming() { ctx := context.Background() fileSource := extension.NewFileSource("in.txt") - flow1 := flow.NewMap(toUpperString, 1) + toUpperMapFlow := flow.NewMap(toUpperString, 1) prodConn, err := stan.Connect("test-cluster", "test-producer", stan.NatsURL("nats://localhost:4223")) if err != nil { log.Fatal(err) @@ -71,24 +76,26 @@ func streaming() { // This example uses the StartWithLastReceived subscription option // there are more available at https://docs.nats.io/developing-with-nats-streaming/receiving streamingSource := ext.NewStreamingSource(ctx, subConn, stan.StartWithLastReceived(), "topic1") - flow2 := flow.NewMap(fetchStanMsg, 1) + fetchStanMsgMapFlow := flow.NewMap(fetchStanMsg, 1) stdOutSInk := extension.NewStdoutSink() - fileSource.Via(flow1).To(streamingSink) - streamingSource.Via(flow2).To(stdOutSInk) + fileSource. + Via(toUpperMapFlow). + To(streamingSink) + + streamingSource. + Via(fetchStanMsgMapFlow). + To(stdOutSInk) } -var toUpperString = func(in interface{}) interface{} { - msg := in.(string) +var toUpperString = func(msg string) []byte { return []byte(strings.ReplaceAll(strings.ToUpper(msg), "\n", "")) } -var fetchJetMsg = func(in interface{}) interface{} { - msg := in.(*nats.Msg) +var fetchJetMsg = func(msg *nats.Msg) string { return string(msg.Data) } -var fetchStanMsg = func(in interface{}) interface{} { - msg := in.(*stan.Msg) +var fetchStanMsg = func(msg *stan.Msg) string { return string(msg.Data) } diff --git a/examples/net/main.go b/examples/net/main.go index 3f3d017..060ee72 100644 --- a/examples/net/main.go +++ b/examples/net/main.go @@ -25,19 +25,19 @@ func main() { if err != nil { log.Fatal(err) } - flow1 := flow.NewMap(toUpper, 1) + + toUpperMapFlow := flow.NewMap(toUpper, 1) sink, err := ext.NewNetSink(ext.UDP, "127.0.0.1:3535") if err != nil { log.Fatal(err) } source. - Via(flow1). + Via(toUpperMapFlow). To(sink) } -var toUpper = func(in interface{}) interface{} { - msg := in.(string) +var toUpper = func(msg string) string { log.Printf("Got: %s", msg) return strings.ToUpper(msg) } diff --git a/examples/pulsar/main.go b/examples/pulsar/main.go index 9c15231..8f0aa44 100644 --- a/examples/pulsar/main.go +++ b/examples/pulsar/main.go @@ -24,18 +24,18 @@ func main() { if err != nil { log.Fatal(err) } - flow1 := flow.NewMap(toUpper, 1) + + toUpperMapFlow := flow.NewMap(toUpper, 1) sink, err := ext.NewPulsarSink(ctx, &clientOptions, &producerOptions) if err != nil { log.Fatal(err) } source. - Via(flow1). + Via(toUpperMapFlow). To(sink) } -var toUpper = func(in interface{}) interface{} { - msg := in.(pulsar.Message) +var toUpper = func(msg pulsar.Message) string { return strings.ToUpper(string(msg.Payload())) } diff --git a/examples/redis/main.go b/examples/redis/main.go index 81175af..c3936c9 100644 --- a/examples/redis/main.go +++ b/examples/redis/main.go @@ -33,15 +33,15 @@ func main() { if err != nil { log.Fatal(err) } - flow1 := flow.NewMap(toUpper, 1) + + toUpperMapFlow := flow.NewMap(toUpper, 1) sink := ext.NewRedisSink(config, "test2") source. - Via(flow1). + Via(toUpperMapFlow). To(sink) } -var toUpper = func(in interface{}) interface{} { - msg := in.(*redis.Message) +var toUpper = func(msg *redis.Message) string { return strings.ToUpper(msg.Payload) } diff --git a/examples/std/main.go b/examples/std/main.go index debbeb4..ee97f77 100644 --- a/examples/std/main.go +++ b/examples/std/main.go @@ -19,16 +19,15 @@ func (msg *message) String() string { func main() { source := ext.NewChanSource(tickerChan(time.Second * 1)) - flow := flow.NewMap(mapp, 1) + mapFlow := flow.NewMap(addUTC, 1) sink := ext.NewStdoutSink() - source.Via(flow).To(sink) - - select {} + source. + Via(mapFlow). + To(sink) } -var mapp = func(in interface{}) interface{} { - msg := in.(*message) +var addUTC = func(msg *message) *message { msg.Msg += "-UTC" return msg } diff --git a/examples/ws/main.go b/examples/ws/main.go index 0bdc99b..e448995 100644 --- a/examples/ws/main.go +++ b/examples/ws/main.go @@ -120,21 +120,21 @@ func main() { if err != nil { log.Fatal(err) } - flow1 := flow.NewMap(appendAsterix, 1) + + addAsteriskMapFlow := flow.NewMap(addAsterisk, 1) sink, err := ext.NewWebSocketSink(ctx, url) if err != nil { log.Fatal(err) } source. - Via(flow1). + Via(addAsteriskMapFlow). To(sink) log.Print("Exiting...") } -var appendAsterix = func(in interface{}) interface{} { - msg := in.(ext.Message) +var addAsterisk = func(msg ext.Message) string { return string(msg.Payload) + "*" }