Transactional outbox based on polling publisher for PostgreSQL.
- Persist messages
- Publish message batch
- Publish message in worker pool
- Manage delay between batch publishing
- Create custom publisher
- Create custom repository
- Use custom outbox table
- Publish in partitions
- pgx
- gorm
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/vsvp21/outbox/v5"
)
type Publisher struct{}
func (p Publisher) Publish(exchange, topic string, message outbox.Message) error {
payload, err := json.Marshal(message.Payload)
if err != nil {
return err
}
fmt.Printf("published message to topic: %s, payload: %s", topic, string(payload))
return nil
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c, err := pgxpool.New(ctx, "postgres://root:root@127.0.0.1:5432/db_name")
if err != nil {
log.Fatal(err)
}
r := outbox.NewRepository(outbox.NewPGXAdapter(c))
relay := outbox.NewRelay(r, Publisher{}, 1_000, time.Millisecond)
if err = relay.Run(ctx, outbox.BatchSize(100)); err != nil {
log.Fatal(err)
}
}
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"github.com/vsvp21/outbox/v5"
)
type Publisher struct{}
func (p Publisher) Publish(exchange, topic string, message outbox.Message) error {
payload, err := json.Marshal(message.Payload)
if err != nil {
return err
}
fmt.Printf("published message to topic: %s, payload: %s", topic, string(payload))
return nil
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
db, err := gorm.Open(postgres.New(postgres.Config{
DSN: "host=127.0.0.1 user=db_user password=secretsecret dbname=test_db port=5432 sslmode=disable",
}), &gorm.Config{})
if err != nil {
log.Fatal(err)
}
r := outbox.NewRepository(outbox.NewGORMAdapter(db))
relay := outbox.NewRelay(r, Publisher{}, 1_000, time.Millisecond)
if err = relay.Run(ctx, outbox.BatchSize(100)); err != nil {
log.Fatal(err)
}
}
package main
import "github.com/vsvp21/outbox/v5"
func main() {
// Your code ...
outbox.TableName = "custom"
// Your code ...
}
package main
import (
"context"
"log"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/vsvp21/outbox/v5"
)
func main() {
db, err := pgxpool.New(context.TODO(), "postgres://root:root@127.0.0.1:5432/db_name")
if err != nil {
log.Fatal(err)
}
p := outbox.NewPgxPersister(db)
p.PersistInTx(context.TODO(), func(tx pgx.Tx) ([]outbox.Message, error) {
// SQL Queries
return []outbox.Message{}, nil
})
}
package main
import (
"log"
"github.com/vsvp21/outbox/v5"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
func main() {
c := postgres.Config{
DSN: "host=127.0.0.1 user=db_user password=secretsecret dbname=test_db port=5432 sslmode=disable",
}
db, err := gorm.Open(postgres.New(c))
if err != nil {
log.Fatal(err)
}
p := outbox.NewGormPersister(db)
p.PersistInTx(func(tx *gorm.DB) ([]outbox.Message, error) {
// SQL Queries
return []outbox.Message{}, nil
})
}