Skip to content

Commit

Permalink
RAFT: allow to rollback by saving the RAFT schema in the old version (w…
Browse files Browse the repository at this point in the history
…eaviate#4679)

RAFT: allow to rollback by saving the RAFT schema in the old version
  • Loading branch information
moogacs authored Apr 16, 2024
1 parent 60590f7 commit 143425f
Show file tree
Hide file tree
Showing 13 changed files with 223 additions and 92 deletions.
1 change: 1 addition & 0 deletions adapters/handlers/rest/configure_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func MakeAppState(ctx context.Context, options *swag.CommandLineOptionsGroup) *s
LogLevel: logLevel(),
IsLocalHost: appState.ServerConfig.Config.Cluster.Localhost,
LoadLegacySchema: schemaRepo.LoadLegacySchema,
SaveLegacySchema: schemaRepo.SaveLegacySchema,
}
for _, name := range appState.ServerConfig.Config.Raft.Join[:rConfig.BootstrapExpect] {
if strings.Contains(name, rConfig.NodeID) {
Expand Down
17 changes: 17 additions & 0 deletions adapters/handlers/rest/raft/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,22 @@ func (h RaftHandler) Statistics(w http.ResponseWriter, r *http.Request) {
}
}

// StoreSchemaV1 migrate from v2 (RAFT) to v1 (Non-RAFT)
func (h RaftHandler) StoreSchemaV1(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "", http.StatusMethodNotAllowed)
return
}
restore := h.schemaHandler.StoreSchemaV1()
w.Header().Set("Content-Type", "application/json")

err := json.NewEncoder(w).Encode(restore)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

// ClusterRouter returns a *mux.Router that will requests starting with "/v1/cluster".
// The schemaHandler is kept in memory internally to forward request to once parsed.
func ClusterRouter(schemaHandler schema.Handler) *http.ServeMux {
Expand All @@ -164,6 +180,7 @@ func ClusterRouter(schemaHandler schema.Handler) *http.ServeMux {
r.HandleFunc(root+"/join", raftHandler.JoinNode)
r.HandleFunc(root+"/remove", raftHandler.RemoveNode)
r.HandleFunc(root+"/statistics", raftHandler.Statistics)
r.HandleFunc(root+"/schema-v1", raftHandler.StoreSchemaV1)

return r
}
12 changes: 12 additions & 0 deletions adapters/repos/schema/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,3 +608,15 @@ func (r *store) LoadLegacySchema() (map[string]clusterStore.ClassState, error) {
}
return res, nil
}

func (r *store) SaveLegacySchema(cluster map[string]clusterStore.ClassState) error {
states := ucs.NewState(len(cluster))

for _, s := range cluster {
currState := s // new var to avoid passing pointer to s
states.ObjectSchema.Classes = append(states.ObjectSchema.Classes, &currState.Class)
states.ShardingState[s.Class.Class] = &currState.Shards
}

return r.Save(context.Background(), states)
}
45 changes: 45 additions & 0 deletions adapters/repos/schema/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,64 @@ import (
"context"
"encoding/json"
"fmt"
"reflect"
"testing"

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clusterStore "github.com/weaviate/weaviate/cluster/store"
"github.com/weaviate/weaviate/entities/models"
"github.com/weaviate/weaviate/entities/schema"

ucs "github.com/weaviate/weaviate/usecases/schema"
"github.com/weaviate/weaviate/usecases/sharding"
)

func TestSaveAndLoadSchema(t *testing.T) {
var (
ctx = context.Background()
logger, _ = test.NewNullLogger()
dirName = t.TempDir()
)

schema := ucs.NewState(2)
addClass(&schema, "C1", 0, 1, 0)
addClass(&schema, "C2", 0, 3, 3)

// Save the schema
repo, _ := newRepo(dirName, 0, logger)
defer repo.Close()

cs := map[string]clusterStore.ClassState{}
for _, s := range schema.ObjectSchema.Classes {
cs[s.Class] = clusterStore.ClassState{
Class: *s,
Shards: *schema.ShardingState[s.Class],
}
}

if err := repo.SaveLegacySchema(cs); err != nil {
t.Fatalf("save all schema: %v", err)
}

// Load the schema
loadedSchema, err := repo.Load(ctx)
if err != nil {
t.Fatalf("load schema: %v", err)
}

// Assert that the loaded schema is the same as the saved schema

if !reflect.DeepEqual(schema.ObjectSchema, loadedSchema.ObjectSchema) {
t.Errorf("loaded schema does not match saved schema")
}
if !reflect.DeepEqual(schema.ShardingState, loadedSchema.ShardingState) {
t.Errorf("loaded sharding state does not match saved sharding state")
}
}

func TestRepositoryMigrate(t *testing.T) {
var (
ctx = context.Background()
Expand Down
184 changes: 94 additions & 90 deletions cluster/proto/api/message.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions cluster/proto/api/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ message ApplyRequest {
TYPE_ADD_TENANT = 16;
TYPE_UPDATE_TENANT = 17;
TYPE_DELETE_TENANT = 18;

TYPE_STORE_SCHEMA_V1 = 99;
}
Type type = 1;
string class = 2;
Expand Down
15 changes: 15 additions & 0 deletions cluster/store/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,21 @@ func (s *schema) getTenants(class string) ([]*models.Tenant, error) {
return res, meta.RLockGuard(f)
}

func (s *schema) States() map[string]ClassState {
s.RLock()
defer s.RUnlock()

cs := make(map[string]ClassState, len(s.Classes))
for _, c := range s.Classes {
cs[c.Class.Class] = ClassState{
Class: c.Class,
Shards: c.Sharding,
}
}

return cs
}

func (s *schema) clear() {
s.Lock()
defer s.Unlock()
Expand Down
8 changes: 8 additions & 0 deletions cluster/store/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,14 @@ func (s *Service) DeleteTenants(class string, req *cmd.DeleteTenantsRequest) (ui
return s.Execute(command)
}

func (s *Service) StoreSchemaV1() error {
command := &cmd.ApplyRequest{
Type: cmd.ApplyRequest_TYPE_STORE_SCHEMA_V1,
}
_, err := s.Execute(command)
return err
}

func (s *Service) Execute(req *cmd.ApplyRequest) (uint64, error) {
if s.store.IsLeader() {
return s.store.Execute(req)
Expand Down
5 changes: 4 additions & 1 deletion cluster/store/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,7 @@ func LegacySnapshot(nodeID string, m map[string]ClassState) (*raft.SnapshotMeta,
return store.Open(sink.ID())
}

type LoadLegacySchema func() (map[string]ClassState, error)
type (
LoadLegacySchema func() (map[string]ClassState, error)
SaveLegacySchema func(map[string]ClassState) error
)
13 changes: 13 additions & 0 deletions cluster/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ type Config struct {

// LoadLegacySchema is responsible for loading old schema from boltDB
LoadLegacySchema LoadLegacySchema
// SaveLegacySchema is responsible for loading new schema into boltDB
SaveLegacySchema SaveLegacySchema
// IsLocalHost only required when running Weaviate from the console in localhost
IsLocalHost bool
}
Expand Down Expand Up @@ -170,6 +172,7 @@ type Store struct {

// LoadLegacySchema is responsible for loading old schema from boltDB
loadLegacySchema LoadLegacySchema
saveLegacySchema SaveLegacySchema
}

func New(cfg Config) Store {
Expand All @@ -195,6 +198,7 @@ func New(cfg Config) Store {

// loadLegacySchema is responsible for loading old schema from boltDB
loadLegacySchema: cfg.LoadLegacySchema,
saveLegacySchema: cfg.SaveLegacySchema,
}
}

Expand Down Expand Up @@ -332,6 +336,11 @@ func (st *Store) onLeaderFound(timeout time.Duration) {
}
}

// StoreSchemaV1() is responsible for saving new schema (RAFT) to boltDB
func (st *Store) StoreSchemaV1() error {
return st.saveLegacySchema(st.db.Schema.States())
}

func (st *Store) Close(ctx context.Context) (err error) {
if !st.open.Load() {
return nil
Expand Down Expand Up @@ -517,6 +526,10 @@ func (st *Store) Apply(l *raft.Log) interface{} {

case api.ApplyRequest_TYPE_DELETE_TENANT:
ret.Error = st.db.DeleteTenants(&cmd, schemaOnly)

case api.ApplyRequest_TYPE_STORE_SCHEMA_V1:
ret.Error = st.StoreSchemaV1()

default:
// This could occur when a new command has been introduced in a later app version
// At this point, we need to panic so that the app undergo an upgrade during restart
Expand Down
4 changes: 3 additions & 1 deletion usecases/schema/authorization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ func Test_Schema_Authorization(t *testing.T) {
// internal methods to indicate readiness state
"StartServing", "Shutdown", "Statistics",
// Cluster/nodes related endpoint
"JoinNode", "RemoveNode", "Nodes", "NodeName", "ClusterHealthScore", "ClusterStatus", "ResolveParentNodes":
"JoinNode", "RemoveNode", "Nodes", "NodeName", "ClusterHealthScore", "ClusterStatus", "ResolveParentNodes",
// revert to schema v0 (non raft)
"StoreSchemaV1":
// don't require auth on methods which are exported because other
// packages need to call them for maintenance and other regular jobs,
// but aren't user facing
Expand Down
4 changes: 4 additions & 0 deletions usecases/schema/fakes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func (f *fakeMetaHandler) Stats() map[string]string {
return map[string]string{}
}

func (f *fakeMetaHandler) StoreSchemaV1() error {
return nil
}

func (f *fakeMetaHandler) ClassEqual(name string) string {
if f.countClassEqual {
args := f.Called(name)
Expand Down
5 changes: 5 additions & 0 deletions usecases/schema/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type metaWriter interface {
Join(_ context.Context, nodeID, raftAddr string, voter bool) error
Remove(_ context.Context, nodeID string) error
Stats() map[string]string
StoreSchemaV1() error
}

type metaReader interface {
Expand Down Expand Up @@ -241,3 +242,7 @@ func (h *Handler) RemoveNode(ctx context.Context, node string) error {
func (h *Handler) Statistics() map[string]string {
return h.metaWriter.Stats()
}

func (h *Handler) StoreSchemaV1() error {
return h.metaWriter.StoreSchemaV1()
}

0 comments on commit 143425f

Please sign in to comment.