Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix live loader and add tests for dropall, drop namespace, live load #9063

Merged
merged 3 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chunker/rdf_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ var typeMap = map[string]types.TypeID{
"xs:float": types.FloatID,
"xs:base64Binary": types.BinaryID,
"geo:geojson": types.GeoID,
"xs:[]float32": types.VFloatID,
"http://www.w3.org/2001/XMLSchema#string": types.StringID,
"http://www.w3.org/2001/XMLSchema#dateTime": types.DateTimeID,
"http://www.w3.org/2001/XMLSchema#date": types.DateTimeID,
Expand Down
4 changes: 2 additions & 2 deletions dgraphtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ loop2:
return errors.Errorf("restore wasn't started on at least 1 alpha")
}

func (hc *HTTPClient) Export(dest string, namespace int) error {
func (hc *HTTPClient) Export(dest, format string, namespace int) error {
const exportRequest = `mutation export($dest: String!, $f: String!, $ns: Int) {
export(input: {destination: $dest, format: $f, namespace: $ns}) {
response {
Expand All @@ -540,7 +540,7 @@ func (hc *HTTPClient) Export(dest string, namespace int) error {
Query: exportRequest,
Variables: map[string]interface{}{
"dest": dest,
"f": "rdf",
"f": format,
"ns": namespace,
},
}
Expand Down
10 changes: 8 additions & 2 deletions dgraphtest/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (c *LocalCluster) LiveLoadFromExport(exportDir string) error {
}()

// .rdf.gz, .schema.gz,.gql_schema.gz
var rdfFiles, schemaFiles, gqlSchemaFiles []string
var rdfFiles, schemaFiles, gqlSchemaFiles, jsonFiles []string
tr := tar.NewReader(ts)
for {
header, err := tr.Next()
Expand All @@ -404,6 +404,8 @@ func (c *LocalCluster) LiveLoadFromExport(exportDir string) error {
switch {
case strings.HasSuffix(fileName, ".rdf.gz"):
rdfFiles = append(rdfFiles, hostFile)
case strings.HasSuffix(fileName, ".json.gz"):
jsonFiles = append(jsonFiles, hostFile)
case strings.HasSuffix(fileName, ".schema.gz"):
schemaFiles = append(schemaFiles, hostFile)
case strings.HasSuffix(fileName, ".gql_schema.gz"):
Expand Down Expand Up @@ -441,10 +443,14 @@ func (c *LocalCluster) LiveLoadFromExport(exportDir string) error {
}

opts := LiveOpts{
DataFiles: rdfFiles,
SchemaFiles: schemaFiles,
GqlSchemaFiles: gqlSchemaFiles,
}
if len(rdfFiles) == 0 {
opts.DataFiles = jsonFiles
} else {
opts.DataFiles = rdfFiles
}
if err := c.LiveLoad(opts); err != nil {
return errors.Wrapf(err, "error running live loader: %v", err)
}
Expand Down
22 changes: 21 additions & 1 deletion dgraphtest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func (c *LocalCluster) Upgrade(version string, strategy UpgradeStrategy) error {
}
}
// using -1 as namespace exports all the namespaces
if err := hc.Export(DefaultExportDir, -1); err != nil {
if err := hc.Export(DefaultExportDir, "rdf", -1); err != nil {
return errors.Wrap(err, "error taking export during upgrade")
}
if err := c.Stop(); err != nil {
Expand Down Expand Up @@ -747,6 +747,26 @@ func (c *LocalCluster) Client() (*GrpcClient, func(), error) {
return &GrpcClient{Dgraph: client}, cleanup, nil
}

func (c *LocalCluster) AlphaClient(id int) (*GrpcClient, func(), error) {
alpha := c.alphas[id]
url, err := alpha.alphaURL(c)
if err != nil {
return nil, nil, errors.Wrap(err, "error getting health URL")
}
conn, err := grpc.Dial(url, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, nil, errors.Wrap(err, "error connecting to alpha")
}

client := dgo.NewDgraphClient(api.NewDgraphClient(conn))
cleanup := func() {
if err := conn.Close(); err != nil {
log.Printf("[WARNING] error closing connection: %v", err)
}
}
return &GrpcClient{Dgraph: client}, cleanup, nil
}

// HTTPClient creates an HTTP client
func (c *LocalCluster) HTTPClient() (*HTTPClient, error) {
adminURL, err := c.serverURL("alpha", "/admin")
Expand Down
106 changes: 106 additions & 0 deletions dgraphtest/vector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2023 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dgraphtest

import (
"encoding/json"
"fmt"
"math/rand"
"strings"

"github.com/dgraph-io/dgo/v230/protos/api"
)

func GenerateRandomVector(size int) []float32 {
vector := make([]float32, size)
for i := 0; i < size; i++ {
vector[i] = rand.Float32() * 10
}
return vector
}

func formatVector(label string, vector []float32, index int) string {
vectorString := fmt.Sprintf(`"[%s]"`, strings.Trim(strings.Join(strings.Fields(fmt.Sprint(vector)), ", "), "[]"))
return fmt.Sprintf("<0x%x> <%s> %s . \n", index+10, label, vectorString)
}

func GenerateRandomVectors(lowerLimit, uppermLimit, vectorSize int, label string) (string, [][]float32) {
var builder strings.Builder
var vectors [][]float32
// builder.WriteString("`")
for i := lowerLimit; i < uppermLimit; i++ {
randomVector := GenerateRandomVector(vectorSize)
vectors = append(vectors, randomVector)
formattedVector := formatVector(label, randomVector, i)
builder.WriteString(formattedVector)
}

return builder.String(), vectors
}

func (gc *GrpcClient) QueryMultipleVectorsUsingSimilarTo(vector []float32, pred string, topK int) ([][]float32, error) {
vectorQuery := fmt.Sprintf(`
{
vector(func: similar_to(%v, %v, "%v")) {
uid
%v
}
}`, pred, topK, vector, pred)
resp, err := gc.Query(vectorQuery)

if err != nil {
return [][]float32{}, err
}

return UnmarshalVectorResp(resp)
}

func (gc *GrpcClient) QuerySingleVectorsUsingUid(uid, pred string) ([][]float32, error) {
vectorQuery := fmt.Sprintf(`
{
vector(func: uid(%v)) {
uid
%v
}
}`, uid[1:len(uid)-1], pred)

resp, err := gc.Query(vectorQuery)
if err != nil {
return [][]float32{}, err
}

return UnmarshalVectorResp(resp)
}

func UnmarshalVectorResp(resp *api.Response) ([][]float32, error) {
type Data struct {
Vector []struct {
UID string `json:"uid"`
ProjectDescriptionV []float32 `json:"project_discription_v"`
} `json:"vector"`
}
var data Data
if err := json.Unmarshal(resp.Json, &data); err != nil {
return nil, err
}

var vectors [][]float32
for _, item := range data.Vector {
vectors = append(vectors, item.ProjectDescriptionV)
}
return vectors, nil
}
8 changes: 4 additions & 4 deletions dql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,10 @@ func parseValue(v varInfo) (types.Val, error) {
}, nil
}
}
case "vector32float":
case "float32vector":
{
if i, err := types.ParseVFloat(v.Value); err != nil {
return types.Val{}, errors.Wrapf(err, "Expected a vfloat but got %v", v.Value)
return types.Val{}, errors.Wrapf(err, "Expected a float32vector but got %v", v.Value)
} else {
return types.Val{
Tid: types.VFloatID,
Expand Down Expand Up @@ -415,10 +415,10 @@ func checkValueType(vm varMap) error {
return errors.Wrapf(err, "Expected a bool but got %v", v.Value)
}
}
case "vfloat":
case "float32vector":
{
if _, err := types.ParseVFloat(v.Value); err != nil {
return errors.Wrapf(err, "Expected a vfloat but got %v", v.Value)
return errors.Wrapf(err, "Expected a vector32float but got %v", v.Value)
}
}
case "string": // Value is a valid string. No checks required.
Expand Down
2 changes: 2 additions & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,8 @@ message SchemaNode {
bool lang = 9;
bool no_conflict = 10;
bool unique = 11;
repeated VectorIndexSpec index_specs = 12;

}

message SchemaResult {
Expand Down
Loading