diff --git a/deploy/all-in-one.yaml b/deploy/all-in-one.yaml index ba23251ce..b9fa85a03 100644 --- a/deploy/all-in-one.yaml +++ b/deploy/all-in-one.yaml @@ -16,7 +16,7 @@ data: - vanus-controller-1.vanus-controller.vanus.svc:2379 - vanus-controller-2.vanus-controller.vanus.svc:2379 data_dir: /data - replicas: 1 + replicas: 3 metadata: key_prefix: /vanus topology: @@ -170,7 +170,7 @@ spec: app: vanus-gateway spec: containers: - - image: public.ecr.aws/vanus/gateway:5952928 + - image: public.ecr.aws/vanus/gateway:v0.0.5 imagePullPolicy: Always name: gateway ports: @@ -192,7 +192,7 @@ metadata: name: vanus-trigger namespace: vanus spec: - replicas: 1 + replicas: 2 selector: matchLabels: app: vanus-trigger @@ -209,7 +209,7 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP - image: public.ecr.aws/vanus/trigger:5952928 + image: public.ecr.aws/vanus/trigger:v0.0.5 imagePullPolicy: Always livenessProbe: failureThreshold: 3 @@ -258,7 +258,7 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP - image: public.ecr.aws/vanus/controller:5952928 + image: public.ecr.aws/vanus/controller:v0.0.5 imagePullPolicy: Always livenessProbe: failureThreshold: 3 @@ -324,7 +324,7 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP - image: public.ecr.aws/vanus/store:5952928 + image: public.ecr.aws/vanus/store:v0.0.5 imagePullPolicy: Always livenessProbe: failureThreshold: 3 diff --git a/internal/store/block/file/block.go b/internal/store/block/file/block.go index 843e4a145..ddf055f8c 100644 --- a/internal/store/block/file/block.go +++ b/internal/store/block/file/block.go @@ -242,7 +242,7 @@ func (b *Block) loadIndexFromFile() error { b.indexes = make([]index, num) for i := range b.indexes { off := int(length) - (i+1)*v1IndexSize - b.indexes[i], _ = unmashalIndex(data[off : off+v1IndexSize]) + b.indexes[i], _ = unmarshalIndex(data[off : off+v1IndexSize]) } return nil diff --git a/internal/store/block/file/index.go b/internal/store/block/file/index.go index 26afeda38..cd303cf96 100644 --- a/internal/store/block/file/index.go +++ b/internal/store/block/file/index.go @@ -47,7 +47,7 @@ func (i index) MarshalTo(data []byte) (int, error) { return v1IndexSize, nil } -func unmashalIndex(data []byte) (index, error) { +func unmarshalIndex(data []byte) (index, error) { if len(data) < v1IndexSize { // TODO(james.yin): correct error. return index{}, bytes.ErrTooLarge diff --git a/internal/store/block/file/index_test.go b/internal/store/block/file/index_test.go index 082122806..8fafde67f 100644 --- a/internal/store/block/file/index_test.go +++ b/internal/store/block/file/index_test.go @@ -13,3 +13,40 @@ // limitations under the License. package file + +import ( + "bytes" + . "github.com/smartystreets/goconvey/convey" + "math/rand" + "testing" + "time" +) + +func TestIndex_Marshal(t *testing.T) { + Convey("test marshall & unmarshall of index", t, func() { + rd := rand.New(rand.NewSource(time.Now().UnixNano())) + idx1 := index{ + offset: rd.Int63(), + length: rd.Int31(), + } + + data := make([]byte, 12) + n, err := idx1.MarshalTo(data) + So(err, ShouldBeNil) + So(n, ShouldEqual, 12) + + nIdx, err := unmarshalIndex(data) + So(err, ShouldBeNil) + So(idx1, ShouldResemble, nIdx) + So(nIdx.StartOffset(), ShouldEqual, nIdx.offset) + So(nIdx.EndOffset(), ShouldEqual, nIdx.offset+int64(nIdx.length)) + + n, err = idx1.MarshalTo(make([]byte, rd.Int31n(12))) + So(n, ShouldBeZeroValue) + So(err, ShouldEqual, bytes.ErrTooLarge) + + nIdx, err = unmarshalIndex(make([]byte, rd.Int31n(12))) + So(n, ShouldBeZeroValue) + So(nIdx, ShouldResemble, index{}) + }) +} diff --git a/test/gateway/main.go b/test/gateway/main.go deleted file mode 100644 index bc6f39205..000000000 --- a/test/gateway/main.go +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright 2022 Linkall Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -import ( - "context" - "flag" - "fmt" - cloudevents "github.com/cloudevents/sdk-go/v2" - cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" - "github.com/linkall-labs/eventbus-go" - "io" - "log" -) - -var ( - mode = flag.String("mode", "send", "") - addr = flag.String("addr", "127.0.0.1:8080", "") - eb = flag.String("eb", "test", "") - eventType = flag.String("type", "a", "") - num = flag.Int("num", 100, "") - size = flag.Int("size", 64, "") - offset = flag.Int("offset", 0, "") -) - -func main() { - flag.Parse() - fmt.Printf("params: %s=%s\n", "mode", *mode) - fmt.Printf("params: %s=%s\n", "addr", *addr) - fmt.Printf("params: %s=%s\n", "eb", *eb) - fmt.Printf("params: %s=%d\n", "num", *num) - fmt.Printf("params: %s=%d\n", "size", *size) - fmt.Printf("params: %s=%d\n", "offset", *offset) - if *mode == "send" { - sender() - } else if *mode == "receive" { - receiver() - } -} - -func sender() { - ctx := cloudevents.ContextWithTarget(context.Background(), fmt.Sprintf("http://%s/gateway/%s", *addr, *eb)) - - p, err := cloudevents.NewHTTP() - if err != nil { - log.Fatalf("failed to create protocol: %s", err.Error()) - } - - c, err := cloudevents.NewClient(p, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) - if err != nil { - log.Fatalf("failed to create client, %v", err) - } - - data := func() string { - str := "" - for idx := 0; idx < *size; idx++ { - str += "a" - } - return str - }() - for i := 0; i < *num; i++ { - e := cloudevents.NewEvent() - e.SetType(*eventType) - e.SetSource("gw-util") - _ = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{ - "id": i, - "message": "Hello, World!", - "data": data, - }) - - res := c.Send(ctx, e) - if cloudevents.IsUndelivered(res) { - log.Printf("Failed to send: %v", res) - } else { - var httpResult *cehttp.Result - cloudevents.ResultAs(res, &httpResult) - log.Printf("Sent %d with status code %d, body: %s", i, httpResult.StatusCode, httpResult.Error()) - } - } -} - -func receiver() { - vrn := fmt.Sprintf("vanus://%s/eventbus/%s", *addr, *eb) - ls, err := eventbus.LookupReadableLogs(context.Background(), vrn) - if err != nil { - log.Fatal(err) - } - - r, err := eventbus.OpenLogReader(ls[0].VRN) - if err != nil { - log.Fatal(err) - } - - _, err = r.Seek(context.Background(), int64(*offset), io.SeekStart) - if err != nil { - log.Fatal(err) - } - - events, err := r.Read(context.Background(), int16(*num)) - if err != nil { - log.Fatal(err) - } - - if len(events) == 0 { - log.Printf("no event\n") - return - } - - for i, e := range events { - log.Printf("event %d: \n%s", i, e) - } - - r.Close() -} diff --git a/test/standalone/main.go b/test/standalone/main.go deleted file mode 100644 index adb541b86..000000000 --- a/test/standalone/main.go +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2022 Linkall Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package main - -func main() { - // TODO -} diff --git a/test/ut/mock_ut.go b/test/ut/mock_ut.go deleted file mode 100644 index d9fb5b52b..000000000 --- a/test/ut/mock_ut.go +++ /dev/null @@ -1,48 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: test/ut/ut.go - -// Package ut is a generated GoMock package. -package ut - -import ( - reflect "reflect" - - gomock "github.com/golang/mock/gomock" -) - -// MockFoo is a mock of Foo interface. -type MockFoo struct { - ctrl *gomock.Controller - recorder *MockFooMockRecorder -} - -// MockFooMockRecorder is the mock recorder for MockFoo. -type MockFooMockRecorder struct { - mock *MockFoo -} - -// NewMockFoo creates a new mock instance. -func NewMockFoo(ctrl *gomock.Controller) *MockFoo { - mock := &MockFoo{ctrl: ctrl} - mock.recorder = &MockFooMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockFoo) EXPECT() *MockFooMockRecorder { - return m.recorder -} - -// Bar mocks base method. -func (m *MockFoo) Bar(x int) int { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Bar", x) - ret0, _ := ret[0].(int) - return ret0 -} - -// Bar indicates an expected call of Bar. -func (mr *MockFooMockRecorder) Bar(x interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bar", reflect.TypeOf((*MockFoo)(nil).Bar), x) -} diff --git a/test/ut/ut.go b/test/ut/ut.go deleted file mode 100644 index 2b0eb7255..000000000 --- a/test/ut/ut.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2022 Linkall Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ut - -import "fmt" - -//go:generate mockgen --source=test/ut/ut.go --destination=test/ut/mock_ut.go --package=ut -type Foo interface { - Bar(x int) int -} - -func SUT(f Foo) { - fmt.Println(f.Bar(99)) -} - -var doubleInt = func(int int) int { - return int * 2 -} diff --git a/test/ut/ut_test.go b/test/ut/ut_test.go deleted file mode 100644 index 96d262a48..000000000 --- a/test/ut/ut_test.go +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright 2022 Linkall Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ut - -import ( - "github.com/golang/mock/gomock" - "github.com/prashantv/gostub" - . "github.com/smartystreets/goconvey/convey" - "testing" - "time" -) - -func TestFoo(t *testing.T) { - Convey("test gomock framework", t, func() { - Convey("test mock", func() { - ctrl := gomock.NewController(t) - - // Assert that Bar() is invoked. - m := NewMockFoo(ctrl) - - // Asserts that the first and only call to Bar() is passed 99. - // Anything else will fail. - m. - EXPECT(). - Bar(99). - Return(101) - - SUT(m) - }) - - Convey("test stub", func() { - ctrl := gomock.NewController(t) - - m := NewMockFoo(ctrl) - - // Does not make any assertions. Executes the anonymous functions and returns - // its result when Bar is invoked with 99. - m. - EXPECT(). - Bar(gomock.Eq(99)). - DoAndReturn(func(_ int) int { - time.Sleep(1 * time.Second) - return 101 - }). - AnyTimes() - - // Does not make any assertions. Returns 103 when Bar is invoked with 101. - m. - EXPECT(). - Bar(gomock.Eq(101)). - Return(103). - AnyTimes() - - SUT(m) - }) - }) -} - -func TestStub1(t *testing.T) { - Convey("test function stub", t, func() { - Convey("cases", func() { - So(doubleInt(2), ShouldEqual, 4) - stubs := gostub.Stub(&doubleInt, func(i int) int { - return i * 3 - }) - defer stubs.Reset() - So(doubleInt(2), ShouldNotEqual, 4) - So(doubleInt(2), ShouldEqual, 6) - gostub.StubFunc(&doubleInt, 10) - So(doubleInt(2), ShouldNotEqual, 6) - So(doubleInt(2), ShouldEqual, 10) - }) - }) -} diff --git a/vsctl/command/eventbus.go b/vsctl/command/eventbus.go index fded15707..8c2a78d2a 100644 --- a/vsctl/command/eventbus.go +++ b/vsctl/command/eventbus.go @@ -19,6 +19,7 @@ import ( "context" "encoding/json" "github.com/fatih/color" + "github.com/gogo/protobuf/sortkeys" "github.com/golang/protobuf/ptypes/empty" "github.com/jedib0t/go-pretty/v6/table" "github.com/jedib0t/go-pretty/v6/text" @@ -161,19 +162,19 @@ func getEventbusInfoCommand() *cobra.Command { } } else { if !showBlock { - t.AppendHeader(table.Row{"Eventbus", "Eventlog", "Segment", "Start", "End"}) + t.AppendHeader(table.Row{"Eventbus", "Eventlog", "Segment", "Capacity", "Size", "Start", "End"}) for _, res := range busMetas { for idx := 0; idx < len(res.Logs); idx++ { segOfEL := segs[res.Logs[idx].EventLogId] for sIdx, v := range segOfEL { if idx == 0 && sIdx == 0 { - t.AppendRow(table.Row{res.Name, res.Logs[idx].EventLogId, v.Id, v.StartOffsetInLog, - v.EndOffsetInLog}) + t.AppendRow(table.Row{res.Name, res.Logs[idx].EventLogId, v.Id, v.Capacity, v.Size, + v.StartOffsetInLog, v.EndOffsetInLog}) } else if sIdx == 0 { - t.AppendRow(table.Row{"", res.Logs[idx].EventLogId, v.Id, v.StartOffsetInLog, - v.EndOffsetInLog}) + t.AppendRow(table.Row{"", res.Logs[idx].EventLogId, v.Id, v.Capacity, v.Size, + v.StartOffsetInLog, v.EndOffsetInLog}) } else { - t.AppendRow(table.Row{"", "", v.Id, v.StartOffsetInLog, + t.AppendRow(table.Row{"", "", v.Id, v.Capacity, v.Size, v.StartOffsetInLog, v.EndOffsetInLog}) } } @@ -185,11 +186,13 @@ func getEventbusInfoCommand() *cobra.Command { {Number: 1, VAlign: text.VAlignMiddle, AutoMerge: true, Align: text.AlignCenter, AlignHeader: text.AlignCenter}, {Number: 2, VAlign: text.VAlignMiddle, AutoMerge: true, Align: text.AlignCenter, AlignHeader: text.AlignCenter}, {Number: 3, VAlign: text.VAlignMiddle, AutoMerge: true, Align: text.AlignCenter, AlignHeader: text.AlignCenter}, - {Number: 4, Align: text.AlignCenter, AlignHeader: text.AlignCenter}, - {Number: 5, Align: text.AlignCenter, AlignHeader: text.AlignCenter}, + {Number: 4, VAlign: text.VAlignMiddle, Align: text.AlignCenter, AlignHeader: text.AlignCenter}, + {Number: 5, VAlign: text.VAlignMiddle, Align: text.AlignCenter, AlignHeader: text.AlignCenter}, + {Number: 6, Align: text.AlignCenter, AlignHeader: text.AlignCenter}, + {Number: 7, Align: text.AlignCenter, AlignHeader: text.AlignCenter}, }) } else { - t.AppendHeader(table.Row{"Eventbus", "Eventlog", "Segment", "Start", "End", "Block", "Leader", "Volume", "Endpoint"}) + t.AppendHeader(table.Row{"Eventbus", "Eventlog", "Segment", "Capacity", "Size", "Start", "End", "Block", "Leader", "Volume", "Endpoint"}) multiReplica := false for _, res := range busMetas { for idx := 0; idx < len(res.Logs); idx++ { @@ -199,18 +202,30 @@ func getEventbusInfoCommand() *cobra.Command { if !multiReplica && len(seg.Replicas) > 1 { multiReplica = true } - for _, blk := range seg.Replicas { + var vols []uint64 + var volMap = map[uint64]*metapb.Block{} + for _, v := range seg.Replicas { + vols = append(vols, v.VolumeID) + volMap[v.VolumeID] = v + } + sortkeys.Uint64s(vols) + for _, k := range vols { + blk := volMap[k] if idx == 0 && sIdx == 0 && tIdx == 0 { - t.AppendRow(table.Row{res.Name, res.Logs[idx].EventLogId, seg.Id, seg.StartOffsetInLog, - seg.EndOffsetInLog, blk.Id, blk.Id == seg.LeaderBlockId, blk.VolumeID, blk.Endpoint}) + t.AppendRow(table.Row{res.Name, res.Logs[idx].EventLogId, seg.Id, seg.Capacity, + seg.Size, seg.StartOffsetInLog, seg.EndOffsetInLog, blk.Id, + blk.Id == seg.LeaderBlockId, blk.VolumeID, blk.Endpoint}) } else if sIdx == 0 && tIdx == 0 { - t.AppendRow(table.Row{"", res.Logs[idx].EventLogId, seg.Id, seg.StartOffsetInLog, - seg.EndOffsetInLog, blk.Id, blk.Id == seg.LeaderBlockId, blk.VolumeID, blk.Endpoint}) + t.AppendRow(table.Row{"", res.Logs[idx].EventLogId, seg.Id, seg.Capacity, + seg.Size, seg.StartOffsetInLog, seg.EndOffsetInLog, blk.Id, + blk.Id == seg.LeaderBlockId, blk.VolumeID, blk.Endpoint}) } else if tIdx == 0 { - t.AppendRow(table.Row{"", "", seg.Id, seg.StartOffsetInLog, - seg.EndOffsetInLog, blk.Id, blk.Id == seg.LeaderBlockId, blk.VolumeID, blk.Endpoint}) + t.AppendRow(table.Row{"", "", seg.Id, seg.Capacity, seg.Size, + seg.StartOffsetInLog, seg.EndOffsetInLog, blk.Id, + blk.Id == seg.LeaderBlockId, blk.VolumeID, blk.Endpoint}) } else { - t.AppendRow(table.Row{"", "", "", "", "", blk.Id, blk.Id == seg.LeaderBlockId, blk.VolumeID, blk.Endpoint}) + t.AppendRow(table.Row{"", "", "", "", "", "", "", blk.Id, + blk.Id == seg.LeaderBlockId, blk.VolumeID, blk.Endpoint}) } tIdx++ } @@ -224,10 +239,12 @@ func getEventbusInfoCommand() *cobra.Command { {Number: 3, VAlign: text.VAlignMiddle, AutoMerge: true, Align: text.AlignCenter, AlignHeader: text.AlignCenter}, {Number: 4, VAlign: text.VAlignMiddle, AutoMerge: multiReplica, Align: text.AlignCenter, AlignHeader: text.AlignCenter}, {Number: 5, VAlign: text.VAlignMiddle, AutoMerge: multiReplica, Align: text.AlignCenter, AlignHeader: text.AlignCenter}, - {Number: 6, Align: text.AlignCenter, AlignHeader: text.AlignCenter}, - {Number: 7, Align: text.AlignCenter, AlignHeader: text.AlignCenter}, + {Number: 6, VAlign: text.VAlignMiddle, AutoMerge: multiReplica, Align: text.AlignCenter, AlignHeader: text.AlignCenter}, + {Number: 7, VAlign: text.VAlignMiddle, AutoMerge: multiReplica, Align: text.AlignCenter, AlignHeader: text.AlignCenter}, {Number: 8, Align: text.AlignCenter, AlignHeader: text.AlignCenter}, {Number: 9, Align: text.AlignCenter, AlignHeader: text.AlignCenter}, + {Number: 10, Align: text.AlignCenter, AlignHeader: text.AlignCenter}, + {Number: 11, Align: text.AlignCenter, AlignHeader: text.AlignCenter}, }) } }