-
Notifications
You must be signed in to change notification settings - Fork 93
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Here, follow up #429 to add a basic plugin system for River clients which allows a driver to add maintenance and non-maintenance services to a client before it starts up. The plugin interface is implemented by the drivers themselves, and looks like this: type driverPlugin[TTx any] interface { // PluginInit initializes a plugin with an archetype and client. It's // invoked on Client.NewClient. PluginInit(archetype *baseservice.Archetype, client *Client[TTx]) // PluginMaintenanceServices returns additional maintenance services (will // only run on an elected leader) for a River client. PluginMaintenanceServices() []startstop.Service // PluginServices returns additional non-maintenance services (will run on // all clients) for a River client. PluginServices() []startstop.Service } The change is fairly straightforward, and we make sure to bring in some test cases verifying the plugin services were indeed added correctly.
- Loading branch information
Showing
7 changed files
with
182 additions
and
47 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,25 @@ | ||
package river | ||
|
||
import ( | ||
"log/slog" | ||
|
||
"github.com/riverqueue/river/rivertype" | ||
"github.com/riverqueue/river/rivershared/baseservice" | ||
"github.com/riverqueue/river/rivershared/startstop" | ||
) | ||
|
||
type Pluginable[TTx any] interface { | ||
Plugin(client *Client[TTx], logger *slog.Logger) rivertype.Plugin | ||
// A plugin API that drivers may implemented to extend a River client. Driver | ||
// plugins may, for example, add additional maintenance services. | ||
// | ||
// This should be considered a River internal API and its stability is not | ||
// guaranteed. DO NOT USE. | ||
type driverPlugin[TTx any] interface { | ||
// PluginInit initializes a plugin with an archetype and client. It's | ||
// invoked on Client.NewClient. | ||
PluginInit(archetype *baseservice.Archetype, client *Client[TTx]) | ||
|
||
// PluginMaintenanceServices returns additional maintenance services (will | ||
// only run on an elected leader) for a River client. | ||
PluginMaintenanceServices() []startstop.Service | ||
|
||
// PluginServices returns additional non-maintenance services (will run on | ||
// all clients) for a River client. | ||
PluginServices() []startstop.Service | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
package river | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/jackc/pgx/v5" | ||
"github.com/jackc/pgx/v5/pgxpool" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/riverqueue/river/internal/riverinternaltest" | ||
"github.com/riverqueue/river/riverdriver/riverpgxv5" | ||
"github.com/riverqueue/river/rivershared/baseservice" | ||
"github.com/riverqueue/river/rivershared/riversharedtest" | ||
"github.com/riverqueue/river/rivershared/startstop" | ||
) | ||
|
||
func TestClientDriverPlugin(t *testing.T) { | ||
t.Parallel() | ||
|
||
ctx := context.Background() | ||
|
||
type testBundle struct { | ||
pluginDriver *TestDriverWithPlugin | ||
} | ||
|
||
setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) { | ||
t.Helper() | ||
|
||
pluginDriver := newDriverWithPlugin(t, riverinternaltest.TestDB(ctx, t)) | ||
|
||
client, err := NewClient(pluginDriver, newTestConfig(t, nil)) | ||
require.NoError(t, err) | ||
|
||
return client, &testBundle{ | ||
pluginDriver: pluginDriver, | ||
} | ||
} | ||
|
||
t.Run("ServicesStart", func(t *testing.T) { | ||
t.Parallel() | ||
|
||
client, bundle := setup(t) | ||
|
||
startClient(ctx, t, client) | ||
|
||
riversharedtest.WaitOrTimeout(t, startstop.WaitAllStartedC( | ||
bundle.pluginDriver.maintenanceService, | ||
bundle.pluginDriver.service, | ||
)) | ||
}) | ||
} | ||
|
||
var _ driverPlugin[pgx.Tx] = &TestDriverWithPlugin{} | ||
|
||
type TestDriverWithPlugin struct { | ||
*riverpgxv5.Driver | ||
maintenanceService startstop.Service | ||
service startstop.Service | ||
} | ||
|
||
func newDriverWithPlugin(t *testing.T, dbPool *pgxpool.Pool) *TestDriverWithPlugin { | ||
t.Helper() | ||
|
||
newService := func(name string) startstop.Service { | ||
return startstop.StartStopFunc(func(ctx context.Context, shouldStart bool, started, stopped func()) error { | ||
if !shouldStart { | ||
return nil | ||
} | ||
|
||
go func() { | ||
started() | ||
defer stopped() // this defer should come first so it's last out | ||
|
||
t.Logf("Test service started: %s", name) | ||
|
||
<-ctx.Done() | ||
}() | ||
|
||
return nil | ||
}) | ||
} | ||
|
||
return &TestDriverWithPlugin{ | ||
Driver: riverpgxv5.New(dbPool), | ||
maintenanceService: newService("maintenance service"), | ||
service: newService("other service"), | ||
} | ||
} | ||
|
||
func (d *TestDriverWithPlugin) PluginInit(archetype *baseservice.Archetype, client *Client[pgx.Tx]) {} | ||
|
||
func (d *TestDriverWithPlugin) PluginMaintenanceServices() []startstop.Service { | ||
return []startstop.Service{d.maintenanceService} | ||
} | ||
|
||
func (d *TestDriverWithPlugin) PluginServices() []startstop.Service { | ||
return []startstop.Service{d.service} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
package startstop | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/riverqueue/river/rivershared/riversharedtest" | ||
) | ||
|
||
func TestMain(m *testing.M) { | ||
riversharedtest.WrapTestMain(m) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters