Vendor-agnostic interfaces for pub/sub messaging systems.
Creates publishers and subscribers.
Publishes messages to topics.
type Publisher interface {
Publish(ctx context.Context, topic string, message queue.Message) error
PublishAfter(ctx context.Context, topic string, message queue.Message, delayMs int64) error
Close() error
}PublishAfter inserts a fresh message that becomes visible to subscribers only after delayMs. It is distinct from Nack(requeueAfterMillis) even though both can produce "next delivery happens at T+delay":
Nackis "this delivery failed, try again" — it bumpsretry_countand eventually trips DLQ.PublishAfteris "postpone this work" —retry_countresets to 0, DLQ stays available for true failures.
Use PublishAfter for self-driven poll loops (e.g. the orchestrator's buildsignal consumer re-publishing itself between Status calls). Use Nack for processing failures.
Consumes messages from topics with per-subscription configuration.
type Subscriber interface {
Subscribe(ctx context.Context, topic string, config SubscriptionConfig) (<-chan Delivery, error)
Close() error
}Message with acknowledgment operations.
type Delivery interface {
Message() queue.Message
Ack(ctx context.Context) error
Nack(ctx context.Context, requeueAfterMillis int64) error
Reject(ctx context.Context, reason string) error
ExtendVisibilityTimeout(ctx context.Context, durationMillis int64) error
DeliveryID() string
Attempt() int
ReceivedAt() int64
Metadata() map[string]string
}- Ack — message processed successfully, remove from queue
- Nack — processing failed, requeue for retry after delay
- Reject — poison pill, move to DLQ (or ack if DLQ disabled)
- ExtendVisibilityTimeout — extend processing window for long-running work
Per-subscription configuration for polling, batching, leasing, retries, and DLQ:
cfg := extqueue.DefaultSubscriptionConfig("worker-1", "consumer-group")
cfg.PollIntervalMs = 50
cfg.BatchSize = 20
cfg.VisibilityTimeoutMs = 60000
cfg.Retry.MaxAttempts = 3
cfg.DLQ.Enabled = trueSee subscription_config.go for all fields and defaults.
q, _ := NewQueue(config)
defer q.Close()
// Publish
pub := q.Publisher()
msg := queue.NewMessage("id", []byte("payload"), "partition-key", nil)
pub.Publish(ctx, "topic", msg)
// Subscribe
sub := q.Subscriber()
cfg := extqueue.DefaultSubscriptionConfig("worker-1", "consumer-group")
deliveries, _ := sub.Subscribe(ctx, "topic", cfg)
for delivery := range deliveries {
if err := process(delivery.Message().Payload); err != nil {
delivery.Nack(ctx, 0) // Retry
continue
}
delivery.Ack(ctx)
}- Create
extension/queue/{backend}/directory - Implement
Queue,Publisher,Subscriber,Deliveryinterfaces - Map
queue.Messageto backend format
See extension/queue/mysql/ for the reference implementation.