Skip to content

Commit

Permalink
Merge branch 'main' into deprecate-old-schema-ops
Browse files Browse the repository at this point in the history
  • Loading branch information
moogacs authored May 22, 2024
2 parents 24fbc8d + c4685e3 commit 215de88
Show file tree
Hide file tree
Showing 164 changed files with 4,229 additions and 1,915 deletions.
6 changes: 3 additions & 3 deletions adapters/handlers/graphql/local/local_component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/weaviate/weaviate/entities/models"
"github.com/weaviate/weaviate/entities/schema"
"github.com/weaviate/weaviate/usecases/config"
"github.com/weaviate/weaviate/usecases/modules"
usecaseModules "github.com/weaviate/weaviate/usecases/modules"
)

// These tests are component tests for the local package including all its
Expand Down Expand Up @@ -207,7 +207,7 @@ type testCases []testCase
func (tests testCases) AssertNoError(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
modules := modules.NewProvider()
modules := usecaseModules.NewProvider()
localSchema, err := Build(&test.localSchema, nil, config.Config{}, modules)
require.Nil(t, err, test.name)

Expand Down Expand Up @@ -240,7 +240,7 @@ func (tests testCases) AssertNoError(t *testing.T) {
func (tests testCases) AssertErrorLogs(t *testing.T, expectedMsg string) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
modules := modules.NewProvider()
modules := usecaseModules.NewProvider()
logger, logsHook := logrus.NewNullLogger()
localSchema, err := Build(&test.localSchema, logger, config.Config{}, modules)
require.Nil(t, err, test.name)
Expand Down
6 changes: 0 additions & 6 deletions adapters/handlers/grpc/v1/parse_search_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,12 +368,6 @@ func extractTargetVectors(req *pb.SearchRequest, class *models.Class) (*[]string
var targetVectors *[]string
if hs := req.HybridSearch; hs != nil {
targetVectors = &hs.TargetVectors
if hs.NearText != nil {
targetVectors = &hs.NearText.TargetVectors
}
if hs.NearVector != nil {
targetVectors = &hs.NearVector.TargetVectors
}
}
if na := req.NearAudio; na != nil {
targetVectors = &na.TargetVectors
Expand Down
18 changes: 9 additions & 9 deletions adapters/handlers/grpc/v1/parse_search_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,11 @@ func TestGRPCRequest(t *testing.T) {
Alpha: 1.0,
Query: "nearvecquery",
NearVector: &pb.NearVector{
VectorBytes: byteops.Float32ToByteVector([]float32{1, 2, 3}),
TargetVectors: []string{"custom"},
Certainty: &one,
Distance: &one,
VectorBytes: byteops.Float32ToByteVector([]float32{1, 2, 3}),
Certainty: &one,
Distance: &one,
},
TargetVectors: []string{"custom"},
},
},
out: dto.GetParams{
Expand All @@ -221,12 +221,12 @@ func TestGRPCRequest(t *testing.T) {
Query: "nearvecquery",
FusionAlgorithm: 1,
NearVectorParams: &searchparams.NearVector{
Vector: []float32{1, 2, 3},
TargetVectors: []string{"custom"},
Certainty: 1.0,
Distance: 1.0,
WithDistance: true,
Vector: []float32{1, 2, 3},
Certainty: 1.0,
Distance: 1.0,
WithDistance: true,
},
TargetVectors: []string{"custom"},
},
},
error: false,
Expand Down
4 changes: 2 additions & 2 deletions adapters/handlers/grpc/v1/tenants.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (s *Service) tenantsGet(ctx context.Context, principal *models.Principal, r
var err error
var tenants []*models.Tenant
if req.Params == nil {
tenants, err = s.schemaManager.GetConsistentTenants(ctx, principal, req.Collection, req.IsConsistent, []string{})
tenants, err = s.schemaManager.GetConsistentTenants(ctx, principal, req.Collection, true, []string{})
if err != nil {
return nil, err
}
Expand All @@ -38,7 +38,7 @@ func (s *Service) tenantsGet(ctx context.Context, principal *models.Principal, r
if len(requestedNames) == 0 {
return nil, fmt.Errorf("must specify at least one tenant name")
}
tenants, err = s.schemaManager.GetConsistentTenants(ctx, principal, req.Collection, req.IsConsistent, requestedNames)
tenants, err = s.schemaManager.GetConsistentTenants(ctx, principal, req.Collection, true, requestedNames)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion adapters/handlers/rest/clusterapi/indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,7 @@ func (i *indices) postShard() http.Handler {
func (i *indices) putShardReinit() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
args := i.regexpShardReinit.FindStringSubmatch(r.URL.Path)
fmt.Println(args)

if len(args) != 3 {
http.Error(w, "invalid URI", http.StatusBadRequest)
return
Expand Down
56 changes: 28 additions & 28 deletions adapters/handlers/rest/configure_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ func MakeAppState(ctx context.Context, options *swag.CommandLineOptionsGroup) *s
MemtablesMaxSizeMB: appState.ServerConfig.Config.Persistence.MemtablesMaxSizeMB,
MemtablesMinActiveSeconds: appState.ServerConfig.Config.Persistence.MemtablesMinActiveDurationSeconds,
MemtablesMaxActiveSeconds: appState.ServerConfig.Config.Persistence.MemtablesMaxActiveDurationSeconds,
MaxSegmentSize: appState.ServerConfig.Config.Persistence.LSMMaxSegmentSize,
HNSWMaxLogSize: appState.ServerConfig.Config.Persistence.HNSWMaxLogSize,
RootPath: appState.ServerConfig.Config.Persistence.DataPath,
QueryLimit: appState.ServerConfig.Config.QueryDefaults.Limit,
QueryMaximumResults: appState.ServerConfig.Config.QueryMaximumResults,
Expand Down Expand Up @@ -251,8 +253,6 @@ func MakeAppState(ctx context.Context, options *swag.CommandLineOptionsGroup) *s
remoteIndexClient, appState.Logger, appState.ServerConfig.Config.Persistence.DataPath)
appState.Scaler = scaler

/// TODO-RAFT START
//
server2port, err := parseNode2Port(appState)
if len(server2port) == 0 || err != nil {
appState.Logger.
Expand All @@ -268,31 +268,31 @@ func MakeAppState(ctx context.Context, options *swag.CommandLineOptionsGroup) *s
dataPath := appState.ServerConfig.Config.Persistence.DataPath

rConfig := rStore.Config{
WorkDir: filepath.Join(dataPath, "raft"),
NodeID: nodeName,
Host: addrs[0],
RaftPort: appState.ServerConfig.Config.Raft.Port,
RPCPort: appState.ServerConfig.Config.Raft.InternalRPCPort,
RaftRPCMessageMaxSize: appState.ServerConfig.Config.Raft.RPCMessageMaxSize,
ServerName2PortMap: server2port,
BootstrapTimeout: appState.ServerConfig.Config.Raft.BootstrapTimeout,
BootstrapExpect: appState.ServerConfig.Config.Raft.BootstrapExpect,
HeartbeatTimeout: appState.ServerConfig.Config.Raft.HeartbeatTimeout,
RecoveryTimeout: appState.ServerConfig.Config.Raft.RecoveryTimeout,
ElectionTimeout: appState.ServerConfig.Config.Raft.ElectionTimeout,
SnapshotInterval: appState.ServerConfig.Config.Raft.SnapshotInterval,
SnapshotThreshold: appState.ServerConfig.Config.Raft.SnapshotThreshold,
UpdateWaitTimeout: time.Second * 10, // TODO-RAFT read from the flag
MetadataOnlyVoters: appState.ServerConfig.Config.Raft.MetadataOnlyVoters,
DB: nil,
Parser: schema.NewParser(appState.Cluster, vectorIndex.ParseAndValidateConfig, migrator),
AddrResolver: appState.Cluster,
Logger: appState.Logger,
LogLevel: logLevel(),
LogJSONFormat: !logTextFormat(),
IsLocalHost: appState.ServerConfig.Config.Cluster.Localhost,
LoadLegacySchema: schemaRepo.LoadLegacySchema,
SaveLegacySchema: schemaRepo.SaveLegacySchema,
WorkDir: filepath.Join(dataPath, config.DefaultRaftDir),
NodeID: nodeName,
Host: addrs[0],
RaftPort: appState.ServerConfig.Config.Raft.Port,
RPCPort: appState.ServerConfig.Config.Raft.InternalRPCPort,
RaftRPCMessageMaxSize: appState.ServerConfig.Config.Raft.RPCMessageMaxSize,
ServerName2PortMap: server2port,
BootstrapTimeout: appState.ServerConfig.Config.Raft.BootstrapTimeout,
BootstrapExpect: appState.ServerConfig.Config.Raft.BootstrapExpect,
HeartbeatTimeout: appState.ServerConfig.Config.Raft.HeartbeatTimeout,
RecoveryTimeout: appState.ServerConfig.Config.Raft.RecoveryTimeout,
ElectionTimeout: appState.ServerConfig.Config.Raft.ElectionTimeout,
SnapshotInterval: appState.ServerConfig.Config.Raft.SnapshotInterval,
SnapshotThreshold: appState.ServerConfig.Config.Raft.SnapshotThreshold,
ConsistencyWaitTimeout: appState.ServerConfig.Config.Raft.ConsistencyWaitTimeout,
MetadataOnlyVoters: appState.ServerConfig.Config.Raft.MetadataOnlyVoters,
DB: nil,
Parser: schema.NewParser(appState.Cluster, vectorIndex.ParseAndValidateConfig, migrator),
AddrResolver: appState.Cluster,
Logger: appState.Logger,
LogLevel: logLevel(),
LogJSONFormat: !logTextFormat(),
IsLocalHost: appState.ServerConfig.Config.Cluster.Localhost,
LoadLegacySchema: schemaRepo.LoadLegacySchema,
SaveLegacySchema: schemaRepo.SaveLegacySchema,
}
for _, name := range appState.ServerConfig.Config.Raft.Join[:rConfig.BootstrapExpect] {
if strings.Contains(name, rConfig.NodeID) {
Expand Down Expand Up @@ -322,7 +322,7 @@ func MakeAppState(ctx context.Context, options *swag.CommandLineOptionsGroup) *s
}

appState.SchemaManager = schemaManager
appState.RemoteIndexIncoming = sharding.NewRemoteIndexIncoming(repo, appState.ClusterService.SchemaReader())
appState.RemoteIndexIncoming = sharding.NewRemoteIndexIncoming(repo, appState.ClusterService.SchemaReader(), appState.Modules)
appState.RemoteNodeIncoming = sharding.NewRemoteNodeIncoming(repo)
appState.RemoteReplicaIncoming = replica.NewRemoteReplicaIncoming(repo, appState.ClusterService.SchemaReader())

Expand Down
2 changes: 1 addition & 1 deletion adapters/handlers/rest/doc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions adapters/handlers/rest/embedded_spec.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions adapters/repos/db/aggregations_fixtures_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,25 @@ var productClass = &models.Class{
},
}

func boolRef(b bool) *bool {
return &b
}

var notIndexedClass = &models.Class{
Class: "NotIndexedClass",
VectorIndexConfig: enthnsw.NewDefaultUserConfig(),
InvertedIndexConfig: invertedConfig(),
Properties: []*models.Property{
{
Name: "name",
DataType: schema.DataTypeText.PropString(),
Tokenization: models.PropertyTokenizationWhitespace,
IndexFilterable: boolRef(false),
IndexInverted: boolRef(false),
},
},
}

var companyClass = &models.Class{
Class: "AggregationsTestCompany",
VectorIndexConfig: enthnsw.NewDefaultUserConfig(),
Expand Down
17 changes: 17 additions & 0 deletions adapters/repos/db/aggregations_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func prepareCompanyTestSchemaAndData(repo *DB,
Objects: &models.Schema{
Classes: []*models.Class{
productClass,
notIndexedClass,
companyClass,
arrayTypesClass,
customerClass,
Expand All @@ -147,6 +148,8 @@ func prepareCompanyTestSchemaAndData(repo *DB,
migrator.AddClass(context.Background(), arrayTypesClass, schemaGetter.shardState))
require.Nil(t,
migrator.AddClass(context.Background(), customerClass, schemaGetter.shardState))
require.Nil(t,
migrator.AddClass(context.Background(), notIndexedClass, schemaGetter.shardState))
})

schemaGetter.schema = schema
Expand All @@ -165,6 +168,20 @@ func prepareCompanyTestSchemaAndData(repo *DB,
}
})

t.Run("import products into notIndexed class", func(t *testing.T) {
for i, schema := range products {
t.Run(fmt.Sprintf("importing product %d", i), func(t *testing.T) {
fixture := models.Object{
Class: notIndexedClass.Class,
ID: productsIds[i],
Properties: schema,
}
require.Nil(t,
repo.PutObject(context.Background(), &fixture, []float32{0.1, 0.2, 0.01, 0.2}, nil, nil, 0))
})
}
})

t.Run("import companies", func(t *testing.T) {
for j := 0; j < importFactor; j++ {
for i, schema := range companies {
Expand Down
3 changes: 3 additions & 0 deletions adapters/repos/db/aggregator/filtered.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ func (fa *filteredAggregator) bm25Objects(ctx context.Context, kw *searchparams.
return nil, nil, fmt.Errorf("bm25 objects: could not find class %s in schema", fa.params.ClassName)
}
cfg := inverted.ConfigFromModel(class.InvertedIndexConfig)

kw.ChooseSearchableProperties(class)

objs, scores, err := inverted.NewBM25Searcher(cfg.BM25, fa.store, fa.getSchema.ReadOnlyClass,
propertyspecific.Indices{}, fa.classSearcher,
fa.GetPropertyLengthTracker(), fa.logger, fa.shardVersion,
Expand Down
2 changes: 2 additions & 0 deletions adapters/repos/db/aggregator/hybrid.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func (a *Aggregator) bm25Objects(ctx context.Context, kw *searchparams.KeywordRa
}
cfg := inverted.ConfigFromModel(class.InvertedIndexConfig)

kw.ChooseSearchableProperties(class)

objs, dists, err := inverted.NewBM25Searcher(cfg.BM25, a.store, a.getSchema.ReadOnlyClass,
propertyspecific.Indices{}, a.classSearcher,
a.GetPropertyLengthTracker(), a.logger, a.shardVersion,
Expand Down
8 changes: 8 additions & 0 deletions adapters/repos/db/bm25f_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ func SetupClass(t require.TestingT, repo *DB, schemaGetter *fakeSchemaGetter, lo
IndexFilterable: &vFalse,
IndexSearchable: &vTrue,
},
// Test that bm25f handles this property being unsearchable
{
Name: "notSearchable",
DataType: schema.DataTypeTextArray.PropString(),
Tokenization: models.PropertyTokenizationWhitespace,
IndexFilterable: &vFalse,
IndexSearchable: &vFalse,
},
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/weaviate/weaviate/entities/schema"
"github.com/weaviate/weaviate/entities/search"
"github.com/weaviate/weaviate/entities/searchparams"
"github.com/weaviate/weaviate/usecases/modules"
"github.com/weaviate/weaviate/usecases/objects"
)

Expand Down Expand Up @@ -350,7 +351,7 @@ func testDistributed(t *testing.T, dirName string, rnd *rand.Rand, batch bool) {
}

node := nodes[rnd.Intn(len(nodes))]
res, err := node.repo.Aggregate(context.Background(), params, nil)
res, err := node.repo.Aggregate(context.Background(), params, modules.NewProvider())
require.Nil(t, err)

expectedResult := &aggregation.Result{
Expand Down
3 changes: 2 additions & 1 deletion adapters/repos/db/clusterintegrationtest/fakes_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
modstgfs "github.com/weaviate/weaviate/modules/backup-filesystem"
ubak "github.com/weaviate/weaviate/usecases/backup"
"github.com/weaviate/weaviate/usecases/memwatch"
"github.com/weaviate/weaviate/usecases/modules"
"github.com/weaviate/weaviate/usecases/sharding"
)

Expand Down Expand Up @@ -99,7 +100,7 @@ func (n *node) init(dirName string, shardStateRaw []byte,

n.migrator = db.NewMigrator(n.repo, logger)

indices := clusterapi.NewIndices(sharding.NewRemoteIndexIncoming(n.repo, n.schemaManager),
indices := clusterapi.NewIndices(sharding.NewRemoteIndexIncoming(n.repo, n.schemaManager, modules.NewProvider()),
n.repo, clusterapi.NewNoopAuthHandler())
mux := http.NewServeMux()
mux.Handle("/indices/", indices.Indices())
Expand Down
Loading

0 comments on commit 215de88

Please sign in to comment.