Skip to content

Commit

Permalink
chore: refactoring (#14)
Browse files Browse the repository at this point in the history
* remove chore identity method

* init array with size for performance int vbucket_discovery

* reorganize package structure

* move function to identity.go file

* add helpers/utils test

* refactor reflection operation

* move identity to root path

* removed time.sleep removed wrapper unused go routine

* rename package

* refactor leaderelector package move it under the k8s package

* remove rpc package move code under the servicediscovery package

* remove unused import

* gofumpt

* gofumpt

* gofumpt
  • Loading branch information
mstrYoda authored Dec 15, 2022
1 parent fe6ace0 commit e67336e
Show file tree
Hide file tree
Showing 26 changed files with 193 additions and 191 deletions.
28 changes: 17 additions & 11 deletions cb_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"log"
"strconv"
"time"

"github.com/Trendyol/go-dcp-client/helpers"
Expand All @@ -17,7 +18,7 @@ type cbMetadata struct {
config helpers.Config
}

func (s *cbMetadata) upsertXattrs(ctx context.Context, id string, path string, xattrs interface{}) error {
func (s *cbMetadata) upsertXattrs(ctx context.Context, id []byte, path string, xattrs interface{}) error {
opm := NewAsyncOp(ctx)

deadline, _ := ctx.Deadline()
Expand All @@ -27,7 +28,7 @@ func (s *cbMetadata) upsertXattrs(ctx context.Context, id string, path string, x
ch := make(chan error)

op, err := s.agent.MutateIn(gocbcore.MutateInOptions{
Key: []byte(id),
Key: id,
Ops: []gocbcore.SubDocOp{
{
Op: memd.SubDocOpDictSet,
Expand All @@ -54,15 +55,15 @@ func (s *cbMetadata) upsertXattrs(ctx context.Context, id string, path string, x
return err
}

func (s *cbMetadata) deleteDocument(ctx context.Context, id string) {
func (s *cbMetadata) deleteDocument(ctx context.Context, id []byte) {
opm := NewAsyncOp(ctx)

deadline, _ := ctx.Deadline()

ch := make(chan error)

op, err := s.agent.Delete(gocbcore.DeleteOptions{
Key: []byte(id),
Key: id,
Deadline: deadline,
}, func(result *gocbcore.DeleteResult, err error) {
opm.Resolve()
Expand All @@ -83,7 +84,7 @@ func (s *cbMetadata) deleteDocument(ctx context.Context, id string) {
}
}

func (s *cbMetadata) getXattrs(ctx context.Context, id string, path string, bucketUUID string) (CheckpointDocument, error) {
func (s *cbMetadata) getXattrs(ctx context.Context, id []byte, path string, bucketUUID string) (CheckpointDocument, error) {
opm := NewAsyncOp(context.Background())

deadline, _ := ctx.Deadline()
Expand All @@ -92,7 +93,7 @@ func (s *cbMetadata) getXattrs(ctx context.Context, id string, path string, buck
documentCh := make(chan CheckpointDocument)

op, err := s.agent.LookupIn(gocbcore.LookupInOptions{
Key: []byte(id),
Key: id,
Ops: []gocbcore.SubDocOp{
{
Op: memd.SubDocOpGet,
Expand Down Expand Up @@ -132,15 +133,15 @@ func (s *cbMetadata) getXattrs(ctx context.Context, id string, path string, buck
return document, err
}

func (s *cbMetadata) createEmptyDocument(ctx context.Context, id string) error {
func (s *cbMetadata) createEmptyDocument(ctx context.Context, id []byte) error {
opm := NewAsyncOp(ctx)

deadline, _ := ctx.Deadline()

ch := make(chan error)

op, err := s.agent.Set(gocbcore.SetOptions{
Key: []byte(id),
Key: id,
Value: []byte{},
Flags: 50333696,
Deadline: deadline,
Expand All @@ -164,7 +165,7 @@ func (s *cbMetadata) Save(state map[uint16]CheckpointDocument, _ string) {
defer cancel()

for vbID, checkpointDocument := range state {
id := helpers.GetCheckpointID(vbID, s.config.Dcp.Group.Name)
id := getCheckpointID(vbID, s.config.Dcp.Group.Name)
err := s.upsertXattrs(ctx, id, helpers.Name, checkpointDocument)

var kvErr *gocbcore.KeyValueError
Expand All @@ -190,7 +191,7 @@ func (s *cbMetadata) Load(vbIds []uint16, bucketUUID string) map[uint16]Checkpoi
state := map[uint16]CheckpointDocument{}

for _, vbID := range vbIds {
id := helpers.GetCheckpointID(vbID, s.config.Dcp.Group.Name)
id := getCheckpointID(vbID, s.config.Dcp.Group.Name)

data, err := s.getXattrs(ctx, id, helpers.Name, bucketUUID)

Expand All @@ -210,7 +211,7 @@ func (s *cbMetadata) Clear(vbIds []uint16) {
defer cancel()

for _, vbID := range vbIds {
id := helpers.GetCheckpointID(vbID, s.config.Dcp.Group.Name)
id := getCheckpointID(vbID, s.config.Dcp.Group.Name)

s.deleteDocument(ctx, id)
}
Expand All @@ -222,3 +223,8 @@ func NewCBMetadata(agent *gocbcore.Agent, config helpers.Config) Metadata {
config: config,
}
}

func getCheckpointID(vbID uint16, groupName string) []byte {
// _connector:cbgo:groupName:stdout-listener:checkpoint:vbId
return []byte(helpers.Prefix + groupName + ":checkpoint:" + strconv.Itoa(int(vbID)))
}
10 changes: 4 additions & 6 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type checkpoint struct {
}

func (s *checkpoint) Save(fromSchedule bool) {
// TODO: review
s.saveLock.Lock()
defer s.saveLock.Unlock()

Expand Down Expand Up @@ -119,12 +120,9 @@ func (s *checkpoint) Clear() {
func (s *checkpoint) StartSchedule() {
go func() {
s.schedule = time.NewTicker(10 * time.Second)
go func() {
time.Sleep(10 * time.Second)
for range s.schedule.C {
s.Save(true)
}
}()
for range s.schedule.C {
s.Save(true)
}
}()
log.Printf("started checkpoint schedule")
}
Expand Down
8 changes: 4 additions & 4 deletions dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"os/signal"
"syscall"

"github.com/Trendyol/go-dcp-client/identity"

"github.com/Trendyol/go-dcp-client/helpers"
"github.com/Trendyol/go-dcp-client/kubernetes"
klem "github.com/Trendyol/go-dcp-client/kubernetes/leaderelector/model"
"github.com/Trendyol/go-dcp-client/membership/info"
"github.com/Trendyol/go-dcp-client/model"
"github.com/Trendyol/go-dcp-client/servicediscovery"
)

Expand All @@ -30,15 +30,15 @@ type dcp struct {
vBucketDiscovery VBucketDiscovery
serviceDiscovery servicediscovery.ServiceDiscovery
kubernetesClient kubernetes.Client
myIdentity *model.Identity
myIdentity *identity.Identity
}

func (s *dcp) Start() {
infoHandler := info.NewHandler()

if s.config.LeaderElection.Enabled {
if s.config.LeaderElection.Type == helpers.KubernetesLeaderElectionType {
s.myIdentity = klem.NewIdentityFromEnv()
s.myIdentity = identity.NewIdentityFromEnv()

if namespace, exist := s.config.LeaderElection.Config["leaseLockNamespace"]; exist {
s.kubernetesClient = kubernetes.NewClient(s.myIdentity, namespace)
Expand Down
6 changes: 3 additions & 3 deletions dcp_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package godcpclient

import (
"bytes"
"context"
"crypto/rand"
"fmt"
"log"
"math"
"math/big"
"os"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -170,8 +170,8 @@ func TestDcp(t *testing.T) {
counter++
lock.Unlock()

assert.True(t, strings.HasPrefix(string(event.Key), "my_key"))
assert.True(t, strings.HasPrefix(string(event.Value), "my_value"))
assert.True(t, bytes.HasPrefix(event.Key, []byte("my_key")))
assert.True(t, bytes.HasPrefix(event.Value, []byte("my_value")))

if counter == mockDataSize {
dcp.Close()
Expand Down
4 changes: 2 additions & 2 deletions example/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package main

import (
"github.com/Trendyol/go-dcp-client"
"log"

"github.com/Trendyol/go-dcp-client"
)

func listener(event interface{}, err error) {
Expand All @@ -23,7 +24,6 @@ func listener(event interface{}, err error) {

func main() {
dcp, err := godcpclient.NewDcp("config.yml", listener)

if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/ansrivas/fiberprometheus/v2 v2.4.1
github.com/avast/retry-go/v4 v4.3.1
github.com/couchbase/gocbcore/v10 v10.2.0
github.com/go-logr/logr v1.2.2
github.com/gofiber/fiber/v2 v2.39.0
github.com/google/uuid v1.3.0
github.com/gookit/config/v2 v2.1.7
Expand All @@ -32,7 +33,6 @@ require (
github.com/docker/docker v20.10.17+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/go-logr/logr v1.2.2 // indirect
github.com/gofiber/adaptor/v2 v2.1.25 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
Expand Down
18 changes: 4 additions & 14 deletions helpers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,22 @@ import (
"bytes"
"fmt"
"reflect"
"strconv"

"github.com/google/uuid"
)

func GetCheckpointID(vbID uint16, groupName string) string {
// _connector:cbgo:groupName:stdout-listener:checkpoint:vbId
return Prefix + groupName + ":checkpoint:" + strconv.Itoa(int(vbID))
}

func GetDcpStreamName(groupName string) string {
streamName := fmt.Sprintf("%s_%s", groupName, uuid.New().String())
return streamName
}

func IsMetadata(data interface{}) bool {
t := reflect.TypeOf(data)

_, exist := t.FieldByName("Key")

if exist {
key := reflect.ValueOf(data).FieldByName("Key").Bytes()
return bytes.HasPrefix(key, []byte(Prefix))
value := reflect.ValueOf(data).FieldByName("Key")
if !value.IsValid() {
return false
}

return false
return bytes.HasPrefix(value.Bytes(), []byte(Prefix))
}

func ChunkSlice[T any](slice []T, chunks int) [][]T {
Expand Down
31 changes: 31 additions & 0 deletions helpers/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package helpers

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestIsMetadata_ReturnsTrue_WhenStructHasKeyPrefix(t *testing.T) {
type ts struct {
Key []byte
}

testData := ts{
Key: []byte(Prefix + "test"),
}

assert.True(t, IsMetadata(testData))
}

func TestIsMetadata_ReturnsFalse_WhenStructHasNoKeyPrefix(t *testing.T) {
type ts struct {
X []byte
}

testData := ts{
X: []byte(Prefix + "test"),
}

assert.False(t, IsMetadata(testData))
}
31 changes: 19 additions & 12 deletions model/identity.go → identity/identity.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package model
package identity

import "encoding/json"
import (
"encoding/json"
"os"
)

type Identity struct {
IP string
Expand All @@ -16,20 +19,24 @@ func (k *Identity) String() string {
return string(str)
}

func (k *Identity) LoadFromString(str string) {
err := json.Unmarshal([]byte(str), k)
if err != nil {
panic(err)
}
}

func (k *Identity) Equal(other *Identity) bool {
return k.IP == other.IP && k.Name == other.Name
}

func NewIdentityFromStr(str string) *Identity {
identity := &Identity{}
identity.LoadFromString(str)
var identity Identity

err := json.Unmarshal([]byte(str), &identity)
if err != nil {
panic(err)
}

return identity
return &identity
}

func NewIdentityFromEnv() *Identity {
return &Identity{
IP: os.Getenv("POD_IP"),
Name: os.Getenv("POD_NAME"),
}
}
3 changes: 2 additions & 1 deletion kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package kubernetes
import (
"context"

dcpModel "github.com/Trendyol/go-dcp-client/model"
dcpModel "github.com/Trendyol/go-dcp-client/identity"

"github.com/go-logr/logr"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down
Loading

0 comments on commit e67336e

Please sign in to comment.