Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 63 additions & 32 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"crypto/rand"
"database/sql"
"encoding/json"
"errors"
"log/slog"
"sync"
"time"
Expand Down Expand Up @@ -70,11 +71,11 @@ type Client interface {
Stop()

// Insert add a job into the queue to be processed.
Insert(queue string, params JobArgs) (*int64, error)
Insert(queue string, params JobArgs, opts ...*types.InsertOptions) (*int64, error)

// InsertTx adds a job into the specified queue within the context of the provided
// transaction, allowing the operation to be part of an atomic database transaction.
InsertTx(tx *sql.Tx, queue string, params JobArgs) (*int64, error)
InsertTx(tx *sql.Tx, queue string, params JobArgs, opts ...*types.InsertOptions) (*int64, error)
}

// NewClient creates a new instance of worker with the provided context and options.
Expand Down Expand Up @@ -122,6 +123,7 @@ func NewClient(ctx context.Context, opts ...Option) Client {
for queue, config := range clt.cfg.queues {
logger := clt.cfg.logger.WithGroup("producer").With(slog.String("queue", queue))
clt.producers[queue] = &producer{
clientID: &clt.id,
logger: logger,
workers: clt.cfg.workers,
storage: clt.cfg.storage,
Expand Down Expand Up @@ -157,58 +159,87 @@ func (c *IClient) Stop() {
}

// Insert add a job into the queue to be processed.
func (c *IClient) Insert(queue string, params JobArgs) (*int64, error) {
func (c *IClient) Insert(queue string, params JobArgs, options ...*types.InsertOptions) (*int64, error) {
args, err := json.Marshal(params)
if err != nil {
return nil, err
}

id, err := c.cfg.storage.Insert(queue, params.Kind(), args)
if err != nil {
c.cfg.logger.ErrorContext(c.ctx, "failed to insert job into queue",
slog.String("error", err.Error()),
slog.String("queue", queue),
slog.String("kind", params.Kind()),
slog.Any("args", params),
)
opts := &types.InsertOptions{}
if len(options) > 0 {
opts = options[0]
}

if opts.Priority == 0 {
opts.Priority = 4
}

if opts.Priority > 4 {
return nil, errors.New("priority must be between 1 and 4")
}

state := types.JobStateAvailable
if !opts.ScheduledAt.IsZero() {
state = types.JobStateScheduled
}

if opts.ScheduledAt.IsZero() || opts.ScheduledAt.Before(time.Now()) {
opts.ScheduledAt = time.Now().UTC()
}

if opts.MaxRetries == 0 {
opts.MaxRetries = 7
}

if opts.Pending {
state = types.JobStatePending
}

job := &types.JobRow{
Kind: params.Kind(),
Queue: queue,
Args: args,
State: state,
Options: opts,
}

var jobID *int64
if jobID, err = c.cfg.storage.Insert(job); err != nil {
c.cfg.logger.ErrorContext(c.ctx, "failed to insert job into queue", slog.String("error", err.Error()),
slog.String("queue", queue), slog.String("kind", params.Kind()), slog.Any("args", params))
return nil, err
}

c.cfg.logger.DebugContext(c.ctx, "job inserted into queue",
slog.String("queue", queue),
slog.Int64("job_id", *id),
slog.String("kind", params.Kind()),
slog.Any("args", params),
)
c.cfg.logger.DebugContext(c.ctx, "job inserted into queue", slog.String("queue", queue),
slog.Int64("job_id", *jobID), slog.String("kind", params.Kind()), slog.Any("args", params))

return id, nil
return jobID, nil
}

// InsertTx adds a job into the specified queue within the context of the provided
// transaction, allowing the operation to be part of an atomic database transaction.
func (c *IClient) InsertTx(tx *sql.Tx, queue string, params JobArgs) (*int64, error) {
func (c *IClient) InsertTx(tx *sql.Tx, queue string, params JobArgs, options ...*types.InsertOptions) (*int64, error) {
args, err := json.Marshal(params)
if err != nil {
return nil, err
}

id, err := c.cfg.storage.InsertTx(tx, queue, params.Kind(), args)
jobRow := &types.JobRow{
Kind: params.Kind(),
Queue: queue,
Args: args,
State: types.JobStateAvailable,
}

id, err := c.cfg.storage.InsertTx(tx, jobRow)
if err != nil {
c.cfg.logger.ErrorContext(c.ctx, "failed to insert job into queue within transaction",
slog.String("error", err.Error()),
slog.String("queue", queue),
slog.String("kind", params.Kind()),
slog.Any("args", params),
)
c.cfg.logger.ErrorContext(c.ctx, "failed to insert job into queue within transaction", slog.String("error", err.Error()),
slog.String("queue", queue), slog.String("kind", params.Kind()), slog.Any("args", params))
return nil, err
}

c.cfg.logger.DebugContext(c.ctx, "job inserted into queue within transaction",
slog.String("queue", queue),
slog.Int64("job_id", *id),
slog.String("kind", params.Kind()),
slog.Any("args", params),
)
c.cfg.logger.DebugContext(c.ctx, "job inserted into queue within transaction", slog.String("queue", queue),
slog.Int64("job_id", *id), slog.String("kind", params.Kind()), slog.Any("args", params))

return id, nil
}
Expand Down
9 changes: 6 additions & 3 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ import (

"github.com/isaqueveras/synk/storage"
"github.com/isaqueveras/synk/types"
"github.com/oklog/ulid/v2"
)

type producer struct {
clientID *ulid.ULID

logger *slog.Logger
jobsChannel chan *types.JobRow
config *producerConfig
Expand All @@ -37,7 +40,7 @@ type producerConfig struct {

func (p *producer) process(ctx context.Context, jobs chan []*types.JobRow) {
limit := int32(p.config.maxWorkerCount) - p.numJobsActive.Load()
go p.getJobAvailable(jobs, limit)
go p.getJobAvailable(jobs, limit, p.clientID)
for {
select {
case jobs := <-jobs:
Expand Down Expand Up @@ -156,8 +159,8 @@ func (p *producer) handleWorkerDone(job *types.JobRow) {
p.jobsChannel <- job
}

func (p *producer) getJobAvailable(jobs chan<- []*types.JobRow, limit int32) {
items, err := p.storage.GetJobAvailable(p.config.queueName, limit)
func (p *producer) getJobAvailable(jobs chan<- []*types.JobRow, limit int32, clientID *ulid.ULID) {
items, err := p.storage.GetJobAvailable(p.config.queueName, limit, clientID)
if err != nil {
panic(err)
}
Expand Down
14 changes: 8 additions & 6 deletions storage/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/isaqueveras/synk/storage"
"github.com/isaqueveras/synk/storage/postgresql/queries"
"github.com/isaqueveras/synk/types"

"github.com/oklog/ulid/v2"
)

// New creates a new instance of the storage repository using the provided
Expand Down Expand Up @@ -40,7 +42,7 @@ func (pg *postgres) Ping() error {
}

// GetJobAvailable retrieves a list of available jobs from the specified queue with a limit on the number of jobs.
func (pg *postgres) GetJobAvailable(queue string, limit int32) (items []*types.JobRow, err error) {
func (pg *postgres) GetJobAvailable(queue string, limit int32, clientID *ulid.ULID) (items []*types.JobRow, err error) {
ctx, cancel := context.WithTimeout(pg.ctx, pg.timeout)
defer cancel()

Expand All @@ -50,7 +52,7 @@ func (pg *postgres) GetJobAvailable(queue string, limit int32) (items []*types.J
}
defer tx.Rollback() // nolint

if items, err = pg.queries.GetJobAvailable(ctx, tx, queue, limit); err != nil {
if items, err = pg.queries.GetJobAvailable(ctx, tx, queue, limit, clientID); err != nil {
return nil, err
}

Expand All @@ -62,7 +64,7 @@ func (pg *postgres) GetJobAvailable(queue string, limit int32) (items []*types.J
}

// Insert inserts a new job into the specified queue with the given kind and arguments.
func (pg *postgres) Insert(queue, kind string, args []byte) (*int64, error) {
func (pg *postgres) Insert(params *types.JobRow) (*int64, error) {
ctx, cancel := context.WithTimeout(pg.ctx, pg.timeout)
defer cancel()

Expand All @@ -73,7 +75,7 @@ func (pg *postgres) Insert(queue, kind string, args []byte) (*int64, error) {
defer tx.Rollback()

var id *int64
if id, err = pg.queries.Insert(ctx, tx, queue, kind, args); err != nil {
if id, err = pg.queries.Insert(ctx, tx, params); err != nil {
return nil, err
}

Expand All @@ -87,10 +89,10 @@ func (pg *postgres) Insert(queue, kind string, args []byte) (*int64, error) {
// InsertTx inserts a new job into the specified queue with the given kind and arguments
// within the context of the provided transaction.
// This allows the operation to be part of an atomic database transaction.
func (pg *postgres) InsertTx(tx *sql.Tx, queue, kind string, args []byte) (*int64, error) {
func (pg *postgres) InsertTx(tx *sql.Tx, params *types.JobRow) (*int64, error) {
ctx, cancel := context.WithTimeout(pg.ctx, pg.timeout)
defer cancel()
return pg.queries.Insert(ctx, tx, queue, kind, args)
return pg.queries.Insert(ctx, tx, params)
}

// UpdateJobState updates the state, finalized_at, and error message of a job.
Expand Down
24 changes: 17 additions & 7 deletions storage/postgresql/queries/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"

"github.com/isaqueveras/synk/types"

"github.com/oklog/ulid/v2"
)

// Queries represents a collection of methods to interact with the PostgreSQL database.
Expand All @@ -23,8 +25,9 @@ WITH jobs AS (
SELECT id, args, kind
FROM synk.job
WHERE state = 'available' AND queue = $1::TEXT
AND scheduled_at <= COALESCE($4::TIMESTAMPTZ, NOW())
ORDER BY priority ASC, scheduled_at ASC, id ASC
LIMIT $2::integer
LIMIT $2::INTEGER
FOR UPDATE SKIP LOCKED
) UPDATE synk.job SET
state = 'running',
Expand All @@ -36,8 +39,8 @@ WHERE job.id = jobs.id
RETURNING job.id, job.args, job.kind`

// GetJobAvailable retrieves available jobs from the database and updates their state to 'running'.
func (q *Queries) GetJobAvailable(ctx context.Context, tx *sql.Tx, queue string, limit int32) ([]*types.JobRow, error) {
rows, err := tx.QueryContext(ctx, getJobAvailableSQL, queue, limit, "01JK7753BK0C8PY75K0JVXFYY0")
func (q *Queries) GetJobAvailable(ctx context.Context, tx *sql.Tx, queue string, limit int32, clientID *ulid.ULID) ([]*types.JobRow, error) {
rows, err := tx.QueryContext(ctx, getJobAvailableSQL, queue, limit, clientID.String(), nil)
if err != nil {
return nil, err
}
Expand All @@ -63,12 +66,19 @@ func (q *Queries) GetJobAvailable(ctx context.Context, tx *sql.Tx, queue string,
return jobs, nil
}

const insertSQL = "INSERT INTO synk.job (queue, kind, args, max_attempts) VALUES ($1, $2, $3::jsonb, 3) RETURNING id"
const insertSQL = `
INSERT INTO synk.job (queue, kind, args, max_attempts, state, scheduled_at)
VALUES ($1, $2, $3::jsonb, $4, $5, $6) RETURNING id`

// Insert inserts a new job into the database with the specified queue, kind, and arguments.
func (q *Queries) Insert(ctx context.Context, tx *sql.Tx, queue, kind string, args []byte) (id *int64, err error) {
err = tx.QueryRowContext(ctx, insertSQL, queue, kind, args).Scan(&id)
return id, err
func (q *Queries) Insert(ctx context.Context, tx *sql.Tx, params *types.JobRow) (id *int64, err error) {
if err = tx.
QueryRowContext(ctx, insertSQL, params.Queue, params.Kind, params.Args, params.Options.MaxRetries,
params.State, params.Options.ScheduledAt).
Scan(&id); err != nil {
return nil, err
}
return id, nil
}

const updateJobStateSQLNoError = `UPDATE synk.job SET state = $1, finalized_at = $2 WHERE id = $3`
Expand Down
6 changes: 4 additions & 2 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"

"github.com/isaqueveras/synk/types"

"github.com/oklog/ulid/v2"
)

// Storage is an interface that defines methods for interacting with job storage.
Expand All @@ -22,11 +24,11 @@ type Storage interface {
// Insert adds a new job to the specified queue with the given kind and arguments.
// It takes the name of the queue, the kind of job, and the arguments as a byte slice.
// It returns a pointer to the job ID and an error if the insertion fails.
Insert(queue, kind string, args []byte) (*int64, error)
Insert(params *types.JobRow) (*int64, error)

// InsertTx adds a new job to the specified queue with the given kind and arguments
// within the context of the provided transaction.
InsertTx(tx *sql.Tx, queue, kind string, args []byte) (*int64, error)
InsertTx(tx *sql.Tx, params *types.JobRow) (*int64, error)

// UpdateJobState updates the state of a job identified by its ID.
// It takes the job ID, the new state, an optional finalized time, and an
Expand Down
23 changes: 23 additions & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,28 @@ type JobRow struct {
Args []byte
State JobState
Errors []AttemptError
Options *InsertOptions
}

// InsertOptions represents options for inserting a job into the queue.
type InsertOptions struct {
// ScheduledAt is the time at which the job should be scheduled to run.
ScheduledAt time.Time

// Priority is the priority of the job, which can be used to determine the order
// in which jobs are processed. Higher values indicate higher priority.
Priority int

// Pending indicates whether the job is pending execution.
// If true, the job is considered pending and will not be executed until it is marked
// as ready. If false, the job is ready to be executed.
Pending bool

// Retryable indicates whether the job can be retried if it fails.
Retryable bool

// MaxRetries is the maximum number of times the job can be retried if it fails.
MaxRetries int
}

// JobState represents the status of a job.
Expand All @@ -28,6 +50,7 @@ const (
JobStateRetryable JobState = "retryable"
JobStateRunning JobState = "running"
JobStateScheduled JobState = "scheduled"
JobStatePending JobState = "pending"
)

// AttemptError represents an error that occurred during a job attempt.
Expand Down