Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(model): extract raw tuple #2835

Merged
merged 2 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions contract/api/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,38 @@ import (
"time"
)

// MessageTuple is an interface of the below interfaces
type MessageTuple interface {
ReadonlyMessage
}

type RawTuple interface {
Raw() []byte
}

// ReadonlyMessage Message is the interface that wraps each record.
// Use this interface to exchange data between different components.
// It is used in sink
type ReadonlyMessage interface {
Get(key string) (value any, ok bool)
Range(f func(key string, value any) bool)
// ToMap todo remove after eliminate map
Value(key, table string) (any, bool)
ToMap() map[string]any
}

type MetaInfo interface {
Meta() ReadonlyMessage
Timestamp() time.Time
Meta(key, table string) (any, bool)
Created() time.Time
AllMeta() map[string]any
}

// Tuple is the record passing in source and sink
type Tuple interface {
Message() ReadonlyMessage
MetaInfo
type HasDynamicProps interface {
// DynamicProps return the transformed dynamic properties (typically in sink).
// The transformation should be done in transform op
DynamicProps(template string) (string, bool)
AllProps() map[string]string
}

type RawTuple interface {
Raw() []byte
MetaInfo
type SinkTupleList interface {
RangeOfTuples(f func(index int, tuple MessageTuple) bool)
Len() int
ToMaps() []map[string]any
}
12 changes: 3 additions & 9 deletions contract/api/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,11 @@ type Sink interface {

type BytesCollector interface {
Sink
Collect(ctx StreamContext, item []byte) error
Collect(ctx StreamContext, item RawTuple) error
}

type TupleCollector interface {
Sink
Collect(ctx StreamContext, item Tuple) error
CollectList(ctx StreamContext, items []Tuple) error
}

type ResendSink interface {
Sink
// CollectResend Called when the sink cache resend is triggered
CollectResend(ctx StreamContext, data interface{}) error
Collect(ctx StreamContext, item MessageTuple) error
CollectList(ctx StreamContext, items SinkTupleList) error
}
2 changes: 1 addition & 1 deletion contract/api/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,6 @@ type LookupSource interface {
// read from the yaml
Configure(datasource string, props map[string]interface{}) error
// Lookup receive lookup values to construct the query and return query results
Lookup(ctx StreamContext, fields []string, keys []string, values []interface{}) ([]Tuple, error)
Lookup(ctx StreamContext, fields []string, keys []string, values []interface{}) (SinkTupleList, error)
Closable
}
5 changes: 3 additions & 2 deletions internal/io/file/file_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/lf-edge/ekuiper/v2/internal/compressor"
"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/topo/context"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/message"
)

Expand Down Expand Up @@ -113,12 +114,12 @@ func TestFileSinkCompress_Collect(t *testing.T) {
t.Fatal(err)
}

if err := sink.Collect(ctx, []byte("{\"key\":\"value1\"}")); err != nil {
if err := sink.Collect(ctx, &xsql.RawTuple{Rawdata: []byte("{\"key\":\"value1\"}")}); err != nil {
t.Errorf("unexpected error: %s", err)
}

// Test collecting another map item
if err := sink.Collect(ctx, []byte("{\"key\":\"value2\"}")); err != nil {
if err := sink.Collect(ctx, &xsql.RawTuple{Rawdata: []byte("{\"key\":\"value2\"}")}); err != nil {
t.Errorf("unexpected error: %s", err)
}
if err = sink.Close(ctx); err != nil {
Expand Down
13 changes: 9 additions & 4 deletions internal/io/file/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,17 @@ func (m *fileSink) Connect(ctx api.StreamContext) error {
return nil
}

func (m *fileSink) Collect(ctx api.StreamContext, item []byte) error {
func (m *fileSink) Collect(ctx api.StreamContext, tuple api.RawTuple) error {
item := tuple.Raw()
ctx.GetLogger().Debugf("file sink receive %s", item)
fn, err := ctx.ParseTemplate(m.c.Path, item)
if err != nil {
return err
fn := m.c.Path
if dp, ok := tuple.(api.HasDynamicProps); ok {
t, transformed := dp.DynamicProps(fn)
if transformed {
fn = t
}
}
ctx.GetLogger().Debugf("writing to file path %s", fn)
fw, err := m.GetFws(ctx, fn, item)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion internal/io/file/sink_rolling_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/topo/topotest/mockclock"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/message"
mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context"
"github.com/lf-edge/ekuiper/v2/pkg/modules"
Expand Down Expand Up @@ -186,7 +187,7 @@ func TestCollectRolling(t *testing.T) {
m := map[string]interface{}{"key": "value" + strconv.Itoa(i), "ts": c.Now().UnixMilli()}
b, err := json.Marshal(m)
assert.NoError(t, err)
if err := sink.Collect(ctx, b); err != nil {
if err := sink.Collect(ctx, &xsql.RawTuple{Rawdata: b}); err != nil {
t.Errorf("unexpected error: %s", err)
}
}
Expand Down
15 changes: 8 additions & 7 deletions internal/io/file/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/encryptor"
"github.com/lf-edge/ekuiper/v2/internal/topo/topotest/mockclock"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/message"
mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context"
)
Expand Down Expand Up @@ -317,12 +318,12 @@ func TestFileSink_Collect(t *testing.T) {
t.Fatal(err)
}

if err := sink.Collect(ctx, []byte("{\"key\":\"value1\"}")); err != nil {
if err := sink.Collect(ctx, &xsql.RawTuple{Rawdata: []byte("{\"key\":\"value1\"}")}); err != nil {
t.Errorf("unexpected error: %s", err)
}

// Test collecting another map item
if err := sink.Collect(ctx, []byte("{\"key\":\"value2\"}")); err != nil {
if err := sink.Collect(ctx, &xsql.RawTuple{Rawdata: []byte("{\"key\":\"value2\"}")}); err != nil {
t.Errorf("unexpected error: %s", err)
}
if err = sink.Close(ctx); err != nil {
Expand Down Expand Up @@ -469,7 +470,7 @@ func TestFileSinkRolling_Collect(t *testing.T) {
m := map[string]interface{}{"key": "value" + strconv.Itoa(i), "ts": c.Now().UnixMilli()}
b, err := json.Marshal(m)
assert.NoError(t, err)
if err := sink.Collect(ctx, b); err != nil {
if err := sink.Collect(ctx, &xsql.RawTuple{Rawdata: b}); err != nil {
t.Errorf("unexpected error: %s", err)
}
}
Expand Down Expand Up @@ -545,7 +546,7 @@ func TestFileSinkReopen(t *testing.T) {
}

// Test collecting a map item
if err := sink.Collect(ctx, []byte("{\"key\":\"value1\"}")); err != nil {
if err := sink.Collect(ctx, &xsql.RawTuple{Rawdata: []byte("{\"key\":\"value1\"}")}); err != nil {
t.Errorf("unexpected error: %s", err)
}
sink.Close(ctx)
Expand Down Expand Up @@ -575,7 +576,7 @@ func TestFileSinkReopen(t *testing.T) {
t.Fatal(err)
}
// Test collecting another map item
if err := sink.Collect(ctx, []byte("{\"key\":\"value2\"}")); err != nil {
if err := sink.Collect(ctx, &xsql.RawTuple{Rawdata: []byte("{\"key\":\"value2\"}")}); err != nil {
t.Errorf("unexpected error: %s", err)
}
if err = sink.Close(ctx); err != nil {
Expand Down Expand Up @@ -654,12 +655,12 @@ func TestFileCompressAndEncrypt(t *testing.T) {
}

// Test collecting a map item
if err := sink.Collect(ctx, []byte("{\"key\":\"value1\"}")); err != nil {
if err := sink.Collect(ctx, &xsql.RawTuple{Rawdata: []byte("{\"key\":\"value1\"}")}); err != nil {
t.Errorf("unexpected error: %s", err)
}

// Test collecting another map item
if err := sink.Collect(ctx, []byte("{\"key\":\"value2\"}")); err != nil {
if err := sink.Collect(ctx, &xsql.RawTuple{Rawdata: []byte("{\"key\":\"value2\"}")}); err != nil {
t.Errorf("unexpected error: %s", err)
}

Expand Down
67 changes: 37 additions & 30 deletions internal/io/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/lf-edge/ekuiper/v2/internal/topo/state"
"github.com/lf-edge/ekuiper/v2/internal/topo/topotest/mockclock"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/model"
"github.com/lf-edge/ekuiper/v2/pkg/timex"
)

Expand Down Expand Up @@ -63,16 +62,24 @@ func TestSharedInmemoryNode(t *testing.T) {
t.Error(err)
}

rawTuple := model.NewDefaultSourceTuple(xsql.Message{"temp": 12}, nil, timex.GetNow())
rawTuple := &xsql.Tuple{
Message: map[string]any{"temp": 20},
Metadata: nil,
}
mockclock.GetMockClock().Add(100)
go func() {
err = snk.CollectList(ctx, []api.Tuple{rawTuple})
err = snk.CollectList(ctx, &xsql.MemTupleList{Content: []api.MessageTuple{rawTuple}})
if err != nil {
t.Error(err)
}
}()
err = src.Subscribe(ctx, func(ctx api.StreamContext, res any, meta map[string]any, ts time.Time) {
expected := []api.Tuple{model.NewDefaultSourceTuple(rawTuple.Message(), xsql.Message{"topic": id}, timex.GetNow())}
expected := []*xsql.Tuple{{
Emitter: "",
Timestamp: timex.GetNowInMilli(),
Metadata: map[string]any{"topic": id},
Message: rawTuple.Message,
}}
assert.Equal(t, expected, res)
cancel()
}, nil)
Expand Down Expand Up @@ -140,39 +147,39 @@ func TestMultipleTopics(t *testing.T) {
},
},
}
expected = [][]api.Tuple{
expected = [][]*xsql.Tuple{
{ // 0 "h/d1/c1/s2",
model.NewDefaultSourceTuple(xsql.Message{"id": 4, "color": "red"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 5, "color": "red"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 6, "color": "green"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()),
{Message: map[string]any{"id": 4, "color": "red"}, Metadata: map[string]any{"topic": "h/d1/c1/s2"}, Timestamp: timex.GetNowInMilli()},
{Message: map[string]any{"id": 5, "color": "red"}, Metadata: map[string]any{"topic": "h/d1/c1/s2"}, Timestamp: timex.GetNowInMilli()},
{Message: map[string]any{"id": 6, "color": "green"}, Metadata: map[string]any{"topic": "h/d1/c1/s2"}, Timestamp: timex.GetNowInMilli()},
},
{ // 1 "h/+/+/s1",
model.NewDefaultSourceTuple(xsql.Message{"id": 1, "temp": 23}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 2, "temp": 34}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 3, "temp": 28}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()),
{Message: map[string]any{"id": 1, "temp": 23}, Metadata: map[string]any{"topic": "h/d1/c1/s1"}, Timestamp: timex.GetNowInMilli()},
{Message: map[string]any{"id": 2, "temp": 34}, Metadata: map[string]any{"topic": "h/d1/c1/s1"}, Timestamp: timex.GetNowInMilli()},
{Message: map[string]any{"id": 3, "temp": 28}, Metadata: map[string]any{"topic": "h/d1/c1/s1"}, Timestamp: timex.GetNowInMilli()},

model.NewDefaultSourceTuple(xsql.Message{"id": 7, "hum": 67.5}, xsql.Message{"topic": "h/d2/c2/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 8, "hum": 77.1}, xsql.Message{"topic": "h/d2/c2/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 9, "hum": 90.3}, xsql.Message{"topic": "h/d2/c2/s1"}, timex.GetNow()),
{Message: map[string]any{"id": 7, "hum": 67.5}, Metadata: map[string]any{"topic": "h/d2/c2/s1"}, Timestamp: timex.GetNowInMilli()},
{Message: map[string]any{"id": 8, "hum": 77.1}, Metadata: map[string]any{"topic": "h/d2/c2/s1"}, Timestamp: timex.GetNowInMilli()},
{Message: map[string]any{"id": 9, "hum": 90.3}, Metadata: map[string]any{"topic": "h/d2/c2/s1"}, Timestamp: timex.GetNowInMilli()},

model.NewDefaultSourceTuple(xsql.Message{"id": 10, "status": "on"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 11, "status": "off"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 12, "status": "on"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()),
{Message: map[string]any{"id": 10, "status": "on"}, Metadata: map[string]any{"topic": "h/d3/c3/s1"}, Timestamp: timex.GetNowInMilli()},
{Message: map[string]any{"id": 11, "status": "off"}, Metadata: map[string]any{"topic": "h/d3/c3/s1"}, Timestamp: timex.GetNowInMilli()},
{Message: map[string]any{"id": 12, "status": "on"}, Metadata: map[string]any{"topic": "h/d3/c3/s1"}, Timestamp: timex.GetNowInMilli()},
},
{ // 2 "h/d3/#",
model.NewDefaultSourceTuple(xsql.Message{"id": 10, "status": "on"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 11, "status": "off"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 12, "status": "on"}, xsql.Message{"topic": "h/d3/c3/s1"}, timex.GetNow()),
{Message: map[string]any{"id": 10, "status": "on"}, Metadata: map[string]any{"topic": "h/d3/c3/s1"}, Timestamp: timex.GetNowInMilli()},
{Message: map[string]any{"id": 11, "status": "off"}, Metadata: map[string]any{"topic": "h/d3/c3/s1"}, Timestamp: timex.GetNowInMilli()},
{Message: map[string]any{"id": 12, "status": "on"}, Metadata: map[string]any{"topic": "h/d3/c3/s1"}, Timestamp: timex.GetNowInMilli()},
},
{ // 3 "h/d1/c1/s2",
model.NewDefaultSourceTuple(xsql.Message{"id": 4, "color": "red"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 5, "color": "red"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 6, "color": "green"}, xsql.Message{"topic": "h/d1/c1/s2"}, timex.GetNow()),
{Message: map[string]any{"id": 4, "color": "red"}, Metadata: map[string]any{"topic": "h/d1/c1/s2"}, Timestamp: timex.GetNowInMilli()},
{Message: map[string]any{"id": 5, "color": "red"}, Metadata: map[string]any{"topic": "h/d1/c1/s2"}, Timestamp: timex.GetNowInMilli()},
{Message: map[string]any{"id": 6, "color": "green"}, Metadata: map[string]any{"topic": "h/d1/c1/s2"}, Timestamp: timex.GetNowInMilli()},
},
{ // 4 "h/+/c1/s1"
model.NewDefaultSourceTuple(xsql.Message{"id": 1, "temp": 23}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 2, "temp": 34}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()),
model.NewDefaultSourceTuple(xsql.Message{"id": 3, "temp": 28}, xsql.Message{"topic": "h/d1/c1/s1"}, timex.GetNow()),
{Message: map[string]any{"id": 1, "temp": 23}, Metadata: map[string]any{"topic": "h/d1/c1/s1"}, Timestamp: timex.GetNowInMilli()},
{Message: map[string]any{"id": 2, "temp": 34}, Metadata: map[string]any{"topic": "h/d1/c1/s1"}, Timestamp: timex.GetNowInMilli()},
{Message: map[string]any{"id": 3, "temp": 28}, Metadata: map[string]any{"topic": "h/d1/c1/s1"}, Timestamp: timex.GetNowInMilli()},
},
}
)
Expand Down Expand Up @@ -201,12 +208,12 @@ func TestMultipleTopics(t *testing.T) {
err := src.Provision(ctx, map[string]any{"datasource": topic})
assert.NoError(t, err)
limit := len(expected[i])
result := make([]api.Tuple, 0, limit)
result := make([]*xsql.Tuple, 0, limit)
nc, cancel := ctx.WithMeta("rule1", fmt.Sprintf("op%d", i), &state.MemoryStore{}).WithCancel()
err = src.Subscribe(nc, func(ctx api.StreamContext, res any, meta map[string]any, ts time.Time) {
rid, _ := res.(api.Tuple).Message().Get("id")
rid, _ := res.(*xsql.Tuple).Message["id"]
fmt.Printf("%d(%s) receive %v\n", i, topic, rid)
result = append(result, res.(api.Tuple))
result = append(result, res.(*xsql.Tuple))
limit--
if limit == 0 {
assert.Equal(t, result, expected[i], i)
Expand All @@ -221,7 +228,7 @@ func TestMultipleTopics(t *testing.T) {
topic := sinkTopics[i]
for _, mm := range v {
time.Sleep(10 * time.Millisecond)
pubsub.Produce(ctx, topic, model.NewDefaultSourceTuple(xsql.Message(mm), xsql.Message{"topic": topic}, timex.GetNow()))
pubsub.Produce(ctx, topic, &xsql.Tuple{Message: mm, Metadata: map[string]any{"topic": topic}, Timestamp: timex.GetNowInMilli()})
fmt.Printf("send to topic %s: %v\n", topic, mm["id"])
}

Expand Down
14 changes: 3 additions & 11 deletions internal/io/memory/pubsub/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
)

const IdProperty = "topic"
Expand Down Expand Up @@ -109,20 +110,11 @@ func RemovePub(topic string) {
}
}

func ProduceUpdatable(ctx api.StreamContext, topic string, data api.Tuple, rowkind string, keyval interface{}) {
// TODO fix updatable
//doProduce(ctx, topic, &UpdatableTuple{
// DefaultSourceTuple: api.NewDefaultSourceTupleWithTime(data, map[string]interface{}{"topic": topic}, timex.GetNow()),
// Rowkind: rowkind,
// Keyval: keyval,
//})
}

func Produce(ctx api.StreamContext, topic string, data api.Tuple) {
func Produce(ctx api.StreamContext, topic string, data *xsql.Tuple) {
doProduce(ctx, topic, data)
}

func ProduceList(ctx api.StreamContext, topic string, list []api.Tuple) {
func ProduceList(ctx api.StreamContext, topic string, list []*xsql.Tuple) {
doProduce(ctx, topic, list)
}

Expand Down
Loading
Loading