Skip to content

Commit

Permalink
feat(admin): manually retry an event (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
webhookx-x authored Sep 22, 2024
1 parent dbde461 commit be0e502
Show file tree
Hide file tree
Showing 15 changed files with 325 additions and 35 deletions.
15 changes: 9 additions & 6 deletions admin/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ type API struct {
cfg *config.Config
log *zap.SugaredLogger
DB *db.DB
dispatcher dispatcher.Dispatcher
dispatcher *dispatcher.Dispatcher
}

func NewAPI(cfg *config.Config, db *db.DB, dispatcher dispatcher.Dispatcher) *API {
func NewAPI(cfg *config.Config, db *db.DB, dispatcher *dispatcher.Dispatcher) *API {
return &API{
cfg: cfg,
log: zap.S(),
Expand All @@ -47,11 +47,13 @@ func (api *API) json(code int, w http.ResponseWriter, data interface{}) {
w.Header().Set("Content-Type", ApplicationJsonType)
w.WriteHeader(code)

bytes, err := json.Marshal(data)
if err != nil {
panic(err)
if data != nil {
bytes, err := json.Marshal(data)
if err != nil {
panic(err)
}
_, _ = w.Write(bytes)
}
_, _ = w.Write(bytes)
}

func (api *API) bindQuery(r *http.Request, q *query.Query) {
Expand Down Expand Up @@ -124,6 +126,7 @@ func (api *API) Handler() http.Handler {
r.HandleFunc(prefix+"/events", api.PageEvent).Methods("GET")
r.HandleFunc(prefix+"/events", api.CreateEvent).Methods("POST")
r.HandleFunc(prefix+"/events/{id}", api.GetEvent).Methods("GET")
r.HandleFunc(prefix+"/events/{id}/retry", api.RetryEvent).Methods("POST")
}

for _, prefix := range []string{"", "/workspaces/{workspace}"} {
Expand Down
23 changes: 23 additions & 0 deletions admin/api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,26 @@ func (api *API) CreateEvent(w http.ResponseWriter, r *http.Request) {

api.json(201, w, event)
}

func (api *API) RetryEvent(w http.ResponseWriter, r *http.Request) {
id := api.param(r, "id")
event, err := api.DB.EventsWS.Get(r.Context(), id)
api.assert(err)
if event == nil {
api.json(404, w, ErrorResponse{Message: MsgNotFound})
return
}

endpointId := r.URL.Query().Get("endpoint_id")
endpoint, err := api.DB.EndpointsWS.Get(r.Context(), endpointId)
api.assert(err)
if endpoint == nil {
api.json(400, w, ErrorResponse{Message: "endpoint not found"})
return
}

err = api.dispatcher.DispatchEndpoint(r.Context(), id, []*entities.Endpoint{endpoint})
api.assert(err)

api.json(200, w, nil)
}
2 changes: 1 addition & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Application struct {
log *zap.SugaredLogger
db *db.DB
queue queue.TaskQueue
dispatcher dispatcher.Dispatcher
dispatcher *dispatcher.Dispatcher
cache cache.Cache

admin *admin.Admin
Expand Down
2 changes: 2 additions & 0 deletions db/dao/attempt_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type AttemptResult struct {
AttemptedAt types.Time
Status entities.AttemptStatus
ErrorCode *entities.AttemptErrorCode
Exhausted bool
}

func NewAttemptDao(db *sqlx.DB, workspace bool) AttemptDAO {
Expand All @@ -32,6 +33,7 @@ func (dao *attemptDao) UpdateDelivery(ctx context.Context, id string, result *At
"attempted_at": result.AttemptedAt,
"status": result.Status,
"error_code": result.ErrorCode,
"exhausted": result.Exhausted,
})
return err
}
Expand Down
12 changes: 10 additions & 2 deletions db/dao/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,11 @@ func (dao *DAO[T]) Insert(ctx context.Context, entity *T) error {
case "created_at", "updated_at": // ignore
default:
columns = append(columns, column)
values = append(values, v.Interface())
value := v.Interface()
if column == "ws_id" && dao.workspace {
value = ucontext.GetWorkspaceID(ctx)
}
values = append(values, value)
}
})
statement, args := psql.Insert(dao.table).Columns(columns...).Values(values...).
Expand Down Expand Up @@ -236,7 +240,11 @@ func (dao *DAO[T]) BatchInsert(ctx context.Context, entities []*T) error {
case "created_at", "updated_at":
// ignore
default:
values = append(values, v.Interface())
value := v.Interface()
if column == "ws_id" && dao.workspace {
value = ucontext.GetWorkspaceID(ctx)
}
values = append(values, value)
}
})
builder = builder.Values(values...)
Expand Down
24 changes: 17 additions & 7 deletions db/entities/attempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
)

type Attempt struct {
ID string `json:"id" db:"id"`
EventId string `json:"event_id" db:"event_id"`
EndpointId string `json:"endpoint_id" db:"endpoint_id"`
Status AttemptStatus `json:"status" db:"status"`
AttemptNumber int `json:"attempt_number" db:"attempt_number"`
ScheduledAt types.Time `json:"scheduled_at" db:"scheduled_at"`
AttemptedAt *types.Time `json:"attempted_at" db:"attempted_at"`
ID string `json:"id" db:"id"`
EventId string `json:"event_id" db:"event_id"`
EndpointId string `json:"endpoint_id" db:"endpoint_id"`
Status AttemptStatus `json:"status" db:"status"`
AttemptNumber int `json:"attempt_number" db:"attempt_number"`
ScheduledAt types.Time `json:"scheduled_at" db:"scheduled_at"`
AttemptedAt *types.Time `json:"attempted_at" db:"attempted_at"`
TriggerMode AttemptTriggerMode `json:"trigger_mode" db:"trigger_mode"`
Exhausted bool `json:"exhausted" db:"exhausted"`

ErrorCode *AttemptErrorCode `json:"error_code" db:"error_code"`
Request *AttemptRequest `json:"request" db:"request"`
Expand Down Expand Up @@ -41,6 +43,14 @@ const (
AttemptErrorCodeEndpointNotFound AttemptErrorCode = "ENDPOINT_NOT_FOUND"
)

type AttemptTriggerMode = string

const (
AttemptTriggerModeInitial AttemptTriggerMode = "INITIAL"
AttemptTriggerModeManual AttemptTriggerMode = "MANUAL"
AttemptTriggerModeAutomatic AttemptTriggerMode = "AUTOMATIC"
)

type AttemptRequest struct {
Method string `json:"method"`
URL string `json:"url"`
Expand Down
2 changes: 2 additions & 0 deletions db/migrations/2_attempts.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE IF EXISTS ONLY "attempts" DROP COLUMN IF EXISTS "trigger_mode";
ALTER TABLE IF EXISTS ONLY "attempts" DROP COLUMN IF EXISTS "exhausted";
5 changes: 5 additions & 0 deletions db/migrations/2_attempts.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE IF EXISTS ONLY "attempts" ADD COLUMN IF NOT EXISTS "trigger_mode" VARCHAR(10) NOT NULL;
ALTER TABLE IF EXISTS ONLY "attempts" ADD COLUMN IF NOT EXISTS "exhausted" BOOLEAN NOT NULL DEFAULT false;

UPDATE "attempts" SET trigger_mode = 'INITIAL' where attempt_number = 1;
UPDATE "attempts" SET trigger_mode = 'AUTOMATIC' where attempt_number != 1;
72 changes: 60 additions & 12 deletions dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,86 @@ import (
)

// Dispatcher is Event Dispatcher
type Dispatcher interface {
Dispatch(ctx context.Context, event *entities.Event) error
}

type DBDispatcher struct {
type Dispatcher struct {
log *zap.SugaredLogger
queue queue.TaskQueue
db *db.DB
}

func NewDispatcher(log *zap.SugaredLogger, queue queue.TaskQueue, db *db.DB) Dispatcher {

dispatcher := &DBDispatcher{
func NewDispatcher(log *zap.SugaredLogger, queue queue.TaskQueue, db *db.DB) *Dispatcher {
dispatcher := &Dispatcher{
log: log,
queue: queue,
db: db,
}
return dispatcher
}

func (d *DBDispatcher) Dispatch(ctx context.Context, event *entities.Event) error {
func (d *Dispatcher) Dispatch(ctx context.Context, event *entities.Event) error {
endpoints, err := listSubscribedEndpoints(ctx, d.db, event.EventType)
if err != nil {
return err
}

return d.dispatch(ctx, event, endpoints)
}

func (d *Dispatcher) DispatchEndpoint(ctx context.Context, eventId string, endpoints []*entities.Endpoint) error {
attempts := make([]*entities.Attempt, 0, len(endpoints))
tasks := make([]*queue.TaskMessage, 0, len(endpoints))

now := time.Now()
for _, endpoint := range endpoints {
delay := endpoint.Retry.Config.Attempts[0]
attempt := &entities.Attempt{
ID: utils.KSUID(),
EventId: eventId,
EndpointId: endpoint.ID,
Status: entities.AttemptStatusInit,
AttemptNumber: 1,
ScheduledAt: types.NewTime(now.Add(time.Second * time.Duration(delay))),
TriggerMode: entities.AttemptTriggerModeManual,
}

task := &queue.TaskMessage{
ID: attempt.ID,
Data: &model.MessageData{
EventID: eventId,
EndpointId: endpoint.ID,
Attempt: 1,
},
}
attempts = append(attempts, attempt)
tasks = append(tasks, task)
}

err := d.db.AttemptsWS.BatchInsert(ctx, attempts)
if err != nil {
return err
}

for i, task := range tasks {
err := d.queue.Add(task, attempts[i].ScheduledAt.Time)
if err != nil {
d.log.Warnf("failed to add task to queue: %v", err)
continue
}
err = d.db.AttemptsWS.UpdateStatus(ctx, task.ID, entities.AttemptStatusQueued)
if err != nil {
d.log.Warnf("failed to update attempt status: %v", err)
}
}

return nil
}

func (d *Dispatcher) dispatch(ctx context.Context, event *entities.Event, endpoints []*entities.Endpoint) error {
attempts := make([]*entities.Attempt, 0, len(endpoints))
tasks := make([]*queue.TaskMessage, 0, len(endpoints))

err = d.db.TX(ctx, func(ctx context.Context) error {
err := d.db.TX(ctx, func(ctx context.Context) error {
now := time.Now()
err := d.db.Events.Insert(ctx, event)
err := d.db.EventsWS.Insert(ctx, event)
if err != nil {
return err
}
Expand All @@ -59,8 +107,8 @@ func (d *DBDispatcher) Dispatch(ctx context.Context, event *entities.Event) erro
Status: entities.AttemptStatusInit,
AttemptNumber: 1,
ScheduledAt: types.NewTime(now.Add(time.Second * time.Duration(delay))),
TriggerMode: entities.AttemptTriggerModeInitial,
}
attempt.WorkspaceId = endpoint.WorkspaceId

task := &queue.TaskMessage{
ID: attempt.ID,
Expand Down
26 changes: 24 additions & 2 deletions openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ paths:
items:
$ref: "#/components/schemas/Attempt"


/workspaces/{ws_id}/attempts/{id}:
parameters:
- $ref: '#/components/parameters/workspace_id'
Expand Down Expand Up @@ -324,7 +323,6 @@ paths:
schema:
$ref: '#/components/schemas/Event'


/workspaces/{ws_id}/events/{id}:
parameters:
- $ref: '#/components/parameters/workspace_id'
Expand All @@ -347,6 +345,30 @@ paths:
schema:
$ref: '#/components/schemas/Event'

/workspaces/{ws_id}/events/{id}/retry:
parameters:
- $ref: '#/components/parameters/workspace_id'

post:
summary: Manually retry an event
tags:
- Event
parameters:
- in: path
name: id
required: true
schema:
type: string
- in: query
name: endpoint_id
required: true
schema:
type: string

responses:
'200':
description: OK


/workspaces/{ws_id}/sources:
parameters:
Expand Down
6 changes: 3 additions & 3 deletions proxy/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ type Gateway struct {
router *router.Router // TODO: happens-before
db *db.DB

dispatcher dispatcher.Dispatcher
dispatcher *dispatcher.Dispatcher
}

func NewGateway(cfg *config.ProxyConfig, db *db.DB, dispatcher dispatcher.Dispatcher) *Gateway {
func NewGateway(cfg *config.ProxyConfig, db *db.DB, dispatcher *dispatcher.Dispatcher) *Gateway {
gw := &Gateway{
cfg: cfg,
log: zap.S(),
Expand Down Expand Up @@ -111,7 +111,7 @@ func (gw *Gateway) Handle(w http.ResponseWriter, r *http.Request) {
})
return
}
event.WorkspaceId = source.WorkspaceId

err := gw.dispatcher.Dispatch(r.Context(), &event)
if err != nil {
w.Header().Set("Content-Type", "application/json")
Expand Down
4 changes: 4 additions & 0 deletions test/admin/attempts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ var _ = Describe("/attempts", Ordered, func() {
AttemptNumber: 1,
ScheduledAt: types.Time{Time: time.Now()},
AttemptedAt: &types.Time{Time: time.Now()},
TriggerMode: entities.AttemptTriggerModeInitial,
Exhausted: false,
}
entitiesConfig.Attempts = []*entities.Attempt{entity}

Expand All @@ -192,6 +194,8 @@ var _ = Describe("/attempts", Ordered, func() {
assert.Equal(GinkgoT(), entity.EventId, result.EventId)
assert.Equal(GinkgoT(), entity.EndpointId, result.EndpointId)
assert.Equal(GinkgoT(), entities.AttemptStatusSuccess, result.Status)
assert.Equal(GinkgoT(), entities.AttemptTriggerModeInitial, result.TriggerMode)
assert.Equal(GinkgoT(), false, result.Exhausted)
assert.EqualValues(GinkgoT(), 1, result.AttemptNumber)
})

Expand Down
Loading

0 comments on commit be0e502

Please sign in to comment.