From db468f4f76dac4aeb93192a3e73473e17669c4bf Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Thu, 4 May 2023 21:28:58 +0400 Subject: [PATCH] Utilize any parameter of DuckDB CSV/JSON/Parquet "read" function (#2194) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Pass all properties (located in "duckdb" section) of a source config to corresponding DuckDB functions on ingestion * Reformatted go sources * Backward compatible ingestion properties: csv.delimiter and hive_partitioning * Added some comments * Reformatted go sources * Updated csv delimiter property name * Renamed a function * Included "duckdb.*" props into ser/de * Fixed unit tests * Reformatted go sources * Simplified a comparison of properties * duckdb prefix + backward compatibility for csv.delimiter and hive_partitioning * Added unit tests of backward compatibility * Added unit tests of backward compatibility * Removed the use of csv.delimiter and hive_partitioning * Removed the use of csv.delimiter and hive_partitioning --------- Co-authored-by: e.sevastyanov Co-authored-by: Benjamin Egelund-Müller --- cli/cmd/source/add.go | 5 - docs/docs/reference/cli-cheat-sheet.md | 8 - docs/docs/reference/cli/source/add.md | 1 - docs/docs/reference/project-files/sources.md | 4 - runtime/compilers/rillv1beta/project.go | 4 - runtime/compilers/rillv1beta/yaml.go | 9 +- runtime/connectors/connectors.go | 14 +- runtime/connectors/connectors_test.go | 50 ++++ runtime/connectors/localfile/file.go | 14 +- runtime/drivers/duckdb/connectors.go | 137 ++++++---- runtime/drivers/duckdb/connectors_test.go | 248 +++++++++++++++++- .../catalog/artifacts/artifacts_test.go | 162 ++++++++++-- .../catalog/artifacts/yaml/objects.go | 25 +- .../services/catalog/artifacts/yaml/yaml.go | 2 +- scripts/druid-import.sh | 2 +- web-local/test/data/Users.json | 10 + 16 files changed, 553 insertions(+), 142 deletions(-) create mode 100644 web-local/test/data/Users.json diff --git a/cli/cmd/source/add.go b/cli/cmd/source/add.go index b9883de9132..3ce11c36aab 100644 --- a/cli/cmd/source/add.go +++ b/cli/cmd/source/add.go @@ -21,7 +21,6 @@ func AddCmd(cfg *config.Config) *cobra.Command { var olapDSN string var projectPath string var sourceName string - var delimiter string var force bool var verbose bool @@ -55,9 +54,6 @@ func AddCmd(cfg *config.Config) *cobra.Command { } props := map[string]any{"path": dataPath} - if delimiter != "" { - props["csv.delimiter"] = delimiter - } propsPB, err := structpb.NewStruct(props) if err != nil { @@ -99,7 +95,6 @@ func AddCmd(cfg *config.Config) *cobra.Command { addCmd.Flags().StringVar(&projectPath, "path", ".", "Project directory") addCmd.Flags().StringVar(&olapDSN, "db", local.DefaultOLAPDSN, "Database DSN") addCmd.Flags().StringVar(&olapDriver, "db-driver", local.DefaultOLAPDriver, "Database driver") - addCmd.Flags().StringVar(&delimiter, "delimiter", "", "CSV delimiter override (defaults to autodetect)") addCmd.Flags().BoolVar(&verbose, "verbose", false, "Sets the log level to debug") return addCmd diff --git a/docs/docs/reference/cli-cheat-sheet.md b/docs/docs/reference/cli-cheat-sheet.md index 250029afd73..dd8bfbb5ed1 100644 --- a/docs/docs/reference/cli-cheat-sheet.md +++ b/docs/docs/reference/cli-cheat-sheet.md @@ -89,14 +89,6 @@ By default the source name will be a sanitized version of the dataset file name. rill source add /path/to/data.parquet --name my_source ``` -### Custom CSV delimiters - -If you have a CSV file that is delimited by a character other than a comma or tab, you can use the `--delimiter` option. If a delimiter is not set, Rill automatically tries to detect the delimiter, so this is not strictly necessary: - -``` -rill source add /path/to/data.csv --delimiter "|" -``` - ## Dropping a source If you have added a source to Rill that you want to drop, run: diff --git a/docs/docs/reference/cli/source/add.md b/docs/docs/reference/cli/source/add.md index 0ae0a07679f..15ba07d1966 100644 --- a/docs/docs/reference/cli/source/add.md +++ b/docs/docs/reference/cli/source/add.md @@ -21,7 +21,6 @@ rill source add [flags] --path string Project directory (default ".") --db string Database DSN (default "stage.db") --db-driver string Database driver (default "duckdb") - --delimiter string CSV delimiter override (defaults to autodetect) --verbose Sets the log level to debug ``` diff --git a/docs/docs/reference/project-files/sources.md b/docs/docs/reference/project-files/sources.md index 3dee2ec3d8f..636d27fd4b8 100644 --- a/docs/docs/reference/project-files/sources.md +++ b/docs/docs/reference/project-files/sources.md @@ -48,10 +48,6 @@ In your Rill project directory, create a `.yaml` file in the `sourc **`timeout`** — The maximum time to wait for souce ingestion. -**`hive_partitioning`** - — If set to true, hive style partitioning is transformed into column values in the data source on ingestion. - - _`true`_ by default - **`extract`** - Optionally limit the data ingested from remote sources (S3/GCS only) - **`rows`** - limits the size of data fetched - **`strategy`** - strategy to fetch data (**head** or **tail**) diff --git a/runtime/compilers/rillv1beta/project.go b/runtime/compilers/rillv1beta/project.go index 09212a41a2e..941742c1005 100644 --- a/runtime/compilers/rillv1beta/project.go +++ b/runtime/compilers/rillv1beta/project.go @@ -84,10 +84,6 @@ func (c *Codec) PutSource(ctx context.Context, repo drivers.RepoStore, instanceI out.Region = val } - if val, ok := props["csv.delimiter"].(string); ok { - out.CSVDelimiter = val - } - blob, err := yaml.Marshal(out) if err != nil { return "", err diff --git a/runtime/compilers/rillv1beta/yaml.go b/runtime/compilers/rillv1beta/yaml.go index 06f1ed9570e..9562c45fadc 100644 --- a/runtime/compilers/rillv1beta/yaml.go +++ b/runtime/compilers/rillv1beta/yaml.go @@ -8,11 +8,10 @@ import ( var alphaNumericRegex = regexp.MustCompile("[^A-Za-z0-9]+") type Source struct { - Type string - URI string `yaml:"uri,omitempty"` - Path string `yaml:"path,omitempty"` - Region string `yaml:"region,omitempty"` - CSVDelimiter string `yaml:"csv.delimiter,omitempty"` + Type string + URI string `yaml:"uri,omitempty"` + Path string `yaml:"path,omitempty"` + Region string `yaml:"region,omitempty"` } type ProjectConfig struct { diff --git a/runtime/connectors/connectors.go b/runtime/connectors/connectors.go index 631a7123841..6176f179012 100644 --- a/runtime/connectors/connectors.go +++ b/runtime/connectors/connectors.go @@ -3,6 +3,7 @@ package connectors import ( "context" "fmt" + "reflect" runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" ) @@ -176,16 +177,5 @@ func ConsumeAsIterator(ctx context.Context, env *Env, source *Source) (FileItera } func (s *Source) PropertiesEquals(o *Source) bool { - if len(s.Properties) != len(o.Properties) { - return false - } - - for k1, v1 := range s.Properties { - v2, ok := o.Properties[k1] - if !ok || v1 != v2 { - return false - } - } - - return true + return reflect.DeepEqual(s.Properties, o.Properties) } diff --git a/runtime/connectors/connectors_test.go b/runtime/connectors/connectors_test.go index 4632a15e4e2..4071189a51e 100644 --- a/runtime/connectors/connectors_test.go +++ b/runtime/connectors/connectors_test.go @@ -27,6 +27,50 @@ func TestPropertiesEquals(t *testing.T) { Properties: map[string]any{"a": 100, "c": "hello world"}, } + s5 := &Source{ + Name: "s5", + Properties: map[string]any{ + "number": 0, + "string": "hello world", + "nestedMap": map[string]any{ + "nestedMap": map[string]any{ + "string": "value", + "number": 2, + }, + "string": "value", + "number": 1, + }, + }, + } + + s6 := &Source{ + Name: "s6", + Properties: map[string]any{ + "number": 0, + "string": "hello world", + "nestedMap": map[string]any{ + "number": 1, + "string": "value", + "nestedMap": map[string]any{ + "number": 2, + "string": "value", + }, + }, + }, + } + + s7 := &Source{ + Name: "s7", + Properties: map[string]any{ + "number": 0, + "string": "hello world", + "nestedMap": map[string]any{ + "number": 1, + "string": "value", + }, + }, + } + // s1 and s2 should be equal require.True(t, s1.PropertiesEquals(s2) && s2.PropertiesEquals(s1)) @@ -37,4 +81,10 @@ func TestPropertiesEquals(t *testing.T) { // s2 should not equal s3 or s4 require.False(t, s2.PropertiesEquals(s3) || s3.PropertiesEquals(s2)) require.False(t, s2.PropertiesEquals(s4) || s4.PropertiesEquals(s2)) + + // s5 and s6 should be equal + require.True(t, s5.PropertiesEquals(s6) && s6.PropertiesEquals(s5)) + + // s6 and s7 should not be equal + require.False(t, s6.PropertiesEquals(s7) || s7.PropertiesEquals(s6)) } diff --git a/runtime/connectors/localfile/file.go b/runtime/connectors/localfile/file.go index a95ae17d5b0..059010a6843 100644 --- a/runtime/connectors/localfile/file.go +++ b/runtime/connectors/localfile/file.go @@ -32,22 +32,12 @@ var spec = connectors.Spec{ Description: "Either CSV or Parquet. Inferred if not set.", Placeholder: "csv", }, - { - Key: "csv.delimiter", - Type: connectors.StringPropertyType, - Required: false, - DisplayName: "CSV Delimiter", - Description: "Force delimiter for a CSV file.", - Placeholder: ",", - }, }, } type Config struct { - Path string `mapstructure:"path"` - Format string `mapstructure:"format"` - CSVDelimiter string `mapstructure:"csv.delimiter"` - HivePartition *bool `mapstructure:"hive_partitioning"` + Path string `mapstructure:"path"` + Format string `mapstructure:"format"` } func ParseConfig(props map[string]any) (*Config, error) { diff --git a/runtime/drivers/duckdb/connectors.go b/runtime/drivers/duckdb/connectors.go index 585274ff78f..2e28c239030 100644 --- a/runtime/drivers/duckdb/connectors.go +++ b/runtime/drivers/duckdb/connectors.go @@ -77,24 +77,7 @@ func (c *connection) ingest(ctx context.Context, env *connectors.Env, source *co // for files downloaded locally from remote sources func (c *connection) ingestIteratorFiles(ctx context.Context, source *connectors.Source, filenames []string, appendToTable bool) error { - format := "" - if value, ok := source.Properties["format"]; ok { - format = value.(string) - } - - delimiter := "" - if value, ok := source.Properties["csv.delimiter"]; ok { - delimiter = value.(string) - } - - hivePartition := 1 - if value, ok := source.Properties["hive_partitioning"]; ok { - if !value.(bool) { - hivePartition = 0 - } - } - - from, err := sourceReader(filenames, delimiter, format, hivePartition) + from, err := sourceReader(filenames, source.Properties) if err != nil { return err } @@ -129,12 +112,7 @@ func (c *connection) ingestLocalFiles(ctx context.Context, env *connectors.Env, return fmt.Errorf("file does not exist at %s", conf.Path) } - hivePartition := 1 - if conf.HivePartition != nil && !*conf.HivePartition { - hivePartition = 0 - } - - from, err := sourceReader(localPaths, conf.CSVDelimiter, conf.Format, hivePartition) + from, err := sourceReader(localPaths, source.Properties) if err != nil { return err } @@ -144,35 +122,6 @@ func (c *connection) ingestLocalFiles(ctx context.Context, env *connectors.Env, return c.Exec(ctx, &drivers.Statement{Query: qry, Priority: 1}) } -func sourceReader(paths []string, csvDelimiter, format string, hivePartition int) (string, error) { - if format == "" { - format = fileutil.FullExt(paths[0]) - } else { - // users will set format like csv, tsv, parquet - // while infering format from file name extensions its better to rely on .csv, .parquet - format = fmt.Sprintf(".%s", format) - } - - if format == "" { - return "", fmt.Errorf("invalid file") - } else if strings.Contains(format, ".csv") || strings.Contains(format, ".tsv") || strings.Contains(format, ".txt") { - return sourceReaderWithDelimiter(paths, csvDelimiter), nil - } else if strings.Contains(format, ".parquet") { - return fmt.Sprintf("read_parquet(['%s'], HIVE_PARTITIONING=%v)", strings.Join(paths, "','"), hivePartition), nil - } else if strings.Contains(format, ".json") || strings.Contains(format, ".ndjson") { - return fmt.Sprintf("read_json_auto(['%s'], sample_size=-1)", strings.Join(paths, "','")), nil - } else { - return "", fmt.Errorf("file type not supported : %s", format) - } -} - -func sourceReaderWithDelimiter(paths []string, delimiter string) string { - if delimiter == "" { - return fmt.Sprintf("read_csv_auto(['%s'], sample_size=-1)", strings.Join(paths, "','")) - } - return fmt.Sprintf("read_csv_auto(['%s'], delim='%s', sample_size=-1)", strings.Join(paths, "','"), delimiter) -} - func fileSize(paths []string) int64 { var size int64 for _, path := range paths { @@ -201,3 +150,85 @@ func resolveLocalPath(env *connectors.Env, path, sourceName string) (string, err } return finalPath, nil } + +func sourceReader(paths []string, properties map[string]any) (string, error) { + format, formatDefined := properties["format"].(string) + if formatDefined { + format = fmt.Sprintf(".%s", format) + } else { + format = fileutil.FullExt(paths[0]) + } + + var ingestionProps map[string]any + if duckDBProps, ok := properties["duckdb"].(map[string]any); ok { + ingestionProps = duckDBProps + } else { + ingestionProps = map[string]any{} + } + + // Generate a "read" statement + if containsAny(format, []string{".csv", ".tsv", ".txt"}) { + // CSV reader + return generateReadCsvStatement(paths, ingestionProps) + } else if strings.Contains(format, ".parquet") { + // Parquet reader + return generateReadParquetStatement(paths, ingestionProps) + } else if containsAny(format, []string{".json", ".ndjson"}) { + // JSON reader + return generateReadJSONStatement(paths, ingestionProps) + } else { + return "", fmt.Errorf("file type not supported : %s", format) + } +} + +func containsAny(s string, targets []string) bool { + source := strings.ToLower(s) + for _, target := range targets { + if strings.Contains(source, target) { + return true + } + } + return false +} + +func generateReadCsvStatement(paths []string, properties map[string]any) (string, error) { + // auto_detect (enables auto-detection of parameters) is true by default, it takes care of params/schema + return fmt.Sprintf("read_csv_auto(%s)", convertToStatementParamsStr(paths, properties)), nil +} + +func generateReadParquetStatement(paths []string, properties map[string]any) (string, error) { + ingestionProps := copyMap(properties) + // set hive_partitioning to true by default + if _, hivePartitioningDefined := ingestionProps["hive_partitioning"]; !hivePartitioningDefined { + ingestionProps["hive_partitioning"] = true + } + return fmt.Sprintf("read_parquet(%s)", convertToStatementParamsStr(paths, ingestionProps)), nil +} + +func generateReadJSONStatement(paths []string, properties map[string]any) (string, error) { + ingestionProps := copyMap(properties) + // auto_detect is false by default so setting it to true simplifies the ingestion + // if columns are defined then DuckDB turns the auto-detection off so no need to check this case here + if _, autoDetectDefined := ingestionProps["auto_detect"]; !autoDetectDefined { + ingestionProps["auto_detect"] = true + } + return fmt.Sprintf("read_json(%s)", convertToStatementParamsStr(paths, ingestionProps)), nil +} + +func copyMap(originalMap map[string]any) map[string]any { + newMap := make(map[string]any, len(originalMap)) + for key, value := range originalMap { + newMap[key] = value + } + return newMap +} + +func convertToStatementParamsStr(paths []string, properties map[string]any) string { + ingestionParamsStr := make([]string, 0, len(properties)+1) + // The first parameter is a source path + ingestionParamsStr = append(ingestionParamsStr, fmt.Sprintf("['%s']", strings.Join(paths, "','"))) + for key, value := range properties { + ingestionParamsStr = append(ingestionParamsStr, fmt.Sprintf("%s=%v", key, value)) + } + return strings.Join(ingestionParamsStr, ",") +} diff --git a/runtime/drivers/duckdb/connectors_test.go b/runtime/drivers/duckdb/connectors_test.go index 7b47ec4ab59..58d911be465 100644 --- a/runtime/drivers/duckdb/connectors_test.go +++ b/runtime/drivers/duckdb/connectors_test.go @@ -28,7 +28,7 @@ func TestConnectorWithSourceVariations(t *testing.T) { {"local_file", filepath.Join(testdataPathRel, "AdBids.csv"), nil}, {"local_file", filepath.Join(testdataPathRel, "AdBids.csv"), map[string]any{"csv.delimiter": ","}}, {"local_file", filepath.Join(testdataPathRel, "AdBids.csv.gz"), nil}, - {"local_file", filepath.Join(testdataPathRel, "AdBids.parquet"), nil}, + {"local_file", filepath.Join(testdataPathRel, "AdBids.parquet"), map[string]any{"hive_partitioning": true}}, {"local_file", filepath.Join(testdataPathAbs, "AdBids.parquet"), nil}, {"local_file", filepath.Join(testdataPathAbs, "AdBids.txt"), nil}, {"local_file", "../../../runtime/testruntime/testdata/ad_bids/data/AdBids.csv.gz", nil}, @@ -179,8 +179,8 @@ func TestCSVDelimiter(t *testing.T) { Name: "foo", Connector: "local_file", Properties: map[string]any{ - "path": testDelimiterCsvPath, - "csv.delimiter": "+", + "path": testDelimiterCsvPath, + "duckdb": map[string]any{"delim": "'+'"}, }, }) require.NoError(t, err) @@ -211,9 +211,9 @@ func TestFileFormatAndDelimiter(t *testing.T) { Name: "foo", Connector: "local_file", Properties: map[string]any{ - "path": testDelimiterCsvPath, - "format": "csv", - "csv.delimiter": " ", + "path": testDelimiterCsvPath, + "format": "csv", + "duckdb": map[string]any{"delim": "' '"}, }, }) require.NoError(t, err) @@ -234,3 +234,239 @@ func TestFileFormatAndDelimiter(t *testing.T) { require.False(t, rows.Next()) require.NoError(t, rows.Close()) } + +func TestCSVIngestionWithColumns(t *testing.T) { + olap := runOLAPStore(t) + ctx := context.Background() + filePath := createFilePath(t, "../../../web-local/test/data", "Users.csv") + + _, err := olap.Ingest(ctx, &connectors.Env{ + RepoDriver: "file", + RepoRoot: ".", + AllowHostAccess: true, + }, &connectors.Source{ + Name: "csv_source", + Connector: "local_file", + Properties: map[string]any{ + "path": filePath, + "duckdb": map[string]any{ + "auto_detect": false, + "header": true, + "ignore_errors": true, + "columns": "{id:'INTEGER',name:'VARCHAR',country:'VARCHAR',city:'VARCHAR'}", + }, + }, + }) + require.NoError(t, err) + + rows, err := olap.Execute(ctx, &drivers.Statement{Query: "SELECT * FROM csv_source"}) + require.NoError(t, err) + cols, err := rows.Columns() + require.NoError(t, err) + require.Len(t, cols, 4) + require.ElementsMatch(t, cols, [4]string{"id", "name", "country", "city"}) + require.NoError(t, rows.Close()) + + var count int + rows, err = olap.Execute(ctx, &drivers.Statement{Query: "SELECT count(*) FROM csv_source"}) + require.NoError(t, err) + require.True(t, rows.Next()) + require.NoError(t, rows.Scan(&count)) + require.Equal(t, count, 100) + require.False(t, rows.Next()) + require.NoError(t, rows.Close()) +} + +func TestJsonIngestionDefault(t *testing.T) { + olap := runOLAPStore(t) + ctx := context.Background() + filePath := createFilePath(t, "../../../web-local/test/data", "Users.json") + + _, err := olap.Ingest(ctx, &connectors.Env{ + RepoDriver: "file", + RepoRoot: ".", + AllowHostAccess: true, + }, &connectors.Source{ + Name: "json_source", + Connector: "local_file", + Properties: map[string]any{ + "path": filePath, + }, + }) + require.NoError(t, err) + + rows, err := olap.Execute(ctx, &drivers.Statement{Query: "SELECT * FROM json_source"}) + require.NoError(t, err) + cols, err := rows.Columns() + require.NoError(t, err) + require.Len(t, cols, 9) + require.NoError(t, rows.Close()) + + var count int + rows, err = olap.Execute(ctx, &drivers.Statement{Query: "SELECT count(*) FROM json_source"}) + require.NoError(t, err) + require.True(t, rows.Next()) + require.NoError(t, rows.Scan(&count)) + require.Equal(t, count, 10) + require.False(t, rows.Next()) + require.NoError(t, rows.Close()) +} + +func TestJsonIngestionWithColumns(t *testing.T) { + olap := runOLAPStore(t) + ctx := context.Background() + filePath := createFilePath(t, "../../../web-local/test/data", "Users.json") + + _, err := olap.Ingest(ctx, &connectors.Env{ + RepoDriver: "file", + RepoRoot: ".", + AllowHostAccess: true, + }, &connectors.Source{ + Name: "json_source", + Connector: "local_file", + Properties: map[string]any{ + "path": filePath, + "duckdb": map[string]any{ + "columns": "{id:'INTEGER', name:'VARCHAR', isActive:'BOOLEAN', createdDate:'VARCHAR', address:'STRUCT(street VARCHAR, city VARCHAR, postalCode VARCHAR)', tags:'VARCHAR[]', projects:'STRUCT(projectId INTEGER, projectName VARCHAR, startDate VARCHAR, endDate VARCHAR)[]', scores:'INTEGER[]'}", + }, + }, + }) + require.NoError(t, err) + rows, err := olap.Execute(ctx, &drivers.Statement{Query: "SELECT * FROM json_source"}) + require.NoError(t, err) + cols, err := rows.Columns() + require.NoError(t, err) + require.Len(t, cols, 8) + require.NoError(t, rows.Close()) + + var count int + rows, err = olap.Execute(ctx, &drivers.Statement{Query: "SELECT count(*) FROM json_source"}) + require.NoError(t, err) + require.True(t, rows.Next()) + require.NoError(t, rows.Scan(&count)) + require.Equal(t, count, 10) + require.False(t, rows.Next()) + require.NoError(t, rows.Close()) +} + +func TestJsonIngestionWithLessColumns(t *testing.T) { + olap := runOLAPStore(t) + ctx := context.Background() + filePath := createFilePath(t, "../../../web-local/test/data", "Users.json") + + _, err := olap.Ingest(ctx, &connectors.Env{ + RepoDriver: "file", + RepoRoot: ".", + AllowHostAccess: true, + }, &connectors.Source{ + Name: "json_source", + Connector: "local_file", + Properties: map[string]any{ + "path": filePath, + "duckdb": map[string]any{ + "columns": "{id:'INTEGER',name:'VARCHAR',isActive:'BOOLEAN',createdDate:'VARCHAR',}", + }, + }, + }) + require.NoError(t, err) + rows, err := olap.Execute(ctx, &drivers.Statement{Query: "SELECT * FROM json_source"}) + require.NoError(t, err) + cols, err := rows.Columns() + require.NoError(t, err) + require.Len(t, cols, 4) + require.NoError(t, rows.Close()) + + var count int + rows, err = olap.Execute(ctx, &drivers.Statement{Query: "SELECT count(*) FROM json_source"}) + require.NoError(t, err) + require.True(t, rows.Next()) + require.NoError(t, rows.Scan(&count)) + require.Equal(t, count, 10) + require.False(t, rows.Next()) + require.NoError(t, rows.Close()) +} + +func TestJsonIngestionWithVariousParams(t *testing.T) { + olap := runOLAPStore(t) + ctx := context.Background() + filePath := createFilePath(t, "../../../web-local/test/data", "Users.json") + + _, err := olap.Ingest(ctx, &connectors.Env{ + RepoDriver: "file", + RepoRoot: ".", + AllowHostAccess: true, + }, &connectors.Source{ + Name: "json_source", + Connector: "local_file", + Properties: map[string]any{ + "path": filePath, + "duckdb": map[string]any{ + "maximum_object_size": "9999999", + "lines": true, + "ignore_errors": true, + "compression": "auto", + "columns": "{id:'INTEGER',name:'VARCHAR',isActive:'BOOLEAN',createdDate:'VARCHAR',}", + "json_format": "records", + "auto_detect": false, + "sample_size": -1, + "dateformat": "iso", + "timestampformat": "iso", + }, + }, + }) + require.NoError(t, err) + rows, err := olap.Execute(ctx, &drivers.Statement{Query: "SELECT * FROM json_source"}) + require.NoError(t, err) + cols, err := rows.Columns() + require.NoError(t, err) + require.Len(t, cols, 4) + require.NoError(t, rows.Close()) + + var count int + rows, err = olap.Execute(ctx, &drivers.Statement{Query: "SELECT count(*) FROM json_source"}) + require.NoError(t, err) + require.True(t, rows.Next()) + require.NoError(t, rows.Scan(&count)) + require.Equal(t, count, 10) + require.False(t, rows.Next()) + require.NoError(t, rows.Close()) +} + +func TestJsonIngestionWithInvalidParam(t *testing.T) { + olap := runOLAPStore(t) + ctx := context.Background() + filePath := createFilePath(t, "../../../web-local/test/data", "Users.json") + + _, err := olap.Ingest(ctx, &connectors.Env{ + RepoDriver: "file", + RepoRoot: ".", + AllowHostAccess: true, + }, &connectors.Source{ + Name: "json_source", + Connector: "local_file", + Properties: map[string]any{ + "path": filePath, + "duckdb": map[string]any{ + "json": map[string]any{ + "invalid_param": "auto", + }, + }, + }, + }) + require.Error(t, err, "Invalid named parameter \"invalid_param\" for function read_json") +} + +func createFilePath(t *testing.T, dirPath string, fileName string) string { + testdataPathAbs, err := filepath.Abs(dirPath) + require.NoError(t, err) + filePath := filepath.Join(testdataPathAbs, fileName) + return filePath +} + +func runOLAPStore(t *testing.T) drivers.OLAPStore { + conn, err := Driver{}.Open("?access_mode=read_write", zap.NewNop()) + require.NoError(t, err) + olap, canServe := conn.OLAPStore() + require.True(t, canServe) + return olap +} diff --git a/runtime/services/catalog/artifacts/artifacts_test.go b/runtime/services/catalog/artifacts/artifacts_test.go index 02b3ab3a346..e7be0b113e5 100644 --- a/runtime/services/catalog/artifacts/artifacts_test.go +++ b/runtime/services/catalog/artifacts/artifacts_test.go @@ -40,16 +40,17 @@ func TestSourceReadWrite(t *testing.T) { Name: "Source", Connector: "local_file", Properties: toProtoStruct(map[string]any{ - "path": "data/source.csv", - "csv.delimiter": "|", - "format": "csv", + "path": "data/source.csv", + "format": "csv", + "duckdb": map[string]any{"delim": "'|'"}, }), }, }, `type: local_file path: data/source.csv -csv.delimiter: '|' format: csv +duckdb: + delim: '''|''' `, }, { @@ -137,23 +138,23 @@ timeseries: time smallest_time_grain: day default_time_range: P1D dimensions: -- label: Dim0_L - property: dim0 - description: Dim0_D -- label: Dim1_L - property: dim1 - description: Dim1_D + - label: Dim0_L + property: dim0 + description: Dim0_D + - label: Dim1_L + property: dim1 + description: Dim1_D measures: -- label: Mea0_L - name: measure_0 - expression: count(c0) - description: Mea0_D - format_preset: humanise -- label: Mea1_L - name: avg_measure - expression: avg(c1) - description: Mea1_D - format_preset: humanise + - label: Mea0_L + name: measure_0 + expression: count(c0) + description: Mea0_D + format_preset: humanise + - label: Mea1_L + name: avg_measure + expression: avg(c1) + description: Mea1_D + format_preset: humanise `, }, } @@ -180,6 +181,116 @@ measures: } } +func TestCsvDelimiterBackwardCompatibility(t *testing.T) { + catalog := &drivers.CatalogEntry{ + Name: "Source", + Path: "sources/Source.yaml", + Type: drivers.ObjectTypeSource, + Object: &runtimev1.Source{ + Name: "Source", + Connector: "local_file", + Properties: toProtoStruct(map[string]any{ + "path": "data/source.csv", + "format": "csv", + "csv.delimiter": "|", + }), + }, + } + raw := `type: local_file +path: data/source.csv +format: csv +duckdb: + delim: '''|''' +` + dir := t.TempDir() + fileStore, err := drivers.Open("file", dir, zap.NewNop()) + require.NoError(t, err) + repoStore, _ := fileStore.RepoStore() + ctx := context.Background() + + err = artifacts.Write(ctx, repoStore, "test", catalog) + require.NoError(t, err) + + readCatalog, err := artifacts.Read(ctx, repoStore, registryStore(t), "test", catalog.Path) + require.Equal(t, readCatalog, &drivers.CatalogEntry{ + Name: "Source", + Path: "sources/Source.yaml", + Type: drivers.ObjectTypeSource, + Object: &runtimev1.Source{ + Name: "Source", + Connector: "local_file", + Properties: toProtoStruct(map[string]any{ + "path": "data/source.csv", + "format": "csv", + "duckdb": map[string]any{"delim": "'|'"}, + }), + }, + }) + require.NoError(t, err) + + err = artifacts.Write(ctx, repoStore, "test", readCatalog) + require.NoError(t, err) + + b, err := os.ReadFile(path.Join(dir, readCatalog.Path)) + require.NoError(t, err) + require.Equal(t, raw, string(b)) +} + +func TestHivePartitioningBackwardCompatibility(t *testing.T) { + catalog := &drivers.CatalogEntry{ + Name: "Source", + Path: "sources/Source.yaml", + Type: drivers.ObjectTypeSource, + Object: &runtimev1.Source{ + Name: "Source", + Connector: "local_file", + Properties: toProtoStruct(map[string]any{ + "path": "data/source.csv", + "format": "csv", + "hive_partitioning": true, + }), + }, + } + raw := `type: local_file +path: data/source.csv +format: csv +duckdb: + hive_partitioning: true +` + dir := t.TempDir() + fileStore, err := drivers.Open("file", dir, zap.NewNop()) + require.NoError(t, err) + repoStore, _ := fileStore.RepoStore() + ctx := context.Background() + + err = artifacts.Write(ctx, repoStore, "test", catalog) + require.NoError(t, err) + + readCatalog, err := artifacts.Read(ctx, repoStore, registryStore(t), "test", catalog.Path) + require.Equal(t, readCatalog, &drivers.CatalogEntry{ + Name: "Source", + Path: "sources/Source.yaml", + Type: drivers.ObjectTypeSource, + Object: &runtimev1.Source{ + Name: "Source", + Connector: "local_file", + Properties: toProtoStruct(map[string]any{ + "path": "data/source.csv", + "format": "csv", + "duckdb": map[string]any{"hive_partitioning": true}, + }), + }, + }) + require.NoError(t, err) + + err = artifacts.Write(ctx, repoStore, "test", readCatalog) + require.NoError(t, err) + + b, err := os.ReadFile(path.Join(dir, readCatalog.Path)) + require.NoError(t, err) + require.Equal(t, raw, string(b)) +} + func TestMetricsLabelBackwardsCompatibility(t *testing.T) { dir := t.TempDir() fileStore, err := drivers.Open("file", dir, zap.NewNop()) @@ -304,9 +415,10 @@ func TestReadWithEnvVariables(t *testing.T) { filePath: "sources/Source.yaml", content: `type: s3 uri: "s3://bucket/file" -csv.delimiter: '{{.env.delimitter}}' format: csv region: {{.env.region}} +duckdb: + delim: '''{{.env.delimitter}}''' `, want: &drivers.CatalogEntry{ Name: "Source", @@ -316,10 +428,10 @@ region: {{.env.region}} Name: "Source", Connector: "s3", Properties: toProtoStruct(map[string]any{ - "path": "s3://bucket/file", - "csv.delimiter": "|", - "format": "csv", - "region": "us-east-2", + "path": "s3://bucket/file", + "format": "csv", + "region": "us-east-2", + "duckdb": map[string]any{"delim": "'|'"}, }), }, }, diff --git a/runtime/services/catalog/artifacts/yaml/objects.go b/runtime/services/catalog/artifacts/yaml/objects.go index c1431493726..16f9a964c3f 100644 --- a/runtime/services/catalog/artifacts/yaml/objects.go +++ b/runtime/services/catalog/artifacts/yaml/objects.go @@ -33,6 +33,7 @@ type Source struct { Timeout int32 `yaml:"timeout,omitempty"` ExtractPolicy *ExtractPolicy `yaml:"extract,omitempty"` Format string `yaml:"format,omitempty" mapstructure:"format,omitempty"` + DuckDBProps map[string]any `yaml:"duckdb,omitempty" mapstructure:"duckdb,omitempty"` } type ExtractPolicy struct { @@ -145,9 +146,27 @@ func fromSourceArtifact(source *Source, path string) (*drivers.CatalogEntry, err if source.Region != "" { props["region"] = source.Region } + + if source.DuckDBProps != nil { + props["duckdb"] = source.DuckDBProps + } + if source.CsvDelimiter != "" { - props["csv.delimiter"] = source.CsvDelimiter + // backward compatibility + if _, defined := props["duckdb"]; !defined { + props["duckdb"] = map[string]any{} + } + props["duckdb"].(map[string]any)["delim"] = fmt.Sprintf("'%v'", source.CsvDelimiter) + } + + if source.HivePartition != nil { + // backward compatibility + if _, defined := props["duckdb"]; !defined { + props["duckdb"] = map[string]any{} + } + props["duckdb"].(map[string]any)["hive_partitioning"] = *source.HivePartition } + if source.GlobMaxTotalSize != 0 { props["glob.max_total_size"] = source.GlobMaxTotalSize } @@ -168,10 +187,6 @@ func fromSourceArtifact(source *Source, path string) (*drivers.CatalogEntry, err props["endpoint"] = source.S3Endpoint } - if source.HivePartition != nil { - props["hive_partitioning"] = *source.HivePartition - } - if source.Format != "" { props["format"] = source.Format } diff --git a/runtime/services/catalog/artifacts/yaml/yaml.go b/runtime/services/catalog/artifacts/yaml/yaml.go index 6a86d9e7cc6..614e09497cc 100644 --- a/runtime/services/catalog/artifacts/yaml/yaml.go +++ b/runtime/services/catalog/artifacts/yaml/yaml.go @@ -8,7 +8,7 @@ import ( "github.com/rilldata/rill/runtime/drivers" "github.com/rilldata/rill/runtime/services/catalog/artifacts" - "gopkg.in/yaml.v2" + "gopkg.in/yaml.v3" ) type artifact struct{} diff --git a/scripts/druid-import.sh b/scripts/druid-import.sh index 55e1e198701..5240b1beeec 100755 --- a/scripts/druid-import.sh +++ b/scripts/druid-import.sh @@ -41,5 +41,5 @@ printf "Downloading data using query : $query \n" curl -XPOST -H'Content-Type: application/json' -u "$user:$pass" https://$druid/druid/v2/sql/ -d @/tmp/$datasource-query.json > /tmp/$datasource.csv printf "Importing to Rill Developer project: $project \n" -npm run cli --silent -- import-source /tmp/$datasource.csv --project $project --delimiter "," +npm run cli --silent -- import-source /tmp/$datasource.csv --project $project diff --git a/web-local/test/data/Users.json b/web-local/test/data/Users.json new file mode 100644 index 00000000000..cd3776fe34f --- /dev/null +++ b/web-local/test/data/Users.json @@ -0,0 +1,10 @@ +{"id":1,"name":"Alice","isActive":true,"createdDate":"2022-12-01T12:00:00Z","address":{"street":"123 Main St","city":"New York","postalCode":"10001"},"tags":["developer","python"],"projects":[{"projectId":101,"projectName":"Project Alpha","startDate":"2022-01-01","endDate":"2022-06-30"}],"scores":[85,90,88,92,78],"flag": true} +{"id":2,"name":"Bob","isActive":false,"createdDate":"2022-01-15T15:30:00Z","address":{"street":"456 Elm St","city":"San Francisco","postalCode":"94102"},"tags":["designer","javascript"],"projects":[{"projectId":102,"projectName":"Project Beta","startDate":"2022-03-01","endDate":"2022-08-31"}],"scores":[76,82,88,70,85],"flag": true} +{"id":3,"name":"Carol","isActive":true,"createdDate":"2021-11-01T11:45:00Z","address":{"street":"789 Pine St","city":"Seattle","postalCode":"98101"},"tags":["product manager","agile"],"projects":[{"projectId":103,"projectName":"Project Gamma","startDate":"2022-04-01","endDate":"2022-09-30"}],"scores":[93,88,90,85,95],"flag": true} +{"id":4,"name":"David","isActive":true,"createdDate":"2021-10-01T09:30:00Z","address":{"street":"12 Oak St","city":"Chicago","postalCode":"60601"},"tags":["QA engineer","automation"],"projects":[{"projectId":104,"projectName":"Project Delta","startDate":"2022-05-01","endDate":"2022-10-31"}],"scores":[89,91,85,80,87],"flag": true} +{"id":5,"name":"Eve","isActive":false,"createdDate":"2021-08-15T14:00:00Z","address":{"street":"33 Maple St","city":"Los Angeles","postalCode":"90001"},"tags":["data analyst","SQL"],"projects":[{"projectId":105,"projectName":"Project Epsilon","startDate":"2022-02-01","endDate":"2022-07-31"}],"scores":[95,88,89,92,81],"flag": false} +{"id":6,"name":"Frank","isActive":true,"createdDate":"2021-07-01T18:00:00Z","address":{"street":"66 Cherry St","city":"Boston","postalCode":"02108"},"tags":["DevOps","Kubernetes"],"projects":[{"projectId":106,"projectName":"Project Zeta","startDate":"2022-06-01","endDate":"2022-11-30"}],"scores":[88,89,91,86,84],"flag": true} +{"id":7,"name":"Grace","isActive":false,"createdDate":"2021-06-15T16:30:00Z","address":{"street":"99 Walnut St","city":"Houston","postalCode":"77002"},"tags":["network engineer","Cisco"],"projects":[{"projectId":107,"projectName":"Project Eta","startDate":"2022-07-01","endDate":"2023-01-31"}],"scores":[81,79,85,90,88],"flag": true} +{"id":8,"name":"Helen","isActive":true,"createdDate":"2021-05-01T12:30:00Z","address":{"street":"22 Birch St","city":"Phoenix","postalCode":"85001"},"tags":["UX designer","Sketch"],"projects":[{"projectId":108,"projectName":"Project Theta","startDate":"2022-08-01","endDate":"2023-02-28"}],"scores":[92,90,87,89,86],"flag": true} +{"id":9,"name":"Ivan","isActive":false,"createdDate":"2021-04-15T10:00:00Z","address":{"street":"55 Cedar St","city":"Philadelphia","postalCode":"19102"},"tags":["business analyst","Tableau"],"projects":[{"projectId":109,"projectName":"Project Iota","startDate":"2022-09-01","endDate":"2023-03-31"}],"scores":[84,86,88,90,87],"flag": false} +{"id":10,"name":"Jack","isActive":true,"createdDate":"2021-03-01T08:30:00Z","address":{"street":"88 Willow St","city":"San Diego","postalCode":"92101"},"tags":["mobile developer","React Native"],"projects":[{"projectId":110,"projectName":"Project Kappa","startDate":"2022-10-01","endDate":"2023-04-30"}],"scores":[89,93,90,88,85],"flag": true}