Skip to content

Commit

Permalink
Merge branch 'main' into gateway
Browse files Browse the repository at this point in the history
Signed-off-by: jyjiangkai <jyjiangkai@163.com>
  • Loading branch information
hwjiangkai committed Jan 6, 2023
2 parents 7f667e0 + 1395ee8 commit 44073ee
Show file tree
Hide file tree
Showing 105 changed files with 3,237 additions and 1,785 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ jobs:
startsWith(github.event.pull_request.title, 'fix') ||
startsWith(github.event.pull_request.title, 'feat') ||
startsWith(github.event.pull_request.title, 'refactor') ||
startsWith(github.event.pull_request.title, 'perf')
startsWith(github.event.pull_request.title, 'perf') ||
startsWith(github.event.pull_request.title, 'test')
runs-on: ${{ matrix.os }}
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
Expand Down
2 changes: 1 addition & 1 deletion build/images/controller/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ARG TARGETOS
ARG TARGETARCH
RUN GOOS=$TARGETOS GOARCH=$TARGETARCH make build-controller

FROM alpine:3.15.4
FROM ubuntu:22.10
WORKDIR /vanus
COPY --from=builder /workspace/bin/controller bin/controller
ENTRYPOINT ["bin/controller"]
Expand Down
2 changes: 1 addition & 1 deletion build/images/gateway/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ARG TARGETOS
ARG TARGETARCH
RUN GOOS=$TARGETOS GOARCH=$TARGETARCH make build-gateway

FROM alpine:3.15.4
FROM ubuntu:22.10
WORKDIR /vanus
COPY --from=builder /workspace/bin/gateway bin/gateway
ENTRYPOINT ["bin/gateway"]
Expand Down
2 changes: 1 addition & 1 deletion build/images/store/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ARG TARGETOS
ARG TARGETARCH
RUN GOOS=$TARGETOS GOARCH=$TARGETARCH make build-store

FROM alpine:3.15.4
FROM ubuntu:22.10
WORKDIR /vanus
COPY --from=builder /workspace/bin/store bin/store
ENTRYPOINT ["bin/store"]
Expand Down
2 changes: 1 addition & 1 deletion build/images/timer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ARG TARGETOS
ARG TARGETARCH
RUN GOOS=$TARGETOS GOARCH=$TARGETARCH make build-timer

FROM alpine:3.15.4
FROM ubuntu:22.10
WORKDIR /vanus
COPY --from=builder /workspace/bin/timer bin/timer
ENTRYPOINT ["bin/timer"]
Expand Down
2 changes: 1 addition & 1 deletion build/images/trigger/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ARG TARGETOS
ARG TARGETARCH
RUN GOOS=$TARGETOS GOARCH=$TARGETARCH make build-trigger

FROM alpine:3.15.4
FROM ubuntu:22.10
WORKDIR /vanus
COPY --from=builder /workspace/bin/trigger bin/trigger
ENTRYPOINT ["bin/trigger"]
Expand Down
6 changes: 3 additions & 3 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ go 1.18
require (
github.com/cloudevents/sdk-go/v2 v2.12.0
github.com/golang/mock v1.6.0
github.com/linkall-labs/vanus/observability v0.5.1
github.com/linkall-labs/vanus/pkg v0.5.1
github.com/linkall-labs/vanus/proto v0.5.1
github.com/linkall-labs/vanus/observability v0.5.6
github.com/linkall-labs/vanus/pkg v0.5.6
github.com/linkall-labs/vanus/proto v0.5.6
github.com/scylladb/go-set v1.0.2
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.36.4
go.opentelemetry.io/otel/trace v1.11.2
Expand Down
4 changes: 2 additions & 2 deletions client/internal/vanus/store/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"context"
"time"

"github.com/linkall-labs/vanus/client/pkg/codec"

"github.com/linkall-labs/vanus/observability/tracing"
"go.opentelemetry.io/otel/trace"

Expand All @@ -30,8 +32,6 @@ import (
cepb "github.com/linkall-labs/vanus/proto/pkg/cloudevents"
segpb "github.com/linkall-labs/vanus/proto/pkg/segment"

// this project
"github.com/linkall-labs/vanus/client/internal/vanus/codec"
"github.com/linkall-labs/vanus/client/internal/vanus/net/rpc"
"github.com/linkall-labs/vanus/client/internal/vanus/net/rpc/bare"
"github.com/linkall-labs/vanus/client/pkg/primitive"
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions client/pkg/eventbus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func (w *busWriter) AppendBatch(ctx context.Context, events *cloudevents.CloudEv
_ctx, span := w.tracer.Start(ctx, "CloudEventBatch")
defer span.End()

var writeOpts *api.WriteOptions = w.opts
var writeOpts = w.opts
if len(opts) > 0 {
writeOpts = w.opts.Copy()
for _, opt := range opts {
Expand All @@ -475,7 +475,7 @@ func (w *busWriter) AppendOne(ctx context.Context, event *ce.Event, opts ...api.
_ctx, span := w.tracer.Start(ctx, "AppendOne")
defer span.End()

var writeOpts *api.WriteOptions = w.opts
var writeOpts = w.opts
if len(opts) > 0 {
writeOpts = w.opts.Copy()
for _, opt := range opts {
Expand Down
41 changes: 36 additions & 5 deletions client/pkg/policy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package policy

import (
"context"
"sort"
"sync"
"sync/atomic"

"github.com/linkall-labs/vanus/client/pkg/api"
Expand All @@ -30,8 +32,10 @@ func NewRoundRobinWritePolicy(eb api.Eventbus) api.WritePolicy {
}

type roundRobinWritePolicy struct {
bus api.Eventbus
idx uint64
bus api.Eventbus
idx uint64
cached []api.Eventlog
mutex sync.Mutex
}

func (w *roundRobinWritePolicy) Type() api.PolicyType {
Expand All @@ -47,6 +51,18 @@ func (w *roundRobinWritePolicy) NextLog(ctx context.Context) (api.Eventlog, erro
if len(logs) == 0 {
continue
}

if len(logs) == len(w.cached) {
logs = w.cached
} else {
w.mutex.Lock()
sort.Slice(logs, func(i, j int) bool {
return logs[i].ID() > logs[j].ID()
})
w.cached = logs
w.mutex.Unlock()
}

l := len(logs)
i := atomic.AddUint64(&w.idx, 1) % uint64(l)
return logs[i], nil
Expand Down Expand Up @@ -82,28 +98,43 @@ type roundRobinReadPolicy struct {
bus api.Eventbus
idx uint64
offset int64
cached []api.Eventlog
mutex sync.Mutex
}

func (r roundRobinReadPolicy) Type() api.PolicyType {
func (r *roundRobinReadPolicy) Type() api.PolicyType {
return api.RoundRobin
}

func (r roundRobinReadPolicy) NextLog(ctx context.Context) (api.Eventlog, error) {
func (r *roundRobinReadPolicy) NextLog(ctx context.Context) (api.Eventlog, error) {
for {
logs, err := r.bus.ListLog(ctx)
if err != nil {
return nil, err
}

if len(logs) == 0 {
continue
}

if len(logs) == len(r.cached) {
logs = r.cached
} else {
r.mutex.Lock()
sort.Slice(logs, func(i, j int) bool {
return logs[i].ID() > logs[j].ID()
})
r.cached = logs
r.mutex.Unlock()
}

l := len(logs)
i := atomic.AddUint64(&r.idx, 1) % uint64(l)
return logs[i], nil
}
}

func (r roundRobinReadPolicy) Offset() int64 {
func (r *roundRobinReadPolicy) Offset() int64 {
return atomic.LoadInt64(&r.offset)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func main() {
}

//trigger controller
triggerCtrlStv := trigger.NewController(cfg.GetTriggerConfig(), cfg.GetControllerAddrs(), etcd)
triggerCtrlStv := trigger.NewController(cfg.GetTriggerConfig(), etcd)
if err = triggerCtrlStv.Start(); err != nil {
log.Error(ctx, "start trigger controller fail", map[string]interface{}{
log.KeyError: err,
Expand Down
11 changes: 6 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ require (
github.com/jedib0t/go-pretty/v6 v6.3.1
github.com/json-iterator/go v1.1.12
github.com/linkall-labs/embed-etcd v0.1.2
github.com/linkall-labs/vanus/client v0.5.1
github.com/linkall-labs/vanus/observability v0.5.1
github.com/linkall-labs/vanus/pkg v0.5.1
github.com/linkall-labs/vanus/proto v0.5.1
github.com/linkall-labs/vanus/raft v0.5.1
github.com/linkall-labs/vanus/client v0.5.6
github.com/linkall-labs/vanus/observability v0.5.6
github.com/linkall-labs/vanus/pkg v0.5.6
github.com/linkall-labs/vanus/proto v0.5.6
github.com/linkall-labs/vanus/raft v0.5.6
github.com/ncw/directio v1.0.5
github.com/ohler55/ojg v1.14.5
github.com/panjf2000/ants/v2 v2.7.1
github.com/pkg/errors v0.9.1
github.com/prashantv/gostub v1.1.0
github.com/prometheus/client_golang v1.14.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/panjf2000/ants/v2 v2.7.1 h1:qBy5lfSdbxvrR0yUnZfaEDjf0FlCw4ufsbcsxmE7r+M=
github.com/panjf2000/ants/v2 v2.7.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
1 change: 1 addition & 0 deletions internal/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (c *Config) GetTriggerConfig() trigger.Config {
ServerList: c.EtcdEndpoints,
},
SecretEncryptionSalt: c.SecretEncryptionSalt,
ControllerAddr: c.GetControllerAddrs(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions internal/controller/trigger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ type Config struct {
Storage primitive.KvStorageConfig

SecretEncryptionSalt string

ControllerAddr []string
}
Loading

0 comments on commit 44073ee

Please sign in to comment.