From ce1cea895958d4247b382dc400e8e034d0e8c3e2 Mon Sep 17 00:00:00 2001 From: Douglas-Lee Date: Sun, 6 Oct 2024 13:07:40 +0800 Subject: [PATCH] feat: support ingesting events via queue --- Makefile | 3 + README.md | 2 +- admin/api/events.go | 7 +- app/app.go | 9 +- config.yml | 3 +- config/config_test.go | 73 +++++++++ config/proxy.go | 29 +++- constants/constants.go | 38 +++++ db/dao/attempt_dao.go | 4 +- db/dao/daos.go | 4 +- db/dao/event_dao.go | 26 +++ db/entities/event.go | 8 +- db/entities/source.go | 13 +- db/migrations/6_async_ingestion.down.sql | 2 + db/migrations/6_async_ingestion.up.sql | 4 + db/query/querys.go | 12 +- dispatcher/dispatcher.go | 152 +++++++++--------- docker-compose.yml | 2 +- go.mod | 1 + go.sum | 2 + openapi.yml | 6 + pkg/log/zap.go | 9 +- pkg/queue/queue.go | 25 +++ pkg/queue/redis/redis.go | 195 +++++++++++++++++++++++ pkg/taskqueue/queue.go | 7 +- pkg/taskqueue/redis.go | 70 ++++---- proxy/gateway.go | 136 ++++++++++++++-- test/admin/events_test.go | 17 +- test/delivery/delivery_test.go | 14 +- test/generate.go | 3 + test/helper/helper.go | 23 +++ test/mocks/queue.go | 85 ++++++++++ test/proxy/ginkgo_test.go | 12 ++ test/proxy/ingest_test.go | 112 +++++++++++++ test/proxy/listen_test.go | 12 +- test/worker/requeue_test.go | 93 +++++++++++ worker/deliverer/http.go | 8 +- worker/worker.go | 43 +++-- 38 files changed, 1080 insertions(+), 184 deletions(-) create mode 100644 constants/constants.go create mode 100644 db/migrations/6_async_ingestion.down.sql create mode 100644 db/migrations/6_async_ingestion.up.sql create mode 100644 pkg/queue/queue.go create mode 100644 pkg/queue/redis/redis.go create mode 100644 test/generate.go create mode 100644 test/mocks/queue.go create mode 100644 test/proxy/ginkgo_test.go create mode 100644 test/proxy/ingest_test.go create mode 100644 test/worker/requeue_test.go diff --git a/Makefile b/Makefile index d42fa1a..68a095d 100644 --- a/Makefile +++ b/Makefile @@ -15,6 +15,9 @@ build: install: go install ${LDFLAGS} +generate: + go generate ./... + .PHONY: test test: clean go test $$(go list ./... | grep -v /test/) diff --git a/README.md b/README.md index 158b691..d0fb33c 100644 --- a/README.md +++ b/README.md @@ -150,7 +150,7 @@ Explore more API at [openapi.yml](/openapi.yml). The gateway requires the following runtime dependencies to work: - PostgreSQL(>=13): Lower versions of PostgreSQL may work, but have not been fully tested. -- Redis(>=4): Lower versions of Redis may work, but have not been fully tested. +- Redis(>=6.2): Lower versions of Redis may work, but have not been fully tested. ## Status and Compatibility diff --git a/admin/api/events.go b/admin/api/events.go index c766c58..d4fff9e 100644 --- a/admin/api/events.go +++ b/admin/api/events.go @@ -4,9 +4,11 @@ import ( "encoding/json" "github.com/webhookx-io/webhookx/db/entities" "github.com/webhookx-io/webhookx/db/query" + "github.com/webhookx-io/webhookx/pkg/types" "github.com/webhookx-io/webhookx/pkg/ucontext" "github.com/webhookx-io/webhookx/utils" "net/http" + "time" ) func (api *API) PageEvent(w http.ResponseWriter, r *http.Request) { @@ -35,7 +37,6 @@ func (api *API) GetEvent(w http.ResponseWriter, r *http.Request) { func (api *API) CreateEvent(w http.ResponseWriter, r *http.Request) { var event entities.Event - event.ID = utils.KSUID() if err := json.NewDecoder(r.Body).Decode(&event); err != nil { api.error(400, w, err) @@ -47,6 +48,8 @@ func (api *API) CreateEvent(w http.ResponseWriter, r *http.Request) { return } + event.ID = utils.KSUID() + event.IngestedAt = types.Time{Time: time.Now()} event.WorkspaceId = ucontext.GetWorkspaceID(r.Context()) err := api.dispatcher.Dispatch(r.Context(), &event) api.assert(err) @@ -71,7 +74,7 @@ func (api *API) RetryEvent(w http.ResponseWriter, r *http.Request) { return } - err = api.dispatcher.DispatchEndpoint(r.Context(), id, []*entities.Endpoint{endpoint}) + err = api.dispatcher.DispatchEndpoint(r.Context(), event, []*entities.Endpoint{endpoint}) api.assert(err) api.json(200, w, nil) diff --git a/app/app.go b/app/app.go index f5b261c..ed2b0ae 100644 --- a/app/app.go +++ b/app/app.go @@ -12,6 +12,7 @@ import ( "github.com/webhookx-io/webhookx/pkg/taskqueue" "github.com/webhookx-io/webhookx/proxy" "github.com/webhookx-io/webhookx/worker" + "github.com/webhookx-io/webhookx/worker/deliverer" "go.uber.org/zap" "sync" ) @@ -74,7 +75,9 @@ func (app *Application) initialize() error { client := cfg.RedisConfig.GetClient() // queue - queue := taskqueue.NewRedisQueue(client) + queue := taskqueue.NewRedisQueue(taskqueue.RedisTaskQueueOptions{ + Client: client, + }, app.log) app.queue = queue // cache @@ -84,7 +87,9 @@ func (app *Application) initialize() error { // worker if cfg.WorkerConfig.Enabled { - app.worker = worker.NewWorker(&cfg.WorkerConfig, db, queue) + opts := worker.WorkerOptions{} + deliverer := deliverer.NewHTTPDeliverer(&cfg.WorkerConfig.Deliverer) + app.worker = worker.NewWorker(opts, db, deliverer, queue) } // admin diff --git a/config.yml b/config.yml index e8e0a91..7182e03 100644 --- a/config.yml +++ b/config.yml @@ -52,8 +52,9 @@ proxy: body: '{"message": "OK"}' queue: + type: redis # supported values are redis, off redis: - host: 127.0.0.1 + host: localhost port: 6379 password: database: 0 diff --git a/config/config_test.go b/config/config_test.go index d5279c8..5f92912 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -90,6 +90,79 @@ func TestLogConfig(t *testing.T) { } } +func TestProxyConfig(t *testing.T) { + tests := []struct { + desc string + cfg ProxyConfig + expectedValidateErr error + }{ + { + desc: "sanity", + cfg: ProxyConfig{ + Queue: Queue{ + Type: "redis", + }, + }, + expectedValidateErr: nil, + }, + { + desc: "max_request_body_size cannot be negative value", + cfg: ProxyConfig{ + MaxRequestBodySize: -1, + Queue: Queue{ + Type: "redis", + }, + }, + expectedValidateErr: errors.New("max_request_body_size cannot be negative value"), + }, + { + desc: "timeout_read cannot be negative value", + cfg: ProxyConfig{ + TimeoutRead: -1, + Queue: Queue{ + Type: "redis", + }, + }, + expectedValidateErr: errors.New("timeout_read cannot be negative value"), + }, + { + desc: "timeout_write cannot be negative value", + cfg: ProxyConfig{ + TimeoutWrite: -1, + Queue: Queue{ + Type: "redis", + }, + }, + expectedValidateErr: errors.New("timeout_write cannot be negative value"), + }, + { + desc: "invalid type: unknown", + cfg: ProxyConfig{ + Queue: Queue{ + Type: "unknown", + }, + }, + expectedValidateErr: errors.New("invalid queue: unknown type: unknown"), + }, + { + desc: "invalid queue", + cfg: ProxyConfig{ + Queue: Queue{ + Type: "redis", + Redis: RedisConfig{ + Port: 65536, + }, + }, + }, + expectedValidateErr: errors.New("invalid queue: port must be in the range [0, 65535]"), + }, + } + for _, test := range tests { + actualValidateErr := test.cfg.Validate() + assert.Equal(t, test.expectedValidateErr, actualValidateErr, "expected %v got %v", test.expectedValidateErr, actualValidateErr) + } +} + func TestConfig(t *testing.T) { cfg, err := Init() assert.Nil(t, err) diff --git a/config/proxy.go b/config/proxy.go index 4d1c12c..e0e62a9 100644 --- a/config/proxy.go +++ b/config/proxy.go @@ -1,6 +1,10 @@ package config -import "errors" +import ( + "errors" + "fmt" + "slices" +) type ProxyResponse struct { Code uint `yaml:"code" default:"200"` @@ -8,10 +12,30 @@ type ProxyResponse struct { Body string `yaml:"body" default:"{\"message\": \"OK\"}"` } +type QueueType string + +const ( + QueueTypeOff QueueType = "off" + QueueTypeRedis QueueType = "redis" +) + type Queue struct { + Type QueueType `yaml:"type" default:"redis"` Redis RedisConfig `yaml:"redis"` } +func (cfg Queue) Validate() error { + if !slices.Contains([]QueueType{QueueTypeRedis, QueueTypeOff}, cfg.Type) { + return fmt.Errorf("unknown type: %s", cfg.Type) + } + if cfg.Type == QueueTypeRedis { + if err := cfg.Redis.Validate(); err != nil { + return err + } + } + return nil +} + type ProxyConfig struct { Listen string `yaml:"listen"` TimeoutRead int64 `yaml:"timeout_read" default:"10"` @@ -31,6 +55,9 @@ func (cfg ProxyConfig) Validate() error { if cfg.TimeoutWrite < 0 { return errors.New("timeout_write cannot be negative value") } + if err := cfg.Queue.Validate(); err != nil { + return errors.New("invalid queue: " + err.Error()) + } return nil } diff --git a/constants/constants.go b/constants/constants.go new file mode 100644 index 0000000..9971f68 --- /dev/null +++ b/constants/constants.go @@ -0,0 +1,38 @@ +package constants + +import ( + "github.com/webhookx-io/webhookx/config" + "time" +) + +// Task Queue +const ( + TaskQueueName = "webhookx:queue" + TaskQueueInvisibleQueueName = "webhookx:queue_invisible" + TaskQueueDataName = "webhookx:queue_data" + TaskQueueVisibilityTimeout = time.Second * 60 +) + +// Redis Queue +const ( + QueueRedisQueueName = "webhookx:proxy_queue" + QueueRedisGroupName = "group_default" + QueueRedisConsumerName = "consumer_default" + QueueRedisVisibilityTimeout = time.Second * 60 +) + +const ( + RequeueBatch = 100 + RequeueInterval = time.Second * 60 +) + +var ( + DefaultResponseHeaders = map[string]string{ + "Content-Type": "application/json", + "Server": "WebhookX/" + config.VERSION, + } + DefaultDelivererRequestHeaders = map[string]string{ + "User-Agent": "WebhookX/" + config.VERSION, + "Content-Type": "application/json; charset=utf-8", + } +) diff --git a/db/dao/attempt_dao.go b/db/dao/attempt_dao.go index 4552f45..bc7e6fb 100644 --- a/db/dao/attempt_dao.go +++ b/db/dao/attempt_dao.go @@ -53,8 +53,8 @@ func (dao *attemptDao) UpdateErrorCode(ctx context.Context, id string, status en return err } -func (dao *attemptDao) ListUnqueued(ctx context.Context, limit int64) (list []*entities.Attempt, err error) { - sql := "SELECT * FROM attempts WHERE status = 'INIT' and created_at <= now() - INTERVAL '60 SECOND' limit $1" +func (dao *attemptDao) ListUnqueued(ctx context.Context, limit int) (list []*entities.Attempt, err error) { + sql := "SELECT * FROM attempts WHERE status = 'INIT' and created_at <= now() AT TIME ZONE 'UTC' - INTERVAL '60 SECOND' limit $1" err = dao.UnsafeDB(ctx).SelectContext(ctx, &list, sql, limit) return } diff --git a/db/dao/daos.go b/db/dao/daos.go index 9cf944c..aa626f2 100644 --- a/db/dao/daos.go +++ b/db/dao/daos.go @@ -14,6 +14,7 @@ type BaseDAO[T any] interface { Delete(ctx context.Context, id string) (bool, error) Page(ctx context.Context, q query.Queryer) ([]*T, int64, error) List(ctx context.Context, q query.Queryer) ([]*T, error) + Count(ctx context.Context, conditions map[string]interface{}) (int64, error) BatchInsert(ctx context.Context, entities []*T) error } @@ -28,6 +29,7 @@ type EndpointDAO interface { type EventDAO interface { BaseDAO[entities.Event] + BatchInsertIgnoreConflict(ctx context.Context, events []*entities.Event) ([]string, error) } type AttemptDAO interface { @@ -35,7 +37,7 @@ type AttemptDAO interface { UpdateStatus(ctx context.Context, id string, status entities.AttemptStatus) error UpdateErrorCode(ctx context.Context, id string, status entities.AttemptStatus, code entities.AttemptErrorCode) error UpdateDelivery(ctx context.Context, id string, result *AttemptResult) error - ListUnqueued(ctx context.Context, limit int64) (list []*entities.Attempt, err error) + ListUnqueued(ctx context.Context, limit int) (list []*entities.Attempt, err error) } type SourceDAO interface { diff --git a/db/dao/event_dao.go b/db/dao/event_dao.go index 4e12a7e..a158316 100644 --- a/db/dao/event_dao.go +++ b/db/dao/event_dao.go @@ -1,6 +1,7 @@ package dao import ( + "context" "github.com/jmoiron/sqlx" "github.com/webhookx-io/webhookx/db/entities" ) @@ -14,3 +15,28 @@ func NewEventDao(db *sqlx.DB, workspace bool) EventDAO { DAO: NewDAO[entities.Event]("events", db, workspace), } } + +func (dao *eventDao) BatchInsertIgnoreConflict(ctx context.Context, events []*entities.Event) (inserteds []string, err error) { + if len(events) == 0 { + return + } + + builder := psql.Insert(dao.table).Columns("id", "data", "event_type", "ingested_at", "ws_id") + for _, event := range events { + builder = builder.Values(event.ID, event.Data, event.EventType, event.IngestedAt, event.WorkspaceId) + } + statement, args := builder.Suffix("ON CONFLICT(id) DO NOTHING RETURNING id").MustSql() + var rows *sqlx.Rows + rows, err = dao.DB(ctx).QueryxContext(ctx, statement, args...) + if err != nil { + return + } + for rows.Next() { + var id string + if err = rows.Scan(&id); err != nil { + return + } + inserteds = append(inserteds, id) + } + return inserteds, rows.Err() +} diff --git a/db/entities/event.go b/db/entities/event.go index 8160338..f92b139 100644 --- a/db/entities/event.go +++ b/db/entities/event.go @@ -2,13 +2,15 @@ package entities import ( "encoding/json" + "github.com/webhookx-io/webhookx/pkg/types" "github.com/webhookx-io/webhookx/utils" ) type Event struct { - ID string `json:"id" validate:"required"` - EventType string `json:"event_type" db:"event_type" validate:"required"` - Data json.RawMessage `json:"data" validate:"required"` + ID string `json:"id" validate:"required"` + EventType string `json:"event_type" db:"event_type" validate:"required"` + Data json.RawMessage `json:"data" validate:"required"` + IngestedAt types.Time `json:"ingested_at" db:"ingested_at"` BaseModel } diff --git a/db/entities/source.go b/db/entities/source.go index d2ced15..671ab05 100644 --- a/db/entities/source.go +++ b/db/entities/source.go @@ -22,13 +22,12 @@ func (m CustomResponse) Value() (driver.Value, error) { } type Source struct { - ID string `json:"id" db:"id"` - Name *string `json:"name" db:"name"` - - Enabled bool `json:"enabled" db:"enabled"` - Path string `json:"path" db:"path"` - Methods pq.StringArray `json:"methods" db:"methods"` - + ID string `json:"id" db:"id"` + Name *string `json:"name" db:"name"` + Enabled bool `json:"enabled" db:"enabled"` + Path string `json:"path" db:"path"` + Methods pq.StringArray `json:"methods" db:"methods"` + Async bool `json:"async" db:"async"` Response *CustomResponse `json:"response" db:"response"` BaseModel diff --git a/db/migrations/6_async_ingestion.down.sql b/db/migrations/6_async_ingestion.down.sql new file mode 100644 index 0000000..8a1f4c1 --- /dev/null +++ b/db/migrations/6_async_ingestion.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE IF EXISTS ONLY "sources" DROP COLUMN IF EXISTS "async"; +ALTER TABLE IF EXISTS ONLY "events" DROP COLUMN IF EXISTS "ingested_at"; diff --git a/db/migrations/6_async_ingestion.up.sql b/db/migrations/6_async_ingestion.up.sql new file mode 100644 index 0000000..3fdb3be --- /dev/null +++ b/db/migrations/6_async_ingestion.up.sql @@ -0,0 +1,4 @@ +ALTER TABLE IF EXISTS ONLY "sources" ADD COLUMN IF NOT EXISTS "async" BOOLEAN NOT NULL DEFAULT false; +ALTER TABLE IF EXISTS ONLY "events" ADD COLUMN IF NOT EXISTS "ingested_at" TIMESTAMPTZ(3) DEFAULT (CURRENT_TIMESTAMP(3) AT TIME ZONE 'UTC'); + +UPDATE "events" SET ingested_at = created_at; diff --git a/db/query/querys.go b/db/query/querys.go index d1dd56a..ef47bdd 100644 --- a/db/query/querys.go +++ b/db/query/querys.go @@ -2,10 +2,16 @@ package query type EndpointQuery struct { Query + + WorkspaceId *string } func (q *EndpointQuery) WhereMap() map[string]interface{} { - return map[string]interface{}{} + maps := make(map[string]interface{}) + if q.WorkspaceId != nil { + maps["ws_id"] = *q.WorkspaceId + } + return maps } type EventQuery struct { @@ -29,6 +35,7 @@ type AttemptQuery struct { EventId *string EndpointId *string + Status *string } func (q *AttemptQuery) WhereMap() map[string]interface{} { @@ -39,6 +46,9 @@ func (q *AttemptQuery) WhereMap() map[string]interface{} { if q.EndpointId != nil { maps["endpoint_id"] = *q.EndpointId } + if q.Status != nil { + maps["status"] = *q.Status + } return maps } diff --git a/dispatcher/dispatcher.go b/dispatcher/dispatcher.go index 24b4118..50afda1 100644 --- a/dispatcher/dispatcher.go +++ b/dispatcher/dispatcher.go @@ -30,122 +30,122 @@ func NewDispatcher(log *zap.SugaredLogger, queue taskqueue.TaskQueue, db *db.DB) } func (d *Dispatcher) Dispatch(ctx context.Context, event *entities.Event) error { - endpoints, err := listSubscribedEndpoints(ctx, d.db, event.EventType) + endpoints, err := d.listSubscribedEndpoint(ctx, event.WorkspaceId, event.EventType) if err != nil { return err } - return d.dispatch(ctx, event, endpoints) + attempts := fanout(event, endpoints, entities.AttemptTriggerModeInitial) + if len(attempts) == 0 { + return d.db.Events.Insert(ctx, event) + } + + err = d.db.TX(ctx, func(ctx context.Context) error { + err := d.db.Events.Insert(ctx, event) + if err != nil { + return err + } + return d.db.Attempts.BatchInsert(ctx, attempts) + }) + if err != nil { + return err + } + + go d.sendToQueue(context.TODO(), attempts) + + return nil } -func (d *Dispatcher) DispatchEndpoint(ctx context.Context, eventId string, endpoints []*entities.Endpoint) error { - attempts := make([]*entities.Attempt, 0, len(endpoints)) - tasks := make([]*taskqueue.TaskMessage, 0, len(endpoints)) +func (d *Dispatcher) DispatchBatch(ctx context.Context, events []*entities.Event) error { + if len(events) == 0 { + return nil + } + + eventAttempts := make(map[string][]*entities.Attempt) + for _, event := range events { + endpoints, err := d.listSubscribedEndpoint(ctx, event.WorkspaceId, event.EventType) + if err != nil { + return err + } + eventAttempts[event.ID] = fanout(event, endpoints, entities.AttemptTriggerModeInitial) + } + + attempts := make([]*entities.Attempt, 0) + err := d.db.TX(ctx, func(ctx context.Context) error { + ids, err := d.db.Events.BatchInsertIgnoreConflict(ctx, events) + if err != nil { + return err + } + for _, id := range ids { + attempts = append(attempts, eventAttempts[id]...) + } + return d.db.Attempts.BatchInsert(ctx, attempts) + }) + go d.sendToQueue(context.TODO(), attempts) + + return err +} + +func fanout(event *entities.Event, endpoints []*entities.Endpoint, mode entities.AttemptTriggerMode) []*entities.Attempt { + attempts := make([]*entities.Attempt, 0, len(endpoints)) now := time.Now() for _, endpoint := range endpoints { delay := endpoint.Retry.Config.Attempts[0] attempt := &entities.Attempt{ ID: utils.KSUID(), - EventId: eventId, + EventId: event.ID, EndpointId: endpoint.ID, Status: entities.AttemptStatusInit, AttemptNumber: 1, ScheduledAt: types.NewTime(now.Add(time.Second * time.Duration(delay))), - TriggerMode: entities.AttemptTriggerModeManual, - } - - task := &taskqueue.TaskMessage{ - ID: attempt.ID, - Data: &model.MessageData{ - EventID: eventId, - EndpointId: endpoint.ID, - Attempt: 1, - }, + TriggerMode: mode, } + attempt.WorkspaceId = event.WorkspaceId attempts = append(attempts, attempt) - tasks = append(tasks, task) } + return attempts +} - err := d.db.AttemptsWS.BatchInsert(ctx, attempts) +func (d *Dispatcher) DispatchEndpoint(ctx context.Context, event *entities.Event, endpoints []*entities.Endpoint) error { + attempts := fanout(event, endpoints, entities.AttemptTriggerModeManual) + + err := d.db.Attempts.BatchInsert(ctx, attempts) if err != nil { return err } - for i, task := range tasks { - err := d.queue.Add(task, attempts[i].ScheduledAt.Time) - if err != nil { - d.log.Warnf("failed to add task to queue: %v", err) - continue - } - err = d.db.AttemptsWS.UpdateStatus(ctx, task.ID, entities.AttemptStatusQueued) - if err != nil { - d.log.Warnf("failed to update attempt status: %v", err) - } - } + d.sendToQueue(context.TODO(), attempts) return nil } -func (d *Dispatcher) dispatch(ctx context.Context, event *entities.Event, endpoints []*entities.Endpoint) error { - attempts := make([]*entities.Attempt, 0, len(endpoints)) - tasks := make([]*taskqueue.TaskMessage, 0, len(endpoints)) - - err := d.db.TX(ctx, func(ctx context.Context) error { - now := time.Now() - err := d.db.EventsWS.Insert(ctx, event) - if err != nil { - return err - } - - for _, endpoint := range endpoints { - delay := endpoint.Retry.Config.Attempts[0] - attempt := &entities.Attempt{ - ID: utils.KSUID(), - EventId: event.ID, - EndpointId: endpoint.ID, - Status: entities.AttemptStatusInit, - AttemptNumber: 1, - ScheduledAt: types.NewTime(now.Add(time.Second * time.Duration(delay))), - TriggerMode: entities.AttemptTriggerModeInitial, - } - - task := &taskqueue.TaskMessage{ - ID: attempt.ID, - Data: &model.MessageData{ - EventID: event.ID, - EndpointId: endpoint.ID, - Attempt: 1, - }, - } - attempts = append(attempts, attempt) - tasks = append(tasks, task) +func (d *Dispatcher) sendToQueue(ctx context.Context, attempts []*entities.Attempt) { + for _, attempt := range attempts { + task := &taskqueue.TaskMessage{ + ID: attempt.ID, + Data: &model.MessageData{ + EventID: attempt.EventId, + EndpointId: attempt.EndpointId, + Attempt: attempt.AttemptNumber, + }, } - - return d.db.AttemptsWS.BatchInsert(ctx, attempts) - }) - if err != nil { - return err - } - - for i, task := range tasks { - err := d.queue.Add(task, attempts[i].ScheduledAt.Time) + err := d.queue.Add(ctx, task, attempt.ScheduledAt.Time) if err != nil { d.log.Warnf("failed to add task to queue: %v", err) continue } - err = d.db.AttemptsWS.UpdateStatus(ctx, task.ID, entities.AttemptStatusQueued) + err = d.db.Attempts.UpdateStatus(ctx, task.ID, entities.AttemptStatusQueued) if err != nil { d.log.Warnf("failed to update attempt status: %v", err) } } - - return nil } -func listSubscribedEndpoints(ctx context.Context, db *db.DB, eventType string) (list []*entities.Endpoint, err error) { +func (d *Dispatcher) listSubscribedEndpoint(ctx context.Context, wid, eventType string) (list []*entities.Endpoint, err error) { var q query.EndpointQuery - endpoints, err := db.EndpointsWS.List(ctx, &q) + q.WorkspaceId = &wid + endpoints, err := d.db.Endpoints.List(ctx, &q) if err != nil { return nil, err } diff --git a/docker-compose.yml b/docker-compose.yml index 16de958..e820ec9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -51,7 +51,7 @@ services: - "postgres_data:/var/lib/postgresql/data/" redis: - image: redis:6 + image: redis:6.2 command: "--appendonly yes --appendfsync everysec" volumes: - "redis_data:/data" diff --git a/go.mod b/go.mod index 852f0f0..c0fd737 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/segmentio/ksuid v1.0.4 github.com/spf13/cobra v1.8.1 github.com/stretchr/testify v1.9.0 + go.uber.org/mock v0.4.0 go.uber.org/zap v1.27.0 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index bcd17f0..85ca631 100644 --- a/go.sum +++ b/go.sum @@ -136,6 +136,8 @@ go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= +go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= diff --git a/openapi.yml b/openapi.yml index 74cb0a7..5ec2f72 100644 --- a/openapi.yml +++ b/openapi.yml @@ -747,6 +747,9 @@ components: type: string data: type: object + ingested_at: + type: integer + description: "The time the event was ingested" created_at: type: integer updated_at: @@ -767,6 +770,9 @@ components: type: array items: type: string + async: + type: boolean + description: "Whether to ingest events asynchronously through the queue" response: type: object properties: diff --git a/pkg/log/zap.go b/pkg/log/zap.go index 77c3da2..9b12209 100644 --- a/pkg/log/zap.go +++ b/pkg/log/zap.go @@ -22,10 +22,11 @@ func NewZapLogger(cfg *config.LogConfig) (*zap.Logger, error) { } zap.NewProductionConfig() zapConfig := zap.Config{ - Level: zap.NewAtomicLevelAt(level), - Development: false, - Encoding: encodingMap[string(cfg.Format)], - EncoderConfig: encoderMap[string(cfg.Format)], + Level: zap.NewAtomicLevelAt(level), + Development: false, + Encoding: encodingMap[string(cfg.Format)], + EncoderConfig: encoderMap[string(cfg.Format)], + DisableStacktrace: true, } if len(cfg.File) > 0 { zapConfig.OutputPaths = []string{cfg.File} diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go new file mode 100644 index 0000000..2e47d62 --- /dev/null +++ b/pkg/queue/queue.go @@ -0,0 +1,25 @@ +package queue + +import ( + "context" + "time" +) + +type Message struct { + ID string + Data []byte + Time time.Time + WorkspaceID string +} + +type Options struct { + Count int64 + Block bool + Timeout time.Duration +} + +type Queue interface { + Enqueue(ctx context.Context, message *Message) error + Dequeue(ctx context.Context, opts *Options) ([]*Message, error) + Delete(ctx context.Context, message []*Message) error +} diff --git a/pkg/queue/redis/redis.go b/pkg/queue/redis/redis.go new file mode 100644 index 0000000..ddffe12 --- /dev/null +++ b/pkg/queue/redis/redis.go @@ -0,0 +1,195 @@ +package redis + +import ( + "context" + "errors" + "github.com/redis/go-redis/v9" + "github.com/webhookx-io/webhookx/constants" + "github.com/webhookx-io/webhookx/pkg/queue" + "github.com/webhookx-io/webhookx/utils" + "go.uber.org/zap" + "strconv" + "strings" + "time" +) + +type RedisQueue struct { + stream string + group string + consumer string + visibilityTimeout time.Duration + + c *redis.Client + log *zap.SugaredLogger +} + +type RedisQueueOptions struct { + StreamName string + GroupName string + ConsumerName string + VisibilityTimeout time.Duration + + Client *redis.Client +} + +func NewRedisQueue(opts RedisQueueOptions, logger *zap.SugaredLogger) (queue.Queue, error) { + q := &RedisQueue{ + stream: utils.DefaultIfZero(opts.StreamName, constants.QueueRedisQueueName), + group: utils.DefaultIfZero(opts.GroupName, constants.QueueRedisGroupName), + consumer: utils.DefaultIfZero(opts.ConsumerName, constants.QueueRedisConsumerName), + visibilityTimeout: utils.DefaultIfZero(opts.VisibilityTimeout, constants.QueueRedisVisibilityTimeout), + c: opts.Client, + log: logger, + } + + go q.process() + return q, nil +} + +func (q *RedisQueue) Enqueue(ctx context.Context, message *queue.Message) error { + args := &redis.XAddArgs{ + Stream: q.stream, + ID: "*", + Values: map[string]interface{}{ + "data": message.Data, + "time": message.Time.UnixMilli(), + "ws_id": message.WorkspaceID, + }, + } + res := q.c.XAdd(ctx, args) + if res.Err() != nil { + return res.Err() + } + message.ID = res.Val() + return nil +} + +func toMessage(values map[string]interface{}) *queue.Message { + message := &queue.Message{} + + if data, ok := values["data"].(string); ok { + message.Data = []byte(data) + } + + if timestr, ok := values["time"].(string); ok { + t, _ := strconv.ParseInt(timestr, 10, 64) + message.Time = time.UnixMilli(t) + } + + if wsid, ok := values["ws_id"].(string); ok { + message.WorkspaceID = wsid + } + + return message +} + +func (q *RedisQueue) Dequeue(ctx context.Context, opt *queue.Options) ([]*queue.Message, error) { + var count int64 = 1 + if opt != nil && opt.Count != 0 { + count = opt.Count + } + var block time.Duration = -1 + if opt != nil && opt.Block { + block = opt.Timeout + } + + args := &redis.XReadGroupArgs{ + Group: q.group, + Consumer: q.consumer, + Streams: []string{q.stream, ">"}, + Count: count, + Block: block, + } + res := q.c.XReadGroup(ctx, args) + if res.Err() != nil { + err := res.Err() + if errors.Is(err, redis.Nil) { + err = nil + } else if strings.HasPrefix(err.Error(), "NOGROUP") { + go q.createConsumerGroup() + err = nil + } + return nil, err + } + + messages := make([]*queue.Message, 0) + for _, stream := range res.Val() { + for _, xmessage := range stream.Messages { + message := toMessage(xmessage.Values) + message.ID = xmessage.ID + messages = append(messages, message) + } + } + + return messages, nil +} + +func (q *RedisQueue) Delete(ctx context.Context, messages []*queue.Message) error { + ids := make([]string, 0, len(messages)) + for _, message := range messages { + ids = append(ids, message.ID) + } + pipeline := q.c.Pipeline() + pipeline.XAck(ctx, q.stream, q.group, ids...) + pipeline.XDel(ctx, q.stream, ids...) + _, err := pipeline.Exec(ctx) + if err != nil { + return err + } + return nil +} + +func (q *RedisQueue) createConsumerGroup() { + res := q.c.XGroupCreateMkStream(context.TODO(), q.stream, q.group, "0") + if res.Err() == nil { + q.log.Debugf("created default consumer group: %s", q.group) + return + } + + if res.Err().Error() != "BUSYGROUP Consumer Group name already exists" { + q.log.Errorf("failed to create the default consumer group: %s", res.Err().Error()) + } +} + +// process re-enqueue invisible messages that reach the visibility timeout +func (q *RedisQueue) process() { + var reenqueueScript = redis.NewScript(` + local entries = redis.call('XPENDING', KEYS[1], KEYS[2], 'IDLE', ARGV[1], '-', '+', 1000) + local ids = {} + if entries then + for i, entry in ipairs(entries) do + local id = entry[1] + local res = redis.call('XRANGE', KEYS[1], id, id) + local items = res[1][2] + local new_id = redis.call('XADD', KEYS[1], '*', unpack(items)) + ids[i] = new_id + redis.call('XACK', KEYS[1], KEYS[2], id) + redis.call('XDEL', KEYS[1], id) + end + end + return ids + `) + + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + keys := []string{q.stream, q.group} + argv := []interface{}{q.visibilityTimeout.Milliseconds()} + res, err := reenqueueScript.Run(context.Background(), q.c, keys, argv...).Result() + if err != nil { + q.log.Errorf("failed to reenqueue: %v", err) + continue + } + + if ids, ok := res.([]interface{}); ok && len(ids) > 0 { + q.log.Debugf("enqueued invisible messages: %d", len(ids)) + } + } + } + + }() +} diff --git a/pkg/taskqueue/queue.go b/pkg/taskqueue/queue.go index 8501bd4..c99fb79 100644 --- a/pkg/taskqueue/queue.go +++ b/pkg/taskqueue/queue.go @@ -1,6 +1,7 @@ package taskqueue import ( + "context" "encoding/json" "github.com/webhookx-io/webhookx/utils" "time" @@ -30,7 +31,7 @@ func (t *TaskMessage) UnmarshalData(v interface{}) error { } type TaskQueue interface { - Add(task *TaskMessage, scheduleAt time.Time) error - Get() (task *TaskMessage, err error) - Delete(task *TaskMessage) error + Add(ctx context.Context, task *TaskMessage, scheduleAt time.Time) error + Get(ctx context.Context) (task *TaskMessage, err error) + Delete(ctx context.Context, task *TaskMessage) error } diff --git a/pkg/taskqueue/redis.go b/pkg/taskqueue/redis.go index d0afe21..3047ffc 100644 --- a/pkg/taskqueue/redis.go +++ b/pkg/taskqueue/redis.go @@ -5,17 +5,12 @@ import ( "encoding/json" "fmt" "github.com/redis/go-redis/v9" + "github.com/webhookx-io/webhookx/constants" + "github.com/webhookx-io/webhookx/utils" "go.uber.org/zap" "time" ) -const ( - DefaultQueueName = "webhookx:queue" - InvisibleQueueName = "webhookx:queue_invisible" - QueueDataHashName = "webhookx:queue_data" - VisibilityTimeout = 60 -) - var ( addScript = redis.NewScript(` local score = ARGV[1] @@ -51,7 +46,7 @@ var ( return 1 `) - reenqueueScript = redis.NewScript(` + requeueScript = redis.NewScript(` redis.replicate_commands() local now = redis.call('TIME')[1] local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], 0, now) @@ -69,22 +64,39 @@ var ( // RedisTaskQueue use redis as queue implementation type RedisTaskQueue struct { - queue string - c *redis.Client + queue string + invisibleQueue string + queueData string + visibilityTimeout time.Duration + + c *redis.Client + log *zap.SugaredLogger +} + +type RedisTaskQueueOptions struct { + QueueName string + InvisibleQueueName string + QueueDataName string + VisibilityTimeout time.Duration + Client *redis.Client } -func NewRedisQueue(client *redis.Client) *RedisTaskQueue { +func NewRedisQueue(opts RedisTaskQueueOptions, logger *zap.SugaredLogger) *RedisTaskQueue { q := &RedisTaskQueue{ - c: client, - queue: "webhookx:queue", + queue: utils.DefaultIfZero(opts.QueueName, constants.TaskQueueName), + invisibleQueue: utils.DefaultIfZero(opts.InvisibleQueueName, constants.TaskQueueInvisibleQueueName), + visibilityTimeout: utils.DefaultIfZero(opts.VisibilityTimeout, constants.TaskQueueVisibilityTimeout), + queueData: utils.DefaultIfZero(opts.QueueDataName, constants.TaskQueueDataName), + c: opts.Client, + log: logger, } q.process() return q } -func (q *RedisTaskQueue) Add(task *TaskMessage, scheduleAt time.Time) error { - zap.S().Debugf("[redis-queue]: add task %s schedule at: %s", task.ID, scheduleAt.Format("2006-01-02T15:04:05.000")) - keys := []string{DefaultQueueName, QueueDataHashName} +func (q *RedisTaskQueue) Add(ctx context.Context, task *TaskMessage, scheduleAt time.Time) error { + q.log.Debugf("[redis-queue]: add task %s schedule at: %s", task.ID, scheduleAt.Format("2006-01-02T15:04:05.000")) + keys := []string{q.queue, q.queueData} data, err := json.Marshal(task.Data) if err != nil { return err @@ -94,7 +106,7 @@ func (q *RedisTaskQueue) Add(task *TaskMessage, scheduleAt time.Time) error { task.ID, data, } - res, err := addScript.Run(context.Background(), q.c, keys, argv...).Result() + res, err := addScript.Run(ctx, q.c, keys, argv...).Result() if err != nil { return err } @@ -104,12 +116,12 @@ func (q *RedisTaskQueue) Add(task *TaskMessage, scheduleAt time.Time) error { return nil } -func (q *RedisTaskQueue) Get() (*TaskMessage, error) { - keys := []string{DefaultQueueName, QueueDataHashName, InvisibleQueueName} +func (q *RedisTaskQueue) Get(ctx context.Context) (*TaskMessage, error) { + keys := []string{q.queue, q.queueData, q.invisibleQueue} argv := []interface{}{ - VisibilityTimeout, + q.visibilityTimeout.Milliseconds() / 1000, } - res, err := getScript.Run(context.Background(), q.c, keys, argv...).Result() + res, err := getScript.Run(ctx, q.c, keys, argv...).Result() if err != nil { return nil, err } @@ -128,13 +140,13 @@ func (q *RedisTaskQueue) Get() (*TaskMessage, error) { } } -func (q *RedisTaskQueue) Delete(task *TaskMessage) error { - zap.S().Debugf("[redis-queue]: delete task %s", task.ID) - keys := []string{InvisibleQueueName, DefaultQueueName, QueueDataHashName} +func (q *RedisTaskQueue) Delete(ctx context.Context, task *TaskMessage) error { + q.log.Debugf("[redis-queue]: delete task %s", task.ID) + keys := []string{q.invisibleQueue, q.queue, q.queueData} argv := []interface{}{ task.ID, } - res, err := deleteScript.Run(context.Background(), q.c, keys, argv...).Result() + res, err := deleteScript.Run(ctx, q.c, keys, argv...).Result() if err != nil { return err } @@ -152,14 +164,14 @@ func (q *RedisTaskQueue) process() { for { select { case <-ticker.C: - keys := []string{InvisibleQueueName, DefaultQueueName} - res, err := reenqueueScript.Run(context.Background(), q.c, keys).Result() + keys := []string{q.invisibleQueue, q.queue} + res, err := requeueScript.Run(context.Background(), q.c, keys).Result() if err != nil { - zap.S().Errorf("failed to : %s", err) + q.log.Errorf("failed to run requeue script: %s", err) continue } if ids, ok := res.([]interface{}); ok && len(ids) > 0 { - zap.S().Debugf("enqueued invisible tasks: %v", ids) + q.log.Debugf("enqueued invisible tasks: %v", ids) } } } diff --git a/proxy/gateway.go b/proxy/gateway.go index 3df8760..7c5e2af 100644 --- a/proxy/gateway.go +++ b/proxy/gateway.go @@ -3,22 +3,32 @@ package proxy import ( "context" "encoding/json" + "errors" "github.com/gorilla/mux" "github.com/webhookx-io/webhookx/config" + "github.com/webhookx-io/webhookx/constants" "github.com/webhookx-io/webhookx/db" "github.com/webhookx-io/webhookx/db/entities" "github.com/webhookx-io/webhookx/db/query" "github.com/webhookx-io/webhookx/dispatcher" + "github.com/webhookx-io/webhookx/pkg/queue" + "github.com/webhookx-io/webhookx/pkg/queue/redis" "github.com/webhookx-io/webhookx/pkg/schedule" + "github.com/webhookx-io/webhookx/pkg/types" "github.com/webhookx-io/webhookx/pkg/ucontext" "github.com/webhookx-io/webhookx/proxy/router" "github.com/webhookx-io/webhookx/utils" "go.uber.org/zap" "net/http" "os" + "runtime" "time" ) +var ( + ErrQueueDisabled = errors.New("queue is disabled") +) + type Gateway struct { ctx context.Context cancel context.CancelFunc @@ -31,15 +41,26 @@ type Gateway struct { db *db.DB dispatcher *dispatcher.Dispatcher + + queue queue.Queue } func NewGateway(cfg *config.ProxyConfig, db *db.DB, dispatcher *dispatcher.Dispatcher) *Gateway { + var q queue.Queue + switch cfg.Queue.Type { + case "redis": + q, _ = redis.NewRedisQueue(redis.RedisQueueOptions{ + Client: cfg.Queue.Redis.GetClient(), + }, zap.S()) + } + gw := &Gateway{ cfg: cfg, log: zap.S(), router: router.NewRouter(nil), db: db, dispatcher: dispatcher, + queue: q, } r := mux.NewRouter() @@ -78,9 +99,7 @@ func (gw *Gateway) buildRouter() { func (gw *Gateway) Handle(w http.ResponseWriter, r *http.Request) { source, _ := gw.router.Execute(r).(*entities.Source) if source == nil { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(404) - w.Write([]byte(`{"message": "not found"}`)) + exit(w, 404, `{"message": "not found"}`, nil) return } @@ -90,7 +109,6 @@ func (gw *Gateway) Handle(w http.ResponseWriter, r *http.Request) { r = r.WithContext(ctx) var event entities.Event - event.ID = utils.KSUID() r.Body = http.MaxBytesReader(w, r.Body, gw.cfg.MaxRequestBodySize) if err := json.NewDecoder(r.Body).Decode(&event); err != nil { if _, ok := err.(*http.MaxBytesError); ok { @@ -104,6 +122,9 @@ func (gw *Gateway) Handle(w http.ResponseWriter, r *http.Request) { return } + event.ID = utils.KSUID() + event.IngestedAt = types.Time{Time: time.Now()} + event.WorkspaceId = source.WorkspaceId if err := event.Validate(); err != nil { utils.JsonResponse(400, w, ErrorResponse{ Message: "Request Validation", @@ -112,24 +133,42 @@ func (gw *Gateway) Handle(w http.ResponseWriter, r *http.Request) { return } - err := gw.dispatcher.Dispatch(r.Context(), &event) + err := gw.ingestEvent(r.Context(), source.Async, &event) if err != nil { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(500) - w.Write([]byte(`{"message": "internal error"}`)) + gw.log.Errorf("[proxy] failed to ingest event: %v", err) + exit(w, 500, `{"message": "internal error"}`, nil) return } if source.Response != nil { - w.Header().Set("Content-Type", source.Response.ContentType) - w.WriteHeader(source.Response.Code) - w.Write([]byte(source.Response.Body)) + exit(w, source.Response.Code, source.Response.Body, headers{"Content-Type": source.Response.ContentType}) return } - w.Header().Set("Content-Type", gw.cfg.Response.ContentType) - w.WriteHeader(int(gw.cfg.Response.Code)) - w.Write([]byte(gw.cfg.Response.Body)) + // default response + exit(w, int(gw.cfg.Response.Code), gw.cfg.Response.Body, headers{"Content-Type": gw.cfg.Response.ContentType}) +} + +func (gw *Gateway) ingestEvent(ctx context.Context, async bool, event *entities.Event) error { + if async { + if gw.queue == nil { + return ErrQueueDisabled + } + + bytes, err := json.Marshal(event) + if err != nil { + return err + } + + msg := queue.Message{ + Data: bytes, + Time: time.Now(), + WorkspaceID: event.WorkspaceId, + } + return gw.queue.Enqueue(ctx, &msg) + } + + return gw.dispatcher.Dispatch(ctx, event) } // Start starts an HTTP server @@ -145,6 +184,14 @@ func (gw *Gateway) Start() { schedule.Schedule(gw.ctx, gw.buildRouter, time.Second) + if gw.queue != nil { + listeners := runtime.GOMAXPROCS(0) + gw.log.Infof("[proxy] starting %d queue listener", listeners) + for i := 0; i < listeners; i++ { + go gw.listenQueue() + } + } + gw.log.Info("[proxy] started") } @@ -158,3 +205,64 @@ func (gw *Gateway) Stop() error { } return nil } + +func (gw *Gateway) listenQueue() { + opts := &queue.Options{ + Count: 20, + Block: true, + Timeout: time.Second, + } + for { + select { + case <-gw.ctx.Done(): + return + default: + ctx := context.Background() + messages, err := gw.queue.Dequeue(ctx, opts) + if err != nil { + gw.log.Warnf("[proxy] [queue] failed to dequeue: %v", err) + time.Sleep(time.Second) + continue + } + if len(messages) == 0 { + continue + } + + events := make([]*entities.Event, 0, len(messages)) + for _, message := range messages { + var event entities.Event + err = json.Unmarshal(message.Data, &event) + if err != nil { + gw.log.Warnf("[proxy] [queue] faield to unmarshal message: %v", err) + continue + } + event.WorkspaceId = message.WorkspaceID + events = append(events, &event) + } + + err = gw.dispatcher.DispatchBatch(ctx, events) + if err != nil { + gw.log.Warnf("[proxy] [queue] failed to dispatch event in batch: %v", err) + continue + } + _ = gw.queue.Delete(ctx, messages) + } + } +} + +type headers map[string]string + +func exit(w http.ResponseWriter, status int, body string, headers headers) { + for header, value := range constants.DefaultResponseHeaders { + w.Header().Set(header, value) + } + + if len(headers) > 0 { + for header, value := range headers { + w.Header().Set(header, value) + } + } + + w.WriteHeader(status) + _, _ = w.Write([]byte(body)) +} diff --git a/test/admin/events_test.go b/test/admin/events_test.go index e259601..593cd1b 100644 --- a/test/admin/events_test.go +++ b/test/admin/events_test.go @@ -10,8 +10,10 @@ import ( "github.com/webhookx-io/webhookx/db" "github.com/webhookx-io/webhookx/db/entities" "github.com/webhookx-io/webhookx/db/query" + "github.com/webhookx-io/webhookx/pkg/types" "github.com/webhookx-io/webhookx/test/helper" "github.com/webhookx-io/webhookx/utils" + "time" ) var _ = Describe("/events", Ordered, func() { @@ -43,9 +45,10 @@ var _ = Describe("/events", Ordered, func() { assert.NoError(GinkgoT(), db.Truncate("events")) for i := 1; i <= 21; i++ { event := &entities.Event{ - ID: utils.KSUID(), - EventType: "foo.bar", - Data: []byte("{}"), + ID: utils.KSUID(), + EventType: "foo.bar", + Data: []byte("{}"), + IngestedAt: types.Time{Time: time.Now()}, } event.WorkspaceId = ws.ID assert.NoError(GinkgoT(), db.Events.Insert(context.TODO(), event)) @@ -82,9 +85,10 @@ var _ = Describe("/events", Ordered, func() { entitiesConfig := helper.EntitiesConfig{ Events: []*entities.Event{ { - ID: utils.KSUID(), - EventType: "foo.bar", - Data: []byte("{}"), + ID: utils.KSUID(), + EventType: "foo.bar", + Data: []byte("{}"), + IngestedAt: types.Time{Time: time.Now()}, }, }, } @@ -101,6 +105,7 @@ var _ = Describe("/events", Ordered, func() { result := resp.Result().(*entities.Event) assert.Equal(GinkgoT(), entity.ID, result.ID) assert.Equal(GinkgoT(), "foo.bar", result.EventType) + assert.Equal(GinkgoT(), entity.IngestedAt.UnixMilli(), result.IngestedAt.UnixMilli()) }) Context("errors", func() { diff --git a/test/delivery/delivery_test.go b/test/delivery/delivery_test.go index 5d31530..daa8777 100644 --- a/test/delivery/delivery_test.go +++ b/test/delivery/delivery_test.go @@ -54,6 +54,7 @@ var _ = Describe("delivery", Ordered, func() { }) It("sanity", func() { + now := time.Now() assert.Eventually(GinkgoT(), func() bool { resp, err := proxyClient.R(). SetBody(`{ @@ -66,6 +67,17 @@ var _ = Describe("delivery", Ordered, func() { return err == nil && resp.StatusCode() == 200 }, time.Second*5, time.Second) + var event *entities.Event + assert.Eventually(GinkgoT(), func() bool { + list, err := db.Events.List(context.TODO(), &query.EventQuery{}) + if err != nil || len(list) != 1 { + return false + } + event = list[0] + return true + }, time.Second*5, time.Second) + assert.True(GinkgoT(), event.IngestedAt.UnixMilli() >= now.UnixMilli()) + var attempt *entities.Attempt assert.Eventually(GinkgoT(), func() bool { list, err := db.Attempts.List(context.TODO(), &query.AttemptQuery{}) @@ -74,7 +86,7 @@ var _ = Describe("delivery", Ordered, func() { } attempt = list[0] return attempt.Status == entities.AttemptStatusSuccess - }, time.Second*15, time.Second) + }, time.Second*5, time.Second) assert.Equal(GinkgoT(), entitiesConfig.Endpoints[0].ID, attempt.EndpointId) diff --git a/test/generate.go b/test/generate.go new file mode 100644 index 0000000..ec13960 --- /dev/null +++ b/test/generate.go @@ -0,0 +1,3 @@ +package test + +//go:generate mockgen --source ../pkg/taskqueue/queue.go --destination mocks/queue.go -package mocks diff --git a/test/helper/helper.go b/test/helper/helper.go index 5cecbfa..f92f2e4 100644 --- a/test/helper/helper.go +++ b/test/helper/helper.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "os" + "regexp" "time" "github.com/creasty/defaults" @@ -217,6 +218,28 @@ func FileLine(filename string, n int) (string, error) { return "", nil } +func FileHasLine(filename string, regex string) (bool, error) { + file, err := os.Open(filename) + if err != nil { + return false, err + } + defer file.Close() + + r, err := regexp.Compile(regex) + if err != nil { + return false, err + } + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + if r.MatchString(line) { + return true, nil + } + } + + return false, nil +} + func DefaultEndpoint() *entities.Endpoint { var entity entities.Endpoint entity.Init() diff --git a/test/mocks/queue.go b/test/mocks/queue.go new file mode 100644 index 0000000..b18b843 --- /dev/null +++ b/test/mocks/queue.go @@ -0,0 +1,85 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ../pkg/taskqueue/queue.go +// +// Generated by this command: +// +// mockgen --source ../pkg/taskqueue/queue.go --destination mocks/queue.go -package mocks +// + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + time "time" + + taskqueue "github.com/webhookx-io/webhookx/pkg/taskqueue" + gomock "go.uber.org/mock/gomock" +) + +// MockTaskQueue is a mock of TaskQueue interface. +type MockTaskQueue struct { + ctrl *gomock.Controller + recorder *MockTaskQueueMockRecorder +} + +// MockTaskQueueMockRecorder is the mock recorder for MockTaskQueue. +type MockTaskQueueMockRecorder struct { + mock *MockTaskQueue +} + +// NewMockTaskQueue creates a new mock instance. +func NewMockTaskQueue(ctrl *gomock.Controller) *MockTaskQueue { + mock := &MockTaskQueue{ctrl: ctrl} + mock.recorder = &MockTaskQueueMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTaskQueue) EXPECT() *MockTaskQueueMockRecorder { + return m.recorder +} + +// Add mocks base method. +func (m *MockTaskQueue) Add(ctx context.Context, task *taskqueue.TaskMessage, scheduleAt time.Time) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Add", ctx, task, scheduleAt) + ret0, _ := ret[0].(error) + return ret0 +} + +// Add indicates an expected call of Add. +func (mr *MockTaskQueueMockRecorder) Add(ctx, task, scheduleAt any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockTaskQueue)(nil).Add), ctx, task, scheduleAt) +} + +// Delete mocks base method. +func (m *MockTaskQueue) Delete(ctx context.Context, task *taskqueue.TaskMessage) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Delete", ctx, task) + ret0, _ := ret[0].(error) + return ret0 +} + +// Delete indicates an expected call of Delete. +func (mr *MockTaskQueueMockRecorder) Delete(ctx, task any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockTaskQueue)(nil).Delete), ctx, task) +} + +// Get mocks base method. +func (m *MockTaskQueue) Get(ctx context.Context) (*taskqueue.TaskMessage, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", ctx) + ret0, _ := ret[0].(*taskqueue.TaskMessage) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Get indicates an expected call of Get. +func (mr *MockTaskQueueMockRecorder) Get(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockTaskQueue)(nil).Get), ctx) +} diff --git a/test/proxy/ginkgo_test.go b/test/proxy/ginkgo_test.go new file mode 100644 index 0000000..98d68f4 --- /dev/null +++ b/test/proxy/ginkgo_test.go @@ -0,0 +1,12 @@ +package proxy + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "testing" +) + +func TestProxy(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Proxy Suite") +} diff --git a/test/proxy/ingest_test.go b/test/proxy/ingest_test.go new file mode 100644 index 0000000..e2b0784 --- /dev/null +++ b/test/proxy/ingest_test.go @@ -0,0 +1,112 @@ +package proxy + +import ( + "context" + "github.com/go-resty/resty/v2" + . "github.com/onsi/ginkgo/v2" + "github.com/stretchr/testify/assert" + "github.com/webhookx-io/webhookx/app" + "github.com/webhookx-io/webhookx/db" + "github.com/webhookx-io/webhookx/db/entities" + "github.com/webhookx-io/webhookx/db/query" + "github.com/webhookx-io/webhookx/test/helper" + "github.com/webhookx-io/webhookx/utils" + "time" +) + +var _ = Describe("ingest", Ordered, func() { + + Context("sanity", func() { + + var proxyClient *resty.Client + var app *app.Application + var db *db.DB + + entitiesConfig := helper.EntitiesConfig{ + Endpoints: []*entities.Endpoint{helper.DefaultEndpoint()}, + Sources: []*entities.Source{helper.DefaultSource()}, + } + entitiesConfig.Sources[0].Async = true + + BeforeAll(func() { + db = helper.InitDB(true, &entitiesConfig) + proxyClient = helper.ProxyClient() + + app = utils.Must(helper.Start(map[string]string{ + "WEBHOOKX_PROXY_LISTEN": "0.0.0.0:8081", + })) + }) + + AfterAll(func() { + app.Stop() + }) + + It("sanity", func() { + assert.Eventually(GinkgoT(), func() bool { + resp, err := proxyClient.R(). + SetBody(`{ + "event_type": "foo.bar", + "data": { + "key": "value" + } + }`). + Post("/") + return err == nil && resp.StatusCode() == 200 + }, time.Second*5, time.Second) + + var attempt *entities.Attempt + assert.Eventually(GinkgoT(), func() bool { + list, err := db.Attempts.List(context.TODO(), &query.AttemptQuery{}) + if err != nil || len(list) == 0 { + return false + } + attempt = list[0] + return attempt.Status == entities.AttemptStatusQueued + }, time.Second*15, time.Second) + }) + }) + + Context("queue disabled", func() { + var proxyClient *resty.Client + var app *app.Application + + entitiesConfig := helper.EntitiesConfig{ + Endpoints: []*entities.Endpoint{helper.DefaultEndpoint()}, + Sources: []*entities.Source{helper.DefaultSource()}, + } + entitiesConfig.Sources[0].Async = true + + BeforeAll(func() { + helper.InitDB(true, &entitiesConfig) + proxyClient = helper.ProxyClient() + app = utils.Must(helper.Start(map[string]string{ + "WEBHOOKX_PROXY_LISTEN": "0.0.0.0:8081", + "WEBHOOKX_PROXY_QUEUE_TYPE": "off", + "WEBHOOKX_LOG_FILE": "webhookx.log", + })) + }) + + AfterAll(func() { + app.Stop() + }) + + It("returns HTTP 500", func() { + helper.TruncateFile("webhookx.log") + assert.Eventually(GinkgoT(), func() bool { + resp, err := proxyClient.R(). + SetBody(`{ + "event_type": "foo.bar", + "data": { + "key": "value" + } + }`). + Post("/") + return err == nil && resp.StatusCode() == 500 + }, time.Second*5, time.Second) + matched, err := helper.FileHasLine("webhookx.log", "^.*failed to ingest event: queue is disabled$") + assert.Nil(GinkgoT(), err) + assert.Equal(GinkgoT(), true, matched) + }) + + }) +}) diff --git a/test/proxy/listen_test.go b/test/proxy/listen_test.go index 3a32cd4..53dd6af 100644 --- a/test/proxy/listen_test.go +++ b/test/proxy/listen_test.go @@ -3,17 +3,17 @@ package proxy import ( "github.com/go-resty/resty/v2" . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" "github.com/stretchr/testify/assert" "github.com/webhookx-io/webhookx/app" + "github.com/webhookx-io/webhookx/config" "github.com/webhookx-io/webhookx/test/helper" "github.com/webhookx-io/webhookx/utils" - "testing" ) var _ = Describe("proxy", Ordered, func() { var app *app.Application var proxyClient *resty.Client + BeforeAll(func() { helper.InitDB(true, nil) app = utils.Must(helper.Start(map[string]string{ @@ -21,6 +21,7 @@ var _ = Describe("proxy", Ordered, func() { })) proxyClient = helper.ProxyClient() }) + AfterAll(func() { app.Stop() }) @@ -29,11 +30,8 @@ var _ = Describe("proxy", Ordered, func() { resp, err := proxyClient.R().Get("/") assert.Nil(GinkgoT(), err) assert.Equal(GinkgoT(), 404, resp.StatusCode()) + assert.Equal(GinkgoT(), "application/json", resp.Header().Get("Content-Type")) + assert.Equal(GinkgoT(), "WebhookX/"+config.VERSION, resp.Header().Get("Server")) }) }) - -func TestProxyListen(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Proxy Listen Suite") -} diff --git a/test/worker/requeue_test.go b/test/worker/requeue_test.go new file mode 100644 index 0000000..784c144 --- /dev/null +++ b/test/worker/requeue_test.go @@ -0,0 +1,93 @@ +package worker + +import ( + "context" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" + "github.com/webhookx-io/webhookx/config" + "github.com/webhookx-io/webhookx/db" + "github.com/webhookx-io/webhookx/db/entities" + "github.com/webhookx-io/webhookx/db/query" + "github.com/webhookx-io/webhookx/test/helper" + "github.com/webhookx-io/webhookx/test/mocks" + "github.com/webhookx-io/webhookx/utils" + "github.com/webhookx-io/webhookx/worker" + "github.com/webhookx-io/webhookx/worker/deliverer" + "go.uber.org/mock/gomock" + "testing" + "time" +) + +var _ = Describe("processRequeue", Ordered, func() { + + var db *db.DB + var w *worker.Worker + var ctrl *gomock.Controller + var queue *mocks.MockTaskQueue + endpoint := helper.DefaultEndpoint() + + BeforeAll(func() { + db = helper.InitDB(true, nil) + + // setup MockTaskQueue + ctrl = gomock.NewController(GinkgoT()) + queue = mocks.NewMockTaskQueue(ctrl) + queue.EXPECT().Get(gomock.Any()).AnyTimes() + queue.EXPECT().Delete(gomock.Any(), gomock.Any()).AnyTimes() + queue.EXPECT().Add(gomock.Any(), gomock.Any(), gomock.Any()).Times(10) + + w = worker.NewWorker(worker.WorkerOptions{ + RequeueJobInterval: time.Second, + }, db, deliverer.NewHTTPDeliverer(&config.WorkerDeliverer{}), queue) + + // data + ws := utils.Must(db.Workspaces.GetDefault(context.TODO())) + endpoint.WorkspaceId = ws.ID + assert.NoError(GinkgoT(), db.Endpoints.Insert(context.TODO(), endpoint)) + + for i := 1; i <= 10; i++ { + event := helper.DefaultEvent() + event.WorkspaceId = ws.ID + assert.NoError(GinkgoT(), db.Events.Insert(context.TODO(), event)) + + attempt := entities.Attempt{ + ID: utils.KSUID(), + EventId: event.ID, + EndpointId: endpoint.ID, + Status: entities.AttemptStatusInit, + AttemptNumber: 1, + } + attempt.WorkspaceId = ws.ID + assert.NoError(GinkgoT(), db.Attempts.Insert(context.TODO(), &attempt)) + } + db.DB.MustExec("update attempts set created_at = created_at - INTERVAL '60 SECOND'") + + w.Start() + }) + + AfterAll(func() { + w.Stop() + ctrl.Finish() + }) + + It("all attempts should become QUEUED", func() { + time.Sleep(time.Second * 3) // wait for timer to be executed + var q query.AttemptQuery + q.EndpointId = utils.Pointer(endpoint.ID) + q.Status = utils.Pointer(entities.AttemptStatusInit) + count, err := db.Attempts.Count(context.TODO(), q.WhereMap()) + assert.NoError(GinkgoT(), err) + assert.EqualValues(GinkgoT(), 0, count) + + q.Status = utils.Pointer(entities.AttemptStatusQueued) + count, err = db.Attempts.Count(context.TODO(), q.WhereMap()) + assert.NoError(GinkgoT(), err) + assert.EqualValues(GinkgoT(), 10, count) + }) +}) + +func TestWorker(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Worker Suite") +} diff --git a/worker/deliverer/http.go b/worker/deliverer/http.go index 7b5c388..a2126c0 100644 --- a/worker/deliverer/http.go +++ b/worker/deliverer/http.go @@ -4,16 +4,12 @@ import ( "bytes" "context" "github.com/webhookx-io/webhookx/config" + "github.com/webhookx-io/webhookx/constants" "io" "net/http" "time" ) -var defaultHeaders = map[string]string{ - "User-Agent": "WebhookX/" + config.VERSION, - "Content-Type": "application/json; charset=utf-8", -} - // HTTPDeliverer delivers via HTTP type HTTPDeliverer struct { defaultTimeout time.Duration @@ -56,7 +52,7 @@ func (d *HTTPDeliverer) Deliver(req *Request) (res *Response) { } req.Request = request - for name, value := range defaultHeaders { + for name, value := range constants.DefaultDelivererRequestHeaders { request.Header.Add(name, value) } for name, value := range req.Headers { diff --git a/worker/worker.go b/worker/worker.go index 404af2f..ca4c1fb 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -3,17 +3,17 @@ package worker import ( "context" "errors" + "github.com/webhookx-io/webhookx/constants" + "github.com/webhookx-io/webhookx/pkg/plugin" + plugintypes "github.com/webhookx-io/webhookx/pkg/plugin/types" + "github.com/webhookx-io/webhookx/pkg/safe" "github.com/webhookx-io/webhookx/pkg/taskqueue" "time" - "github.com/webhookx-io/webhookx/config" "github.com/webhookx-io/webhookx/db" "github.com/webhookx-io/webhookx/db/dao" "github.com/webhookx-io/webhookx/db/entities" "github.com/webhookx-io/webhookx/model" - "github.com/webhookx-io/webhookx/pkg/plugin" - plugintypes "github.com/webhookx-io/webhookx/pkg/plugin/types" - "github.com/webhookx-io/webhookx/pkg/safe" "github.com/webhookx-io/webhookx/pkg/schedule" "github.com/webhookx-io/webhookx/pkg/types" "github.com/webhookx-io/webhookx/utils" @@ -25,6 +25,8 @@ type Worker struct { ctx context.Context cancel context.CancelFunc + opts WorkerOptions + stop chan struct{} log *zap.SugaredLogger @@ -33,12 +35,21 @@ type Worker struct { DB *db.DB } -func NewWorker(cfg *config.WorkerConfig, db *db.DB, queue taskqueue.TaskQueue) *Worker { +type WorkerOptions struct { + RequeueJobBatch int + RequeueJobInterval time.Duration +} + +func NewWorker(opts WorkerOptions, db *db.DB, deliverer deliverer.Deliverer, queue taskqueue.TaskQueue) *Worker { + opts.RequeueJobBatch = utils.DefaultIfZero(opts.RequeueJobBatch, constants.RequeueBatch) + opts.RequeueJobInterval = utils.DefaultIfZero(opts.RequeueJobInterval, constants.RequeueInterval) + worker := &Worker{ + opts: opts, stop: make(chan struct{}), queue: queue, log: zap.S(), - deliverer: deliverer.NewHTTPDeliverer(&cfg.Deliverer), + deliverer: deliverer, DB: db, } @@ -56,7 +67,7 @@ func (w *Worker) run() { return case <-ticker.C: for { - task, err := w.queue.Get() + task, err := w.queue.Get(context.TODO()) if err != nil { w.log.Errorf("[worker] failed to get task from queue: %v", err) break @@ -70,18 +81,18 @@ func (w *Worker) run() { err = task.UnmarshalData(task.Data) if err != nil { w.log.Errorf("[worker] failed to unmarshal task: %v", err) - w.queue.Delete(task) + _ = w.queue.Delete(context.TODO(), task) return } - err = w.handleTask(context.Background(), task) + err = w.handleTask(context.TODO(), task) if err != nil { // TODO: delete task when causes error too many times (maxReceiveCount) w.log.Errorf("[worker] failed to handle task: %v", err) return } - w.queue.Delete(task) + _ = w.queue.Delete(context.TODO(), task) }) } } @@ -93,7 +104,7 @@ func (w *Worker) Start() error { go w.run() w.ctx, w.cancel = context.WithCancel(context.Background()) - schedule.Schedule(w.ctx, w.processUnqueued, time.Second*60) + schedule.Schedule(w.ctx, w.processRequeue, w.opts.RequeueJobInterval) w.log.Info("[worker] started") return nil @@ -111,11 +122,11 @@ func (w *Worker) Stop() error { return nil } -func (w *Worker) processUnqueued() { - batch := 100 +func (w *Worker) processRequeue() { + batch := w.opts.RequeueJobBatch ctx := context.Background() for { - attempts, err := w.DB.Attempts.ListUnqueued(ctx, int64(batch)) + attempts, err := w.DB.Attempts.ListUnqueued(ctx, batch) if err != nil { w.log.Errorf("failed to query unqueued attempts: %v", err) break @@ -138,7 +149,7 @@ func (w *Worker) processUnqueued() { } for i, task := range tasks { - err := w.queue.Add(task, attempts[i].ScheduledAt.Time) + err := w.queue.Add(ctx, task, attempts[i].ScheduledAt.Time) if err != nil { w.log.Warnf("failed to add task to queue: %v", err) continue @@ -289,7 +300,7 @@ func (w *Worker) handleTask(ctx context.Context, task *taskqueue.TaskMessage) er }, } - err = w.queue.Add(task, nextAttempt.ScheduledAt.Time) + err = w.queue.Add(ctx, task, nextAttempt.ScheduledAt.Time) if err != nil { w.log.Warnf("[worker] failed to add task to queue: %v", err) }