From 714e228757179987dea5cdbe5e7e901de360e373 Mon Sep 17 00:00:00 2001 From: wenfeng Date: Mon, 5 Sep 2022 17:55:23 +0800 Subject: [PATCH] fix: read events will be blocked when switching to a new segment (#235) --- .github/workflows/codecov.yml | 16 ++++++++-------- .github/workflows/e2e.yml | 1 - .../vanus/eventlog/distributed_eventlog.go | 7 +++++-- go.mod | 2 +- internal/controller/config.go | 2 ++ internal/controller/eventbus/block/block.go | 16 ++++++++++++---- internal/controller/eventbus/block/block_test.go | 4 ++-- internal/controller/eventbus/config.go | 1 + internal/controller/eventbus/controller.go | 2 +- .../controller/eventbus/eventlog/eventlog.go | 4 ++-- 10 files changed, 34 insertions(+), 21 deletions(-) diff --git a/.github/workflows/codecov.yml b/.github/workflows/codecov.yml index 95053fa7a..8cfbfdf55 100644 --- a/.github/workflows/codecov.yml +++ b/.github/workflows/codecov.yml @@ -12,24 +12,24 @@ permissions: # pull-requests: read- jobs: codecov: + strategy: + matrix: + go-version: [ 1.17.x, 1.18.x ] + os: [ ubuntu-latest ] name: codecov - runs-on: ubuntu-latest + runs-on: ${{ matrix.os }} + env: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} steps: - uses: actions/setup-go@v2 - name: checkout-vanus uses: actions/checkout@v3 - with: - repository: linkall-labs/vanus - token: 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 # `GH_PAT` is a secret that contains your PAT - path: vanus - name: Generate coverage report timeout-minutes: 2 run: | - cd vanus && go test -failfast -race -coverprofile=coverage.txt -covermode=atomic -timeout 20s ./internal/... + go test -failfast -race -coverprofile=coverage.txt -covermode=atomic -timeout 20s ./internal/... - uses: codecov/codecov-action@v2 with: - token: 931ae77e-a303-4e84-9c9f-33083aceaef2 # not required for public repos - working-directory: vanus files: ./coverage.txt # optional name: codecov-umbrella # optional fail_ci_if_error: true # optional (default = false) diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 1e510f6d1..805f30fe8 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -21,7 +21,6 @@ jobs: steps: - name: Check out repository code uses: actions/checkout@v3 - - name: Environmental preparation run: | curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 diff --git a/client/internal/vanus/eventlog/distributed_eventlog.go b/client/internal/vanus/eventlog/distributed_eventlog.go index edd6a543a..79d3fb900 100644 --- a/client/internal/vanus/eventlog/distributed_eventlog.go +++ b/client/internal/vanus/eventlog/distributed_eventlog.go @@ -382,8 +382,11 @@ func (r *logReader) Read(ctx context.Context, size int16) ([]*ce.Event, error) { events, err := r.cur.Read(ctx, r.pos, size) if err != nil { - if stderr.Is(err, errors.ErrOverflow) && r.switchSegment(ctx) { - return nil, errors.ErrTryAgain + if stderr.Is(err, errors.ErrOverflow) { + r.elog.refreshReadableSegments(ctx) + if r.switchSegment(ctx) { + return nil, errors.ErrTryAgain + } } return nil, err } diff --git a/go.mod b/go.mod index 1a87421b5..134156fd2 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,6 @@ require ( github.com/iceber/iouring-go v0.0.0-20220609112130-b1dc8dd9fbfd github.com/jedib0t/go-pretty/v6 v6.3.1 github.com/labstack/echo/v4 v4.7.2 - github.com/labstack/gommon v0.3.1 github.com/linkall-labs/embed-etcd v0.0.3 github.com/linkall-labs/vanus/client v0.1.0 github.com/linkall-labs/vanus/proto v0.1.0 @@ -86,6 +85,7 @@ require ( github.com/jonboulle/clockwork v0.2.2 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/jtolds/gls v4.20.0+incompatible // indirect + github.com/labstack/gommon v0.3.1 // indirect github.com/mattn/go-colorable v0.1.11 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect diff --git a/internal/controller/config.go b/internal/controller/config.go index c74c796f7..cc7e600de 100644 --- a/internal/controller/config.go +++ b/internal/controller/config.go @@ -35,6 +35,7 @@ type Config struct { Topology map[string]string `yaml:"topology"` Replicas uint `yaml:"replicas"` SecretEncryptionSalt string `yaml:"secret_encryption_salt"` + SegmentCapacity int64 `yaml:"segment_capacity"` } func (c *Config) GetEtcdConfig() embedetcd.Config { @@ -51,6 +52,7 @@ func (c *Config) GetEventbusCtrlConfig() eventbus.Config { KVKeyPrefix: c.MetadataConfig.KeyPrefix, Replicas: c.Replicas, Topology: c.Topology, + SegmentCapacity: c.SegmentCapacity, } } diff --git a/internal/controller/eventbus/block/block.go b/internal/controller/eventbus/block/block.go index 8850982c0..d29dd117a 100644 --- a/internal/controller/eventbus/block/block.go +++ b/internal/controller/eventbus/block/block.go @@ -31,6 +31,7 @@ import ( const ( defaultBlockSize = int64(64 * 1024 * 1024) + minimumBlockSize = int64(4 * 1024 * 1024) defaultBlockBufferSizePerVolume = 8 ) @@ -44,8 +45,14 @@ type Allocator interface { Stop() } -func NewAllocator(selector VolumeSelector) Allocator { +func NewAllocator(defaultBlockCapacity int64, selector VolumeSelector) Allocator { + if defaultBlockCapacity <= 0 { + defaultBlockCapacity = defaultBlockSize + } else if defaultBlockCapacity < minimumBlockSize { + defaultBlockCapacity = minimumBlockSize + } return &allocator{ + blockCapacity: defaultBlockCapacity, selector: selector, allocateTicker: time.NewTicker(time.Second), } @@ -60,6 +67,7 @@ type allocator struct { cancel func() cancelCtx context.Context allocateTicker *time.Ticker + blockCapacity int64 } func (al *allocator) Run(ctx context.Context, kvCli kv.Client, startDynamicAllocate bool) error { @@ -97,7 +105,7 @@ func (al *allocator) Pick(ctx context.Context, num int) ([]*metadata.Block, erro defer al.mutex.Unlock() blockArr := make([]*metadata.Block, num) - instances := al.selector.Select(num, defaultBlockSize) + instances := al.selector.Select(num, al.blockCapacity) if len(instances) == 0 { return nil, ErrVolumeNotFound } @@ -112,7 +120,7 @@ func (al *allocator) Pick(ctx context.Context, num int) ([]*metadata.Block, erro } if !exist || skipList.Len() == 0 { - block, err = ins.CreateBlock(ctx, defaultBlockSize) + block, err = ins.CreateBlock(ctx, al.blockCapacity) if err != nil { return nil, err } @@ -153,7 +161,7 @@ func (al *allocator) dynamicAllocateBlockTask(ctx context.Context) { } skipList, _ = v.(*skiplist.SkipList) for skipList.Len() < defaultBlockBufferSizePerVolume { - block, err := instance.CreateBlock(ctx, defaultBlockSize) + block, err := instance.CreateBlock(ctx, al.blockCapacity) if err != nil { log.Warning(ctx, "create block failed", map[string]interface{}{ "volume_id": instance.GetMeta().ID, diff --git a/internal/controller/eventbus/block/block_test.go b/internal/controller/eventbus/block/block_test.go index 3a907b6aa..310a177bd 100644 --- a/internal/controller/eventbus/block/block_test.go +++ b/internal/controller/eventbus/block/block_test.go @@ -210,7 +210,7 @@ func TestAllocator_RunWithDynamic(t *testing.T) { mutex := sync.Mutex{} var instanceList []server.Instance - alloc := NewAllocator(NewVolumeRoundRobin(func() []server.Instance { + alloc := NewAllocator(0, NewVolumeRoundRobin(func() []server.Instance { mutex.Lock() defer mutex.Unlock() return instanceList @@ -317,7 +317,7 @@ func getAllocator(ctrl *gomock.Controller) *allocator { }, nil }) - alloc := NewAllocator(NewVolumeRoundRobin(func() []server.Instance { + alloc := NewAllocator(0, NewVolumeRoundRobin(func() []server.Instance { return []server.Instance{srv1, srv2, srv3} })) return alloc.(*allocator) diff --git a/internal/controller/eventbus/config.go b/internal/controller/eventbus/config.go index 605ba9dbc..f672fe15b 100644 --- a/internal/controller/eventbus/config.go +++ b/internal/controller/eventbus/config.go @@ -24,4 +24,5 @@ type Config struct { EtcdConfig embedetcd.Config `yaml:"etcd"` Replicas uint `yaml:"replicas"` Topology map[string]string `yaml:"topology"` + SegmentCapacity int64 `yaml:"segment_capacity"` } diff --git a/internal/controller/eventbus/controller.go b/internal/controller/eventbus/controller.go index 96b6bd3c0..47fbd2945 100644 --- a/internal/controller/eventbus/controller.go +++ b/internal/controller/eventbus/controller.go @@ -62,7 +62,7 @@ func NewController(cfg Config, member embedetcd.Member) *controller { stopNotify: make(chan error, 1), } c.volumeMgr = volume.NewVolumeManager(c.ssMgr) - c.eventLogMgr = eventlog.NewManager(c.volumeMgr, cfg.Replicas) + c.eventLogMgr = eventlog.NewManager(c.volumeMgr, cfg.Replicas, cfg.SegmentCapacity) return c } diff --git a/internal/controller/eventbus/eventlog/eventlog.go b/internal/controller/eventbus/eventlog/eventlog.go index 93d627c9b..a394e001f 100644 --- a/internal/controller/eventbus/eventlog/eventlog.go +++ b/internal/controller/eventbus/eventlog/eventlog.go @@ -94,12 +94,12 @@ type eventlogManager struct { segmentExpiredTime time.Duration } -func NewManager(volMgr volume.Manager, replicaNum uint) Manager { +func NewManager(volMgr volume.Manager, replicaNum uint, defaultBlockSize int64) Manager { mgr.volMgr = volMgr if replicaNum > 0 { mgr.segmentReplicaNum = replicaNum } - mgr.allocator = block.NewAllocator(block.NewVolumeRoundRobin(mgr.volMgr.GetAllActiveVolumes)) + mgr.allocator = block.NewAllocator(defaultBlockSize, block.NewVolumeRoundRobin(mgr.volMgr.GetAllActiveVolumes)) return mgr }