Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions cmd/stroppy/commands/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/stroppy-io/stroppy/internal/runner"
)
Expand All @@ -15,7 +16,7 @@ const (
)

var Cmd = &cobra.Command{
Use: "run <script.ts> [sql_file.sql]",
Use: "run <script.ts> [sql_file.sql] [-- <k6 run direct args>]",
Short: "Run benchmark script with k6",
Long: `Run a TypeScript benchmark script with k6.

Expand All @@ -34,7 +35,13 @@ Examples:
sqlPath = args[1]
}

r, err := runner.NewScriptRunner(scriptPath, sqlPath)
var afterDash []string
if dashIdx := pflag.CommandLine.ArgsLenAtDash(); dashIdx != -1 {
// Everything after --
afterDash = pflag.Args()[dashIdx:]
}

r, err := runner.NewScriptRunner(scriptPath, sqlPath, afterDash)
if err != nil {
return fmt.Errorf("failed to create runner: %w", err)
}
Expand Down
16 changes: 11 additions & 5 deletions cmd/xk6air/driver_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,19 @@ type DriverWrapper struct {
// https://github.com/grafana/k6/issues?q=is%3Aopen+is%3Aissue+label%3Anew-http
// https://github.com/grafana/k6/issues/2293
func (d *DriverWrapper) configure() {
d.configureOnce.Do(
func() {
d.drv.Configure(d.vu.Context(), driver.Options{
if rootModule.sharedDrv != nil {
rootModule.once.Do(func() {
rootModule.sharedDrv.Configure(rootModule.ctx, driver.Options{
DialFunc: d.vu.State().Dialer.DialContext,
})
},
)
})
return
}
d.configureOnce.Do(func() {
d.drv.Configure(d.vu.Context(), driver.Options{
DialFunc: d.vu.State().Dialer.DialContext,
})
})
}

func (d *DriverWrapper) RunQuery(sql string, args map[string]any) any {
Expand Down
57 changes: 40 additions & 17 deletions cmd/xk6air/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ func NewInstance(vu modules.VU) modules.Instance {
if state := vu.State(); state != nil {
VUID = state.VUID
}
return &Instance{
i := &Instance{
vu: vu,
lg: logger.
NewFromEnv().
Named("k6-vu").
With(zap.Uint64("VUID", uint64(VUID))).
WithOptions(zap.AddStacktrace(zap.FatalLevel)),
}
rootModule.addVuTeardown(i)
return i
}

func (i *Instance) Exports() modules.Exports {
Expand All @@ -49,14 +51,14 @@ func (i *Instance) Exports() modules.Exports {
Named: map[string]any{
"NotifyStep": rootModule.NotifyStep,
"NewDriverByConfigBin": i.NewDriverByConfigBin,
"Teardown": i.Teardown,
"Teardown": rootModule.Teardown,
"NewGeneratorByRuleBin": NewGeneratorByRuleBin,
"NewGroupGeneratorByRulesBin": NewGroupGeneratorByRulesBin,
},
}
}

var onceDefineConfig sync.Once
var onceGetConfig sync.Once

// NewDriverByConfigBin initializes the driver from GlobalConfig.
// This is called by scripts using defineConfig(globalConfig) at the top level.
Expand All @@ -65,28 +67,24 @@ var onceDefineConfig sync.Once
// i.vu.State() is nil
func (i *Instance) NewDriverByConfigBin(configBin []byte) *DriverWrapper {
var globalCfg stroppy.GlobalConfig
err := proto.Unmarshal(configBin, &globalCfg)
if err != nil {
if err := proto.Unmarshal(configBin, &globalCfg); err != nil {
i.lg.Fatal("error unmarshalling GlobalConfig", zap.Error(err))
}
drvCfg := globalCfg.GetDriver()
if drvCfg == nil {
i.lg.Fatal("GlobalConfig.driver is required")
}

drv, err := driver.Dispatch(rootModule.ctx, i.lg, drvCfg)
if err != nil {
i.lg.Fatal("can't initialize driver", zap.Error(err))
}

onceDefineConfig.Do(func() {
onceGetConfig.Do(func() {
rootModule.cloudClient.NotifyRun(rootModule.ctx, &stroppy.StroppyRun{
Id: &stroppy.Ulid{Value: rootModule.runULID.String()},
Status: stroppy.Status_STATUS_RUNNING,
Cmd: "",
})
})

drv := i.getOrCreateDriver(&globalCfg)

i.dw = &DriverWrapper{
vu: i.vu,
lg: i.lg,
Expand All @@ -95,14 +93,39 @@ func (i *Instance) NewDriverByConfigBin(configBin []byte) *DriverWrapper {
return i.dw
}

func (i *Instance) getOrCreateDriver(cfg *stroppy.GlobalConfig) (drv driver.Driver) {
var err error
if cfg.GetDriver().GetConnectionType().GetSingleConnPerVu() != nil {
if drv, err = driver.Dispatch(rootModule.ctx, i.lg, cfg.GetDriver()); err != nil {
i.lg.Fatal("can't initialize driver", zap.Error(err))
}
return drv
}

if rootModule.sharedDrv != nil {
return rootModule.sharedDrv
}

if cfg.GetDriver().GetConnectionType() == nil {
// NOTE: unfortunately we have no good suggestion on which amount of connections we may use.
// Nice idea to use i.State().Options.VUs, but it's not available at pre-init state.
cfg.GetDriver().ConnectionType = &stroppy.DriverConfig_ConnectionType{
Is: &stroppy.DriverConfig_ConnectionType_SharedPool{},
}
}

rootModule.sharedDrv, err = driver.Dispatch(rootModule.ctx, i.lg, cfg.GetDriver())
if err != nil {
i.lg.Fatal("can't initialize shared driver", zap.Error(err))
}
return rootModule.sharedDrv
}

// Teardown mirrors k6 "function teardown()".
func (i *Instance) Teardown() error {
i.dw.drv.Teardown(i.vu.Context())
if rootModule.sharedDrv == nil {
i.dw.drv.Teardown(i.vu.Context())
}

rootModule.cloudClient.NotifyRun(rootModule.ctx, &stroppy.StroppyRun{
Id: &stroppy.Ulid{Value: rootModule.runULID.String()},
Status: stroppy.Status_STATUS_COMPLETED,
Cmd: "",
})
return nil
}
39 changes: 37 additions & 2 deletions cmd/xk6air/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ package xk6air

import (
"context"
"errors"
"sync"

"github.com/oklog/ulid/v2"
"github.com/stroppy-io/stroppy/cmd/stroppy/commands"
"github.com/stroppy-io/stroppy/pkg/common/logger"
"github.com/stroppy-io/stroppy/pkg/common/proto/stroppy"
"github.com/stroppy-io/stroppy/pkg/common/proto/stroppy/stroppyconnect"
"github.com/stroppy-io/stroppy/pkg/driver"

"go.k6.io/k6/js/modules"
"go.k6.io/k6/subcommand"
Expand All @@ -29,8 +32,9 @@ func init() {
WithOptions(zap.AddStacktrace(zap.FatalLevel))

rootModule = &RootModule{
lg: lg,
ctx: context.Background(),
lg: lg,
ctx: context.Background(),
vuTeardown: make(map[*Instance]func() error),
}

rootModule.runULID, rootModule.cloudClient = NewCloudClient(lg)
Expand All @@ -46,6 +50,12 @@ type RootModule struct {
cloudClient stroppyconnect.CloudStatusServiceClient
runULID ulid.ULID
ctx context.Context

sharedDrv driver.Driver
once sync.Once

vuMutex sync.Mutex
vuTeardown map[*Instance]func() error
}

// NewModuleInstance factory method for Instances.
Expand All @@ -64,3 +74,28 @@ func (r *RootModule) NotifyStep(name string, status int32) {
Name: name,
})
}

func (r *RootModule) addVuTeardown(instance *Instance) {
r.vuMutex.Lock()
r.vuTeardown[instance] = instance.Teardown
r.vuMutex.Unlock()
}

func (r *RootModule) Teardown() error {

var err error
r.vuMutex.Lock()
for _, teardown := range r.vuTeardown {
err = errors.Join(err, teardown())
}
r.vuMutex.Unlock()

r.sharedDrv.Teardown(r.ctx)

_, errCloud := r.cloudClient.NotifyRun(rootModule.ctx, &stroppy.StroppyRun{
Id: &stroppy.Ulid{Value: rootModule.runULID.String()},
Status: stroppy.Status_STATUS_COMPLETED,
Cmd: "",
})
return errors.Join(err, errCloud)
}
34 changes: 34 additions & 0 deletions docs/proto.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@

- [proto/stroppy/config.proto](#proto_stroppy_config-proto)
- [DriverConfig](#stroppy-DriverConfig)
- [DriverConfig.ConnectionType](#stroppy-DriverConfig-ConnectionType)
- [DriverConfig.ConnectionType.Pool](#stroppy-DriverConfig-ConnectionType-Pool)
- [ExporterConfig](#stroppy-ExporterConfig)
- [GlobalConfig](#stroppy-GlobalConfig)
- [GlobalConfig.MetadataEntry](#stroppy-GlobalConfig-MetadataEntry)
Expand Down Expand Up @@ -656,6 +658,38 @@ settings.
| url | [string](#string) | | Database connection URL |
| db_specific | [Value.Struct](#stroppy-Value-Struct) | optional | Database-specific configuration options |
| driver_type | [DriverConfig.DriverType](#stroppy-DriverConfig-DriverType) | | Name/Type of chosen driver |
| connection_type | [DriverConfig.ConnectionType](#stroppy-DriverConfig-ConnectionType) | | Shared connection pool vs own connection for each VU. |






<a name="stroppy-DriverConfig-ConnectionType"></a>

### DriverConfig.ConnectionType



| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| single_conn_per_vu | [google.protobuf.Empty](#google-protobuf-Empty) | | |
| shared_pool | [DriverConfig.ConnectionType.Pool](#stroppy-DriverConfig-ConnectionType-Pool) | | |






<a name="stroppy-DriverConfig-ConnectionType-Pool"></a>

### DriverConfig.ConnectionType.Pool
If shared_connections not set use drivers pool default.


| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| shared_connections | [int32](#int32) | | |



Expand Down
27 changes: 23 additions & 4 deletions internal/runner/script_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,6 @@ func prepareVMEnvironment(
return fmt.Errorf("failed to set __ENV: %w", err)
}

if err := vm.Set("__SQL_FILE", ""); err != nil {
return fmt.Errorf("failed to set __SQL_FILE: %w", err)
}

if err := vm.Set("NewGeneratorByRuleBin", func() {}); err != nil {
return fmt.Errorf("failed to set NewGeneratorByRuleBin: %w", err)
}
Expand All @@ -203,9 +199,32 @@ func prepareVMEnvironment(
return fmt.Errorf("failed to set NewDriverByConfigBin: %w", err)
}

if err := vm.Set("Trend", newDummyWithNoopConstructor(vm)); err != nil {
return fmt.Errorf("failed to set Trend: %w", err)
}

if err := vm.Set("Rate", newDummyWithNoopConstructor(vm)); err != nil {
return fmt.Errorf("failed to set Rate: %w", err)
}

if err := vm.Set("Counter", newDummyWithNoopConstructor(vm)); err != nil {
return fmt.Errorf("failed to set Counter: %w", err)
}

return nil
}

func newDummyWithNoopConstructor(rt *sobek.Runtime) *sobek.Object {
src := `
function MyDummy() {}
MyDummy.prototype.constructor = MyDummy;
MyDummy;
`
val, _ := rt.RunString(src)

return val.ToObject(rt)
}

// setupConfigExtraction registers the config extraction callbacks.
func setupConfigExtraction(vm *sobek.Runtime, extractor *configExtractor) error {
stub := stroppyStub{}
Expand Down
Loading