Skip to content

Commit

Permalink
Merge pull request weaviate#4943 from weaviate/deprecate-old-schema-ops
Browse files Browse the repository at this point in the history
deprecate old schema impl. before RAFT
  • Loading branch information
moogacs authored May 22, 2024
2 parents c4685e3 + 215de88 commit 9e924cc
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 319 deletions.
116 changes: 3 additions & 113 deletions adapters/repos/schema/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ func initBoltDB(filePath string, version int, cfg *config) (*bolt.DB, error) {
}

// Open the underlying DB
// Deprecated: instead schema now is persistent via RAFT
// see : cluster package
// Load and save are left to support backward compatibility
func (r *store) Open() (err error) {
if err := os.MkdirAll(r.homeDir, 0o777); err != nil {
return fmt.Errorf("create root directory %q: %w", r.homeDir, err)
Expand Down Expand Up @@ -224,32 +227,6 @@ func (r *store) loadSchemaV1() (*ucs.State, error) {
return &state, nil
}

// UpdateClass if it exists, otherwise return an error.
func (r *store) UpdateClass(_ context.Context, data ucs.ClassPayload) error {
classKey := encodeClassName(data.Name)
f := func(tx *bolt.Tx) error {
b := tx.Bucket(schemaBucket).Bucket(classKey)
if b == nil {
return fmt.Errorf("class not found")
}
return r.updateClass(b, data)
}
return r.db.Update(f)
}

// NewClass creates a new class if it doesn't exist, otherwise return an error
func (r *store) NewClass(_ context.Context, data ucs.ClassPayload) error {
classKey := encodeClassName(data.Name)
f := func(tx *bolt.Tx) error {
b, err := tx.Bucket(schemaBucket).CreateBucket(classKey)
if err != nil {
return err
}
return r.updateClass(b, data)
}
return r.db.Update(f)
}

func (r *store) updateClass(b *bolt.Bucket, data ucs.ClassPayload) error {
// remove old shards
if data.ReplaceShards {
Expand All @@ -276,65 +253,6 @@ func (r *store) updateClass(b *bolt.Bucket, data ucs.ClassPayload) error {
return appendShards(b, data.Shards, make([]byte, 1, 68))
}

// DeleteClass class
func (r *store) DeleteClass(_ context.Context, class string) error {
classKey := encodeClassName(class)
f := func(tx *bolt.Tx) error {
err := tx.Bucket(schemaBucket).DeleteBucket(classKey)
if err != nil && !errors.Is(err, bolt.ErrBucketNotFound) {
return err
}
return nil
}
return r.db.Update(f)
}

// NewShards add new shards to an existing class
func (r *store) NewShards(_ context.Context, class string, shards []ucs.KeyValuePair) error {
classKey := encodeClassName(class)
f := func(tx *bolt.Tx) error {
b := tx.Bucket(schemaBucket).Bucket(classKey)
if b == nil {
return fmt.Errorf("class not found")
}
return appendShards(b, shards, make([]byte, 1, 68))
}
return r.db.Update(f)
}

// Update shards updates (replaces) shards of existing class
// Error is returned if class or shard does not exist
func (r *store) UpdateShards(_ context.Context, class string, shards []ucs.KeyValuePair) error {
classKey := encodeClassName(class)
f := func(tx *bolt.Tx) error {
b := tx.Bucket(schemaBucket).Bucket(classKey)
if b == nil {
return fmt.Errorf("class not found")
}
keyBuf := make([]byte, 1, 68)
if !existShards(b, shards, keyBuf) {
return fmt.Errorf("shard not found")
}
return appendShards(b, shards, keyBuf)
}
return r.db.Update(f)
}

// DeleteShards of a specific class
//
// If the class or a shard does not exist then nothing is done and a nil error is returned
func (r *store) DeleteShards(_ context.Context, class string, shards []string) error {
classKey := encodeClassName(class)
f := func(tx *bolt.Tx) error {
b := tx.Bucket(schemaBucket).Bucket(classKey)
if b == nil {
return nil
}
return deleteShards(b, shards, make([]byte, 1, 68))
}
return r.db.Update(f)
}

// Load loads the complete schema from the persistent storage
func (r *store) Load(ctx context.Context) (ucs.State, error) {
state := ucs.NewState(32)
Expand Down Expand Up @@ -513,19 +431,6 @@ func saveConfig(root *bolt.Bucket, cfg config) error {
return nil
}

func existShards(b *bolt.Bucket, shards []ucs.KeyValuePair, keyBuf []byte) bool {
keyBuf[0] = eTypeShard
for _, pair := range shards {
kLen := len(pair.Key) + 1
keyBuf = append(keyBuf, pair.Key...)
if val := b.Get(keyBuf[:kLen]); val == nil {
return false
}
keyBuf = keyBuf[:1]
}
return true
}

func appendShards(b *bolt.Bucket, shards []ucs.KeyValuePair, key []byte) error {
key[0] = eTypeShard
for _, pair := range shards {
Expand All @@ -539,19 +444,6 @@ func appendShards(b *bolt.Bucket, shards []ucs.KeyValuePair, key []byte) error {
return nil
}

func deleteShards(b *bolt.Bucket, shards []string, keyBuf []byte) error {
keyBuf[0] = eTypeShard
for _, name := range shards {
kLen := len(name) + 1
keyBuf = append(keyBuf, name...)
if err := b.Delete(keyBuf[:kLen]); err != nil {
return err
}
keyBuf = keyBuf[:1]
}
return nil
}

func encodeClassName(name string) []byte {
len := len(name) + 1
buf := make([]byte, 1, len)
Expand All @@ -568,8 +460,6 @@ func copyFile(dst, src string) error {
return os.WriteFile(dst, data, 0o644)
}

// var _ = schemauc.Repo(&Repo{})

func createClassPayload(class *models.Class,
shardingState *sharding.State,
) (pl schema.ClassPayload, err error) {
Expand Down
186 changes: 0 additions & 186 deletions adapters/repos/schema/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@ package schema

import (
"context"
"encoding/json"
"fmt"
"reflect"
"testing"

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clusterStore "github.com/weaviate/weaviate/cluster/store"
"github.com/weaviate/weaviate/entities/models"
"github.com/weaviate/weaviate/entities/schema"
Expand Down Expand Up @@ -163,141 +161,6 @@ func TestRepositorySaveLoad(t *testing.T) {
t.Fatalf("load schema: %v", err)
}
assert.Equal(t, schema, res)

// delete class
deleteClass(&schema, "C2")
repo.DeleteClass(ctx, "C2") // second call to test impotency
if err := repo.DeleteClass(ctx, "C2"); err != nil {
t.Errorf("delete bucket: %v", err)
}
repo.asserEqualSchema(t, schema, "delete class")
}

func TestRepositoryUpdateClass(t *testing.T) {
var (
ctx = context.Background()
logger, _ = test.NewNullLogger()
dirName = t.TempDir()
)
repo, err := newRepo(dirName, -1, logger)
if err != nil {
t.Fatalf("create new repo: %v", err)
}

// save and load non empty schema
schema := ucs.NewState(3)
cls, ss := addClass(&schema, "C1", 0, 1, 0)
payload, err := createClassPayload(cls, ss)
assert.Nil(t, err)
if err := repo.NewClass(ctx, payload); err != nil {
t.Fatalf("create new class: %v", err)
}
if err := repo.NewClass(ctx, payload); err == nil {
t.Fatal("create new class: must fail since class already exists")
}
repo.asserEqualSchema(t, schema, "create class")

// update class
deleteClass(&schema, "C1")
cls, ss = addClass(&schema, "C1", 0, 2, 1)

payload, err = createClassPayload(cls, ss)
assert.Nil(t, err)
payload.Name = "C3"
if err := repo.UpdateClass(ctx, payload); err == nil {
t.Fatal("updating class by adding shards to non existing class must fail")
}
payload.Name = "C1"
if err := repo.UpdateClass(ctx, payload); err != nil {
t.Errorf("update class: %v", err)
}
repo.asserEqualSchema(t, schema, "update class")

// overwrite class
deleteClass(&schema, "C1")
cls, ss = addClass(&schema, "C1", 2, 2, 3)
payload, err = createClassPayload(cls, ss)
assert.Nil(t, err)
payload.ReplaceShards = true
if err := repo.UpdateClass(ctx, payload); err != nil {
t.Errorf("update class: %v", err)
}
repo.asserEqualSchema(t, schema, "overwrite class")

// delete class
deleteClass(&schema, "C1")
repo.DeleteClass(ctx, "C1") // second call to test impotency
if err := repo.DeleteClass(ctx, "C1"); err != nil {
t.Errorf("delete bucket: %v", err)
}
repo.asserEqualSchema(t, schema, "delete class")
}

func TestRepositoryUpdateShards(t *testing.T) {
var (
ctx = context.Background()
logger, _ = test.NewNullLogger()
dirName = t.TempDir()
)
repo, err := newRepo(dirName, -1, logger)
if err != nil {
t.Fatalf("create new repo: %v", err)
}

schema := ucs.NewState(2)
cls, ss := addClass(&schema, "C1", 0, 2, 1)
payload, err := createClassPayload(cls, ss)
assert.Nil(t, err)
if err := repo.NewClass(ctx, payload); err != nil {
t.Errorf("update class: %v", err)
}
repo.asserEqualSchema(t, schema, "update class")

// add two shards
deleteClass(&schema, "C1")
_, ss = addClass(&schema, "C1", 0, 2, 5)
shards := serializeShards(ss.Physical)
if err := repo.NewShards(ctx, "C1", shards); err != nil {
t.Fatalf("add new shards: %v", err)
}
if err := repo.NewShards(ctx, "C3", shards); err == nil {
t.Fatal("add new shards to a non existing class must fail")
}
repo.asserEqualSchema(t, schema, "adding new shards")

t.Run("fails updating non existent shards", func(t *testing.T) {
nonExistentShards := createShards(4, 2, models.TenantActivityStatusCOLD)
nonExistentShardPairs := serializeShards(nonExistentShards)
err := repo.UpdateShards(ctx, "C1", nonExistentShardPairs)
require.NotNil(t, err)
assert.ErrorContains(t, err, "shard not found")
})

existentShards := createShards(3, 2, models.TenantActivityStatusCOLD)
existentShardPairs := serializeShards(existentShards)

t.Run("fails updating shards of non existent class", func(t *testing.T) {
err := repo.UpdateShards(ctx, "ClassNonExistent", existentShardPairs)
require.NotNil(t, err)
assert.ErrorContains(t, err, "class not found")
})
t.Run("succeeds updating shards", func(t *testing.T) {
err := repo.UpdateShards(ctx, "C1", existentShardPairs)
require.Nil(t, err)

replaceShards(ss, existentShards)
repo.asserEqualSchema(t, schema, "update shards")
})

xset := removeShards(ss, []int{0, 3, 4})
if err := repo.DeleteShards(ctx, "C1", xset); err != nil {
t.Fatalf("delete shards: %v", err)
}
repo.asserEqualSchema(t, schema, "remove shards")

if err := repo.DeleteShards(ctx, "C3", xset); err != nil {
t.Fatalf("delete shards from unknown class: %v", err)
}
}

func createClass(name string, start, nProps, nShards int) (models.Class, sharding.State) {
Expand Down Expand Up @@ -334,22 +197,6 @@ func createShards(start, nShards int, activityStatus string) map[string]sharding
return shards
}

func replaceShards(ss *sharding.State, shards map[string]sharding.Physical) {
for name, shard := range shards {
ss.Physical[name] = shard
}
}

func removeShards(ss *sharding.State, shards []int) []string {
res := make([]string, len(shards))
for i, j := range shards {
name := fmt.Sprintf("shard-%d", j)
delete(ss.Physical, name)
res[i] = name
}
return res
}

func addClass(schema *ucs.State, name string, start, nProps, nShards int) (*models.Class, *sharding.State) {
cls, ss := createClass(name, start, nProps, nShards)
if schema.ObjectSchema == nil {
Expand All @@ -363,39 +210,6 @@ func addClass(schema *ucs.State, name string, start, nProps, nShards int) (*mode
return &cls, &ss
}

func deleteClass(schema *ucs.State, name string) {
idx := -1
for i, cls := range schema.ObjectSchema.Classes {
if cls.Class == name {
idx = i
break
}
}
if idx == -1 {
return
}
schema.ObjectSchema.Classes = append(schema.ObjectSchema.Classes[:idx], schema.ObjectSchema.Classes[idx+1:]...)
delete(schema.ShardingState, name)
}

func (r *store) asserEqualSchema(t *testing.T, expected ucs.State, msg string) {
t.Helper()
actual, err := r.Load(context.Background())
if err != nil {
t.Fatalf("load schema: %s: %v", msg, err)
}
assert.Equal(t, expected, actual)
}

func serializeShards(shards map[string]sharding.Physical) []ucs.KeyValuePair {
xs := make([]ucs.KeyValuePair, 0, len(shards))
for k, v := range shards {
val, _ := json.Marshal(&v)
xs = append(xs, ucs.KeyValuePair{Key: k, Value: val})
}
return xs
}

func newRepo(homeDir string, version int, logger logrus.FieldLogger) (*store, error) {
r := NewStore(homeDir, logger)
if version > -1 {
Expand Down
Loading

0 comments on commit 9e924cc

Please sign in to comment.