Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 29 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ curl -X POST 'http://localhost:8000/v1/topics' \
{
"id": "topic",
"name": "Topic",
"created_at": "2020-05-15T12:46:25.789143Z",
"updated_at": "2020-05-15T12:46:25.789143Z"
"created_at": "2020-05-17T18:04:49.949875Z",
"updated_at": "2020-05-17T18:04:49.949875Z"
}
```

Expand Down Expand Up @@ -99,8 +99,8 @@ curl -X POST 'http://localhost:8000/v1/subscriptions' \
"max_delivery_attempts": 5,
"delivery_attempt_delay": 60,
"delivery_attempt_timeout": 5,
"created_at": "2020-05-15T12:46:46.469638Z",
"updated_at": "2020-05-15T12:46:46.469638Z"
"created_at": "2020-05-17T18:05:54.102493Z",
"updated_at": "2020-05-17T18:05:54.102493Z"
}
```

Expand All @@ -120,11 +120,11 @@ curl -X POST 'http://localhost:8000/v1/messages' \

```javascript
{
"id": "01E8C5ZKFFKHEAMSM0WRKCH7FQ",
"id": "01E8HX1CYHKN2R4TQVG507NYVS",
"topic_id": "topic",
"content_type": "application/json",
"data": "eyJuYW1lIjogIkFsbGlzc29uIn0=",
"created_at": "2020-05-15T12:47:11.343464Z"
"created_at": "2020-05-17T18:06:19.601962Z"
}
```

Expand All @@ -136,15 +136,17 @@ The system will send a post request and the server must respond with the followi

```bash
docker run --env HAMMER_DATABASE_URL='postgres://user:pass@localhost:5432/hammer?sslmode=disable' allisson/hammer worker
{"level":"info","ts":1589738659.759326,"caller":"hammer/main.go:266","msg":"worker-started"}
{"level":"info","ts":1589738780.93929,"caller":"service/worker.go:77","msg":"delivery-made","delivery_id":"01E8HX1CYM0RFZDMKJHSPFF50J","delivery_attempt_id":"01E8HX1D6PYFB1HJFG0S7WEKBK","response_status_code":200,"execution_duration":1061}
```

#### Local

```bash
make run-worker
go run cmd/worker/main.go
{"level":"info","ts":1589546853.6263108,"caller":"worker/main.go:182","msg":"worker-started"}
{"level":"info","ts":1589546854.666185,"caller":"worker/main.go:93","msg":"delivery-made","id":"01E8C5ZKFHGFR4ZBZ17SGN4FAX","topic_id":"topic","subscription_id":"httpbin-post","message_id":"01E8C5ZKFFKHEAMSM0WRKCH7FQ","status":"completed","attempts":1,"max_delivery_attempts":5}
{"level":"info","ts":1589738659.759326,"caller":"hammer/main.go:266","msg":"worker-started"}
{"level":"info","ts":1589738780.93929,"caller":"service/worker.go:77","msg":"delivery-made","delivery_id":"01E8HX1CYM0RFZDMKJHSPFF50J","delivery_attempt_id":"01E8HX1D6PYFB1HJFG0S7WEKBK","response_status_code":200,"execution_duration":1061}
```

Submitted payload (Compatible with JSON Event Format for CloudEvents - Version 1.0):
Expand All @@ -153,13 +155,13 @@ Submitted payload (Compatible with JSON Event Format for CloudEvents - Version 1
{
"data_base64": "eyJuYW1lIjogIkFsbGlzc29uIn0=",
"datacontenttype": "application/json",
"id": "01E8C5ZKFHGFR4ZBZ17SGN4FAX",
"messageid": "01E8C5ZKFFKHEAMSM0WRKCH7FQ",
"id": "01E8HX1CYM0RFZDMKJHSPFF50J",
"messageid": "01E8HX1CYHKN2R4TQVG507NYVS",
"secrettoken": "my-super-secret-token",
"source": "/v1/messages/01E8C5ZKFFKHEAMSM0WRKCH7FQ",
"source": "/v1/messages/01E8HX1CYHKN2R4TQVG507NYVS",
"specversion": "1.0",
"subscriptionid": "httpbin-post",
"time": "2020-05-15T09:47:11.345912-03:00",
"time": "2020-05-17T15:06:19.604225-03:00",
"topicid": "topic",
"type": "hammer.message.created"
}
Expand All @@ -168,27 +170,27 @@ Submitted payload (Compatible with JSON Event Format for CloudEvents - Version 1
### Get delivery data

```bash
curl -X GET http://localhost:8000/v1/deliveries/01E8C5ZKFHGFR4ZBZ17SGN4FAX
curl -X GET http://localhost:8000/v1/deliveries/01E8HX1CYM0RFZDMKJHSPFF50J
```

```javascript
{
"id": "01E8C5ZKFHGFR4ZBZ17SGN4FAX",
"id": "01E8HX1CYM0RFZDMKJHSPFF50J",
"topic_id": "topic",
"subscription_id": "httpbin-post",
"message_id": "01E8C5ZKFFKHEAMSM0WRKCH7FQ",
"message_id": "01E8HX1CYHKN2R4TQVG507NYVS",
"content_type": "application/json",
"data": "eyJuYW1lIjogIkFsbGlzc29uIn0=",
"url": "https://httpbin.org/post",
"secret_token": "my-super-secret-token",
"max_delivery_attempts": 5,
"delivery_attempt_delay": 60,
"delivery_attempt_timeout": 5,
"scheduled_at": "2020-05-15T12:47:11.345912Z",
"scheduled_at": "2020-05-17T18:06:19.604225Z",
"delivery_attempts": 1,
"status": "completed",
"created_at": "2020-05-15T12:47:11.345912Z",
"updated_at": "2020-05-15T12:47:34.662483Z"
"created_at": "2020-05-17T18:06:19.604225Z",
"updated_at": "2020-05-17T18:06:20.935601Z"
}
```

Expand All @@ -197,23 +199,19 @@ curl -X GET http://localhost:8000/v1/deliveries/01E8C5ZKFHGFR4ZBZ17SGN4FAX
The execution_duration are in milliseconds.

```bash
curl -X GET 'http://localhost:8000/v1/delivery-attempts?delivery_id=01E8C5ZKFHGFR4ZBZ17SGN4FAX'
curl -X GET http://localhost:8000/v1/delivery-attempts/01E8HX1D6PYFB1HJFG0S7WEKBK
```

```javascript
{
"delivery_attempts":[
{
"id": "01E8C60987C47VZ14RBA48M5EB",
"delivery_id": "01E8C5ZKFHGFR4ZBZ17SGN4FAX",
"request": "POST /post HTTP/1.1\r\nHost: httpbin.org\r\nContent-Type: application/json\r\n\r\n{\"specversion\":\"1.0\",\"type\":\"hammer.message.created\",\"source\":\"/v1/messages/01E8C5ZKFFKHEAMSM0WRKCH7FQ\",\"id\":\"01E8C5ZKFHGFR4ZBZ17SGN4FAX\",\"time\":\"2020-05-15T09:47:11.345912-03:00\",\"secrettoken\":\"my-super-secret-token\",\"messageid\":\"01E8C5ZKFFKHEAMSM0WRKCH7FQ\",\"subscriptionid\":\"httpbin-post\",\"topicid\":\"topic\",\"datacontenttype\":\"application/json\",\"data_base64\":\"eyJuYW1lIjogIkFsbGlzc29uIn0=\"}",
"response": "HTTP/2.0 200 OK\r\nContent-Length: 1308\r\nAccess-Control-Allow-Credentials: true\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/json\r\nDate: Fri, 15 May 2020 12:47:34 GMT\r\nServer: gunicorn/19.9.0\r\n\r\n{\n \"args\": {}, \n \"data\": \"{\\\"specversion\\\":\\\"1.0\\\",\\\"type\\\":\\\"hammer.message.created\\\",\\\"source\\\":\\\"/v1/messages/01E8C5ZKFFKHEAMSM0WRKCH7FQ\\\",\\\"id\\\":\\\"01E8C5ZKFHGFR4ZBZ17SGN4FAX\\\",\\\"time\\\":\\\"2020-05-15T09:47:11.345912-03:00\\\",\\\"secrettoken\\\":\\\"my-super-secret-token\\\",\\\"messageid\\\":\\\"01E8C5ZKFFKHEAMSM0WRKCH7FQ\\\",\\\"subscriptionid\\\":\\\"httpbin-post\\\",\\\"topicid\\\":\\\"topic\\\",\\\"datacontenttype\\\":\\\"application/json\\\",\\\"data_base64\\\":\\\"eyJuYW1lIjogIkFsbGlzc29uIn0=\\\"}\", \n \"files\": {}, \n \"form\": {}, \n \"headers\": {\n \"Accept-Encoding\": \"gzip\", \n \"Content-Length\": \"391\", \n \"Content-Type\": \"application/json\", \n \"Host\": \"httpbin.org\", \n \"User-Agent\": \"Go-http-client/2.0\", \n \"X-Amzn-Trace-Id\": \"Root=1-5ebe8f66-7cd7a3a08196283065db25d0\"\n }, \n \"json\": {\n \"data_base64\": \"eyJuYW1lIjogIkFsbGlzc29uIn0=\", \n \"datacontenttype\": \"application/json\", \n \"id\": \"01E8C5ZKFHGFR4ZBZ17SGN4FAX\", \n \"messageid\": \"01E8C5ZKFFKHEAMSM0WRKCH7FQ\", \n \"secrettoken\": \"my-super-secret-token\", \n \"source\": \"/v1/messages/01E8C5ZKFFKHEAMSM0WRKCH7FQ\", \n \"specversion\": \"1.0\", \n \"subscriptionid\": \"httpbin-post\", \n \"time\": \"2020-05-15T09:47:11.345912-03:00\", \n \"topicid\": \"topic\", \n \"type\": \"hammer.message.created\"\n }, \n \"origin\": \"177.37.153.46\", \n \"url\": \"https://httpbin.org/post\"\n}\n",
"response_status_code": 200,
"execution_duration": 1010,
"success": true,
"created_at": "2020-05-15T12:47:34.651052Z"
}
]
"id": "01E8HX1D6PYFB1HJFG0S7WEKBK",
"delivery_id": "01E8HX1CYM0RFZDMKJHSPFF50J",
"request": "POST /post HTTP/1.1\r\nHost: httpbin.org\r\nContent-Type: application/json\r\n\r\n{\"specversion\":\"1.0\",\"type\":\"hammer.message.created\",\"source\":\"/v1/messages/01E8HX1CYHKN2R4TQVG507NYVS\",\"id\":\"01E8HX1CYM0RFZDMKJHSPFF50J\",\"time\":\"2020-05-17T15:06:19.604225-03:00\",\"secrettoken\":\"my-super-secret-token\",\"messageid\":\"01E8HX1CYHKN2R4TQVG507NYVS\",\"subscriptionid\":\"httpbin-post\",\"topicid\":\"topic\",\"datacontenttype\":\"application/json\",\"data_base64\":\"eyJuYW1lIjogIkFsbGlzc29uIn0=\"}",
"response": "HTTP/2.0 200 OK\r\nContent-Length: 1308\r\nAccess-Control-Allow-Credentials: true\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/json\r\nDate: Sun, 17 May 2020 18:06:20 GMT\r\nServer: gunicorn/19.9.0\r\n\r\n{\n \"args\": {}, \n \"data\": \"{\\\"specversion\\\":\\\"1.0\\\",\\\"type\\\":\\\"hammer.message.created\\\",\\\"source\\\":\\\"/v1/messages/01E8HX1CYHKN2R4TQVG507NYVS\\\",\\\"id\\\":\\\"01E8HX1CYM0RFZDMKJHSPFF50J\\\",\\\"time\\\":\\\"2020-05-17T15:06:19.604225-03:00\\\",\\\"secrettoken\\\":\\\"my-super-secret-token\\\",\\\"messageid\\\":\\\"01E8HX1CYHKN2R4TQVG507NYVS\\\",\\\"subscriptionid\\\":\\\"httpbin-post\\\",\\\"topicid\\\":\\\"topic\\\",\\\"datacontenttype\\\":\\\"application/json\\\",\\\"data_base64\\\":\\\"eyJuYW1lIjogIkFsbGlzc29uIn0=\\\"}\", \n \"files\": {}, \n \"form\": {}, \n \"headers\": {\n \"Accept-Encoding\": \"gzip\", \n \"Content-Length\": \"391\", \n \"Content-Type\": \"application/json\", \n \"Host\": \"httpbin.org\", \n \"User-Agent\": \"Go-http-client/2.0\", \n \"X-Amzn-Trace-Id\": \"Root=1-5ec17d1c-2614cd69fd899c64176e4e01\"\n }, \n \"json\": {\n \"data_base64\": \"eyJuYW1lIjogIkFsbGlzc29uIn0=\", \n \"datacontenttype\": \"application/json\", \n \"id\": \"01E8HX1CYM0RFZDMKJHSPFF50J\", \n \"messageid\": \"01E8HX1CYHKN2R4TQVG507NYVS\", \n \"secrettoken\": \"my-super-secret-token\", \n \"source\": \"/v1/messages/01E8HX1CYHKN2R4TQVG507NYVS\", \n \"specversion\": \"1.0\", \n \"subscriptionid\": \"httpbin-post\", \n \"time\": \"2020-05-17T15:06:19.604225-03:00\", \n \"topicid\": \"topic\", \n \"type\": \"hammer.message.created\"\n }, \n \"origin\": \"177.37.153.46\", \n \"url\": \"https://httpbin.org/post\"\n}\n",
"response_status_code": 200,
"execution_duration": 1061,
"success": true,
"created_at": "2020-05-17T18:06:20.925086Z"
}
```

Expand Down
8 changes: 5 additions & 3 deletions fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ func MakeTestDelivery() Delivery {
func MakeTestDeliveryAttempt() DeliveryAttempt {
id := fmt.Sprintf("%d", randonInt())
return DeliveryAttempt{
ID: fmt.Sprintf("DeliveryAttempt_%s", id),
Success: false,
CreatedAt: time.Now().UTC(),
ID: fmt.Sprintf("DeliveryAttempt_%s", id),
Success: true,
ResponseStatusCode: 201,
ExecutionDuration: 1000,
CreatedAt: time.Now().UTC(),
}
}
17 changes: 12 additions & 5 deletions mocks/DeliveryService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type DeliveryService interface {
Find(id string) (Delivery, error)
FindAll(findOptions FindOptions) ([]Delivery, error)
FindToDispatch(limit, offset int) ([]string, error)
Dispatch(delivery *Delivery, httpClient *http.Client) error
Dispatch(delivery *Delivery, httpClient *http.Client) (DeliveryAttempt, error)
}

// DeliveryAttemptService interface
Expand Down
14 changes: 7 additions & 7 deletions service/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,19 +108,19 @@ func (d *Delivery) FindToDispatch(limit, offset int) ([]string, error) {
}

// Dispatch message to destination
func (d *Delivery) Dispatch(delivery *hammer.Delivery, httpClient *http.Client) error {
func (d *Delivery) Dispatch(delivery *hammer.Delivery, httpClient *http.Client) (hammer.DeliveryAttempt, error) {
// Generate delivery attempt id
id, err := generateULID()
if err != nil {
return err
return hammer.DeliveryAttempt{}, err
}

dr := makeRequest(delivery, httpClient)

// Start tx
tx, err := d.txFactoryRepo.New()
if err != nil {
return err
return hammer.DeliveryAttempt{}, err
}

// Create delivery attempt
Expand All @@ -138,7 +138,7 @@ func (d *Delivery) Dispatch(delivery *hammer.Delivery, httpClient *http.Client)
err = d.deliveryAttemptRepo.Store(tx, &deliveryAttempt)
if err != nil {
rollback(tx, "delivery-dispatch-delivery-attempt-store")
return err
return hammer.DeliveryAttempt{}, err
}

// Update delivery
Expand All @@ -156,17 +156,17 @@ func (d *Delivery) Dispatch(delivery *hammer.Delivery, httpClient *http.Client)
err = d.deliveryRepo.Store(tx, delivery)
if err != nil {
rollback(tx, "delivery-dispatch-delivery-store")
return err
return hammer.DeliveryAttempt{}, err
}

// Commit tx
err = tx.Commit()
if err != nil {
rollback(tx, "delivery-dispatch-commit")
return err
return hammer.DeliveryAttempt{}, err
}

return nil
return deliveryAttempt, nil
}

// NewDelivery returns a new Delivery with DeliveryRepo
Expand Down
12 changes: 9 additions & 3 deletions service/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,12 @@ func TestDelivery(t *testing.T) {
txRepo.On("Commit").Return(nil)

deliveryAttemps := delivery.DeliveryAttempts
err := deliveryService.Dispatch(&delivery, httpServer.Client())
deliveryAttempt, err := deliveryService.Dispatch(&delivery, httpServer.Client())
assert.Nil(t, err)
assert.Equal(t, delivery.DeliveryAttempts, deliveryAttemps+1)
assert.Equal(t, hammer.DeliveryStatusCompleted, delivery.Status)
assert.Equal(t, true, deliveryAttempt.Success)
assert.Equal(t, http.StatusOK, deliveryAttempt.ResponseStatusCode)
})

t.Run("Test Dispatch Error", func(t *testing.T) {
Expand All @@ -101,11 +103,13 @@ func TestDelivery(t *testing.T) {

deliveryScheduledAt := delivery.ScheduledAt
deliveryAttemps := delivery.DeliveryAttempts
err := deliveryService.Dispatch(&delivery, httpServer.Client())
deliveryAttempt, err := deliveryService.Dispatch(&delivery, httpServer.Client())
assert.Nil(t, err)
assert.Equal(t, delivery.DeliveryAttempts, deliveryAttemps+1)
assert.Equal(t, hammer.DeliveryStatusPending, delivery.Status)
assert.True(t, delivery.ScheduledAt.After(deliveryScheduledAt))
assert.Equal(t, false, deliveryAttempt.Success)
assert.Equal(t, http.StatusNotFound, deliveryAttempt.ResponseStatusCode)
})

t.Run("Test Dispatch MaxDeliveryAttempts Error", func(t *testing.T) {
Expand All @@ -128,10 +132,12 @@ func TestDelivery(t *testing.T) {

deliveryScheduledAt := delivery.ScheduledAt
deliveryAttemps := delivery.DeliveryAttempts
err := deliveryService.Dispatch(&delivery, httpServer.Client())
deliveryAttempt, err := deliveryService.Dispatch(&delivery, httpServer.Client())
assert.Nil(t, err)
assert.Equal(t, delivery.DeliveryAttempts, deliveryAttemps+1)
assert.Equal(t, hammer.DeliveryStatusFailed, delivery.Status)
assert.Equal(t, deliveryScheduledAt, delivery.ScheduledAt)
assert.Equal(t, false, deliveryAttempt.Success)
assert.Equal(t, http.StatusNotFound, deliveryAttempt.ResponseStatusCode)
})
}
33 changes: 22 additions & 11 deletions service/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,32 @@ func (w *Worker) dispatch(deliveryID string) {
httpClient := &http.Client{Timeout: time.Duration(delivery.DeliveryAttemptTimeout) * time.Second}

// Dispatch
err = w.deliveryService.Dispatch(&delivery, httpClient)
deliveryAttempt, err := w.deliveryService.Dispatch(&delivery, httpClient)
if err != nil {
logger.Error("delivery-service-dispatch", zap.Error(err))
return
}
logger.Info(
"delivery-made",
zap.String("id", delivery.ID),
zap.String("topic_id", delivery.TopicID),
zap.String("subscription_id", delivery.SubscriptionID),
zap.String("message_id", delivery.MessageID),
zap.String("status", delivery.Status),
zap.Int("attempts", delivery.DeliveryAttempts),
zap.Int("max_delivery_attempts", delivery.MaxDeliveryAttempts),
)

if delivery.Status == hammer.DeliveryStatusCompleted {
logger.Info(
"delivery-made",
zap.String("delivery_id", delivery.ID),
zap.String("delivery_attempt_id", deliveryAttempt.ID),
zap.Int("response_status_code", deliveryAttempt.ResponseStatusCode),
zap.Int("execution_duration", deliveryAttempt.ExecutionDuration),
)
} else {
logger.Info(
"delivery-fail",
zap.String("delivery_id", delivery.ID),
zap.String("delivery_attempt_id", deliveryAttempt.ID),
zap.Int("response_status_code", deliveryAttempt.ResponseStatusCode),
zap.Int("execution_duration", deliveryAttempt.ExecutionDuration),
zap.String("error", deliveryAttempt.Error),
zap.Int("attempts", delivery.DeliveryAttempts),
zap.Int("max_delivery_attempts", delivery.MaxDeliveryAttempts),
)
}
}

// Run worker flow
Expand Down
8 changes: 7 additions & 1 deletion service/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,19 @@ import (

func TestWorker(t *testing.T) {
delivery := hammer.MakeTestDelivery()
delivery.TopicID = "topic"
delivery.SubscriptionID = "subscription"
delivery.MessageID = "message"
delivery.Status = hammer.DeliveryStatusCompleted
deliveryAttempt := hammer.MakeTestDeliveryAttempt()
deliveryAttempt.DeliveryID = delivery.ID
deliveryService := &mocks.DeliveryService{}
lock := &lockmock.Locker{}
workerService := NewWorker(lock, deliveryService)
deliveryService.On("FindToDispatch", hammer.WorkerDefaultFetchLimit, 0).Return([]string{delivery.ID}, nil)
lock.On("Lock", mock.Anything).Return(true, nil)
deliveryService.On("Find", delivery.ID).Return(delivery, nil)
deliveryService.On("Dispatch", &delivery, mock.Anything).Return(nil)
deliveryService.On("Dispatch", &delivery, mock.Anything).Return(deliveryAttempt, nil)
lock.On("Unlock", mock.Anything).Return(nil)

// Execute Run method in goroutine
Expand Down