Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kubernetes leader election and membership added #13

Merged
merged 1 commit into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,30 +42,30 @@ $ go get github.com/Trendyol/go-dcp-client

### Configuration

| Variable | Type | Is Required |
|----------------------------------|-------------------------|-------------|
| `hosts` | array | yes |
| `username` | string | yes |
| `password` | string | yes |
| `bucketName` | string | yes |
| `userAgent` | string | yes |
| `compression` | boolean *(true/false)* | yes |
| `metadataBucket` | string | yes |
| `connectTimeout` | integer *(second)* | yes |
| `dcp.connectTimeout` | integer *(second)* | yes |
| `dcp.flowControlBuffer` | integer | yes |
| `dcp.persistencePollingInterval` | integer *(millisecond)* | yes |
| `dcp.group.name` | string | yes |
| `dcp.membership.type` | string | yes |
| `dcp.membership.memberNumber` | integer | yes |
| `dcp.membership.totalMembers` | integer | yes |
| `api.port` | integer | yes |
| `metric.enabled` | boolean *(true/false)* | yes |
| `metric.path` | string | yes |
| Variable | Type | Is Required |
|-------------------------------------|-----------------------------|-------------|
| `hosts` | array | yes |
| `username` | string | yes |
| `password` | string | yes |
| `bucketName` | string | yes |
| `metadataBucket` | string | yes |
| `dcp.group.name` | string | yes |
| `dcp.group.membership.type` | string | yes |
| `dcp.group.membership.memberNumber` | integer | no |
| `dcp.group.membership.totalMembers` | integer | no |
| `api.port` | integer | yes |
| `metric.enabled` | boolean *(true/false)* | yes |
| `metric.path` | string | yes |
| `leaderElection.enabled` | boolean *(true/false)* | yes |
| `leaderElection.type` | string | no |
| `leaderElection.config` | string/string key value map | no |
| `leaderElection.rpc.port` | integer | yes |

---

### Examples

- [main.go](example/main.go)
- [config.yml](example/config.yml)
- [example with static membership](example/main.go)
- [static membership config](example/config.yml)
- [kubernetesStatefulSet membership config](example/config_k8s_stateful_set.yml)
- [kubernetesHa membership config](example/config_k8s_leader_election.yml)
42 changes: 33 additions & 9 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package godcpclient

import (
"fmt"
"github.com/Trendyol/go-dcp-client/helpers"
"github.com/Trendyol/go-dcp-client/serviceDiscovery"
"github.com/gofiber/fiber/v2"
"log"
)
Expand All @@ -12,9 +14,11 @@ type Api interface {
}

type api struct {
app *fiber.App
config Config
observer Observer
app *fiber.App
config helpers.Config
client Client
stream Stream
serviceDiscovery serviceDiscovery.ServiceDiscovery
}

func (s *api) Listen() {
Expand All @@ -40,26 +44,46 @@ func (s *api) Shutdown() {
}

func (s *api) status(c *fiber.Ctx) error {
_, err := s.client.Ping()

if err != nil {
return err
}

return c.SendString("OK")
}

func (s *api) observerState(c *fiber.Ctx) error {
return c.JSON(s.observer.GetState())
return c.JSON(s.stream.GetObserver().GetState())
}

func (s *api) rebalance(c *fiber.Ctx) error {
s.stream.Rebalance()

return c.SendString("OK")
}

func (s *api) followers(c *fiber.Ctx) error {
return c.JSON(s.serviceDiscovery.GetAll())
}

func NewApi(config Config, observer Observer) Api {
func NewApi(config helpers.Config, client Client, stream Stream, serviceDiscovery serviceDiscovery.ServiceDiscovery) Api {
app := fiber.New(fiber.Config{DisableStartupMessage: true})

app.Use(NewMetricMiddleware(app, config, observer))
app.Use(NewMetricMiddleware(app, config, stream.GetObserver()))

api := &api{
app: app,
config: config,
observer: observer,
app: app,
config: config,
client: client,
stream: stream,
serviceDiscovery: serviceDiscovery,
}

app.Get("/status", api.status)
app.Get("/states/observer", api.observerState)
app.Get("/states/followers", api.followers)
app.Post("/rebalance", api.rebalance)

return api
}
10 changes: 8 additions & 2 deletions asyncOp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import (
"github.com/couchbase/gocbcore/v10"
)

type AsyncOp interface {
Reject()
Resolve()
Wait(op gocbcore.PendingOp, err error) error
}

type asyncOp struct {
signal chan struct{}
wasResolved bool
Expand Down Expand Up @@ -32,10 +38,10 @@ func (m *asyncOp) Wait(op gocbcore.PendingOp, err error) error {
<-m.signal
}

return nil
return m.ctx.Err()
}

func newAsyncOp(ctx context.Context) *asyncOp {
func NewAsyncOp(ctx context.Context) AsyncOp {
if ctx == nil {
ctx = context.Background()
}
Expand Down
114 changes: 84 additions & 30 deletions cbMetadata.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
package godcpclient

import (
"context"
"encoding/json"
"errors"
"github.com/Trendyol/go-dcp-client/helpers"
"github.com/couchbase/gocbcore/v10"
"github.com/couchbase/gocbcore/v10/memd"
"log"
"time"
)

type cbMetadata struct {
agent *gocbcore.Agent
config Config
config helpers.Config
}

func (s *cbMetadata) upsertXattrs(id string, path string, xattrs interface{}) error {
opm := newAsyncOp(nil)
func (s *cbMetadata) upsertXattrs(ctx context.Context, id string, path string, xattrs interface{}) error {
opm := NewAsyncOp(ctx)

deadline, _ := ctx.Deadline()

payload, _ := json.Marshal(xattrs)

Expand All @@ -30,37 +35,57 @@ func (s *cbMetadata) upsertXattrs(id string, path string, xattrs interface{}) er
Value: payload,
},
},
Deadline: deadline,
}, func(result *gocbcore.MutateInResult, err error) {
ch <- err
opm.Resolve()

ch <- err
})

err = opm.Wait(op, err)

if err != nil {
return err
}

err = <-ch

return opm.Wait(op, err)
return err
}

func (s *cbMetadata) deleteDocument(id string) {
opm := newAsyncOp(nil)
func (s *cbMetadata) deleteDocument(ctx context.Context, id string) {
opm := NewAsyncOp(ctx)

deadline, _ := ctx.Deadline()

ch := make(chan error)

op, err := s.agent.Delete(gocbcore.DeleteOptions{
Key: []byte(id),
Key: []byte(id),
Deadline: deadline,
}, func(result *gocbcore.DeleteResult, err error) {
opm.Resolve()

ch <- err
})

err = opm.Wait(op, err)

if err != nil {
return
}

_ = opm.Wait(op, err)
err = <-ch

if err != nil {
return
}
}

func (s *cbMetadata) getXattrs(id string, path string, bucketUuid string) (CheckpointDocument, error) {
opm := newAsyncOp(nil)
func (s *cbMetadata) getXattrs(ctx context.Context, id string, path string, bucketUuid string) (CheckpointDocument, error) {
opm := NewAsyncOp(nil)

deadline, _ := ctx.Deadline()

errorCh := make(chan error)
documentCh := make(chan CheckpointDocument)
Expand All @@ -74,7 +99,10 @@ func (s *cbMetadata) getXattrs(id string, path string, bucketUuid string) (Check
Path: path,
},
},
Deadline: deadline,
}, func(result *gocbcore.LookupInResult, err error) {
opm.Resolve()

if err == nil {
document := CheckpointDocument{}
err = json.Unmarshal(result.Ops[0].Value, &document)
Expand All @@ -89,58 +117,81 @@ func (s *cbMetadata) getXattrs(id string, path string, bucketUuid string) (Check
}

errorCh <- err

opm.Resolve()
})

err = opm.Wait(op, err)

if err != nil {
return NewEmptyCheckpointDocument(bucketUuid), err
}

document := <-documentCh

err = opm.Wait(op, <-errorCh)
err = <-errorCh

return document, err
}

func (s *cbMetadata) createEmptyDocument(id string) error {
opm := newAsyncOp(nil)
func (s *cbMetadata) createEmptyDocument(ctx context.Context, id string) error {
opm := NewAsyncOp(ctx)

deadline, _ := ctx.Deadline()

ch := make(chan error)

op, err := s.agent.Set(gocbcore.SetOptions{
Key: []byte(id),
Value: []byte{},
Flags: 50333696,
Key: []byte(id),
Value: []byte{},
Flags: 50333696,
Deadline: deadline,
}, func(result *gocbcore.StoreResult, err error) {
opm.Resolve()

ch <- err
})

err = opm.Wait(op, err)

if err != nil {
return err
}

return opm.Wait(op, err)
return <-ch
}

func (s *cbMetadata) Save(state map[uint16]CheckpointDocument, _ string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

for vbId, checkpointDocument := range state {
id := helpers.GetCheckpointId(vbId, s.config.Dcp.Group.Name, s.config.UserAgent)
id := helpers.GetCheckpointId(vbId, s.config.Dcp.Group.Name)
err := s.upsertXattrs(ctx, id, helpers.Name, checkpointDocument)

var kvErr *gocbcore.KeyValueError
if err != nil && errors.As(err, &kvErr) && kvErr.StatusCode == memd.StatusKeyNotFound {
err = s.createEmptyDocument(ctx, id)

err := s.upsertXattrs(id, helpers.Name, checkpointDocument)
if err == nil {
err = s.upsertXattrs(ctx, id, helpers.Name, checkpointDocument)
}
}

if err != nil {
_ = s.createEmptyDocument(id)
_ = s.upsertXattrs(id, helpers.Name, checkpointDocument)
log.Printf("error while saving checkpoint document: %v", err)
return
}
}
}

func (s *cbMetadata) Load(vbIds []uint16, bucketUuid string) map[uint16]CheckpointDocument {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

state := map[uint16]CheckpointDocument{}

for _, vbId := range vbIds {
id := helpers.GetCheckpointId(vbId, s.config.Dcp.Group.Name, s.config.UserAgent)
id := helpers.GetCheckpointId(vbId, s.config.Dcp.Group.Name)

data, err := s.getXattrs(id, helpers.Name, bucketUuid)
data, err := s.getXattrs(ctx, id, helpers.Name, bucketUuid)

var kvErr *gocbcore.KeyValueError
if err == nil || errors.As(err, &kvErr) && kvErr.StatusCode == memd.StatusKeyNotFound {
Expand All @@ -154,14 +205,17 @@ func (s *cbMetadata) Load(vbIds []uint16, bucketUuid string) map[uint16]Checkpoi
}

func (s *cbMetadata) Clear(vbIds []uint16) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

for _, vbId := range vbIds {
id := helpers.GetCheckpointId(vbId, s.config.Dcp.Group.Name, s.config.UserAgent)
id := helpers.GetCheckpointId(vbId, s.config.Dcp.Group.Name)

s.deleteDocument(id)
s.deleteDocument(ctx, id)
}
}

func NewCBMetadata(agent *gocbcore.Agent, config Config) Metadata {
func NewCBMetadata(agent *gocbcore.Agent, config helpers.Config) Metadata {
return &cbMetadata{
agent: agent,
config: config,
Expand Down
Loading