-
Notifications
You must be signed in to change notification settings - Fork 93
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add basic plugin system for clients (#430)
Add basic plugin system for clients Here, follow up #429 to add a basic plugin system for River clients which allows a driver to add maintenance and non-maintenance services to a client before it starts up. The plugin interface is implemented by the drivers themselves, and looks like this: type driverPlugin[TTx any] interface { // PluginInit initializes a plugin with an archetype and client. It's // invoked on Client.NewClient. PluginInit(archetype *baseservice.Archetype, client *Client[TTx]) // PluginMaintenanceServices returns additional maintenance services (will // only run on an elected leader) for a River client. PluginMaintenanceServices() []startstop.Service // PluginServices returns additional non-maintenance services (will run on // all clients) for a River client. PluginServices() []startstop.Service } The change is fairly straightforward, and we make sure to bring in some test cases verifying the plugin services were indeed added correctly. --------- Co-authored-by: Blake Gentry <blakesgentry@gmail.com>
- Loading branch information
Showing
6 changed files
with
207 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
package river | ||
|
||
import ( | ||
"github.com/riverqueue/river/rivershared/baseservice" | ||
"github.com/riverqueue/river/rivershared/startstop" | ||
) | ||
|
||
// A plugin API that drivers may implement to extend a River client. Driver | ||
// plugins may, for example, add additional maintenance services. | ||
// | ||
// This should be considered a River internal API and its stability is not | ||
// guaranteed. DO NOT USE. | ||
type driverPlugin[TTx any] interface { | ||
// PluginInit initializes a plugin with an archetype and client. It's | ||
// invoked on Client.NewClient. | ||
PluginInit(archetype *baseservice.Archetype, client *Client[TTx]) | ||
|
||
// PluginMaintenanceServices returns additional maintenance services (will | ||
// only run on an elected leader) for a River client. | ||
PluginMaintenanceServices() []startstop.Service | ||
|
||
// PluginServices returns additional non-maintenance services (will run on | ||
// all clients) for a River client. | ||
PluginServices() []startstop.Service | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
package river | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/jackc/pgx/v5" | ||
"github.com/jackc/pgx/v5/pgxpool" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/riverqueue/river/internal/riverinternaltest" | ||
"github.com/riverqueue/river/riverdriver/riverpgxv5" | ||
"github.com/riverqueue/river/rivershared/baseservice" | ||
"github.com/riverqueue/river/rivershared/riversharedtest" | ||
"github.com/riverqueue/river/rivershared/startstop" | ||
) | ||
|
||
func TestClientDriverPlugin(t *testing.T) { | ||
t.Parallel() | ||
|
||
ctx := context.Background() | ||
|
||
type testBundle struct { | ||
pluginDriver *TestDriverWithPlugin | ||
} | ||
|
||
setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) { | ||
t.Helper() | ||
|
||
pluginDriver := newDriverWithPlugin(t, riverinternaltest.TestDB(ctx, t)) | ||
|
||
client, err := NewClient(pluginDriver, newTestConfig(t, nil)) | ||
require.NoError(t, err) | ||
|
||
return client, &testBundle{ | ||
pluginDriver: pluginDriver, | ||
} | ||
} | ||
|
||
t.Run("ServicesStart", func(t *testing.T) { | ||
t.Parallel() | ||
|
||
client, bundle := setup(t) | ||
|
||
startClient(ctx, t, client) | ||
|
||
riversharedtest.WaitOrTimeout(t, startstop.WaitAllStartedC( | ||
bundle.pluginDriver.maintenanceService, | ||
bundle.pluginDriver.service, | ||
)) | ||
}) | ||
} | ||
|
||
var _ driverPlugin[pgx.Tx] = &TestDriverWithPlugin{} | ||
|
||
type TestDriverWithPlugin struct { | ||
*riverpgxv5.Driver | ||
initCalled bool | ||
maintenanceService startstop.Service | ||
service startstop.Service | ||
} | ||
|
||
func newDriverWithPlugin(t *testing.T, dbPool *pgxpool.Pool) *TestDriverWithPlugin { | ||
t.Helper() | ||
|
||
newService := func(name string) startstop.Service { | ||
return startstop.StartStopFunc(func(ctx context.Context, shouldStart bool, started, stopped func()) error { | ||
if !shouldStart { | ||
return nil | ||
} | ||
|
||
go func() { | ||
started() | ||
defer stopped() // this defer should come first so it's last out | ||
|
||
t.Logf("Test service started: %s", name) | ||
|
||
<-ctx.Done() | ||
}() | ||
|
||
return nil | ||
}) | ||
} | ||
|
||
return &TestDriverWithPlugin{ | ||
Driver: riverpgxv5.New(dbPool), | ||
maintenanceService: newService("maintenance service"), | ||
service: newService("other service"), | ||
} | ||
} | ||
|
||
func (d *TestDriverWithPlugin) PluginInit(archetype *baseservice.Archetype, client *Client[pgx.Tx]) { | ||
d.initCalled = true | ||
} | ||
|
||
func (d *TestDriverWithPlugin) PluginMaintenanceServices() []startstop.Service { | ||
if !d.initCalled { | ||
panic("expected PluginInit to be called before this function") | ||
} | ||
|
||
return []startstop.Service{d.maintenanceService} | ||
} | ||
|
||
func (d *TestDriverWithPlugin) PluginServices() []startstop.Service { | ||
if !d.initCalled { | ||
panic("expected PluginInit to be called before this function") | ||
} | ||
|
||
return []startstop.Service{d.service} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
package startstop | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/riverqueue/river/rivershared/riversharedtest" | ||
) | ||
|
||
func TestMain(m *testing.M) { | ||
riversharedtest.WrapTestMain(m) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters