Skip to content

Commit

Permalink
Utilize any parameter of DuckDB CSV/JSON/Parquet "read" function (ril…
Browse files Browse the repository at this point in the history
…ldata#2194)

* Pass all properties (located in "duckdb" section) of a source config to corresponding DuckDB functions on ingestion

* Reformatted go sources

* Backward compatible ingestion properties: csv.delimiter and hive_partitioning

* Added some comments

* Reformatted go sources

* Updated csv delimiter property name

* Renamed a function

* Included "duckdb.*" props into ser/de

* Fixed unit tests

* Reformatted go sources

* Simplified a comparison of properties

* duckdb prefix + backward compatibility for csv.delimiter and hive_partitioning

* Added unit tests of backward compatibility

* Added unit tests of backward compatibility

* Removed the use of csv.delimiter and hive_partitioning

* Removed the use of csv.delimiter and hive_partitioning

---------

Co-authored-by: e.sevastyanov <eugene.sevastianov@rilldata.com>
Co-authored-by: Benjamin Egelund-Müller <b@egelund-muller.com>
  • Loading branch information
3 people authored May 4, 2023
1 parent 7e8fe84 commit db468f4
Show file tree
Hide file tree
Showing 16 changed files with 553 additions and 142 deletions.
5 changes: 0 additions & 5 deletions cli/cmd/source/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ func AddCmd(cfg *config.Config) *cobra.Command {
var olapDSN string
var projectPath string
var sourceName string
var delimiter string
var force bool
var verbose bool

Expand Down Expand Up @@ -55,9 +54,6 @@ func AddCmd(cfg *config.Config) *cobra.Command {
}

props := map[string]any{"path": dataPath}
if delimiter != "" {
props["csv.delimiter"] = delimiter
}

propsPB, err := structpb.NewStruct(props)
if err != nil {
Expand Down Expand Up @@ -99,7 +95,6 @@ func AddCmd(cfg *config.Config) *cobra.Command {
addCmd.Flags().StringVar(&projectPath, "path", ".", "Project directory")
addCmd.Flags().StringVar(&olapDSN, "db", local.DefaultOLAPDSN, "Database DSN")
addCmd.Flags().StringVar(&olapDriver, "db-driver", local.DefaultOLAPDriver, "Database driver")
addCmd.Flags().StringVar(&delimiter, "delimiter", "", "CSV delimiter override (defaults to autodetect)")
addCmd.Flags().BoolVar(&verbose, "verbose", false, "Sets the log level to debug")

return addCmd
Expand Down
8 changes: 0 additions & 8 deletions docs/docs/reference/cli-cheat-sheet.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,6 @@ By default the source name will be a sanitized version of the dataset file name.
rill source add /path/to/data.parquet --name my_source
```

### Custom CSV delimiters

If you have a CSV file that is delimited by a character other than a comma or tab, you can use the `--delimiter` option. If a delimiter is not set, Rill automatically tries to detect the delimiter, so this is not strictly necessary:

```
rill source add /path/to/data.csv --delimiter "|"
```

## Dropping a source

If you have added a source to Rill that you want to drop, run:
Expand Down
1 change: 0 additions & 1 deletion docs/docs/reference/cli/source/add.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ rill source add <file> [flags]
--path string Project directory (default ".")
--db string Database DSN (default "stage.db")
--db-driver string Database driver (default "duckdb")
--delimiter string CSV delimiter override (defaults to autodetect)
--verbose Sets the log level to debug
```

Expand Down
4 changes: 0 additions & 4 deletions docs/docs/reference/project-files/sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ In your Rill project directory, create a `<source_name>.yaml` file in the `sourc
**`timeout`**
— The maximum time to wait for souce ingestion.

**`hive_partitioning`**
— If set to true, hive style partitioning is transformed into column values in the data source on ingestion.
- _`true`_ by default

**`extract`** - Optionally limit the data ingested from remote sources (S3/GCS only)
- **`rows`** - limits the size of data fetched
- **`strategy`** - strategy to fetch data (**head** or **tail**)
Expand Down
4 changes: 0 additions & 4 deletions runtime/compilers/rillv1beta/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,6 @@ func (c *Codec) PutSource(ctx context.Context, repo drivers.RepoStore, instanceI
out.Region = val
}

if val, ok := props["csv.delimiter"].(string); ok {
out.CSVDelimiter = val
}

blob, err := yaml.Marshal(out)
if err != nil {
return "", err
Expand Down
9 changes: 4 additions & 5 deletions runtime/compilers/rillv1beta/yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import (
var alphaNumericRegex = regexp.MustCompile("[^A-Za-z0-9]+")

type Source struct {
Type string
URI string `yaml:"uri,omitempty"`
Path string `yaml:"path,omitempty"`
Region string `yaml:"region,omitempty"`
CSVDelimiter string `yaml:"csv.delimiter,omitempty"`
Type string
URI string `yaml:"uri,omitempty"`
Path string `yaml:"path,omitempty"`
Region string `yaml:"region,omitempty"`
}

type ProjectConfig struct {
Expand Down
14 changes: 2 additions & 12 deletions runtime/connectors/connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package connectors
import (
"context"
"fmt"
"reflect"

runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
)
Expand Down Expand Up @@ -176,16 +177,5 @@ func ConsumeAsIterator(ctx context.Context, env *Env, source *Source) (FileItera
}

func (s *Source) PropertiesEquals(o *Source) bool {
if len(s.Properties) != len(o.Properties) {
return false
}

for k1, v1 := range s.Properties {
v2, ok := o.Properties[k1]
if !ok || v1 != v2 {
return false
}
}

return true
return reflect.DeepEqual(s.Properties, o.Properties)
}
50 changes: 50 additions & 0 deletions runtime/connectors/connectors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,50 @@ func TestPropertiesEquals(t *testing.T) {
Properties: map[string]any{"a": 100, "c": "hello world"},
}

s5 := &Source{
Name: "s5",
Properties: map[string]any{
"number": 0,
"string": "hello world",
"nestedMap": map[string]any{
"nestedMap": map[string]any{
"string": "value",
"number": 2,
},
"string": "value",
"number": 1,
},
},
}

s6 := &Source{
Name: "s6",
Properties: map[string]any{
"number": 0,
"string": "hello world",
"nestedMap": map[string]any{
"number": 1,
"string": "value",
"nestedMap": map[string]any{
"number": 2,
"string": "value",
},
},
},
}

s7 := &Source{
Name: "s7",
Properties: map[string]any{
"number": 0,
"string": "hello world",
"nestedMap": map[string]any{
"number": 1,
"string": "value",
},
},
}

// s1 and s2 should be equal
require.True(t, s1.PropertiesEquals(s2) && s2.PropertiesEquals(s1))

Expand All @@ -37,4 +81,10 @@ func TestPropertiesEquals(t *testing.T) {
// s2 should not equal s3 or s4
require.False(t, s2.PropertiesEquals(s3) || s3.PropertiesEquals(s2))
require.False(t, s2.PropertiesEquals(s4) || s4.PropertiesEquals(s2))

// s5 and s6 should be equal
require.True(t, s5.PropertiesEquals(s6) && s6.PropertiesEquals(s5))

// s6 and s7 should not be equal
require.False(t, s6.PropertiesEquals(s7) || s7.PropertiesEquals(s6))
}
14 changes: 2 additions & 12 deletions runtime/connectors/localfile/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,12 @@ var spec = connectors.Spec{
Description: "Either CSV or Parquet. Inferred if not set.",
Placeholder: "csv",
},
{
Key: "csv.delimiter",
Type: connectors.StringPropertyType,
Required: false,
DisplayName: "CSV Delimiter",
Description: "Force delimiter for a CSV file.",
Placeholder: ",",
},
},
}

type Config struct {
Path string `mapstructure:"path"`
Format string `mapstructure:"format"`
CSVDelimiter string `mapstructure:"csv.delimiter"`
HivePartition *bool `mapstructure:"hive_partitioning"`
Path string `mapstructure:"path"`
Format string `mapstructure:"format"`
}

func ParseConfig(props map[string]any) (*Config, error) {
Expand Down
137 changes: 84 additions & 53 deletions runtime/drivers/duckdb/connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,24 +77,7 @@ func (c *connection) ingest(ctx context.Context, env *connectors.Env, source *co

// for files downloaded locally from remote sources
func (c *connection) ingestIteratorFiles(ctx context.Context, source *connectors.Source, filenames []string, appendToTable bool) error {
format := ""
if value, ok := source.Properties["format"]; ok {
format = value.(string)
}

delimiter := ""
if value, ok := source.Properties["csv.delimiter"]; ok {
delimiter = value.(string)
}

hivePartition := 1
if value, ok := source.Properties["hive_partitioning"]; ok {
if !value.(bool) {
hivePartition = 0
}
}

from, err := sourceReader(filenames, delimiter, format, hivePartition)
from, err := sourceReader(filenames, source.Properties)
if err != nil {
return err
}
Expand Down Expand Up @@ -129,12 +112,7 @@ func (c *connection) ingestLocalFiles(ctx context.Context, env *connectors.Env,
return fmt.Errorf("file does not exist at %s", conf.Path)
}

hivePartition := 1
if conf.HivePartition != nil && !*conf.HivePartition {
hivePartition = 0
}

from, err := sourceReader(localPaths, conf.CSVDelimiter, conf.Format, hivePartition)
from, err := sourceReader(localPaths, source.Properties)
if err != nil {
return err
}
Expand All @@ -144,35 +122,6 @@ func (c *connection) ingestLocalFiles(ctx context.Context, env *connectors.Env,
return c.Exec(ctx, &drivers.Statement{Query: qry, Priority: 1})
}

func sourceReader(paths []string, csvDelimiter, format string, hivePartition int) (string, error) {
if format == "" {
format = fileutil.FullExt(paths[0])
} else {
// users will set format like csv, tsv, parquet
// while infering format from file name extensions its better to rely on .csv, .parquet
format = fmt.Sprintf(".%s", format)
}

if format == "" {
return "", fmt.Errorf("invalid file")
} else if strings.Contains(format, ".csv") || strings.Contains(format, ".tsv") || strings.Contains(format, ".txt") {
return sourceReaderWithDelimiter(paths, csvDelimiter), nil
} else if strings.Contains(format, ".parquet") {
return fmt.Sprintf("read_parquet(['%s'], HIVE_PARTITIONING=%v)", strings.Join(paths, "','"), hivePartition), nil
} else if strings.Contains(format, ".json") || strings.Contains(format, ".ndjson") {
return fmt.Sprintf("read_json_auto(['%s'], sample_size=-1)", strings.Join(paths, "','")), nil
} else {
return "", fmt.Errorf("file type not supported : %s", format)
}
}

func sourceReaderWithDelimiter(paths []string, delimiter string) string {
if delimiter == "" {
return fmt.Sprintf("read_csv_auto(['%s'], sample_size=-1)", strings.Join(paths, "','"))
}
return fmt.Sprintf("read_csv_auto(['%s'], delim='%s', sample_size=-1)", strings.Join(paths, "','"), delimiter)
}

func fileSize(paths []string) int64 {
var size int64
for _, path := range paths {
Expand Down Expand Up @@ -201,3 +150,85 @@ func resolveLocalPath(env *connectors.Env, path, sourceName string) (string, err
}
return finalPath, nil
}

func sourceReader(paths []string, properties map[string]any) (string, error) {
format, formatDefined := properties["format"].(string)
if formatDefined {
format = fmt.Sprintf(".%s", format)
} else {
format = fileutil.FullExt(paths[0])
}

var ingestionProps map[string]any
if duckDBProps, ok := properties["duckdb"].(map[string]any); ok {
ingestionProps = duckDBProps
} else {
ingestionProps = map[string]any{}
}

// Generate a "read" statement
if containsAny(format, []string{".csv", ".tsv", ".txt"}) {
// CSV reader
return generateReadCsvStatement(paths, ingestionProps)
} else if strings.Contains(format, ".parquet") {
// Parquet reader
return generateReadParquetStatement(paths, ingestionProps)
} else if containsAny(format, []string{".json", ".ndjson"}) {
// JSON reader
return generateReadJSONStatement(paths, ingestionProps)
} else {
return "", fmt.Errorf("file type not supported : %s", format)
}
}

func containsAny(s string, targets []string) bool {
source := strings.ToLower(s)
for _, target := range targets {
if strings.Contains(source, target) {
return true
}
}
return false
}

func generateReadCsvStatement(paths []string, properties map[string]any) (string, error) {
// auto_detect (enables auto-detection of parameters) is true by default, it takes care of params/schema
return fmt.Sprintf("read_csv_auto(%s)", convertToStatementParamsStr(paths, properties)), nil
}

func generateReadParquetStatement(paths []string, properties map[string]any) (string, error) {
ingestionProps := copyMap(properties)
// set hive_partitioning to true by default
if _, hivePartitioningDefined := ingestionProps["hive_partitioning"]; !hivePartitioningDefined {
ingestionProps["hive_partitioning"] = true
}
return fmt.Sprintf("read_parquet(%s)", convertToStatementParamsStr(paths, ingestionProps)), nil
}

func generateReadJSONStatement(paths []string, properties map[string]any) (string, error) {
ingestionProps := copyMap(properties)
// auto_detect is false by default so setting it to true simplifies the ingestion
// if columns are defined then DuckDB turns the auto-detection off so no need to check this case here
if _, autoDetectDefined := ingestionProps["auto_detect"]; !autoDetectDefined {
ingestionProps["auto_detect"] = true
}
return fmt.Sprintf("read_json(%s)", convertToStatementParamsStr(paths, ingestionProps)), nil
}

func copyMap(originalMap map[string]any) map[string]any {
newMap := make(map[string]any, len(originalMap))
for key, value := range originalMap {
newMap[key] = value
}
return newMap
}

func convertToStatementParamsStr(paths []string, properties map[string]any) string {
ingestionParamsStr := make([]string, 0, len(properties)+1)
// The first parameter is a source path
ingestionParamsStr = append(ingestionParamsStr, fmt.Sprintf("['%s']", strings.Join(paths, "','")))
for key, value := range properties {
ingestionParamsStr = append(ingestionParamsStr, fmt.Sprintf("%s=%v", key, value))
}
return strings.Join(ingestionParamsStr, ",")
}
Loading

0 comments on commit db468f4

Please sign in to comment.