Skip to content
Merged
Show file tree
Hide file tree
Changes from 56 commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
cc71c81
Bump dispatch-proto
chriso Jun 14, 2024
ea2bece
Use typed coroutine state
chriso Jun 14, 2024
982a39e
Factor out helpers for use in the next layer up
chriso Jun 14, 2024
3f36dcc
Integrate coroutine library, and get simple functions working
chriso Jun 14, 2024
fccfb36
Implement suspend/resume
chriso Jun 14, 2024
4c8b57a
No need for manual noinline directive
chriso Jun 14, 2024
e1510bf
coroc struggles with nested NewFunction call; avoid it for now
chriso Jun 14, 2024
4f0cfef
Setup serde for proto.Message
chriso Jun 14, 2024
92df977
Add custom serializers for Dispatch/Client
chriso Jun 17, 2024
255c1af
Ignore warning about copy of mutex
chriso Jun 17, 2024
8c2c65c
Fix Dispatch deser
chriso Jun 17, 2024
e593364
Need to store type URL alongside serialized proto.Message
chriso Jun 17, 2024
975e2ba
Tidy up the coroutine after response is generated
chriso Jun 17, 2024
9613050
Factor out a component to hold a collection of functions
chriso Jun 18, 2024
443ab9d
Rework internals in prep for test runner
chriso Jun 18, 2024
141f30e
Disallow registration of volatile coroutines for now
chriso Jun 18, 2024
0285ca0
Sketch out volatile execution mode
chriso Jun 18, 2024
78bba6b
Helper for constructing exit directives with an output value
chriso Jun 18, 2024
7f863e8
Test exit directive
chriso Jun 18, 2024
4d69531
Simplify
chriso Jun 18, 2024
0223560
Improve internal docs
chriso Jun 18, 2024
4d24e28
Remove unused interfaces
chriso Jun 18, 2024
089f2cc
Fix adding coroutine state to a response
chriso Jun 18, 2024
d3b6e7c
Status is optional when constructing responses
chriso Jun 18, 2024
bfadd59
Input/Output option funcs are optional; infer from how Any is used
chriso Jun 18, 2024
f39c38f
Clarify what's happening when creating response
chriso Jun 18, 2024
f82221a
Test poll
chriso Jun 18, 2024
162ab6b
Avoid range over int; no need to exclude older Go versions
chriso Jun 18, 2024
8562188
Remove id.go
chriso Jun 18, 2024
5e0d68e
Continue to make it easier working with proto wrappers
chriso Jun 18, 2024
4fa5ba4
Build await/gather
chriso Jun 18, 2024
578946b
Add more comments to help readers
chriso Jun 19, 2024
dd14f9d
Implement and test gather
chriso Jun 19, 2024
5050c9b
No need for pointer receiver on gRPC handler
chriso Jun 19, 2024
60e069c
Test random delivery of call results
chriso Jun 19, 2024
9cd38f8
Extract a Gather[O] helper
chriso Jun 19, 2024
e39b14c
Implement dispatchtest.Run
chriso Jun 19, 2024
5db758e
Write an integration test
chriso Jun 19, 2024
c8eb42f
Change make command name so it's easier to read
chriso Jun 19, 2024
6fb2e5d
This needs to be a coroutine
chriso Jun 19, 2024
ebbe731
Bump all deps
chriso Jun 19, 2024
dc4be01
Run the integration test in volatile mode first
chriso Jun 19, 2024
bc44ad4
Merge branch 'main' into coroutine
chriso Jun 20, 2024
f4b242f
Add a couple more helpers
chriso Jun 20, 2024
9b9a092
Move the runnable interface
chriso Jun 20, 2024
8f002a3
Downgrade the panic to a warning
chriso Jun 20, 2024
6632b20
Merge GenericFunction and GenericCoroutine
chriso Jun 20, 2024
43ae3e3
Make volatile coroutines work a little better
chriso Jun 20, 2024
aa787bb
Use shorter function constructors
chriso Jun 20, 2024
ebd1de9
Factor out a dispatchproto package
chriso Jun 20, 2024
100daba
Factor out a dispatchcoro package
chriso Jun 20, 2024
6b35737
Tidy up coroutine impl
chriso Jun 20, 2024
6008b2a
Remove the volatile coroutine warning
chriso Jun 20, 2024
e33f8f4
Tweak dispatchtest API to avoid dispatchproto
chriso Jun 20, 2024
750ec98
No need to export default API URL
chriso Jun 20, 2024
c12f841
Return an error if the response status isn't OK
chriso Jun 20, 2024
5210c6b
Update coroutine.go
chriso Jun 20, 2024
4061aa6
Update coroutine.go
chriso Jun 20, 2024
5c0638d
Drop the type aliases for ID/Call
chriso Jun 20, 2024
6e268d0
NewCall => BuildCall
chriso Jun 20, 2024
7639b4a
Remove the need to close PrimitiveFunction/Registry
chriso Jun 21, 2024
8722d65
Rework exports
chriso Jun 21, 2024
d91dd0c
Remove the need to close functions
chriso Jun 21, 2024
7ad2705
Closing an endpoint closes the registry and any registered coroutines
chriso Jun 21, 2024
aeddcee
Move registry
chriso Jun 21, 2024
2c29d74
Factor out a dispatchclient package
chriso Jun 21, 2024
ba06fc3
Don't have Function depend on PrimitiveFunction
chriso Jun 21, 2024
a4dba56
Simplify how primitive functions are registered
chriso Jun 21, 2024
d77d264
Simplify the dispatchtest package
chriso Jun 21, 2024
54d4fbf
Simplify Option interface
chriso Jun 21, 2024
1a2d935
Fix docs
chriso Jun 21, 2024
d49cb23
Isolate the set of volatile coroutine instances
chriso Jun 21, 2024
d9e5ecc
Simplify further
chriso Jun 21, 2024
7750571
Move more internal logic to dispatchcoro
chriso Jun 21, 2024
0010d75
PR feedback
chriso Jun 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*_durable.go
15 changes: 14 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: fmt lint test
.PHONY: clean coroc fmt lint test integration-test clean coroc

fmt:
go fmt ./...
Expand All @@ -8,3 +8,16 @@ lint:

test:
go test ./...

integration-test: clean coroc
go run ./dispatchtest/integration # volatile mode
coroc ./dispatchtest/integration
go run -tags durable ./dispatchtest/integration # durable mode

clean:
find . -name '*_durable.go' -delete

coroc:
@which coroc &>/dev/null \
|| echo "Installing coroc..." \
&& go install github.com/dispatchrun/coroutine/compiler/cmd/coroc@latest
29 changes: 22 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
//go:build !durable

package dispatch

import (
"context"
"fmt"
"net/http"
"os"
_ "unsafe"

"buf.build/gen/go/stealthrocket/dispatch-proto/connectrpc/go/dispatch/sdk/v1/sdkv1connect"
sdkv1 "buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go/dispatch/sdk/v1"
"connectrpc.com/connect"
"connectrpc.com/validate"
"github.com/dispatchrun/dispatch-go/dispatchproto"
)

const defaultApiUrl = "https://api.dispatch.run"

// Call is a function call.
type Call = dispatchproto.Call

// ID is an identifier for a dispatched function call.
type ID = dispatchproto.ID

// Client is a client for the Dispatch API.
//
// The Client can be used to dispatch function calls.
Expand All @@ -21,14 +33,16 @@ type Client struct {
apiUrl string
env []string
httpClient *http.Client
opts []ClientOption

client sdkv1connect.DispatchServiceClient
}

// NewClient creates a Client.
func NewClient(opts ...ClientOption) (*Client, error) {
c := &Client{
env: os.Environ(),
env: os.Environ(),
opts: opts,
}
for _, opt := range opts {
opt.configureClient(c)
Expand All @@ -46,7 +60,7 @@ func NewClient(opts ...ClientOption) (*Client, error) {
c.apiUrl = getenv(c.env, "DISPATCH_API_URL")
}
if c.apiUrl == "" {
c.apiUrl = DefaultApiUrl
c.apiUrl = defaultApiUrl
}

if c.httpClient == nil {
Expand Down Expand Up @@ -94,14 +108,12 @@ func APIKey(apiKey string) ClientOption {
// APIUrl sets the URL of the Dispatch API.
//
// It defaults to the value of the DISPATCH_API_URL environment variable,
// or DefaultApiUrl if DISPATCH_API_URL is unset.
// or the default API URL (https://api.dispatch.run) if DISPATCH_API_URL
// is unset.
func APIUrl(apiUrl string) ClientOption {
return clientOptionFunc(func(c *Client) { c.apiUrl = apiUrl })
}

// DefaultApiUrl is the default Dispatch API URL.
const DefaultApiUrl = "https://api.dispatch.run"

// Dispatch dispatches a function call.
func (c *Client) Dispatch(ctx context.Context, call Call) (ID, error) {
batch := c.Batch()
Expand Down Expand Up @@ -138,10 +150,13 @@ func (b *Batch) Reset() {
// Add adds calls to the batch.
func (b *Batch) Add(calls ...Call) {
for i := range calls {
b.calls = append(b.calls, calls[i].proto)
b.calls = append(b.calls, callProto(calls[i]))
}
}

//go:linkname callProto github.com/dispatchrun/dispatch-go/dispatchproto.callProto
func callProto(r dispatchproto.Call) *sdkv1.Call

// Dispatch dispatches the batch of function calls.
func (b *Batch) Dispatch(ctx context.Context) ([]ID, error) {
req := connect.NewRequest(&sdkv1.DispatchRequest{Calls: b.calls})
Expand Down
21 changes: 11 additions & 10 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/dispatchrun/dispatch-go"
"github.com/dispatchrun/dispatch-go/dispatchproto"
"github.com/dispatchrun/dispatch-go/dispatchtest"
)

Expand All @@ -18,7 +19,7 @@ func TestClient(t *testing.T) {
t.Fatal(err)
}

call := dispatch.NewCall("http://example.com", "function1", dispatch.Input(dispatch.Int(11)))
call := dispatchproto.NewCall("http://example.com", "function1", dispatchproto.Int(11))

_, err = client.Dispatch(context.Background(), call)
if err != nil {
Expand All @@ -27,7 +28,7 @@ func TestClient(t *testing.T) {

recorder.Assert(t, dispatchtest.DispatchRequest{
Header: http.Header{"Authorization": []string{"Bearer foobar"}},
Calls: []dispatch.Call{call},
Calls: []dispatchproto.Call{call},
})
}

Expand All @@ -43,7 +44,7 @@ func TestClientEnvConfig(t *testing.T) {
t.Fatal(err)
}

call := dispatch.NewCall("http://example.com", "function1", dispatch.Input(dispatch.Int(11)))
call := dispatchproto.NewCall("http://example.com", "function1", dispatchproto.Int(11))

_, err = client.Dispatch(context.Background(), call)
if err != nil {
Expand All @@ -52,7 +53,7 @@ func TestClientEnvConfig(t *testing.T) {

recorder.Assert(t, dispatchtest.DispatchRequest{
Header: http.Header{"Authorization": []string{"Bearer foobar"}},
Calls: []dispatch.Call{call},
Calls: []dispatchproto.Call{call},
})
}

Expand All @@ -65,10 +66,10 @@ func TestClientBatch(t *testing.T) {
t.Fatal(err)
}

call1 := dispatch.NewCall("http://example.com", "function1", dispatch.Input(dispatch.Int(11)))
call2 := dispatch.NewCall("http://example.com", "function2", dispatch.Input(dispatch.Int(22)))
call3 := dispatch.NewCall("http://example.com", "function3", dispatch.Input(dispatch.Int(33)))
call4 := dispatch.NewCall("http://example2.com", "function4", dispatch.Input(dispatch.Int(44)))
call1 := dispatchproto.NewCall("http://example.com", "function1", dispatchproto.Int(11))
call2 := dispatchproto.NewCall("http://example.com", "function2", dispatchproto.Int(22))
call3 := dispatchproto.NewCall("http://example.com", "function3", dispatchproto.Int(33))
call4 := dispatchproto.NewCall("http://example2.com", "function4", dispatchproto.Int(44))

batch := client.Batch()
batch.Add(call1, call2)
Expand All @@ -88,11 +89,11 @@ func TestClientBatch(t *testing.T) {
recorder.Assert(t,
dispatchtest.DispatchRequest{
Header: http.Header{"Authorization": []string{"Bearer foobar"}},
Calls: []dispatch.Call{call1, call2},
Calls: []dispatchproto.Call{call1, call2},
},
dispatchtest.DispatchRequest{
Header: http.Header{"Authorization": []string{"Bearer foobar"}},
Calls: []dispatch.Call{call3, call4},
Calls: []dispatchproto.Call{call3, call4},
})
}

Expand Down
Loading