Skip to content

Commit

Permalink
kubernetes leader election and membership added (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan authored Dec 13, 2022
1 parent e2aaeab commit 7427d27
Show file tree
Hide file tree
Showing 45 changed files with 1,747 additions and 332 deletions.
44 changes: 22 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,30 +42,30 @@ $ go get github.com/Trendyol/go-dcp-client

### Configuration

| Variable | Type | Is Required |
|----------------------------------|-------------------------|-------------|
| `hosts` | array | yes |
| `username` | string | yes |
| `password` | string | yes |
| `bucketName` | string | yes |
| `userAgent` | string | yes |
| `compression` | boolean *(true/false)* | yes |
| `metadataBucket` | string | yes |
| `connectTimeout` | integer *(second)* | yes |
| `dcp.connectTimeout` | integer *(second)* | yes |
| `dcp.flowControlBuffer` | integer | yes |
| `dcp.persistencePollingInterval` | integer *(millisecond)* | yes |
| `dcp.group.name` | string | yes |
| `dcp.membership.type` | string | yes |
| `dcp.membership.memberNumber` | integer | yes |
| `dcp.membership.totalMembers` | integer | yes |
| `api.port` | integer | yes |
| `metric.enabled` | boolean *(true/false)* | yes |
| `metric.path` | string | yes |
| Variable | Type | Is Required |
|-------------------------------------|-----------------------------|-------------|
| `hosts` | array | yes |
| `username` | string | yes |
| `password` | string | yes |
| `bucketName` | string | yes |
| `metadataBucket` | string | yes |
| `dcp.group.name` | string | yes |
| `dcp.group.membership.type` | string | yes |
| `dcp.group.membership.memberNumber` | integer | no |
| `dcp.group.membership.totalMembers` | integer | no |
| `api.port` | integer | yes |
| `metric.enabled` | boolean *(true/false)* | yes |
| `metric.path` | string | yes |
| `leaderElection.enabled` | boolean *(true/false)* | yes |
| `leaderElection.type` | string | no |
| `leaderElection.config` | string/string key value map | no |
| `leaderElection.rpc.port` | integer | yes |

---

### Examples

- [main.go](example/main.go)
- [config.yml](example/config.yml)
- [example with static membership](example/main.go)
- [static membership config](example/config.yml)
- [kubernetesStatefulSet membership config](example/config_k8s_stateful_set.yml)
- [kubernetesHa membership config](example/config_k8s_leader_election.yml)
42 changes: 33 additions & 9 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package godcpclient

import (
"fmt"
"github.com/Trendyol/go-dcp-client/helpers"
"github.com/Trendyol/go-dcp-client/serviceDiscovery"
"github.com/gofiber/fiber/v2"
"log"
)
Expand All @@ -12,9 +14,11 @@ type Api interface {
}

type api struct {
app *fiber.App
config Config
observer Observer
app *fiber.App
config helpers.Config
client Client
stream Stream
serviceDiscovery serviceDiscovery.ServiceDiscovery
}

func (s *api) Listen() {
Expand All @@ -40,26 +44,46 @@ func (s *api) Shutdown() {
}

func (s *api) status(c *fiber.Ctx) error {
_, err := s.client.Ping()

if err != nil {
return err
}

return c.SendString("OK")
}

func (s *api) observerState(c *fiber.Ctx) error {
return c.JSON(s.observer.GetState())
return c.JSON(s.stream.GetObserver().GetState())
}

func (s *api) rebalance(c *fiber.Ctx) error {
s.stream.Rebalance()

return c.SendString("OK")
}

func (s *api) followers(c *fiber.Ctx) error {
return c.JSON(s.serviceDiscovery.GetAll())
}

func NewApi(config Config, observer Observer) Api {
func NewApi(config helpers.Config, client Client, stream Stream, serviceDiscovery serviceDiscovery.ServiceDiscovery) Api {
app := fiber.New(fiber.Config{DisableStartupMessage: true})

app.Use(NewMetricMiddleware(app, config, observer))
app.Use(NewMetricMiddleware(app, config, stream.GetObserver()))

api := &api{
app: app,
config: config,
observer: observer,
app: app,
config: config,
client: client,
stream: stream,
serviceDiscovery: serviceDiscovery,
}

app.Get("/status", api.status)
app.Get("/states/observer", api.observerState)
app.Get("/states/followers", api.followers)
app.Post("/rebalance", api.rebalance)

return api
}
10 changes: 8 additions & 2 deletions asyncOp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import (
"github.com/couchbase/gocbcore/v10"
)

type AsyncOp interface {
Reject()
Resolve()
Wait(op gocbcore.PendingOp, err error) error
}

type asyncOp struct {
signal chan struct{}
wasResolved bool
Expand Down Expand Up @@ -32,10 +38,10 @@ func (m *asyncOp) Wait(op gocbcore.PendingOp, err error) error {
<-m.signal
}

return nil
return m.ctx.Err()
}

func newAsyncOp(ctx context.Context) *asyncOp {
func NewAsyncOp(ctx context.Context) AsyncOp {
if ctx == nil {
ctx = context.Background()
}
Expand Down
114 changes: 84 additions & 30 deletions cbMetadata.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
package godcpclient

import (
"context"
"encoding/json"
"errors"
"github.com/Trendyol/go-dcp-client/helpers"
"github.com/couchbase/gocbcore/v10"
"github.com/couchbase/gocbcore/v10/memd"
"log"
"time"
)

type cbMetadata struct {
agent *gocbcore.Agent
config Config
config helpers.Config
}

func (s *cbMetadata) upsertXattrs(id string, path string, xattrs interface{}) error {
opm := newAsyncOp(nil)
func (s *cbMetadata) upsertXattrs(ctx context.Context, id string, path string, xattrs interface{}) error {
opm := NewAsyncOp(ctx)

deadline, _ := ctx.Deadline()

payload, _ := json.Marshal(xattrs)

Expand All @@ -30,37 +35,57 @@ func (s *cbMetadata) upsertXattrs(id string, path string, xattrs interface{}) er
Value: payload,
},
},
Deadline: deadline,
}, func(result *gocbcore.MutateInResult, err error) {
ch <- err
opm.Resolve()

ch <- err
})

err = opm.Wait(op, err)

if err != nil {
return err
}

err = <-ch

return opm.Wait(op, err)
return err
}

func (s *cbMetadata) deleteDocument(id string) {
opm := newAsyncOp(nil)
func (s *cbMetadata) deleteDocument(ctx context.Context, id string) {
opm := NewAsyncOp(ctx)

deadline, _ := ctx.Deadline()

ch := make(chan error)

op, err := s.agent.Delete(gocbcore.DeleteOptions{
Key: []byte(id),
Key: []byte(id),
Deadline: deadline,
}, func(result *gocbcore.DeleteResult, err error) {
opm.Resolve()

ch <- err
})

err = opm.Wait(op, err)

if err != nil {
return
}

_ = opm.Wait(op, err)
err = <-ch

if err != nil {
return
}
}

func (s *cbMetadata) getXattrs(id string, path string, bucketUuid string) (CheckpointDocument, error) {
opm := newAsyncOp(nil)
func (s *cbMetadata) getXattrs(ctx context.Context, id string, path string, bucketUuid string) (CheckpointDocument, error) {
opm := NewAsyncOp(nil)

deadline, _ := ctx.Deadline()

errorCh := make(chan error)
documentCh := make(chan CheckpointDocument)
Expand All @@ -74,7 +99,10 @@ func (s *cbMetadata) getXattrs(id string, path string, bucketUuid string) (Check
Path: path,
},
},
Deadline: deadline,
}, func(result *gocbcore.LookupInResult, err error) {
opm.Resolve()

if err == nil {
document := CheckpointDocument{}
err = json.Unmarshal(result.Ops[0].Value, &document)
Expand All @@ -89,58 +117,81 @@ func (s *cbMetadata) getXattrs(id string, path string, bucketUuid string) (Check
}

errorCh <- err

opm.Resolve()
})

err = opm.Wait(op, err)

if err != nil {
return NewEmptyCheckpointDocument(bucketUuid), err
}

document := <-documentCh

err = opm.Wait(op, <-errorCh)
err = <-errorCh

return document, err
}

func (s *cbMetadata) createEmptyDocument(id string) error {
opm := newAsyncOp(nil)
func (s *cbMetadata) createEmptyDocument(ctx context.Context, id string) error {
opm := NewAsyncOp(ctx)

deadline, _ := ctx.Deadline()

ch := make(chan error)

op, err := s.agent.Set(gocbcore.SetOptions{
Key: []byte(id),
Value: []byte{},
Flags: 50333696,
Key: []byte(id),
Value: []byte{},
Flags: 50333696,
Deadline: deadline,
}, func(result *gocbcore.StoreResult, err error) {
opm.Resolve()

ch <- err
})

err = opm.Wait(op, err)

if err != nil {
return err
}

return opm.Wait(op, err)
return <-ch
}

func (s *cbMetadata) Save(state map[uint16]CheckpointDocument, _ string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

for vbId, checkpointDocument := range state {
id := helpers.GetCheckpointId(vbId, s.config.Dcp.Group.Name, s.config.UserAgent)
id := helpers.GetCheckpointId(vbId, s.config.Dcp.Group.Name)
err := s.upsertXattrs(ctx, id, helpers.Name, checkpointDocument)

var kvErr *gocbcore.KeyValueError
if err != nil && errors.As(err, &kvErr) && kvErr.StatusCode == memd.StatusKeyNotFound {
err = s.createEmptyDocument(ctx, id)

err := s.upsertXattrs(id, helpers.Name, checkpointDocument)
if err == nil {
err = s.upsertXattrs(ctx, id, helpers.Name, checkpointDocument)
}
}

if err != nil {
_ = s.createEmptyDocument(id)
_ = s.upsertXattrs(id, helpers.Name, checkpointDocument)
log.Printf("error while saving checkpoint document: %v", err)
return
}
}
}

func (s *cbMetadata) Load(vbIds []uint16, bucketUuid string) map[uint16]CheckpointDocument {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

state := map[uint16]CheckpointDocument{}

for _, vbId := range vbIds {
id := helpers.GetCheckpointId(vbId, s.config.Dcp.Group.Name, s.config.UserAgent)
id := helpers.GetCheckpointId(vbId, s.config.Dcp.Group.Name)

data, err := s.getXattrs(id, helpers.Name, bucketUuid)
data, err := s.getXattrs(ctx, id, helpers.Name, bucketUuid)

var kvErr *gocbcore.KeyValueError
if err == nil || errors.As(err, &kvErr) && kvErr.StatusCode == memd.StatusKeyNotFound {
Expand All @@ -154,14 +205,17 @@ func (s *cbMetadata) Load(vbIds []uint16, bucketUuid string) map[uint16]Checkpoi
}

func (s *cbMetadata) Clear(vbIds []uint16) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

for _, vbId := range vbIds {
id := helpers.GetCheckpointId(vbId, s.config.Dcp.Group.Name, s.config.UserAgent)
id := helpers.GetCheckpointId(vbId, s.config.Dcp.Group.Name)

s.deleteDocument(id)
s.deleteDocument(ctx, id)
}
}

func NewCBMetadata(agent *gocbcore.Agent, config Config) Metadata {
func NewCBMetadata(agent *gocbcore.Agent, config helpers.Config) Metadata {
return &cbMetadata{
agent: agent,
config: config,
Expand Down
Loading

0 comments on commit 7427d27

Please sign in to comment.