Skip to content

Commit

Permalink
fix: read events will be blocked when switching to a new segment (#235)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenfengwang authored Sep 5, 2022
1 parent af0805a commit 714e228
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 21 deletions.
16 changes: 8 additions & 8 deletions .github/workflows/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,24 @@ permissions:
# pull-requests: read-
jobs:
codecov:
strategy:
matrix:
go-version: [ 1.17.x, 1.18.x ]
os: [ ubuntu-latest ]
name: codecov
runs-on: ubuntu-latest
runs-on: ${{ matrix.os }}
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
steps:
- uses: actions/setup-go@v2
- name: checkout-vanus
uses: actions/checkout@v3
with:
repository: linkall-labs/vanus
token: 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 # `GH_PAT` is a secret that contains your PAT
path: vanus
- name: Generate coverage report
timeout-minutes: 2
run: |
cd vanus && go test -failfast -race -coverprofile=coverage.txt -covermode=atomic -timeout 20s ./internal/...
go test -failfast -race -coverprofile=coverage.txt -covermode=atomic -timeout 20s ./internal/...
- uses: codecov/codecov-action@v2
with:
token: 931ae77e-a303-4e84-9c9f-33083aceaef2 # not required for public repos
working-directory: vanus
files: ./coverage.txt # optional
name: codecov-umbrella # optional
fail_ci_if_error: true # optional (default = false)
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ jobs:
steps:
- name: Check out repository code
uses: actions/checkout@v3

- name: Environmental preparation
run: |
curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
Expand Down
7 changes: 5 additions & 2 deletions client/internal/vanus/eventlog/distributed_eventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,11 @@ func (r *logReader) Read(ctx context.Context, size int16) ([]*ce.Event, error) {

events, err := r.cur.Read(ctx, r.pos, size)
if err != nil {
if stderr.Is(err, errors.ErrOverflow) && r.switchSegment(ctx) {
return nil, errors.ErrTryAgain
if stderr.Is(err, errors.ErrOverflow) {
r.elog.refreshReadableSegments(ctx)
if r.switchSegment(ctx) {
return nil, errors.ErrTryAgain
}
}
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ require (
github.com/iceber/iouring-go v0.0.0-20220609112130-b1dc8dd9fbfd
github.com/jedib0t/go-pretty/v6 v6.3.1
github.com/labstack/echo/v4 v4.7.2
github.com/labstack/gommon v0.3.1
github.com/linkall-labs/embed-etcd v0.0.3
github.com/linkall-labs/vanus/client v0.1.0
github.com/linkall-labs/vanus/proto v0.1.0
Expand Down Expand Up @@ -86,6 +85,7 @@ require (
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/labstack/gommon v0.3.1 // indirect
github.com/mattn/go-colorable v0.1.11 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
Expand Down
2 changes: 2 additions & 0 deletions internal/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Config struct {
Topology map[string]string `yaml:"topology"`
Replicas uint `yaml:"replicas"`
SecretEncryptionSalt string `yaml:"secret_encryption_salt"`
SegmentCapacity int64 `yaml:"segment_capacity"`
}

func (c *Config) GetEtcdConfig() embedetcd.Config {
Expand All @@ -51,6 +52,7 @@ func (c *Config) GetEventbusCtrlConfig() eventbus.Config {
KVKeyPrefix: c.MetadataConfig.KeyPrefix,
Replicas: c.Replicas,
Topology: c.Topology,
SegmentCapacity: c.SegmentCapacity,
}
}

Expand Down
16 changes: 12 additions & 4 deletions internal/controller/eventbus/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

const (
defaultBlockSize = int64(64 * 1024 * 1024)
minimumBlockSize = int64(4 * 1024 * 1024)
defaultBlockBufferSizePerVolume = 8
)

Expand All @@ -44,8 +45,14 @@ type Allocator interface {
Stop()
}

func NewAllocator(selector VolumeSelector) Allocator {
func NewAllocator(defaultBlockCapacity int64, selector VolumeSelector) Allocator {
if defaultBlockCapacity <= 0 {
defaultBlockCapacity = defaultBlockSize
} else if defaultBlockCapacity < minimumBlockSize {
defaultBlockCapacity = minimumBlockSize
}
return &allocator{
blockCapacity: defaultBlockCapacity,
selector: selector,
allocateTicker: time.NewTicker(time.Second),
}
Expand All @@ -60,6 +67,7 @@ type allocator struct {
cancel func()
cancelCtx context.Context
allocateTicker *time.Ticker
blockCapacity int64
}

func (al *allocator) Run(ctx context.Context, kvCli kv.Client, startDynamicAllocate bool) error {
Expand Down Expand Up @@ -97,7 +105,7 @@ func (al *allocator) Pick(ctx context.Context, num int) ([]*metadata.Block, erro
defer al.mutex.Unlock()
blockArr := make([]*metadata.Block, num)

instances := al.selector.Select(num, defaultBlockSize)
instances := al.selector.Select(num, al.blockCapacity)
if len(instances) == 0 {
return nil, ErrVolumeNotFound
}
Expand All @@ -112,7 +120,7 @@ func (al *allocator) Pick(ctx context.Context, num int) ([]*metadata.Block, erro
}

if !exist || skipList.Len() == 0 {
block, err = ins.CreateBlock(ctx, defaultBlockSize)
block, err = ins.CreateBlock(ctx, al.blockCapacity)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -153,7 +161,7 @@ func (al *allocator) dynamicAllocateBlockTask(ctx context.Context) {
}
skipList, _ = v.(*skiplist.SkipList)
for skipList.Len() < defaultBlockBufferSizePerVolume {
block, err := instance.CreateBlock(ctx, defaultBlockSize)
block, err := instance.CreateBlock(ctx, al.blockCapacity)
if err != nil {
log.Warning(ctx, "create block failed", map[string]interface{}{
"volume_id": instance.GetMeta().ID,
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/eventbus/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func TestAllocator_RunWithDynamic(t *testing.T) {

mutex := sync.Mutex{}
var instanceList []server.Instance
alloc := NewAllocator(NewVolumeRoundRobin(func() []server.Instance {
alloc := NewAllocator(0, NewVolumeRoundRobin(func() []server.Instance {
mutex.Lock()
defer mutex.Unlock()
return instanceList
Expand Down Expand Up @@ -317,7 +317,7 @@ func getAllocator(ctrl *gomock.Controller) *allocator {
}, nil
})

alloc := NewAllocator(NewVolumeRoundRobin(func() []server.Instance {
alloc := NewAllocator(0, NewVolumeRoundRobin(func() []server.Instance {
return []server.Instance{srv1, srv2, srv3}
}))
return alloc.(*allocator)
Expand Down
1 change: 1 addition & 0 deletions internal/controller/eventbus/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ type Config struct {
EtcdConfig embedetcd.Config `yaml:"etcd"`
Replicas uint `yaml:"replicas"`
Topology map[string]string `yaml:"topology"`
SegmentCapacity int64 `yaml:"segment_capacity"`
}
2 changes: 1 addition & 1 deletion internal/controller/eventbus/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewController(cfg Config, member embedetcd.Member) *controller {
stopNotify: make(chan error, 1),
}
c.volumeMgr = volume.NewVolumeManager(c.ssMgr)
c.eventLogMgr = eventlog.NewManager(c.volumeMgr, cfg.Replicas)
c.eventLogMgr = eventlog.NewManager(c.volumeMgr, cfg.Replicas, cfg.SegmentCapacity)
return c
}

Expand Down
4 changes: 2 additions & 2 deletions internal/controller/eventbus/eventlog/eventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ type eventlogManager struct {
segmentExpiredTime time.Duration
}

func NewManager(volMgr volume.Manager, replicaNum uint) Manager {
func NewManager(volMgr volume.Manager, replicaNum uint, defaultBlockSize int64) Manager {
mgr.volMgr = volMgr
if replicaNum > 0 {
mgr.segmentReplicaNum = replicaNum
}
mgr.allocator = block.NewAllocator(block.NewVolumeRoundRobin(mgr.volMgr.GetAllActiveVolumes))
mgr.allocator = block.NewAllocator(defaultBlockSize, block.NewVolumeRoundRobin(mgr.volMgr.GetAllActiveVolumes))
return mgr
}

Expand Down

0 comments on commit 714e228

Please sign in to comment.