Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix couchbase member index query and field alignment #22

Merged
merged 1 commit into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ type checkpoint struct {
metadata Metadata
schedule *time.Ticker
bucketUUID string
config helpers.Config
vbIds []uint16
config helpers.Config
saveLock sync.Mutex
loadLock sync.Mutex
}
Expand Down
2 changes: 1 addition & 1 deletion dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ type dcp struct {
vBucketDiscovery VBucketDiscovery
serviceDiscovery servicediscovery.ServiceDiscovery
listener models.Listener
config helpers.Config
apiShutdown chan struct{}
cancelCh chan os.Signal
stopCh chan struct{}
healCheckFailedCh chan struct{}
config helpers.Config
}

func (s *dcp) getCollectionIDs() map[uint32]string {
Expand Down
16 changes: 6 additions & 10 deletions dcp/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,13 @@ type ObserverMetric struct {

type observer struct {
currentSnapshots map[uint16]*models.SnapshotMarker
uuIDs map[uint16]gocbcore.VbUUID
metrics map[uint16]ObserverMetric
collectionIDs map[uint32]string
listenerCh models.ListenerCh
endCh chan models.DcpStreamEnd
currentSnapshotsLock sync.Mutex

uuIDs map[uint16]gocbcore.VbUUID

metrics map[uint16]ObserverMetric
metricsLock sync.Mutex

collectionIDs map[uint32]string

listenerCh models.ListenerCh
endCh chan models.DcpStreamEnd
metricsLock sync.Mutex
}

func (so *observer) convertToCollectionName(collectionID uint32) *string {
Expand Down
2 changes: 1 addition & 1 deletion helpers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ type ConfigDCPListener struct {
}

type ConfigDCP struct {
BufferSizeKb int `yaml:"bufferSizeKb" default:"16384"` // 16MB
Group ConfigDCPGroup `yaml:"group"`
BufferSizeKb int `yaml:"bufferSizeKb" default:"16384"`
Listener ConfigDCPListener `yaml:"listener"`
}

Expand Down
8 changes: 4 additions & 4 deletions membership/cb_membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ import (
)

type cbMembership struct {
client dcp.Client
handler info.Handler
info *info.Model
infoChan chan *info.Model
client dcp.Client
id []byte
clusterJoinTime int64
lastActiveInstances []Instance
handler info.Handler
monitorQuery []byte
indexQuery []byte
clusterJoinTime int64
}

type Instance struct {
Expand Down Expand Up @@ -181,7 +181,7 @@ func getIndexQuery(metadataBucket string) []byte {
var query []byte

query = append(query, []byte("CREATE INDEX ")...)
query = append(query, []byte("ids_metadata_instance IF NOT EXISTS on ")...)
query = append(query, []byte("ids_metadata_instance on ")...)
query = append(query, []byte("`")...)
query = append(query, []byte(metadataBucket)...)
query = append(query, []byte("`(`type`)")...)
Expand Down
6 changes: 3 additions & 3 deletions models/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@ type SnapshotMarker struct {
}

type InternalDcpMutation struct {
CollectionName *string
gocbcore.DcpMutation
Offset
CollectionName *string
}

type InternalDcpDeletion struct {
CollectionName *string
gocbcore.DcpDeletion
Offset
CollectionName *string
}

type InternalDcpExpiration struct {
CollectionName *string
gocbcore.DcpExpiration
Offset
CollectionName *string
}

func (i *InternalDcpMutation) IsCreated() bool {
Expand Down
12 changes: 6 additions & 6 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,20 @@ type stream struct {
client gDcp.Client
metadata Metadata
checkpoint Checkpoint
offsetsLock sync.Mutex
metric StreamMetric
observer gDcp.Observer
vBucketDiscovery VBucketDiscovery
collectionIDs map[uint32]string
listener models.Listener
offsets map[uint16]models.Offset
dirty bool
stopCh chan struct{}
config helpers.Config
listener models.Listener
activeStreams sync.WaitGroup
streamsLock sync.Mutex
collectionIDs map[uint32]string
metric StreamMetric
offsetsLock sync.Mutex
rebalanceLock sync.Mutex
dirty bool
balancing bool
stopCh chan struct{}
}

func (s *stream) setOffset(vbID uint16, offset models.Offset) {
Expand Down