Skip to content

Commit

Permalink
Merge pull request weaviate#4878 from weaviate/refactor-db-open-3
Browse files Browse the repository at this point in the history
raft: open db on service.Open instead on apply
  • Loading branch information
reyreaud-l authored May 10, 2024
2 parents df50575 + 7c9b231 commit 9a018c7
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 45 deletions.
6 changes: 2 additions & 4 deletions cluster/store/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,8 @@ func (db *localDB) apply(op applyOp) error {
return fmt.Errorf("%w: %s: %w", errSchema, op.op, err)
}

if !op.schemaOnly {
if err := op.updateStore(); err != nil {
return fmt.Errorf("%w: %s: %w", errDB, op.op, err)
}
if err := op.updateStore(); err != nil {
return fmt.Errorf("%w: %s: %w", errDB, op.op, err)
}

// Always trigger the schema callback last
Expand Down
17 changes: 6 additions & 11 deletions cluster/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,6 @@ func (st *Store) Open(ctx context.Context) (err error) {
return fmt.Errorf("read log last command: %w", err)
}

if st.initialLastAppliedIndex == 0 { // empty node
st.openDatabase(ctx)
}

st.log.WithFields(logrus.Fields{
"name": st.nodeID,
"metadata_only_voters": st.metadataOnlyVoters,
Expand All @@ -256,7 +252,13 @@ func (st *Store) Open(ctx context.Context) (err error) {
if err != nil {
return fmt.Errorf("raft.NewRaft %v %w", st.transport.LocalAddr(), err)
}
if st.initialLastAppliedIndex <= st.raft.LastIndex() {
// this should include empty and non empty node
st.openDatabase(ctx)
}

st.lastAppliedIndex.Store(st.raft.AppliedIndex())

st.log.WithFields(logrus.Fields{
"raft_applied_index": st.raft.AppliedIndex(),
"raft_last_index": st.raft.LastIndex(),
Expand Down Expand Up @@ -640,12 +642,6 @@ func (st *Store) Apply(l *raft.Log) interface{} {
schemaOnly := l.Index <= st.initialLastAppliedIndex
defer func() {
st.lastAppliedIndex.Store(l.Index)
// If the local db has not been loaded, wait until we reach the state
// from the local raft log before loading the db.
// This is necessary because the database operations are not idempotent
if !st.dbLoaded.Load() && l.Index >= st.initialLastAppliedIndex {
st.openDatabase(context.Background())
}
if ret.Error != nil {
st.log.WithFields(logrus.Fields{
"type": l.Type,
Expand Down Expand Up @@ -868,7 +864,6 @@ func (st *Store) openDatabase(ctx context.Context) {
// reloadDBFromSnapshot reloads the node's local db. If the db is already loaded, it will be reloaded.
// If a snapshot exists and its is up to date with the log, it will be loaded.
// Otherwise, the database will be loaded when the node synchronizes its state with the leader.
// For more details, see apply() -> loadDatabase().
//
// In specific scenarios where the follower's state is too far behind the leader's log,
// the leader may decide to send a snapshot. Consequently, the follower must update its state accordingly.
Expand Down
71 changes: 41 additions & 30 deletions cluster/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,6 @@ func TestServicePanics(t *testing.T) {

func TestStoreApply(t *testing.T) {
doFirst := func(m *MockStore) {
m.indexer.On("Open", mock.Anything).Return(nil)
m.parser.On("ParseClass", mock.Anything).Return(nil)
m.indexer.On("TriggerSchemaUpdateCallbacks").Return()
}
Expand Down Expand Up @@ -449,8 +448,12 @@ func TestStoreApply(t *testing.T) {
cmd.ApplyRequest_TYPE_ADD_CLASS,
cmd.AddClassRequest{Class: cls, State: ss},
nil)},
resp: Response{Error: nil},
doBefore: doFirst,
resp: Response{Error: nil},
doBefore: func(m *MockStore) {
m.indexer.On("AddClass", mock.Anything).Return(nil)
m.parser.On("ParseClass", mock.Anything).Return(nil)
m.indexer.On("TriggerSchemaUpdateCallbacks").Return()
},
doAfter: func(ms *MockStore) error {
_, ok := ms.store.db.Schema.Classes["C1"]
if !ok {
Expand Down Expand Up @@ -495,9 +498,9 @@ func TestStoreApply(t *testing.T) {
nil)},
resp: Response{Error: nil},
doBefore: func(m *MockStore) {
m.indexer.On("Open", mock.Anything).Return(nil)
m.parser.On("ParseClass", mock.Anything).Return(nil)
m.indexer.On("RestoreClassDir", cls.Class).Return(nil)
m.indexer.On("AddClass", mock.Anything).Return(nil)
m.indexer.On("TriggerSchemaUpdateCallbacks").Return()
},
doAfter: func(ms *MockStore) error {
Expand Down Expand Up @@ -550,6 +553,7 @@ func TestStoreApply(t *testing.T) {
doBefore: func(m *MockStore) {
m.indexer.On("Open", mock.Anything).Return(nil)
m.parser.On("ParseClassUpdate", mock.Anything, mock.Anything).Return(mock.Anything, nil)
m.indexer.On("UpdateClass", mock.Anything).Return(nil)
m.store.db.Schema.addClass(cls, ss, 1)
m.indexer.On("TriggerSchemaUpdateCallbacks").Return()
},
Expand All @@ -561,7 +565,7 @@ func TestStoreApply(t *testing.T) {
nil)},
resp: Response{Error: nil},
doBefore: func(m *MockStore) {
m.indexer.On("Open", mock.Anything).Return(nil)
m.indexer.On("DeleteClass", mock.Anything).Return(nil)
m.indexer.On("TriggerSchemaUpdateCallbacks").Return()
},
doAfter: func(ms *MockStore) error {
Expand Down Expand Up @@ -605,8 +609,8 @@ func TestStoreApply(t *testing.T) {
},
resp: Response{Error: nil},
doBefore: func(m *MockStore) {
m.indexer.On("Open", mock.Anything).Return(nil)
m.store.db.Schema.addClass(cls, ss, 1)
m.indexer.On("AddProperty", mock.Anything, mock.Anything).Return(nil)
m.indexer.On("TriggerSchemaUpdateCallbacks").Return()
},
doAfter: func(ms *MockStore) error {
Expand Down Expand Up @@ -634,8 +638,12 @@ func TestStoreApply(t *testing.T) {
name: "UpdateShard/Success",
req: raft.Log{Data: cmdAsBytes("C1", cmd.ApplyRequest_TYPE_UPDATE_SHARD_STATUS,
cmd.UpdateShardStatusRequest{Class: "C1"}, nil)},
resp: Response{Error: nil},
doBefore: doFirst,
resp: Response{Error: nil},
doBefore: func(m *MockStore) {
m.parser.On("ParseClass", mock.Anything).Return(nil)
m.indexer.On("UpdateShardStatus", mock.Anything).Return(nil)
m.indexer.On("TriggerSchemaUpdateCallbacks").Return()
},
},
{
name: "AddTenant/Unmarshal",
Expand All @@ -659,10 +667,11 @@ func TestStoreApply(t *testing.T) {
})},
resp: Response{Error: nil},
doBefore: func(m *MockStore) {
m.indexer.On("Open", mock.Anything).Return(nil)
m.store.db.Schema.addClass(cls, &sharding.State{
Physical: map[string]sharding.Physical{"T1": {}},
}, 1)

m.indexer.On("AddTenants", mock.Anything, mock.Anything).Return(nil)
},
doAfter: func(ms *MockStore) error {
if _, ok := ms.store.db.Schema.Classes["C1"].Sharding.Physical["T1"]; !ok {
Expand Down Expand Up @@ -720,8 +729,8 @@ func TestStoreApply(t *testing.T) {
BelongsToNodes: []string{"NODE-2"},
Status: models.TenantActivityStatusHOT,
}}}
m.indexer.On("Open", mock.Anything).Return(nil)
m.store.db.Schema.addClass(cls, ss, 1)
m.indexer.On("UpdateTenants", mock.Anything, mock.Anything).Return(nil)
},
doAfter: func(ms *MockStore) error {
want := map[string]sharding.Physical{"T1": {
Expand Down Expand Up @@ -763,8 +772,8 @@ func TestStoreApply(t *testing.T) {
nil, &cmd.DeleteTenantsRequest{Tenants: []string{"T1", "T2"}})},
resp: Response{Error: nil},
doBefore: func(m *MockStore) {
m.indexer.On("Open", mock.Anything).Return(nil)
m.store.db.Schema.addClass(cls, &sharding.State{Physical: map[string]sharding.Physical{"T1": {}}}, 1)
m.indexer.On("DeleteTenants", mock.Anything, mock.Anything).Return(nil)
},
doAfter: func(ms *MockStore) error {
if len(ms.store.db.Schema.Classes["C1"].Sharding.Physical) != 0 {
Expand All @@ -776,27 +785,29 @@ func TestStoreApply(t *testing.T) {
}

for _, tc := range tests {
m := NewMockStore(t, "Node-1", 9091)
store := m.Store(tc.doBefore)
ret := store.Apply(&tc.req)
resp, ok := ret.(Response)
if !ok {
t.Errorf("%s: response has wrong type", tc.name)
}
if got, want := resp.Error, tc.resp.Error; want != nil {
if !errors.Is(resp.Error, tc.resp.Error) {
t.Errorf("%s: error want: %v got: %v", tc.name, want, got)
t.Run(tc.name, func(t *testing.T) {
m := NewMockStore(t, "Node-1", 9091)
store := m.Store(tc.doBefore)
ret := store.Apply(&tc.req)
resp, ok := ret.(Response)
if !ok {
t.Errorf("%s: response has wrong type", tc.name)
}
} else if got != nil {
t.Errorf("%s: error want: nil got: %v", tc.name, got)
}
if tc.doAfter != nil {
if err := tc.doAfter(&m); err != nil {
t.Errorf("%s check updates: %v", tc.name, err)
if got, want := resp.Error, tc.resp.Error; want != nil {
if !errors.Is(resp.Error, tc.resp.Error) {
t.Errorf("%s: error want: %v got: %v", tc.name, want, got)
}
} else if got != nil {
t.Errorf("%s: error want: nil got: %v", tc.name, got)
}
m.indexer.AssertExpectations(t)
m.parser.AssertExpectations(t)
}
if tc.doAfter != nil {
if err := tc.doAfter(&m); err != nil {
t.Errorf("%s check updates: %v", tc.name, err)
}
m.indexer.AssertExpectations(t)
m.parser.AssertExpectations(t)
}
})
}
}

Expand Down

0 comments on commit 9a018c7

Please sign in to comment.