Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

model executor for s3/gcs/azure to duckdb #6353

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
model executor for object storage to duckdb
  • Loading branch information
k-anshul committed Jan 6, 2025
commit e2c80c0ad40fa40b06c57548f13ac38b5d644a17
8 changes: 4 additions & 4 deletions runtime/drivers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ var spec = drivers.Spec{

type driver struct{}

type configProperties struct {
type ConfigProperties struct {
Account string `mapstructure:"azure_storage_account"`
Key string `mapstructure:"azure_storage_key"`
SASToken string `mapstructure:"azure_storage_sas_token"`
Expand All @@ -87,7 +87,7 @@ func (d driver) Open(instanceID string, config map[string]any, st *storage.Clien
return nil, errors.New("azure driver can't be shared")
}

conf := &configProperties{}
conf := &ConfigProperties{}
err := mapstructure.WeakDecode(config, conf)
if err != nil {
return nil, err
Expand All @@ -112,7 +112,7 @@ func (d driver) HasAnonymousSourceAccess(ctx context.Context, props map[string]a
}

conn := &Connection{
config: &configProperties{},
config: &ConfigProperties{},
logger: logger,
}

Expand All @@ -130,7 +130,7 @@ func (d driver) TertiarySourceConnectors(ctx context.Context, src map[string]any
}

type Connection struct {
config *configProperties
config *ConfigProperties
storage *storage.Client
logger *zap.Logger
}
Expand Down
3 changes: 3 additions & 0 deletions runtime/drivers/duckdb/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,9 @@ func (c *connection) AsModelExecutor(instanceID string, opts *drivers.ModelExecu
if opts.InputHandle.Driver() == "mysql" || opts.InputHandle.Driver() == "postgres" {
return &sqlStoreToSelfExecutor{c}, true
}
if _, ok := opts.InputHandle.AsObjectStore(); ok {
return &objectStoreToSelfExecutor{c}, true
}
}
if opts.InputHandle == c {
if opts.OutputHandle.Driver() == "file" {
Expand Down
180 changes: 180 additions & 0 deletions runtime/drivers/duckdb/model_executor_objectstore_self.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package duckdb

import (
"context"
"fmt"
"strings"

"github.com/mitchellh/mapstructure"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/drivers/azure"
"github.com/rilldata/rill/runtime/drivers/s3"
"github.com/rilldata/rill/runtime/pkg/fileutil"
"go.uber.org/zap"
)

type s3InputProps struct {
Path string `mapstructure:"path"`
Format drivers.FileFormat `mapstructure:"format"`
Endpoint string `mapstructure:"endpoint"`
Region string `mapstructure:"region"`
DuckDB map[string]any `mapstructure:"duckdb"`
}

func (p *s3InputProps) Validate() error {
if p.Path == "" {
return fmt.Errorf("path is mandatory for s3 input connector")
}
return nil
}

type objectStoreToSelfExecutor struct {
c *connection
}

var _ drivers.ModelExecutor = &objectStoreToSelfExecutor{}

func (e *objectStoreToSelfExecutor) Concurrency(desired int) (int, bool) {
if desired > 1 {
return 0, false
}
return 1, true
}

func (e *objectStoreToSelfExecutor) Execute(ctx context.Context, opts *drivers.ModelExecuteOptions) (*drivers.ModelResult, error) {
inputProps := &s3InputProps{}
if err := mapstructure.WeakDecode(opts.InputProperties, inputProps); err != nil {
return nil, fmt.Errorf("failed to parse input properties: %w", err)
}
if err := inputProps.Validate(); err != nil {
return nil, fmt.Errorf("invalid input properties: %w", err)
}

outputProps := &ModelOutputProperties{}
if err := mapstructure.WeakDecode(opts.OutputProperties, outputProps); err != nil {
return nil, fmt.Errorf("failed to parse output properties: %w", err)
}
if err := outputProps.Validate(opts); err != nil {
return nil, fmt.Errorf("invalid output properties: %w", err)
}

// Build the model executor options with updated input and output properties
clone := *opts

newInputProps, err := e.modelInputProperties(opts.InputHandle, inputProps, opts.ModelName)
if err != nil {
return nil, err
}
clone.InputProperties = newInputProps

newOutputProps := make(map[string]any)
err = mapstructure.WeakDecode(outputProps, &newOutputProps)
if err != nil {
return nil, err
}
clone.OutputProperties = newOutputProps
newOpts := &clone

// execute
executor := &selfToSelfExecutor{c: e.c}
return executor.Execute(ctx, newOpts)
}

func (e *objectStoreToSelfExecutor) modelInputProperties(inputHandle drivers.Handle, inputProps *s3InputProps, model string) (map[string]any, error) {
m := &ModelInputProperties{}
var format string
if inputProps.Format != "" {
format = fmt.Sprintf(".%s", inputProps.Format)
} else {
format = fileutil.FullExt(inputProps.Path)
}

// Generate secret SQL to access the service and set as pre_exec_query
switch inputHandle.Driver() {
case "s3":
safeSecretName := safeName(model + "_s3_secret_")
s3Config := &s3.ConfigProperties{}
err := mapstructure.WeakDecode(inputHandle.Config(), s3Config)
if err != nil {
return nil, fmt.Errorf("failed to parse s3 config properties: %w", err)
}
var sb strings.Builder
sb.WriteString("CREATE OR REPLACE TEMPORARY SECRET ")
sb.WriteString(safeSecretName)
sb.WriteString(" (TYPE S3,")
if s3Config.AllowHostAccess {
sb.WriteString(" PROVIDER CREDENTIAL_CHAIN")
}
if s3Config.AccessKeyID != "" {
fmt.Fprintf(&sb, ", KEY_ID %s, SECRET %s, SESSION_TOKEN %s", safeSQLString(s3Config.AccessKeyID), safeSQLString(s3Config.SecretAccessKey), safeSQLString(s3Config.SessionToken))
}
if inputProps.Endpoint != "" {
sb.WriteString(", ENDPOINT ")
sb.WriteString(safeSQLString(inputProps.Endpoint))
}
if inputProps.Region != "" {
sb.WriteString(", REGION ")
sb.WriteString(safeSQLString(inputProps.Region))
}
sb.WriteRune(')')
m.PreExec = sb.String()
case "gcs":
safeSecretName := safeName(model + "_gcs_secret_")
// GCS works via S3 compatibility mode
s3Config := &s3.ConfigProperties{}
err := mapstructure.WeakDecode(inputHandle.Config(), s3Config)
if err != nil {
return nil, fmt.Errorf("failed to parse s3 config properties: %w", err)
}
var sb strings.Builder
sb.WriteString("CREATE OR REPLACE TEMPORARY SECRET ")
sb.WriteString(safeSecretName)
sb.WriteString(" (TYPE GCS,")
if s3Config.AllowHostAccess {
sb.WriteString(" PROVIDER CREDENTIAL_CHAIN")
}
if s3Config.AccessKeyID != "" {
fmt.Fprintf(&sb, ", KEY_ID %s, SECRET %s", safeSQLString(s3Config.AccessKeyID), safeSQLString(s3Config.SecretAccessKey))
}
sb.WriteRune(')')
m.PreExec = sb.String()
case "azure":
safeSecretName := safeName(model + "_azure_secret_")
azureConfig := &azure.ConfigProperties{}
err := mapstructure.WeakDecode(inputHandle.Config(), azureConfig)
if err != nil {
return nil, fmt.Errorf("failed to parse s3 config properties: %w", err)
}
var sb strings.Builder
sb.WriteString("CREATE OR REPLACE TEMPORARY SECRET ")
sb.WriteString(safeSecretName)
sb.WriteString(" (TYPE AZURE,")
if azureConfig.AllowHostAccess {
sb.WriteString(" PROVIDER CREDENTIAL_CHAIN")
}
if azureConfig.ConnectionString != "" {
fmt.Fprintf(&sb, ", CONNECTION_STRING %s", safeSQLString(azureConfig.ConnectionString))
}
if azureConfig.Account != "" {
fmt.Fprintf(&sb, ", ACCOUNT_NAME %s", safeSQLString(azureConfig.Account))
}
sb.WriteRune(')')
m.PreExec = sb.String()
default:
return nil, fmt.Errorf("internal error: unsupported object store: %s", inputHandle.Driver())
}

// Set SQL to read from the external source
from, err := sourceReader([]string{inputProps.Path}, format, inputProps.DuckDB)
if err != nil {
return nil, err
}
m.SQL = "SELECT * FROM " + from
e.c.logger.Debug("objectStoreToSelfExecutor: generated model input properties", zap.String("sql", m.SQL), zap.String("pre_exec", m.PreExec), zap.String("table", model))

propsMap := make(map[string]any)
if err := mapstructure.Decode(m, &propsMap); err != nil {
return nil, err
}
return propsMap, nil
}
5 changes: 4 additions & 1 deletion runtime/drivers/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ var spec = drivers.Spec{
type driver struct{}

type configProperties struct {
SecretJSON string `mapstructure:"google_application_credentials"`
SecretJSON string `mapstructure:"google_application_credentials"`
// When working in s3 compatible mode
AccessKeyID string `mapstructure:"aws_access_key_id"`
SecretAccessKey string `mapstructure:"aws_secret_access_key"`
k-anshul marked this conversation as resolved.
Show resolved Hide resolved
AllowHostAccess bool `mapstructure:"allow_host_access"`
}

Expand Down