From 3311aecbea3197fa91fb14041255e364fc37eb8f Mon Sep 17 00:00:00 2001 From: Bart van Deenen Date: Wed, 11 Aug 2021 14:34:16 +0200 Subject: [PATCH] feat(sim): added demo schema random simulator --- cmd/strm/main.go | 2 +- pkg/cmd/sim.go | 25 +++- pkg/demoschema/array_int.go | 54 ++++++++ pkg/demoschema/demo_event.go | 168 +++++++++++++++++++++++ pkg/demoschema/strm_meta.go | 184 ++++++++++++++++++++++++++ pkg/demoschema/union_null_int.go | 96 ++++++++++++++ pkg/demoschema/union_null_long.go | 96 ++++++++++++++ pkg/demoschema/union_null_string.go | 96 ++++++++++++++ pkg/entity/schema/schema.go | 11 +- pkg/sim/legacy_schemas.go | 32 +++++ pkg/sim/randomsim/cmd.go | 2 +- pkg/sim/randomsim/event_generators.go | 38 ++++++ pkg/sim/randomsim/randomsim.go | 68 +++++----- pkg/sim/sims.go | 41 ++++++ pkg/util/utils.go | 12 ++ 15 files changed, 877 insertions(+), 48 deletions(-) create mode 100644 pkg/demoschema/array_int.go create mode 100644 pkg/demoschema/demo_event.go create mode 100644 pkg/demoschema/strm_meta.go create mode 100644 pkg/demoschema/union_null_int.go create mode 100644 pkg/demoschema/union_null_long.go create mode 100644 pkg/demoschema/union_null_string.go create mode 100644 pkg/sim/legacy_schemas.go create mode 100644 pkg/sim/randomsim/event_generators.go diff --git a/cmd/strm/main.go b/cmd/strm/main.go index ddf6a4b..e91bef1 100644 --- a/cmd/strm/main.go +++ b/cmd/strm/main.go @@ -110,7 +110,7 @@ func setupVerbs() { RootCmd.AddCommand(cmd.DeleteCmd) RootCmd.AddCommand(cmd.ListCmd) RootCmd.AddCommand(cmd.CompletionCmd) - RootCmd.AddCommand(cmd.SimCmd) + RootCmd.AddCommand(cmd.SimCmd()) RootCmd.AddCommand(cmd.EgressCmd) RootCmd.AddCommand(cmd.AuthCmd) RootCmd.AddCommand(cmd.VersionCmd) diff --git a/pkg/cmd/sim.go b/pkg/cmd/sim.go index 98cba15..27525f3 100644 --- a/pkg/cmd/sim.go +++ b/pkg/cmd/sim.go @@ -2,15 +2,28 @@ package cmd import ( "github.com/spf13/cobra" + "streammachine.io/strm/pkg/sim" "streammachine.io/strm/pkg/sim/randomsim" ) -// SimCmd represents the create command -var SimCmd = &cobra.Command{ - Use: "sim", - Short: "Simulate events", +func SimCmd() (cmd *cobra.Command) { + // SimCmd represents the create command + simCmd := &cobra.Command{ + Use: "sim", + Short: "Simulate events", + } + + flags := simCmd.PersistentFlags() + flags.String(sim.SchemaFlag, "streammachine/demo/1.0.2", "what schema to simulate") + _ = simCmd.RegisterFlagCompletionFunc(sim.SchemaFlag, schemaCompletion) + simCmd.AddCommand(randomsim.RunCmd()) + return simCmd } -func init() { - SimCmd.AddCommand(randomsim.RunCmd()) +func schemaCompletion(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) { + keys := make([]string, 0, len(randomsim.EventGenerators)) + for key := range randomsim.EventGenerators { + keys = append(keys, key) + } + return keys, cobra.ShellCompDirectiveDefault } diff --git a/pkg/demoschema/array_int.go b/pkg/demoschema/array_int.go new file mode 100644 index 0000000..cd22535 --- /dev/null +++ b/pkg/demoschema/array_int.go @@ -0,0 +1,54 @@ +// Code generated by github.com/actgardner/gogen-avro/v7. DO NOT EDIT. +/* + * SOURCE: + * demo.avsc + */ +package demoschema + +import ( + "io" + + "github.com/actgardner/gogen-avro/v7/vm" + "github.com/actgardner/gogen-avro/v7/vm/types" +) + +func writeArrayInt(r []int32, w io.Writer) error { + err := vm.WriteLong(int64(len(r)), w) + if err != nil || len(r) == 0 { + return err + } + for _, e := range r { + err = vm.WriteInt(e, w) + if err != nil { + return err + } + } + return vm.WriteLong(0, w) +} + +type ArrayIntWrapper struct { + Target *[]int32 +} + +func (_ *ArrayIntWrapper) SetBoolean(v bool) { panic("Unsupported operation") } +func (_ *ArrayIntWrapper) SetInt(v int32) { panic("Unsupported operation") } +func (_ *ArrayIntWrapper) SetLong(v int64) { panic("Unsupported operation") } +func (_ *ArrayIntWrapper) SetFloat(v float32) { panic("Unsupported operation") } +func (_ *ArrayIntWrapper) SetDouble(v float64) { panic("Unsupported operation") } +func (_ *ArrayIntWrapper) SetBytes(v []byte) { panic("Unsupported operation") } +func (_ *ArrayIntWrapper) SetString(v string) { panic("Unsupported operation") } +func (_ *ArrayIntWrapper) SetUnionElem(v int64) { panic("Unsupported operation") } +func (_ *ArrayIntWrapper) Get(i int) types.Field { panic("Unsupported operation") } +func (_ *ArrayIntWrapper) AppendMap(key string) types.Field { panic("Unsupported operation") } +func (_ *ArrayIntWrapper) Finalize() {} +func (_ *ArrayIntWrapper) SetDefault(i int) { panic("Unsupported operation") } +func (r *ArrayIntWrapper) NullField(i int) { + panic("Unsupported operation") +} + +func (r *ArrayIntWrapper) AppendArray() types.Field { + var v int32 + + *r.Target = append(*r.Target, v) + return &types.Int{Target: &(*r.Target)[len(*r.Target)-1]} +} diff --git a/pkg/demoschema/demo_event.go b/pkg/demoschema/demo_event.go new file mode 100644 index 0000000..2bb3207 --- /dev/null +++ b/pkg/demoschema/demo_event.go @@ -0,0 +1,168 @@ +// Code generated by github.com/actgardner/gogen-avro/v7. DO NOT EDIT. +/* + * SOURCE: + * demo.avsc + */ +package demoschema + +import ( + "github.com/actgardner/gogen-avro/v7/compiler" + "github.com/actgardner/gogen-avro/v7/vm" + "github.com/actgardner/gogen-avro/v7/vm/types" + "io" +) + +type DemoEvent struct { + StrmMeta *StrmMeta `json:"strmMeta"` + // any value. For illustration purposes: use a value that is consistent over time like a customer or device ID. + UniqueIdentifier *UnionNullString `json:"uniqueIdentifier"` + // any value. For illustration purposes: use a value that is consistent over a limited period like a session. + ConsistentValue string `json:"consistentValue"` + // any value. For illustration purposes: use a value that could identify a user over time based on behavior, like browsing behavior (e.g. urls). + SomeSensitiveValue *UnionNullString `json:"someSensitiveValue"` + // any value. For illustration purposes: use a value that is not sensitive at all, like the rank of an item in a set. + NotSensitiveValue *UnionNullString `json:"notSensitiveValue"` +} + +const DemoEventAvroCRC64Fingerprint = "\xf7\x84\xf6薠\x8fT" + +func NewDemoEvent() *DemoEvent { + return &DemoEvent{} +} + +func DeserializeDemoEvent(r io.Reader) (*DemoEvent, error) { + t := NewDemoEvent() + deser, err := compiler.CompileSchemaBytes([]byte(t.Schema()), []byte(t.Schema())) + if err != nil { + return nil, err + } + + err = vm.Eval(r, deser, t) + if err != nil { + return nil, err + } + return t, err +} + +func DeserializeDemoEventFromSchema(r io.Reader, schema string) (*DemoEvent, error) { + t := NewDemoEvent() + + deser, err := compiler.CompileSchemaBytes([]byte(schema), []byte(t.Schema())) + if err != nil { + return nil, err + } + + err = vm.Eval(r, deser, t) + if err != nil { + return nil, err + } + return t, err +} + +func writeDemoEvent(r *DemoEvent, w io.Writer) error { + var err error + err = writeStrmMeta(r.StrmMeta, w) + if err != nil { + return err + } + err = writeUnionNullString(r.UniqueIdentifier, w) + if err != nil { + return err + } + err = vm.WriteString(r.ConsistentValue, w) + if err != nil { + return err + } + err = writeUnionNullString(r.SomeSensitiveValue, w) + if err != nil { + return err + } + err = writeUnionNullString(r.NotSensitiveValue, w) + if err != nil { + return err + } + return err +} + +func (r *DemoEvent) Serialize(w io.Writer) error { + return writeDemoEvent(r, w) +} + +func (r *DemoEvent) Schema() string { + return "{\"fields\":[{\"name\":\"strmMeta\",\"type\":{\"fields\":[{\"name\":\"eventContractRef\",\"type\":\"string\"},{\"default\":null,\"name\":\"nonce\",\"type\":[\"null\",\"int\"]},{\"default\":null,\"logicalType\":\"date\",\"name\":\"timestamp\",\"type\":[\"null\",\"long\"]},{\"default\":null,\"name\":\"keyLink\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"billingId\",\"type\":[\"null\",\"string\"]},{\"name\":\"consentLevels\",\"type\":{\"items\":\"int\",\"type\":\"array\"}}],\"name\":\"StrmMeta\",\"type\":\"record\"}},{\"default\":null,\"doc\":\"any value. For illustration purposes: use a value that is consistent over time like a customer or device ID.\",\"name\":\"uniqueIdentifier\",\"type\":[\"null\",\"string\"]},{\"doc\":\"any value. For illustration purposes: use a value that is consistent over a limited period like a session.\",\"name\":\"consistentValue\",\"type\":\"string\"},{\"default\":null,\"doc\":\"any value. For illustration purposes: use a value that could identify a user over time based on behavior, like browsing behavior (e.g. urls).\",\"name\":\"someSensitiveValue\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"doc\":\"any value. For illustration purposes: use a value that is not sensitive at all, like the rank of an item in a set.\",\"name\":\"notSensitiveValue\",\"type\":[\"null\",\"string\"]}],\"name\":\"io.streammachine.schemas.demo.v1.DemoEvent\",\"type\":\"record\"}" +} + +func (r *DemoEvent) SchemaName() string { + return "io.streammachine.schemas.demo.v1.DemoEvent" +} + +func (_ *DemoEvent) SetBoolean(v bool) { panic("Unsupported operation") } +func (_ *DemoEvent) SetInt(v int32) { panic("Unsupported operation") } +func (_ *DemoEvent) SetLong(v int64) { panic("Unsupported operation") } +func (_ *DemoEvent) SetFloat(v float32) { panic("Unsupported operation") } +func (_ *DemoEvent) SetDouble(v float64) { panic("Unsupported operation") } +func (_ *DemoEvent) SetBytes(v []byte) { panic("Unsupported operation") } +func (_ *DemoEvent) SetString(v string) { panic("Unsupported operation") } +func (_ *DemoEvent) SetUnionElem(v int64) { panic("Unsupported operation") } + +func (r *DemoEvent) Get(i int) types.Field { + switch i { + case 0: + r.StrmMeta = NewStrmMeta() + + return r.StrmMeta + case 1: + r.UniqueIdentifier = NewUnionNullString() + + return r.UniqueIdentifier + case 2: + return &types.String{Target: &r.ConsistentValue} + case 3: + r.SomeSensitiveValue = NewUnionNullString() + + return r.SomeSensitiveValue + case 4: + r.NotSensitiveValue = NewUnionNullString() + + return r.NotSensitiveValue + } + panic("Unknown field index") +} + +func (r *DemoEvent) SetDefault(i int) { + switch i { + case 1: + r.UniqueIdentifier = nil + return + case 3: + r.SomeSensitiveValue = nil + return + case 4: + r.NotSensitiveValue = nil + return + } + panic("Unknown field index") +} + +func (r *DemoEvent) NullField(i int) { + switch i { + case 1: + r.UniqueIdentifier = nil + return + case 3: + r.SomeSensitiveValue = nil + return + case 4: + r.NotSensitiveValue = nil + return + } + panic("Not a nullable field index") +} + +func (_ *DemoEvent) AppendMap(key string) types.Field { panic("Unsupported operation") } +func (_ *DemoEvent) AppendArray() types.Field { panic("Unsupported operation") } +func (_ *DemoEvent) Finalize() {} + +func (_ *DemoEvent) AvroCRC64Fingerprint() []byte { + return []byte(DemoEventAvroCRC64Fingerprint) +} diff --git a/pkg/demoschema/strm_meta.go b/pkg/demoschema/strm_meta.go new file mode 100644 index 0000000..b3618e9 --- /dev/null +++ b/pkg/demoschema/strm_meta.go @@ -0,0 +1,184 @@ +// Code generated by github.com/actgardner/gogen-avro/v7. DO NOT EDIT. +/* + * SOURCE: + * demo.avsc + */ +package demoschema + +import ( + "github.com/actgardner/gogen-avro/v7/compiler" + "github.com/actgardner/gogen-avro/v7/vm" + "github.com/actgardner/gogen-avro/v7/vm/types" + "io" +) + +type StrmMeta struct { + EventContractRef string `json:"eventContractRef"` + + Nonce *UnionNullInt `json:"nonce"` + + Timestamp *UnionNullLong `json:"timestamp"` + + KeyLink *UnionNullString `json:"keyLink"` + + BillingId *UnionNullString `json:"billingId"` + + ConsentLevels []int32 `json:"consentLevels"` +} + +const StrmMetaAvroCRC64Fingerprint = "\xa4\xfa\x89!\x1fD\xc4\x1b" + +func NewStrmMeta() *StrmMeta { + return &StrmMeta{} +} + +func DeserializeStrmMeta(r io.Reader) (*StrmMeta, error) { + t := NewStrmMeta() + deser, err := compiler.CompileSchemaBytes([]byte(t.Schema()), []byte(t.Schema())) + if err != nil { + return nil, err + } + + err = vm.Eval(r, deser, t) + if err != nil { + return nil, err + } + return t, err +} + +func DeserializeStrmMetaFromSchema(r io.Reader, schema string) (*StrmMeta, error) { + t := NewStrmMeta() + + deser, err := compiler.CompileSchemaBytes([]byte(schema), []byte(t.Schema())) + if err != nil { + return nil, err + } + + err = vm.Eval(r, deser, t) + if err != nil { + return nil, err + } + return t, err +} + +func writeStrmMeta(r *StrmMeta, w io.Writer) error { + var err error + err = vm.WriteString(r.EventContractRef, w) + if err != nil { + return err + } + err = writeUnionNullInt(r.Nonce, w) + if err != nil { + return err + } + err = writeUnionNullLong(r.Timestamp, w) + if err != nil { + return err + } + err = writeUnionNullString(r.KeyLink, w) + if err != nil { + return err + } + err = writeUnionNullString(r.BillingId, w) + if err != nil { + return err + } + err = writeArrayInt(r.ConsentLevels, w) + if err != nil { + return err + } + return err +} + +func (r *StrmMeta) Serialize(w io.Writer) error { + return writeStrmMeta(r, w) +} + +func (r *StrmMeta) Schema() string { + return "{\"fields\":[{\"name\":\"eventContractRef\",\"type\":\"string\"},{\"default\":null,\"name\":\"nonce\",\"type\":[\"null\",\"int\"]},{\"default\":null,\"logicalType\":\"date\",\"name\":\"timestamp\",\"type\":[\"null\",\"long\"]},{\"default\":null,\"name\":\"keyLink\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"billingId\",\"type\":[\"null\",\"string\"]},{\"name\":\"consentLevels\",\"type\":{\"items\":\"int\",\"type\":\"array\"}}],\"name\":\"io.streammachine.schemas.demo.v1.StrmMeta\",\"type\":\"record\"}" +} + +func (r *StrmMeta) SchemaName() string { + return "io.streammachine.schemas.demo.v1.StrmMeta" +} + +func (_ *StrmMeta) SetBoolean(v bool) { panic("Unsupported operation") } +func (_ *StrmMeta) SetInt(v int32) { panic("Unsupported operation") } +func (_ *StrmMeta) SetLong(v int64) { panic("Unsupported operation") } +func (_ *StrmMeta) SetFloat(v float32) { panic("Unsupported operation") } +func (_ *StrmMeta) SetDouble(v float64) { panic("Unsupported operation") } +func (_ *StrmMeta) SetBytes(v []byte) { panic("Unsupported operation") } +func (_ *StrmMeta) SetString(v string) { panic("Unsupported operation") } +func (_ *StrmMeta) SetUnionElem(v int64) { panic("Unsupported operation") } + +func (r *StrmMeta) Get(i int) types.Field { + switch i { + case 0: + return &types.String{Target: &r.EventContractRef} + case 1: + r.Nonce = NewUnionNullInt() + + return r.Nonce + case 2: + r.Timestamp = NewUnionNullLong() + + return r.Timestamp + case 3: + r.KeyLink = NewUnionNullString() + + return r.KeyLink + case 4: + r.BillingId = NewUnionNullString() + + return r.BillingId + case 5: + r.ConsentLevels = make([]int32, 0) + + return &ArrayIntWrapper{Target: &r.ConsentLevels} + } + panic("Unknown field index") +} + +func (r *StrmMeta) SetDefault(i int) { + switch i { + case 1: + r.Nonce = nil + return + case 2: + r.Timestamp = nil + return + case 3: + r.KeyLink = nil + return + case 4: + r.BillingId = nil + return + } + panic("Unknown field index") +} + +func (r *StrmMeta) NullField(i int) { + switch i { + case 1: + r.Nonce = nil + return + case 2: + r.Timestamp = nil + return + case 3: + r.KeyLink = nil + return + case 4: + r.BillingId = nil + return + } + panic("Not a nullable field index") +} + +func (_ *StrmMeta) AppendMap(key string) types.Field { panic("Unsupported operation") } +func (_ *StrmMeta) AppendArray() types.Field { panic("Unsupported operation") } +func (_ *StrmMeta) Finalize() {} + +func (_ *StrmMeta) AvroCRC64Fingerprint() []byte { + return []byte(StrmMetaAvroCRC64Fingerprint) +} diff --git a/pkg/demoschema/union_null_int.go b/pkg/demoschema/union_null_int.go new file mode 100644 index 0000000..1f89c39 --- /dev/null +++ b/pkg/demoschema/union_null_int.go @@ -0,0 +1,96 @@ +// Code generated by github.com/actgardner/gogen-avro/v7. DO NOT EDIT. +/* + * SOURCE: + * demo.avsc + */ +package demoschema + +import ( + "encoding/json" + "fmt" + "io" + + "github.com/actgardner/gogen-avro/v7/vm" + "github.com/actgardner/gogen-avro/v7/vm/types" +) + +type UnionNullIntTypeEnum int + +const ( + UnionNullIntTypeEnumInt UnionNullIntTypeEnum = 1 +) + +type UnionNullInt struct { + Null *types.NullVal + Int int32 + UnionType UnionNullIntTypeEnum +} + +func writeUnionNullInt(r *UnionNullInt, w io.Writer) error { + + if r == nil { + err := vm.WriteLong(0, w) + return err + } + + err := vm.WriteLong(int64(r.UnionType), w) + if err != nil { + return err + } + switch r.UnionType { + case UnionNullIntTypeEnumInt: + return vm.WriteInt(r.Int, w) + } + return fmt.Errorf("invalid value for *UnionNullInt") +} + +func NewUnionNullInt() *UnionNullInt { + return &UnionNullInt{} +} + +func (_ *UnionNullInt) SetBoolean(v bool) { panic("Unsupported operation") } +func (_ *UnionNullInt) SetInt(v int32) { panic("Unsupported operation") } +func (_ *UnionNullInt) SetFloat(v float32) { panic("Unsupported operation") } +func (_ *UnionNullInt) SetDouble(v float64) { panic("Unsupported operation") } +func (_ *UnionNullInt) SetBytes(v []byte) { panic("Unsupported operation") } +func (_ *UnionNullInt) SetString(v string) { panic("Unsupported operation") } +func (r *UnionNullInt) SetLong(v int64) { + r.UnionType = (UnionNullIntTypeEnum)(v) +} +func (r *UnionNullInt) Get(i int) types.Field { + switch i { + case 0: + return r.Null + case 1: + return &types.Int{Target: (&r.Int)} + } + panic("Unknown field index") +} +func (_ *UnionNullInt) NullField(i int) { panic("Unsupported operation") } +func (_ *UnionNullInt) SetDefault(i int) { panic("Unsupported operation") } +func (_ *UnionNullInt) AppendMap(key string) types.Field { panic("Unsupported operation") } +func (_ *UnionNullInt) AppendArray() types.Field { panic("Unsupported operation") } +func (_ *UnionNullInt) Finalize() {} + +func (r *UnionNullInt) MarshalJSON() ([]byte, error) { + if r == nil { + return []byte("null"), nil + } + switch r.UnionType { + case UnionNullIntTypeEnumInt: + return json.Marshal(map[string]interface{}{"int": r.Int}) + } + return nil, fmt.Errorf("invalid value for *UnionNullInt") +} + +func (r *UnionNullInt) UnmarshalJSON(data []byte) error { + var fields map[string]json.RawMessage + if err := json.Unmarshal(data, &fields); err != nil { + return err + } + if value, ok := fields["int"]; ok { + r.UnionType = 1 + return json.Unmarshal([]byte(value), &r.Int) + } + return fmt.Errorf("invalid value for *UnionNullInt") +} diff --git a/pkg/demoschema/union_null_long.go b/pkg/demoschema/union_null_long.go new file mode 100644 index 0000000..b9b783e --- /dev/null +++ b/pkg/demoschema/union_null_long.go @@ -0,0 +1,96 @@ +// Code generated by github.com/actgardner/gogen-avro/v7. DO NOT EDIT. +/* + * SOURCE: + * demo.avsc + */ +package demoschema + +import ( + "encoding/json" + "fmt" + "io" + + "github.com/actgardner/gogen-avro/v7/vm" + "github.com/actgardner/gogen-avro/v7/vm/types" +) + +type UnionNullLongTypeEnum int + +const ( + UnionNullLongTypeEnumLong UnionNullLongTypeEnum = 1 +) + +type UnionNullLong struct { + Null *types.NullVal + Long int64 + UnionType UnionNullLongTypeEnum +} + +func writeUnionNullLong(r *UnionNullLong, w io.Writer) error { + + if r == nil { + err := vm.WriteLong(0, w) + return err + } + + err := vm.WriteLong(int64(r.UnionType), w) + if err != nil { + return err + } + switch r.UnionType { + case UnionNullLongTypeEnumLong: + return vm.WriteLong(r.Long, w) + } + return fmt.Errorf("invalid value for *UnionNullLong") +} + +func NewUnionNullLong() *UnionNullLong { + return &UnionNullLong{} +} + +func (_ *UnionNullLong) SetBoolean(v bool) { panic("Unsupported operation") } +func (_ *UnionNullLong) SetInt(v int32) { panic("Unsupported operation") } +func (_ *UnionNullLong) SetFloat(v float32) { panic("Unsupported operation") } +func (_ *UnionNullLong) SetDouble(v float64) { panic("Unsupported operation") } +func (_ *UnionNullLong) SetBytes(v []byte) { panic("Unsupported operation") } +func (_ *UnionNullLong) SetString(v string) { panic("Unsupported operation") } +func (r *UnionNullLong) SetLong(v int64) { + r.UnionType = (UnionNullLongTypeEnum)(v) +} +func (r *UnionNullLong) Get(i int) types.Field { + switch i { + case 0: + return r.Null + case 1: + return &types.Long{Target: (&r.Long)} + } + panic("Unknown field index") +} +func (_ *UnionNullLong) NullField(i int) { panic("Unsupported operation") } +func (_ *UnionNullLong) SetDefault(i int) { panic("Unsupported operation") } +func (_ *UnionNullLong) AppendMap(key string) types.Field { panic("Unsupported operation") } +func (_ *UnionNullLong) AppendArray() types.Field { panic("Unsupported operation") } +func (_ *UnionNullLong) Finalize() {} + +func (r *UnionNullLong) MarshalJSON() ([]byte, error) { + if r == nil { + return []byte("null"), nil + } + switch r.UnionType { + case UnionNullLongTypeEnumLong: + return json.Marshal(map[string]interface{}{"long": r.Long}) + } + return nil, fmt.Errorf("invalid value for *UnionNullLong") +} + +func (r *UnionNullLong) UnmarshalJSON(data []byte) error { + var fields map[string]json.RawMessage + if err := json.Unmarshal(data, &fields); err != nil { + return err + } + if value, ok := fields["long"]; ok { + r.UnionType = 1 + return json.Unmarshal([]byte(value), &r.Long) + } + return fmt.Errorf("invalid value for *UnionNullLong") +} diff --git a/pkg/demoschema/union_null_string.go b/pkg/demoschema/union_null_string.go new file mode 100644 index 0000000..6b4952b --- /dev/null +++ b/pkg/demoschema/union_null_string.go @@ -0,0 +1,96 @@ +// Code generated by github.com/actgardner/gogen-avro/v7. DO NOT EDIT. +/* + * SOURCE: + * demo.avsc + */ +package demoschema + +import ( + "encoding/json" + "fmt" + "io" + + "github.com/actgardner/gogen-avro/v7/vm" + "github.com/actgardner/gogen-avro/v7/vm/types" +) + +type UnionNullStringTypeEnum int + +const ( + UnionNullStringTypeEnumString UnionNullStringTypeEnum = 1 +) + +type UnionNullString struct { + Null *types.NullVal + String string + UnionType UnionNullStringTypeEnum +} + +func writeUnionNullString(r *UnionNullString, w io.Writer) error { + + if r == nil { + err := vm.WriteLong(0, w) + return err + } + + err := vm.WriteLong(int64(r.UnionType), w) + if err != nil { + return err + } + switch r.UnionType { + case UnionNullStringTypeEnumString: + return vm.WriteString(r.String, w) + } + return fmt.Errorf("invalid value for *UnionNullString") +} + +func NewUnionNullString() *UnionNullString { + return &UnionNullString{} +} + +func (_ *UnionNullString) SetBoolean(v bool) { panic("Unsupported operation") } +func (_ *UnionNullString) SetInt(v int32) { panic("Unsupported operation") } +func (_ *UnionNullString) SetFloat(v float32) { panic("Unsupported operation") } +func (_ *UnionNullString) SetDouble(v float64) { panic("Unsupported operation") } +func (_ *UnionNullString) SetBytes(v []byte) { panic("Unsupported operation") } +func (_ *UnionNullString) SetString(v string) { panic("Unsupported operation") } +func (r *UnionNullString) SetLong(v int64) { + r.UnionType = (UnionNullStringTypeEnum)(v) +} +func (r *UnionNullString) Get(i int) types.Field { + switch i { + case 0: + return r.Null + case 1: + return &types.String{Target: (&r.String)} + } + panic("Unknown field index") +} +func (_ *UnionNullString) NullField(i int) { panic("Unsupported operation") } +func (_ *UnionNullString) SetDefault(i int) { panic("Unsupported operation") } +func (_ *UnionNullString) AppendMap(key string) types.Field { panic("Unsupported operation") } +func (_ *UnionNullString) AppendArray() types.Field { panic("Unsupported operation") } +func (_ *UnionNullString) Finalize() {} + +func (r *UnionNullString) MarshalJSON() ([]byte, error) { + if r == nil { + return []byte("null"), nil + } + switch r.UnionType { + case UnionNullStringTypeEnumString: + return json.Marshal(map[string]interface{}{"string": r.String}) + } + return nil, fmt.Errorf("invalid value for *UnionNullString") +} + +func (r *UnionNullString) UnmarshalJSON(data []byte) error { + var fields map[string]json.RawMessage + if err := json.Unmarshal(data, &fields); err != nil { + return err + } + if value, ok := fields["string"]; ok { + r.UnionType = 1 + return json.Unmarshal([]byte(value), &r.String) + } + return fmt.Errorf("invalid value for *UnionNullString") +} diff --git a/pkg/entity/schema/schema.go b/pkg/entity/schema/schema.go index c18bc5a..d94eda8 100644 --- a/pkg/entity/schema/schema.go +++ b/pkg/entity/schema/schema.go @@ -44,11 +44,16 @@ func SetupClient(clientConnection *grpc.ClientConn, ctx context.Context) { func list() { req := &schemas.ListSchemasRequest{BillingId: common.BillingId} - sinksList, err := client.ListSchemas(apiContext, req) + util.Print(List(req)) +} + +func List(req *schemas.ListSchemasRequest) *schemas.ListSchemasResponse { + schemasList, err := client.ListSchemas(apiContext, req) common.CliExit(err) - util.Print(sinksList) + return schemasList } + func get(name *string, cmd *cobra.Command) { flags := cmd.Flags() clusterRef, err := getClusterRef(flags) @@ -130,4 +135,4 @@ func NamesCompletion(cmd *cobra.Command, args []string, complete string) ([]stri } return names, cobra.ShellCompDirectiveNoFileComp -} +} \ No newline at end of file diff --git a/pkg/sim/legacy_schemas.go b/pkg/sim/legacy_schemas.go new file mode 100644 index 0000000..6d2f900 --- /dev/null +++ b/pkg/sim/legacy_schemas.go @@ -0,0 +1,32 @@ +package sim + +import ( + "bytes" + "fmt" + "io" + "net/http" + "streammachine.io/strm/pkg/common" +) + +type LegacySender struct { + Gateway, Schema string + Client http.Client +} + +func (s LegacySender) Send(event StreammachineEvent, token string) { + b := &bytes.Buffer{} + err := event.Serialize(b) + common.CliExit(err) + req, err := http.NewRequest("POST", s.Gateway, b) + common.CliExit(err) + req.Header.Set("Strm-Schema-Id", s.Schema) + req.Header.Set("Authorization", "Bearer "+token) + resp, err := s.Client.Do(req) + if err != nil || resp.StatusCode != 204 { + if resp != nil { + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + fmt.Printf("%v %s\n", err, string(body)) + } + } +} diff --git a/pkg/sim/randomsim/cmd.go b/pkg/sim/randomsim/cmd.go index 76b71eb..6f22531 100644 --- a/pkg/sim/randomsim/cmd.go +++ b/pkg/sim/randomsim/cmd.go @@ -25,6 +25,6 @@ Uses a saved stream definition if available, otherwise, client id and secret are flags.String(sim.ClientIdFlag, "", "Client id to be used for sending data") flags.String(sim.ClientSecretFlag, "", "Client secret to be used for sending data") flags.Bool(sim.QuietFlag, false, "don't spam stderr") - flags.StringSlice(sim.ConsentLevelsFlag, []string{"0", "0/1", "0/1/2", "0/1/2/3"}, "consent levels to be simulated") + flags.StringSlice(sim.ConsentLevelsFlag, []string{"", "0", "0/1", "0/1/2", "0/1/2/3"}, "consent levels to be simulated") return simCmd } diff --git a/pkg/sim/randomsim/event_generators.go b/pkg/sim/randomsim/event_generators.go new file mode 100644 index 0000000..ccc71b8 --- /dev/null +++ b/pkg/sim/randomsim/event_generators.go @@ -0,0 +1,38 @@ +package randomsim + +import ( + "fmt" + "math/rand" + "streammachine.io/strm/pkg/clickstream" + "streammachine.io/strm/pkg/demoschema" + "streammachine.io/strm/pkg/sim" + "streammachine.io/strm/pkg/util" +) + +func createRandomDemo102Event(consentLevels []int32, sessionId string) sim.StreammachineEvent { + event := demoschema.NewDemoEvent() + const eventContractRef = "streammachine/example/1.3.0" + event.StrmMeta = &demoschema.StrmMeta{ + ConsentLevels: consentLevels, + EventContractRef: eventContractRef, + } + event.ConsistentValue = sessionId + event.UniqueIdentifier = util.CreateUnionString(fmt.Sprintf("unique-%d", rand.Intn(100))) + event.SomeSensitiveValue = util.CreateUnionString(fmt.Sprintf("sensitive-%d", rand.Intn(100))) + event.NotSensitiveValue = util.CreateUnionString(fmt.Sprintf("not-sensitive-%d", rand.Intn(100))) + return event +} + +func createRandomClickstreamEvent(consentLevels []int32, sessionId string) sim.StreammachineEvent { + event := clickstream.NewClickstreamEvent() + event.StrmMeta = &clickstream.StrmMeta{ConsentLevels: consentLevels} + event.ProducerSessionId = sessionId + event.Customer = &clickstream.Customer{Id: "customer-" + event.ProducerSessionId} + event.Url = "https://www.streammachine.io/rules" + return event +} + +var EventGenerators = map[string]func([]int32, string) sim.StreammachineEvent{ + "clickstream": createRandomClickstreamEvent, + "streammachine/demo/1.0.2": createRandomDemo102Event, +} diff --git a/pkg/sim/randomsim/randomsim.go b/pkg/sim/randomsim/randomsim.go index 804b903..79b2835 100644 --- a/pkg/sim/randomsim/randomsim.go +++ b/pkg/sim/randomsim/randomsim.go @@ -1,22 +1,20 @@ package randomsim import ( - "bytes" "fmt" + "math/rand" + "net/http" + "strings" + "time" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/streammachineio/api-definitions-go/api/entities/v1" - "io" - "math/rand" - "net/http" "streammachine.io/strm/pkg/auth" - "streammachine.io/strm/pkg/clickstream" "streammachine.io/strm/pkg/common" "streammachine.io/strm/pkg/entity/stream" "streammachine.io/strm/pkg/sim" "streammachine.io/strm/pkg/util" - "strings" - "time" ) // start a random simulator @@ -47,28 +45,39 @@ func run(cmd *cobra.Command, streamName *string) { quiet := util.GetBoolAndErr(flags, sim.QuietFlag) consentLevels, err := flags.GetStringSlice(sim.ConsentLevelsFlag) common.CliExit(err) + + schema := util.GetStringAndErr(flags, sim.SchemaFlag) + f := EventGenerators[schema] + if f == nil { + common.CliExit(fmt.Sprintf("Can't simulate for schema %s", schema)) + } + if len(consentLevels) == 0 { log.Fatalf("%v is not a valid set of consent levels", consentLevels) } authClient := &auth.Auth{Uri: sts} authClient.AuthenticateEvent(s.Ref.BillingId, s.Credentials[0].ClientId, s.Credentials[0].ClientSecret) if !quiet { - println("Starting sim to stream '"+*streamName+"'. Sending 1 event every", interval, "ms") + fmt.Printf("Starting to simulate random %s events to stream %s. ", + schema, *streamName) + fmt.Printf("Sending one event every %d ms.\n", interval) } client := http.Client{} + var sender sim.Sender + if schema == "clickstream" { + sender = sim.LegacySender{Client: client, Gateway: gateway, Schema: schema} + } else { + sender = sim.ModernSender{Client: client, Gateway: gateway, Schema: schema} + } + var ct = 0 now := time.Now() for { - event := clickstream.NewClickstreamEvent() - event.StrmMeta = &clickstream.StrmMeta{ - ConsentLevels: randomConsentLevels(consentLevels), - } - event.ProducerSessionId = fmt.Sprintf("%s-%d", sessionPrefix, rand.Intn(sessionRange)) - event.Customer = &clickstream.Customer{Id: "customer-" + event.ProducerSessionId} - event.Url = "https://www.streammachine.io/rules" + sessionId := fmt.Sprintf("%s-%d", sessionPrefix, rand.Intn(sessionRange)) + event := f(randomConsentLevels(consentLevels), sessionId) token, _ := authClient.GetToken(quiet) - go sendEvent(client, event, gateway, token) + go sender.Send(event, token) ct += 1 time.Sleep(interval * time.Millisecond) if !quiet && time.Now().Sub(now) > 5*time.Second { @@ -79,28 +88,13 @@ func run(cmd *cobra.Command, streamName *string) { } // randomConsentLevels returns a slice of integers for the simulated event. -// It starts with [ "0", "0/1", "3/8", "3/7/10", ...] so a slice of strings that define +// It starts with [ "", "0", "0/1", "3/8", "3/7/10", ...] so a slice of strings that define // what we want to send. This method picks a random one. func randomConsentLevels(levels []string) []int32 { l := strings.Split(levels[rand.Intn(len(levels))], "/") - return util.StringsArrayToInt32(l) -} - -func sendEvent(client http.Client, event *clickstream.ClickstreamEvent, - gateway string, token string) { - b := &bytes.Buffer{} - err := event.Serialize(b) - common.CliExit(err) - req, err := http.NewRequest("POST", gateway, b) - common.CliExit(err) - req.Header.Set("Strm-Schema-Id", "clickstream") - req.Header.Set("Authorization", "Bearer "+token) - resp, err := client.Do(req) - if err != nil || resp.StatusCode != 204 { - if resp != nil { - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - fmt.Printf("%v %s\n", err, string(body)) - } + if len(l) == 1 && l[0] == "" { + // No consent at all + return []int32{} } -} + return util.StringsArrayToInt32(l) +} \ No newline at end of file diff --git a/pkg/sim/sims.go b/pkg/sim/sims.go index bbdfa9b..7648040 100644 --- a/pkg/sim/sims.go +++ b/pkg/sim/sims.go @@ -1,5 +1,13 @@ package sim +import ( + "bytes" + "fmt" + "io" + "net/http" + "streammachine.io/strm/pkg/common" +) + const ( IntervalFlag = "interval" EventGatewayFlag = "events-gateway" @@ -9,4 +17,37 @@ const ( ClientSecretFlag = "client-secret" ConsentLevelsFlag = "consent-levels" QuietFlag = "quiet" + SchemaFlag = "schema" ) + +type StreammachineEvent interface { + Serialize(w io.Writer) error +} + +type Sender interface { + Send(event StreammachineEvent, token string) +} + +type ModernSender struct { + Gateway, Schema string + Client http.Client +} + +// new school send events. +func (s ModernSender) Send(event StreammachineEvent, token string) { + b := &bytes.Buffer{} + err := event.Serialize(b) + common.CliExit(err) + req, err := http.NewRequest("POST", s.Gateway, b) + common.CliExit(err) + req.Header.Set("Strm-Schema-Ref", s.Schema) + req.Header.Set("Authorization", "Bearer "+token) + resp, err := s.Client.Do(req) + if err != nil || resp.StatusCode != 204 { + if resp != nil { + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + fmt.Printf("%v %s\n", err, string(body)) + } + } +} diff --git a/pkg/util/utils.go b/pkg/util/utils.go index c47a386..2352465 100644 --- a/pkg/util/utils.go +++ b/pkg/util/utils.go @@ -13,6 +13,7 @@ import ( "path/filepath" "strconv" "streammachine.io/strm/pkg/common" + "streammachine.io/strm/pkg/demoschema" ) var ConfigPath string @@ -56,6 +57,10 @@ func MapStringsToInt(vs []string, f func(string) int) []int { return vsm } func MapStringsToInt32(vs []string, f func(string) int32) []int32 { + if len(vs) == 0 { + return []int32{} + + } vsm := make([]int32, len(vs)) for i, v := range vs { vsm[i] = f(v) @@ -130,3 +135,10 @@ func getSaveFilename(m proto.Message, name *string) string { cat := fmt.Sprint(m.ProtoReflect().Descriptor().Name()) return path.Join(ConfigPath, cat, *name+".json") } + +func CreateUnionString(s string) *demoschema.UnionNullString { + v := demoschema.NewUnionNullString() + v.UnionType = demoschema.UnionNullStringTypeEnumString + v.String = s + return v +}