Skip to content

Commit

Permalink
Merge branch 'main' into lovro/markdownlint
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon authored Feb 23, 2023
2 parents 0f5895a + de00f46 commit 464f735
Show file tree
Hide file tree
Showing 26 changed files with 874 additions and 1,925 deletions.
1,095 changes: 0 additions & 1,095 deletions .github/actions/package-lock.json

This file was deleted.

16 changes: 0 additions & 16 deletions .github/actions/package.json

This file was deleted.

72 changes: 0 additions & 72 deletions .github/actions/public_roadmap.js

This file was deleted.

20 changes: 20 additions & 0 deletions docs/health_check.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Health check

Conduit’s health check can be used to determine if Conduit is running correctly. What it does is to check if Conduit
can successfully connect to the database which it was configured with (which can be BadgerDB, PostgreSQL or the
in-memory one). The health check is available at the `/healthz` path. Here’s an example:

```bash
$ curl "http://localhost:8080/healthz"
{"status":"SERVING"}
```

You can also check individual services within Conduit. The following example checks if the PipelineService is running:

```bash
$ curl "http://localhost:8080/healthz?service=PipelineService"
{"status":"SERVING"}
```

The services which can be checked for health are: `PipelineService`, `ConnectorService`, `ProcessorService`, and
`PluginService`.
11 changes: 7 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/conduitio/conduit-connector-postgres v0.3.0
github.com/conduitio/conduit-connector-protocol v0.4.0
github.com/conduitio/conduit-connector-s3 v0.3.0
github.com/conduitio/conduit-connector-sdk v0.5.0
github.com/conduitio/conduit-connector-sdk v0.5.1
github.com/conduitio/yaml/v3 v3.2.0
github.com/dgraph-io/badger/v3 v3.2103.5
github.com/dop251/goja v0.0.0-20210225094849-f3cfc97811c0
Expand All @@ -23,7 +23,7 @@ require (
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
github.com/gorilla/websocket v1.5.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.1
github.com/hashicorp/go-hclog v1.4.0
github.com/hashicorp/go-plugin v1.4.8
github.com/jackc/pgtype v1.14.0
Expand All @@ -34,15 +34,15 @@ require (
github.com/piotrkowalczuk/promgrpc/v4 v4.0.4
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.39.0
github.com/prometheus/common v0.40.0
github.com/rs/zerolog v1.29.0
go.buf.build/grpc/go/conduitio/conduit-connector-protocol v1.4.5
go.buf.build/protocolbuffers/go/grpc-ecosystem/grpc-gateway v1.3.50
go.uber.org/goleak v1.2.1
golang.org/x/exp v0.0.0-20230131160201-f062dba9d201
golang.org/x/tools v0.6.0
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2
google.golang.org/genproto v0.0.0-20230127162408-596548ed4efa
google.golang.org/genproto v0.0.0-20230221151758-ace64dc21148
google.golang.org/grpc v1.53.0
google.golang.org/protobuf v1.28.2-0.20220831092852-f930b1dc76e8
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
Expand Down Expand Up @@ -121,6 +121,7 @@ require (
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle v1.3.0 // indirect
github.com/jdxcode/netrc v0.0.0-20221124155335-4616370d1a84 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/klauspost/pgzip v1.2.5 // indirect
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
Expand All @@ -134,6 +135,8 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/moby/term v0.0.0-20221205130635-1aeaba878587 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/oklog/run v1.1.0 // indirect
Expand Down
21 changes: 13 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ github.com/conduitio/conduit-connector-protocol v0.4.0 h1:hO6mB4Ft34+PbF1G2pDGHz
github.com/conduitio/conduit-connector-protocol v0.4.0/go.mod h1:xU4dsoPPCiW+1rJFwxkMUjhYs+HMte+XB/aFL6x94iM=
github.com/conduitio/conduit-connector-s3 v0.3.0 h1:Zy+TYRnZKw09mks4qsXW43qnp0PuO/SsgvYbbuMUhss=
github.com/conduitio/conduit-connector-s3 v0.3.0/go.mod h1:ims2iWZJcOSjEPY1nal/DQrxiqG+xHPYFV5nN1ia/Pk=
github.com/conduitio/conduit-connector-sdk v0.5.0 h1:RC4t7ymcK4uhVnQ5r6l3dgcISlYiL3StezUlinrByNk=
github.com/conduitio/conduit-connector-sdk v0.5.0/go.mod h1:xq7v2QFhTUb26Y+8WB0FNNCJL0MObuHIJ5ikYYaG+1Q=
github.com/conduitio/conduit-connector-sdk v0.5.1 h1:mWPSjYNy5Eq1+irhxOHKdbPYQkNw010/j58olhjum2c=
github.com/conduitio/conduit-connector-sdk v0.5.1/go.mod h1:fvgavA7aNgysmC4dPaozRnejOSq3lB3LeEgJPiRSe1Y=
github.com/conduitio/yaml/v3 v3.2.0 h1:itolFiK0dzUYG6PdlOpmCi1fcMYFoaWdZ1wbNNTEnlI=
github.com/conduitio/yaml/v3 v3.2.0/go.mod h1:JNgFMOX1t8W4YJuRZOh6GggVtSMsgP9XgTw+7dIenpc=
github.com/containerd/stargz-snapshotter/estargz v0.12.1 h1:+7nYmHJb0tEkcRaAW+MHqoKaJYZmkikupxCqVtmPuY0=
Expand Down Expand Up @@ -336,8 +336,8 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0 h1:1JYBfzqrWPcCclBwxFCPAou9n+q86mfnu7NAeHfte7A=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0/go.mod h1:YDZoGHuwE+ov0c8smSH49WLF3F2LaWnYYuDVd+EWrc0=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.1 h1:I6ITHEanAwjB0FvaxmGm8pKqmCLR7QIe05ZmO4QAXMw=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.1/go.mod h1:gYC+WX4YJFarA2ie73G2epzt7TBWpo9pzcBnK1g0MSw=
github.com/hashicorp/go-hclog v1.4.0 h1:ctuWFGrhFha8BnnzxqeRGidlEcQkDyL5u8J8t5eA11I=
github.com/hashicorp/go-hclog v1.4.0/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-plugin v1.4.8 h1:CHGwpxYDOttQOY7HOWgETU9dyVjOXzniXDqJcYJE1zM=
Expand Down Expand Up @@ -426,6 +426,8 @@ github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2E
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
Expand Down Expand Up @@ -496,9 +498,12 @@ github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx
github.com/moby/term v0.0.0-20221205130635-1aeaba878587 h1:HfkjXDfhgVaN5rmueG8cL8KKeFNecRCXFhaJ2qZ5SKA=
github.com/moby/term v0.0.0-20221205130635-1aeaba878587/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8=
github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
Expand Down Expand Up @@ -547,8 +552,8 @@ github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvq
github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc=
github.com/prometheus/common v0.39.0 h1:oOyhkDq05hPZKItWVBkJ6g6AtGxi+fy7F4JvUV8uhsI=
github.com/prometheus/common v0.39.0/go.mod h1:6XBZ7lYdLCbkAVhwRsWTZn+IN5AB9F/NXd5w0BbEX0Y=
github.com/prometheus/common v0.40.0 h1:Afz7EVRqGg2Mqqf4JuF9vdvp1pi220m55Pi9T2JnO4Q=
github.com/prometheus/common v0.40.0/go.mod h1:L65ZJPSmfn/UBWLQIHV7dBrKFidB/wPlF1y5TlSt9OE=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ=
Expand Down Expand Up @@ -957,8 +962,8 @@ google.golang.org/genproto v0.0.0-20200224152610-e50cd9704f63/go.mod h1:55QSHmfG
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20210630183607-d20f26d13c79/go.mod h1:yiaVoXHpRzHGyxV3o4DktVWY4mSUErTKaeEOq6C3t3U=
google.golang.org/genproto v0.0.0-20230127162408-596548ed4efa h1:GZXdWYIKckxQE2EcLHLvF+KLF+bIwoxGdMUxTZizueg=
google.golang.org/genproto v0.0.0-20230127162408-596548ed4efa/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM=
google.golang.org/genproto v0.0.0-20230221151758-ace64dc21148 h1:muK+gVBJBfFb4SejshDBlN2/UgxCCOKH9Y34ljqEGOc=
google.golang.org/genproto v0.0.0-20230221151758-ace64dc21148/go.mod h1:3Dl5ZL0q0isWJt+FVcfpQyirqemEuLAK/iFvg1UP1Hw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
Expand Down
8 changes: 4 additions & 4 deletions pkg/orchestrator/mock/orchestrator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions pkg/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,14 @@ type base struct {

type PipelineService interface {
Start(ctx context.Context, connFetcher pipeline.ConnectorFetcher, procFetcher pipeline.ProcessorFetcher, pluginFetcher pipeline.PluginDispenserFetcher, pipelineID string) error
// Stop initiates a graceful shutdown of the given pipeline.
// The method does not wait for the pipeline (and its nodes) to actually
// stop, because there still might be some in-flight messages.
Stop(ctx context.Context, pipelineID string) error
// Stop initiates a stop of the given pipeline. The method does not wait for
// the pipeline (and its nodes) to actually stop.
// When force is false the pipeline will try to stop gracefully and drain
// any in-flight messages that have not yet reached the destination. When
// force is true the pipeline will stop without draining in-flight messages.
// It is allowed to execute a force stop even after a graceful stop was
// requested.
Stop(ctx context.Context, pipelineID string, force bool) error

List(ctx context.Context) map[string]*pipeline.Instance
Get(ctx context.Context, id string) (*pipeline.Instance, error)
Expand Down
2 changes: 1 addition & 1 deletion pkg/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestPipelineSimple(t *testing.T) {
time.Sleep(time.Second)

t.Log("stopping pipeline")
err = orc.Pipelines.Stop(ctx, pl.ID)
err = orc.Pipelines.Stop(ctx, pl.ID, false)
is.NoErr(err)
t.Log("waiting")
err = pl.Wait()
Expand Down
4 changes: 2 additions & 2 deletions pkg/orchestrator/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ func (po *PipelineOrchestrator) Start(ctx context.Context, id string) error {
return po.pipelines.Start(ctx, po.connectors, po.processors, po.plugins, id)
}

func (po *PipelineOrchestrator) Stop(ctx context.Context, id string) error {
func (po *PipelineOrchestrator) Stop(ctx context.Context, id string, force bool) error {
// TODO lock pipeline
return po.pipelines.Stop(ctx, id)
return po.pipelines.Stop(ctx, id, force)
}

func (po *PipelineOrchestrator) List(ctx context.Context) map[string]*pipeline.Instance {
Expand Down
8 changes: 4 additions & 4 deletions pkg/orchestrator/pipelines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ func TestPipelineOrchestrator_Stop_Success(t *testing.T) {

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, pluginMock)
plsMock.EXPECT().
Stop(gomock.AssignableToTypeOf(ctxType), plBefore.ID).
Stop(gomock.AssignableToTypeOf(ctxType), plBefore.ID, false).
Return(nil)

err := orc.Pipelines.Stop(ctx, plBefore.ID)
err := orc.Pipelines.Stop(ctx, plBefore.ID, false)
is.NoErr(err)
}

Expand All @@ -102,11 +102,11 @@ func TestPipelineOrchestrator_Stop_Fail(t *testing.T) {

wantErr := cerrors.New("pipeline doesn't exist")
plsMock.EXPECT().
Stop(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf("")).
Stop(gomock.AssignableToTypeOf(ctxType), gomock.AssignableToTypeOf(""), true).
Return(wantErr)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, pluginMock)
err := orc.Pipelines.Stop(ctx, plBefore.ID)
err := orc.Pipelines.Stop(ctx, plBefore.ID, true)
is.True(cerrors.Is(err, wantErr))
}

Expand Down
Loading

0 comments on commit 464f735

Please sign in to comment.