Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ linters:
- gocognit
- gosec
- maintidx
- unparam
path: _test\.go
- linters:
- goconst
Expand Down
139 changes: 34 additions & 105 deletions run.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,27 @@ import (
"github.com/rs/zerolog"
)

// Run is the entry-point for a standalone processor. It handles all communication
// with Conduit. It will block forever, or until an error occurs. If an error
// occurs, it will be printed to stderr and the process will exit with a non-zero
// exit code. Otherwise, it will exit with a zero exit code.
// Run is the entry-point for a standalone processor. It will set up the handler
// for the exported functions and wrap the provided Processor with middleware.
//
// A processor plugin needs to call this function in its main function. The
// A processor plugin needs to call this function in its init function. The
// entrypoint file should look like this:
//
// //go:build wasm
// //go:build wasm
//
// package main
// package main
//
// import (
// sdk "github.com/conduitio/conduit-processor-sdk"
// )
// import (
// sdk "github.com/conduitio/conduit-processor-sdk"
// )
//
// func main() {
// processor := NewMyProcessor()
// sdk.Run(processor)
// }
// func init() {
// processor := NewMyProcessor()
// sdk.Run(processor)
// }
//
// // Main is required by the Go compiler, but it is not executed.
// func main() {}
func Run(p Processor) {
checkMagicCookie()

Expand All @@ -64,7 +65,6 @@ func Run(p Processor) {
}

ctx = context.Background()
cmd processorv1.CommandRequest
)

wasm.InitUtils(env.logLevel)
Expand All @@ -84,27 +84,10 @@ func Run(p Processor) {
executor := commandExecutor{
protoconv: protoConverter{},
logger: logger,
p: p,
}

for {
logger.Trace().Msg("retrieving next command")
cmd.Reset()
err := wasm.NextCommand(&cmd)
if err != nil {
if errors.Is(err, pprocutils.ErrNoMoreCommands) {
os.Exit(0)
}
_, _ = fmt.Fprintf(os.Stderr, "failed retrieving next command: %v", err)
os.Exit(1)
}

resp := executor.Execute(ctx, p, &cmd)
err = wasm.Reply(resp)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "failed writing reply: %v\n", err)
os.Exit(1)
}
}
wasm.Handler = executor
}

func checkMagicCookie() {
Expand All @@ -129,109 +112,55 @@ func checkMagicCookie() {
type commandExecutor struct {
protoconv protoConverter
logger *zerolog.Logger
p Processor
}

// Execute executes the given command request. It returns a command response
// that will be sent back to Conduit.
func (e commandExecutor) Execute(ctx context.Context, p Processor, cmdReq *processorv1.CommandRequest) *processorv1.CommandResponse {
e.logger.Trace().Type("command", cmdReq.GetRequest()).Msg("executing command")

var resp *processorv1.CommandResponse
var err error

switch req := cmdReq.GetRequest().(type) {
case *processorv1.CommandRequest_Specify:
resp, err = e.executeSpecify(ctx, p, req.Specify)
case *processorv1.CommandRequest_Configure:
resp, err = e.executeConfigure(ctx, p, req.Configure)
case *processorv1.CommandRequest_Open:
resp, err = e.executeOpen(ctx, p, req.Open)
case *processorv1.CommandRequest_Process:
resp, err = e.executeProcess(ctx, p, req.Process)
case *processorv1.CommandRequest_Teardown:
resp, err = e.executeTeardown(ctx, p, req.Teardown)
default:
err = pprocutils.ErrUnknownCommandRequest
}

if err != nil {
e.logger.Trace().Err(err).Msg("command returned an error")
resp = &processorv1.CommandResponse{
Response: &processorv1.CommandResponse_Error{
Error: e.protoconv.error(err),
},
}
}

return resp
}

func (e commandExecutor) executeSpecify(_ context.Context, p Processor, _ *processorv1.Specify_Request) (*processorv1.CommandResponse, error) {
spec, err := p.Specification()
func (e commandExecutor) Specification(_ *processorv1.Specify_Request) (*processorv1.Specify_Response, error) {
spec, err := e.p.Specification()
if err != nil {
return nil, err
}
return &processorv1.CommandResponse{
Response: &processorv1.CommandResponse_Specify{
Specify: e.protoconv.specifyResponse(spec),
},
}, nil
return e.protoconv.specifyResponse(spec), nil
}

func (e commandExecutor) executeConfigure(ctx context.Context, p Processor, req *processorv1.Configure_Request) (*processorv1.CommandResponse, error) {
err := p.Configure(ctx, req.Parameters)
func (e commandExecutor) Configure(_ *processorv1.Configure_Request) (*processorv1.Configure_Response, error) {
err := e.p.Configure(context.Background(), nil)
if err != nil {
return nil, err
}
return &processorv1.CommandResponse{
Response: &processorv1.CommandResponse_Configure{
Configure: &processorv1.Configure_Response{},
},
}, nil
return &processorv1.Configure_Response{}, nil
}

func (e commandExecutor) executeOpen(ctx context.Context, p Processor, _ *processorv1.Open_Request) (*processorv1.CommandResponse, error) {
err := p.Open(ctx)
func (e commandExecutor) Open(_ *processorv1.Open_Request) (*processorv1.Open_Response, error) {
err := e.p.Open(context.Background())
if err != nil {
return nil, err
}
return &processorv1.CommandResponse{
Response: &processorv1.CommandResponse_Open{
Open: &processorv1.Open_Response{},
},
}, nil
return &processorv1.Open_Response{}, nil
}

func (e commandExecutor) executeProcess(ctx context.Context, p Processor, req *processorv1.Process_Request) (*processorv1.CommandResponse, error) {
func (e commandExecutor) Process(req *processorv1.Process_Request) (*processorv1.Process_Response, error) {
records, err := e.protoconv.records(req.Records)
if err != nil {
return nil, fmt.Errorf("failed to convert proto opencdc records: %w", err)
}
processedRecords := p.Process(ctx, records)
processedRecords := e.p.Process(context.Background(), records)
protoRecords, err := e.protoconv.processedRecords(processedRecords)
if err != nil {
return nil, fmt.Errorf("failed to convert processed records: %w", err)
}

return &processorv1.CommandResponse{
Response: &processorv1.CommandResponse_Process{
Process: &processorv1.Process_Response{
Records: protoRecords,
},
},
return &processorv1.Process_Response{
Records: protoRecords,
}, nil
}

func (e commandExecutor) executeTeardown(ctx context.Context, p Processor, _ *processorv1.Teardown_Request) (*processorv1.CommandResponse, error) {
err := p.Teardown(ctx)
func (e commandExecutor) Teardown(_ *processorv1.Teardown_Request) (*processorv1.Teardown_Response, error) {
err := e.p.Teardown(context.Background())
if err != nil {
return nil, err
}
return &processorv1.CommandResponse{
Response: &processorv1.CommandResponse_Teardown{
Teardown: &processorv1.Teardown_Response{},
},
}, nil
return &processorv1.Teardown_Response{}, nil
}

// protoConverter converts between the SDK and protobuf types.
Expand Down
57 changes: 0 additions & 57 deletions wasm/caller.go

This file was deleted.

82 changes: 0 additions & 82 deletions wasm/caller_test.go

This file was deleted.

Loading