Skip to content

Commit

Permalink
Fix couchbase member index query and field alignment (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
mhmtszr authored Mar 9, 2023
1 parent 2051619 commit 5427d75
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 26 deletions.
2 changes: 1 addition & 1 deletion checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ type checkpoint struct {
metadata Metadata
schedule *time.Ticker
bucketUUID string
config helpers.Config
vbIds []uint16
config helpers.Config
saveLock sync.Mutex
loadLock sync.Mutex
}
Expand Down
2 changes: 1 addition & 1 deletion dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ type dcp struct {
vBucketDiscovery VBucketDiscovery
serviceDiscovery servicediscovery.ServiceDiscovery
listener models.Listener
config helpers.Config
apiShutdown chan struct{}
cancelCh chan os.Signal
stopCh chan struct{}
healCheckFailedCh chan struct{}
config helpers.Config
}

func (s *dcp) getCollectionIDs() map[uint32]string {
Expand Down
16 changes: 6 additions & 10 deletions dcp/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,13 @@ type ObserverMetric struct {

type observer struct {
currentSnapshots map[uint16]*models.SnapshotMarker
uuIDs map[uint16]gocbcore.VbUUID
metrics map[uint16]ObserverMetric
collectionIDs map[uint32]string
listenerCh models.ListenerCh
endCh chan models.DcpStreamEnd
currentSnapshotsLock sync.Mutex

uuIDs map[uint16]gocbcore.VbUUID

metrics map[uint16]ObserverMetric
metricsLock sync.Mutex

collectionIDs map[uint32]string

listenerCh models.ListenerCh
endCh chan models.DcpStreamEnd
metricsLock sync.Mutex
}

func (so *observer) convertToCollectionName(collectionID uint32) *string {
Expand Down
2 changes: 1 addition & 1 deletion helpers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ type ConfigDCPListener struct {
}

type ConfigDCP struct {
BufferSizeKb int `yaml:"bufferSizeKb" default:"16384"` // 16MB
Group ConfigDCPGroup `yaml:"group"`
BufferSizeKb int `yaml:"bufferSizeKb" default:"16384"`
Listener ConfigDCPListener `yaml:"listener"`
}

Expand Down
8 changes: 4 additions & 4 deletions membership/cb_membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ import (
)

type cbMembership struct {
client dcp.Client
handler info.Handler
info *info.Model
infoChan chan *info.Model
client dcp.Client
id []byte
clusterJoinTime int64
lastActiveInstances []Instance
handler info.Handler
monitorQuery []byte
indexQuery []byte
clusterJoinTime int64
}

type Instance struct {
Expand Down Expand Up @@ -181,7 +181,7 @@ func getIndexQuery(metadataBucket string) []byte {
var query []byte

query = append(query, []byte("CREATE INDEX ")...)
query = append(query, []byte("ids_metadata_instance IF NOT EXISTS on ")...)
query = append(query, []byte("ids_metadata_instance on ")...)
query = append(query, []byte("`")...)
query = append(query, []byte(metadataBucket)...)
query = append(query, []byte("`(`type`)")...)
Expand Down
6 changes: 3 additions & 3 deletions models/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@ type SnapshotMarker struct {
}

type InternalDcpMutation struct {
CollectionName *string
gocbcore.DcpMutation
Offset
CollectionName *string
}

type InternalDcpDeletion struct {
CollectionName *string
gocbcore.DcpDeletion
Offset
CollectionName *string
}

type InternalDcpExpiration struct {
CollectionName *string
gocbcore.DcpExpiration
Offset
CollectionName *string
}

func (i *InternalDcpMutation) IsCreated() bool {
Expand Down
12 changes: 6 additions & 6 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,20 @@ type stream struct {
client gDcp.Client
metadata Metadata
checkpoint Checkpoint
offsetsLock sync.Mutex
metric StreamMetric
observer gDcp.Observer
vBucketDiscovery VBucketDiscovery
collectionIDs map[uint32]string
listener models.Listener
offsets map[uint16]models.Offset
dirty bool
stopCh chan struct{}
config helpers.Config
listener models.Listener
activeStreams sync.WaitGroup
streamsLock sync.Mutex
collectionIDs map[uint32]string
metric StreamMetric
offsetsLock sync.Mutex
rebalanceLock sync.Mutex
dirty bool
balancing bool
stopCh chan struct{}
}

func (s *stream) setOffset(vbID uint16, offset models.Offset) {
Expand Down

0 comments on commit 5427d75

Please sign in to comment.