Skip to content

Commit

Permalink
feat: support ingesting events via queue
Browse files Browse the repository at this point in the history
  • Loading branch information
vm-001 authored and webhookx-x committed Oct 9, 2024
1 parent 5f65e6d commit ce1cea8
Show file tree
Hide file tree
Showing 38 changed files with 1,080 additions and 184 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ build:
install:
go install ${LDFLAGS}

generate:
go generate ./...

.PHONY: test
test: clean
go test $$(go list ./... | grep -v /test/)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ Explore more API at [openapi.yml](/openapi.yml).
The gateway requires the following runtime dependencies to work:

- PostgreSQL(>=13): Lower versions of PostgreSQL may work, but have not been fully tested.
- Redis(>=4): Lower versions of Redis may work, but have not been fully tested.
- Redis(>=6.2): Lower versions of Redis may work, but have not been fully tested.

## Status and Compatibility

Expand Down
7 changes: 5 additions & 2 deletions admin/api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"encoding/json"
"github.com/webhookx-io/webhookx/db/entities"
"github.com/webhookx-io/webhookx/db/query"
"github.com/webhookx-io/webhookx/pkg/types"
"github.com/webhookx-io/webhookx/pkg/ucontext"
"github.com/webhookx-io/webhookx/utils"
"net/http"
"time"
)

func (api *API) PageEvent(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -35,7 +37,6 @@ func (api *API) GetEvent(w http.ResponseWriter, r *http.Request) {

func (api *API) CreateEvent(w http.ResponseWriter, r *http.Request) {
var event entities.Event
event.ID = utils.KSUID()

if err := json.NewDecoder(r.Body).Decode(&event); err != nil {
api.error(400, w, err)
Expand All @@ -47,6 +48,8 @@ func (api *API) CreateEvent(w http.ResponseWriter, r *http.Request) {
return
}

event.ID = utils.KSUID()
event.IngestedAt = types.Time{Time: time.Now()}
event.WorkspaceId = ucontext.GetWorkspaceID(r.Context())
err := api.dispatcher.Dispatch(r.Context(), &event)
api.assert(err)
Expand All @@ -71,7 +74,7 @@ func (api *API) RetryEvent(w http.ResponseWriter, r *http.Request) {
return
}

err = api.dispatcher.DispatchEndpoint(r.Context(), id, []*entities.Endpoint{endpoint})
err = api.dispatcher.DispatchEndpoint(r.Context(), event, []*entities.Endpoint{endpoint})
api.assert(err)

api.json(200, w, nil)
Expand Down
9 changes: 7 additions & 2 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/webhookx-io/webhookx/pkg/taskqueue"
"github.com/webhookx-io/webhookx/proxy"
"github.com/webhookx-io/webhookx/worker"
"github.com/webhookx-io/webhookx/worker/deliverer"
"go.uber.org/zap"
"sync"
)
Expand Down Expand Up @@ -74,7 +75,9 @@ func (app *Application) initialize() error {
client := cfg.RedisConfig.GetClient()

// queue
queue := taskqueue.NewRedisQueue(client)
queue := taskqueue.NewRedisQueue(taskqueue.RedisTaskQueueOptions{
Client: client,
}, app.log)
app.queue = queue

// cache
Expand All @@ -84,7 +87,9 @@ func (app *Application) initialize() error {

// worker
if cfg.WorkerConfig.Enabled {
app.worker = worker.NewWorker(&cfg.WorkerConfig, db, queue)
opts := worker.WorkerOptions{}
deliverer := deliverer.NewHTTPDeliverer(&cfg.WorkerConfig.Deliverer)
app.worker = worker.NewWorker(opts, db, deliverer, queue)
}

// admin
Expand Down
3 changes: 2 additions & 1 deletion config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ proxy:
body: '{"message": "OK"}'

queue:
type: redis # supported values are redis, off
redis:
host: 127.0.0.1
host: localhost
port: 6379
password:
database: 0
73 changes: 73 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,79 @@ func TestLogConfig(t *testing.T) {
}
}

func TestProxyConfig(t *testing.T) {
tests := []struct {
desc string
cfg ProxyConfig
expectedValidateErr error
}{
{
desc: "sanity",
cfg: ProxyConfig{
Queue: Queue{
Type: "redis",
},
},
expectedValidateErr: nil,
},
{
desc: "max_request_body_size cannot be negative value",
cfg: ProxyConfig{
MaxRequestBodySize: -1,
Queue: Queue{
Type: "redis",
},
},
expectedValidateErr: errors.New("max_request_body_size cannot be negative value"),
},
{
desc: "timeout_read cannot be negative value",
cfg: ProxyConfig{
TimeoutRead: -1,
Queue: Queue{
Type: "redis",
},
},
expectedValidateErr: errors.New("timeout_read cannot be negative value"),
},
{
desc: "timeout_write cannot be negative value",
cfg: ProxyConfig{
TimeoutWrite: -1,
Queue: Queue{
Type: "redis",
},
},
expectedValidateErr: errors.New("timeout_write cannot be negative value"),
},
{
desc: "invalid type: unknown",
cfg: ProxyConfig{
Queue: Queue{
Type: "unknown",
},
},
expectedValidateErr: errors.New("invalid queue: unknown type: unknown"),
},
{
desc: "invalid queue",
cfg: ProxyConfig{
Queue: Queue{
Type: "redis",
Redis: RedisConfig{
Port: 65536,
},
},
},
expectedValidateErr: errors.New("invalid queue: port must be in the range [0, 65535]"),
},
}
for _, test := range tests {
actualValidateErr := test.cfg.Validate()
assert.Equal(t, test.expectedValidateErr, actualValidateErr, "expected %v got %v", test.expectedValidateErr, actualValidateErr)
}
}

func TestConfig(t *testing.T) {
cfg, err := Init()
assert.Nil(t, err)
Expand Down
29 changes: 28 additions & 1 deletion config/proxy.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,41 @@
package config

import "errors"
import (
"errors"
"fmt"
"slices"
)

type ProxyResponse struct {
Code uint `yaml:"code" default:"200"`
ContentType string `yaml:"contentType" default:"application/json"`
Body string `yaml:"body" default:"{\"message\": \"OK\"}"`
}

type QueueType string

const (
QueueTypeOff QueueType = "off"
QueueTypeRedis QueueType = "redis"
)

type Queue struct {
Type QueueType `yaml:"type" default:"redis"`
Redis RedisConfig `yaml:"redis"`
}

func (cfg Queue) Validate() error {
if !slices.Contains([]QueueType{QueueTypeRedis, QueueTypeOff}, cfg.Type) {
return fmt.Errorf("unknown type: %s", cfg.Type)
}
if cfg.Type == QueueTypeRedis {
if err := cfg.Redis.Validate(); err != nil {
return err
}
}
return nil
}

type ProxyConfig struct {
Listen string `yaml:"listen"`
TimeoutRead int64 `yaml:"timeout_read" default:"10"`
Expand All @@ -31,6 +55,9 @@ func (cfg ProxyConfig) Validate() error {
if cfg.TimeoutWrite < 0 {
return errors.New("timeout_write cannot be negative value")
}
if err := cfg.Queue.Validate(); err != nil {
return errors.New("invalid queue: " + err.Error())
}
return nil
}

Expand Down
38 changes: 38 additions & 0 deletions constants/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package constants

import (
"github.com/webhookx-io/webhookx/config"
"time"
)

// Task Queue
const (
TaskQueueName = "webhookx:queue"
TaskQueueInvisibleQueueName = "webhookx:queue_invisible"
TaskQueueDataName = "webhookx:queue_data"
TaskQueueVisibilityTimeout = time.Second * 60
)

// Redis Queue
const (
QueueRedisQueueName = "webhookx:proxy_queue"
QueueRedisGroupName = "group_default"
QueueRedisConsumerName = "consumer_default"
QueueRedisVisibilityTimeout = time.Second * 60
)

const (
RequeueBatch = 100
RequeueInterval = time.Second * 60
)

var (
DefaultResponseHeaders = map[string]string{
"Content-Type": "application/json",
"Server": "WebhookX/" + config.VERSION,
}
DefaultDelivererRequestHeaders = map[string]string{
"User-Agent": "WebhookX/" + config.VERSION,
"Content-Type": "application/json; charset=utf-8",
}
)
4 changes: 2 additions & 2 deletions db/dao/attempt_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func (dao *attemptDao) UpdateErrorCode(ctx context.Context, id string, status en
return err
}

func (dao *attemptDao) ListUnqueued(ctx context.Context, limit int64) (list []*entities.Attempt, err error) {
sql := "SELECT * FROM attempts WHERE status = 'INIT' and created_at <= now() - INTERVAL '60 SECOND' limit $1"
func (dao *attemptDao) ListUnqueued(ctx context.Context, limit int) (list []*entities.Attempt, err error) {
sql := "SELECT * FROM attempts WHERE status = 'INIT' and created_at <= now() AT TIME ZONE 'UTC' - INTERVAL '60 SECOND' limit $1"
err = dao.UnsafeDB(ctx).SelectContext(ctx, &list, sql, limit)
return
}
4 changes: 3 additions & 1 deletion db/dao/daos.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type BaseDAO[T any] interface {
Delete(ctx context.Context, id string) (bool, error)
Page(ctx context.Context, q query.Queryer) ([]*T, int64, error)
List(ctx context.Context, q query.Queryer) ([]*T, error)
Count(ctx context.Context, conditions map[string]interface{}) (int64, error)
BatchInsert(ctx context.Context, entities []*T) error
}

Expand All @@ -28,14 +29,15 @@ type EndpointDAO interface {

type EventDAO interface {
BaseDAO[entities.Event]
BatchInsertIgnoreConflict(ctx context.Context, events []*entities.Event) ([]string, error)
}

type AttemptDAO interface {
BaseDAO[entities.Attempt]
UpdateStatus(ctx context.Context, id string, status entities.AttemptStatus) error
UpdateErrorCode(ctx context.Context, id string, status entities.AttemptStatus, code entities.AttemptErrorCode) error
UpdateDelivery(ctx context.Context, id string, result *AttemptResult) error
ListUnqueued(ctx context.Context, limit int64) (list []*entities.Attempt, err error)
ListUnqueued(ctx context.Context, limit int) (list []*entities.Attempt, err error)
}

type SourceDAO interface {
Expand Down
26 changes: 26 additions & 0 deletions db/dao/event_dao.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dao

import (
"context"
"github.com/jmoiron/sqlx"
"github.com/webhookx-io/webhookx/db/entities"
)
Expand All @@ -14,3 +15,28 @@ func NewEventDao(db *sqlx.DB, workspace bool) EventDAO {
DAO: NewDAO[entities.Event]("events", db, workspace),
}
}

func (dao *eventDao) BatchInsertIgnoreConflict(ctx context.Context, events []*entities.Event) (inserteds []string, err error) {
if len(events) == 0 {
return
}

builder := psql.Insert(dao.table).Columns("id", "data", "event_type", "ingested_at", "ws_id")
for _, event := range events {
builder = builder.Values(event.ID, event.Data, event.EventType, event.IngestedAt, event.WorkspaceId)
}
statement, args := builder.Suffix("ON CONFLICT(id) DO NOTHING RETURNING id").MustSql()
var rows *sqlx.Rows
rows, err = dao.DB(ctx).QueryxContext(ctx, statement, args...)
if err != nil {
return
}
for rows.Next() {
var id string
if err = rows.Scan(&id); err != nil {
return
}
inserteds = append(inserteds, id)
}
return inserteds, rows.Err()
}
8 changes: 5 additions & 3 deletions db/entities/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package entities

import (
"encoding/json"
"github.com/webhookx-io/webhookx/pkg/types"
"github.com/webhookx-io/webhookx/utils"
)

type Event struct {
ID string `json:"id" validate:"required"`
EventType string `json:"event_type" db:"event_type" validate:"required"`
Data json.RawMessage `json:"data" validate:"required"`
ID string `json:"id" validate:"required"`
EventType string `json:"event_type" db:"event_type" validate:"required"`
Data json.RawMessage `json:"data" validate:"required"`
IngestedAt types.Time `json:"ingested_at" db:"ingested_at"`

BaseModel
}
Expand Down
13 changes: 6 additions & 7 deletions db/entities/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ func (m CustomResponse) Value() (driver.Value, error) {
}

type Source struct {
ID string `json:"id" db:"id"`
Name *string `json:"name" db:"name"`

Enabled bool `json:"enabled" db:"enabled"`
Path string `json:"path" db:"path"`
Methods pq.StringArray `json:"methods" db:"methods"`

ID string `json:"id" db:"id"`
Name *string `json:"name" db:"name"`
Enabled bool `json:"enabled" db:"enabled"`
Path string `json:"path" db:"path"`
Methods pq.StringArray `json:"methods" db:"methods"`
Async bool `json:"async" db:"async"`
Response *CustomResponse `json:"response" db:"response"`

BaseModel
Expand Down
2 changes: 2 additions & 0 deletions db/migrations/6_async_ingestion.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE IF EXISTS ONLY "sources" DROP COLUMN IF EXISTS "async";
ALTER TABLE IF EXISTS ONLY "events" DROP COLUMN IF EXISTS "ingested_at";
4 changes: 4 additions & 0 deletions db/migrations/6_async_ingestion.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE IF EXISTS ONLY "sources" ADD COLUMN IF NOT EXISTS "async" BOOLEAN NOT NULL DEFAULT false;
ALTER TABLE IF EXISTS ONLY "events" ADD COLUMN IF NOT EXISTS "ingested_at" TIMESTAMPTZ(3) DEFAULT (CURRENT_TIMESTAMP(3) AT TIME ZONE 'UTC');

UPDATE "events" SET ingested_at = created_at;
Loading

0 comments on commit ce1cea8

Please sign in to comment.