Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/allisson/go-env v0.3.0
github.com/go-ozzo/ozzo-validation/v4 v4.3.0
github.com/golang-migrate/migrate/v4 v4.14.1
github.com/golang/protobuf v1.4.3
github.com/golang/protobuf v1.5.1
github.com/grpc-ecosystem/go-grpc-middleware v1.2.2
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
Expand All @@ -16,12 +16,12 @@ require (
github.com/joho/godotenv v1.3.0
github.com/lib/pq v1.10.0
github.com/oklog/ulid/v2 v2.0.2
github.com/prometheus/client_golang v1.9.0
github.com/prometheus/client_golang v1.10.0
github.com/stretchr/testify v1.7.0
github.com/urfave/cli/v2 v2.3.0
go.uber.org/zap v1.16.0
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4
google.golang.org/genproto v0.0.0-20210317182105-75c7a8546eb9
google.golang.org/grpc v1.36.0
google.golang.org/protobuf v1.25.0
google.golang.org/protobuf v1.26.0
)
288 changes: 278 additions & 10 deletions go.sum

Large diffs are not rendered by default.

20 changes: 7 additions & 13 deletions grpc/delivery_attempt_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,18 @@ import (

"github.com/allisson/hammer"
pb "github.com/allisson/hammer/api/v1"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

// DeliveryAttemptHandler implements methods for DeliveryAttempt get/list
type DeliveryAttemptHandler struct {
deliveryAttemptService hammer.DeliveryAttemptService
}

func (d DeliveryAttemptHandler) buildResponse(deliveryAttempt *hammer.DeliveryAttempt) (*pb.DeliveryAttempt, error) {
func (d DeliveryAttemptHandler) buildResponse(deliveryAttempt *hammer.DeliveryAttempt) *pb.DeliveryAttempt {
response := &pb.DeliveryAttempt{}
createdAt, err := ptypes.TimestampProto(deliveryAttempt.CreatedAt)
if err != nil {
return response, status.Error(codes.Internal, err.Error())
}
response.Id = deliveryAttempt.ID
response.DeliveryId = deliveryAttempt.DeliveryID
response.Request = deliveryAttempt.Request
Expand All @@ -30,9 +26,9 @@ func (d DeliveryAttemptHandler) buildResponse(deliveryAttempt *hammer.DeliveryAt
response.ExecutionDuration = uint32(deliveryAttempt.ExecutionDuration)
response.Success = deliveryAttempt.Success
response.Error = deliveryAttempt.Error
response.CreatedAt = createdAt
response.CreatedAt = timestamppb.New(deliveryAttempt.CreatedAt)

return response, nil
return response
}

// GetDeliveryAttempt gets the DeliveryAttempt
Expand All @@ -48,7 +44,8 @@ func (d DeliveryAttemptHandler) GetDeliveryAttempt(ctx context.Context, request
}
}

return d.buildResponse(deliveryAttempt)
response := d.buildResponse(deliveryAttempt)
return response, nil
}

// ListDeliveryAttempts get a list of DeliveryAttempts
Expand Down Expand Up @@ -84,10 +81,7 @@ func (d DeliveryAttemptHandler) ListDeliveryAttempts(ctx context.Context, reques
// Update response
for i := range deliveryAttempts {
deliveryAttempt := deliveryAttempts[i]
deliveryAttemptResponse, err := d.buildResponse(deliveryAttempt)
if err != nil {
return response, status.Error(codes.Internal, err.Error())
}
deliveryAttemptResponse := d.buildResponse(deliveryAttempt)
response.DeliveryAttempts = append(response.DeliveryAttempts, deliveryAttemptResponse)
}

Expand Down
33 changes: 9 additions & 24 deletions grpc/delivery_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,18 @@ import (

"github.com/allisson/hammer"
pb "github.com/allisson/hammer/api/v1"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

// DeliveryHandler implements methods for Delivery get/list
type DeliveryHandler struct {
deliveryService hammer.DeliveryService
}

func (d DeliveryHandler) buildResponse(delivery *hammer.Delivery) (*pb.Delivery, error) {
func (d DeliveryHandler) buildResponse(delivery *hammer.Delivery) *pb.Delivery {
response := &pb.Delivery{}
createdAt, err := ptypes.TimestampProto(delivery.CreatedAt)
if err != nil {
return response, status.Error(codes.Internal, err.Error())
}
updatedAt, err := ptypes.TimestampProto(delivery.UpdatedAt)
if err != nil {
return response, status.Error(codes.Internal, err.Error())
}
scheduledAt, err := ptypes.TimestampProto(delivery.ScheduledAt)
if err != nil {
return response, status.Error(codes.Internal, err.Error())
}
response.Id = delivery.ID
response.TopicId = delivery.TopicID
response.SubscriptionId = delivery.SubscriptionID
Expand All @@ -41,13 +29,13 @@ func (d DeliveryHandler) buildResponse(delivery *hammer.Delivery) (*pb.Delivery,
response.MaxDeliveryAttempts = uint32(delivery.MaxDeliveryAttempts)
response.DeliveryAttemptDelay = uint32(delivery.DeliveryAttemptDelay)
response.DeliveryAttemptTimeout = uint32(delivery.DeliveryAttemptTimeout)
response.ScheduledAt = scheduledAt
response.ScheduledAt = timestamppb.New(delivery.ScheduledAt)
response.DeliveryAttempts = uint32(delivery.DeliveryAttempts)
response.Status = delivery.Status
response.CreatedAt = createdAt
response.UpdatedAt = updatedAt
response.CreatedAt = timestamppb.New(delivery.CreatedAt)
response.UpdatedAt = timestamppb.New(delivery.UpdatedAt)

return response, nil
return response
}

// GetDelivery gets the Delivery
Expand All @@ -62,8 +50,8 @@ func (d DeliveryHandler) GetDelivery(ctx context.Context, request *pb.GetDeliver
return &pb.Delivery{}, status.Error(codes.Internal, err.Error())
}
}

return d.buildResponse(delivery)
response := d.buildResponse(delivery)
return response, nil
}

// ListDeliveries get a list of deliveries
Expand Down Expand Up @@ -123,10 +111,7 @@ func (d DeliveryHandler) ListDeliveries(ctx context.Context, request *pb.ListDel
// Update response
for i := range deliveries {
delivery := deliveries[i]
deliveryResponse, err := d.buildResponse(delivery)
if err != nil {
return response, status.Error(codes.Internal, err.Error())
}
deliveryResponse := d.buildResponse(delivery)
response.Deliveries = append(response.Deliveries, deliveryResponse)
}

Expand Down
25 changes: 9 additions & 16 deletions grpc/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,26 @@ import (

"github.com/allisson/hammer"
pb "github.com/allisson/hammer/api/v1"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

// MessageHandler implements methods for message create/update
type MessageHandler struct {
messageService hammer.MessageService
}

func (m MessageHandler) buildResponse(message *hammer.Message) (*pb.Message, error) {
func (m MessageHandler) buildResponse(message *hammer.Message) *pb.Message {
response := &pb.Message{}
createdAt, err := ptypes.TimestampProto(message.CreatedAt)
if err != nil {
return response, status.Error(codes.Internal, err.Error())
}
response.Id = message.ID
response.TopicId = message.TopicID
response.ContentType = message.ContentType
response.Data = message.Data
response.CreatedAt = createdAt
response.CreatedAt = timestamppb.New(message.CreatedAt)

return response, nil
return response
}

// CreateMessage creates a new Message
Expand Down Expand Up @@ -58,8 +54,8 @@ func (m MessageHandler) CreateMessage(ctx context.Context, request *pb.CreateMes
if err != nil {
return &pb.Message{}, status.Error(codes.Internal, err.Error())
}

return m.buildResponse(message)
response := m.buildResponse(message)
return response, nil
}

// GetMessage gets the message
Expand All @@ -74,8 +70,8 @@ func (m MessageHandler) GetMessage(ctx context.Context, request *pb.GetMessageRe
return &pb.Message{}, status.Error(codes.Internal, err.Error())
}
}

return m.buildResponse(message)
response := m.buildResponse(message)
return response, nil
}

// ListMessages get a list of messages
Expand Down Expand Up @@ -111,10 +107,7 @@ func (m MessageHandler) ListMessages(ctx context.Context, request *pb.ListMessag
// Update response
for i := range messages {
message := messages[i]
messageResponse, err := m.buildResponse(message)
if err != nil {
return response, status.Error(codes.Internal, err.Error())
}
messageResponse := m.buildResponse(message)
response.Messages = append(response.Messages, messageResponse)
}

Expand Down
35 changes: 12 additions & 23 deletions grpc/subscription_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,19 @@ import (

"github.com/allisson/hammer"
pb "github.com/allisson/hammer/api/v1"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

// SubscriptionHandler implements methods for topic create/update
type SubscriptionHandler struct {
subscriptionService hammer.SubscriptionService
}

func (s SubscriptionHandler) buildResponse(subscription *hammer.Subscription) (*pb.Subscription, error) {
func (s SubscriptionHandler) buildResponse(subscription *hammer.Subscription) *pb.Subscription {
response := &pb.Subscription{}
createdAt, err := ptypes.TimestampProto(subscription.CreatedAt)
if err != nil {
return response, status.Error(codes.Internal, err.Error())
}
updatedAt, err := ptypes.TimestampProto(subscription.UpdatedAt)
if err != nil {
return response, status.Error(codes.Internal, err.Error())
}
response.Id = subscription.ID
response.TopicId = subscription.TopicID
response.Name = subscription.Name
Expand All @@ -35,10 +27,10 @@ func (s SubscriptionHandler) buildResponse(subscription *hammer.Subscription) (*
response.MaxDeliveryAttempts = uint32(subscription.MaxDeliveryAttempts)
response.DeliveryAttemptDelay = uint32(subscription.DeliveryAttemptDelay)
response.DeliveryAttemptTimeout = uint32(subscription.DeliveryAttemptTimeout)
response.CreatedAt = createdAt
response.UpdatedAt = updatedAt
response.CreatedAt = timestamppb.New(subscription.CreatedAt)
response.UpdatedAt = timestamppb.New(subscription.UpdatedAt)

return response, nil
return response
}

// CreateSubscription creates a new subscription
Expand Down Expand Up @@ -71,8 +63,8 @@ func (s SubscriptionHandler) CreateSubscription(ctx context.Context, request *pb
if err != nil {
return &pb.Subscription{}, status.Error(codes.Internal, err.Error())
}

return s.buildResponse(subscription)
response := s.buildResponse(subscription)
return response, nil
}

// UpdateSubscription update the subscription
Expand Down Expand Up @@ -104,8 +96,8 @@ func (s SubscriptionHandler) UpdateSubscription(ctx context.Context, request *pb
if err != nil {
return &pb.Subscription{}, status.Error(codes.Internal, err.Error())
}

return s.buildResponse(subscription)
response := s.buildResponse(subscription)
return response, nil
}

// GetSubscription gets the subscription
Expand All @@ -120,8 +112,8 @@ func (s SubscriptionHandler) GetSubscription(ctx context.Context, request *pb.Ge
return &pb.Subscription{}, status.Error(codes.Internal, err.Error())
}
}

return s.buildResponse(subscription)
response := s.buildResponse(subscription)
return response, nil
}

// ListSubscriptions get a list of topics
Expand Down Expand Up @@ -149,10 +141,7 @@ func (s SubscriptionHandler) ListSubscriptions(ctx context.Context, request *pb.
// Update response
for i := range subscriptions {
subscription := subscriptions[i]
subscriptionResponse, err := s.buildResponse(subscription)
if err != nil {
return response, status.Error(codes.Internal, err.Error())
}
subscriptionResponse := s.buildResponse(subscription)
response.Subscriptions = append(response.Subscriptions, subscriptionResponse)
}

Expand Down
35 changes: 12 additions & 23 deletions grpc/topic_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,25 @@ import (

"github.com/allisson/hammer"
pb "github.com/allisson/hammer/api/v1"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

// TopicHandler implements methods for topic create/update
type TopicHandler struct {
topicService hammer.TopicService
}

func (t TopicHandler) buildResponse(topic *hammer.Topic) (*pb.Topic, error) {
func (t TopicHandler) buildResponse(topic *hammer.Topic) *pb.Topic {
response := &pb.Topic{}
createdAt, err := ptypes.TimestampProto(topic.CreatedAt)
if err != nil {
return response, status.Error(codes.Internal, err.Error())
}
updatedAt, err := ptypes.TimestampProto(topic.UpdatedAt)
if err != nil {
return response, status.Error(codes.Internal, err.Error())
}
response.Id = topic.ID
response.Name = topic.Name
response.CreatedAt = createdAt
response.UpdatedAt = updatedAt
response.CreatedAt = timestamppb.New(topic.CreatedAt)
response.UpdatedAt = timestamppb.New(topic.UpdatedAt)

return response, nil
return response
}

// CreateTopic creates a new topic
Expand All @@ -59,8 +51,8 @@ func (t TopicHandler) CreateTopic(ctx context.Context, request *pb.CreateTopicRe
if err != nil {
return &pb.Topic{}, status.Error(codes.Internal, err.Error())
}

return t.buildResponse(topic)
response := t.buildResponse(topic)
return response, nil
}

// UpdateTopic update the topic
Expand All @@ -86,8 +78,8 @@ func (t TopicHandler) UpdateTopic(ctx context.Context, request *pb.UpdateTopicRe
if err != nil {
return &pb.Topic{}, status.Error(codes.Internal, err.Error())
}

return t.buildResponse(topic)
response := t.buildResponse(topic)
return response, nil
}

// GetTopic gets the topic
Expand All @@ -102,8 +94,8 @@ func (t TopicHandler) GetTopic(ctx context.Context, request *pb.GetTopicRequest)
return &pb.Topic{}, status.Error(codes.Internal, err.Error())
}
}

return t.buildResponse(topic)
response := t.buildResponse(topic)
return response, nil
}

// ListTopics get a list of topics
Expand Down Expand Up @@ -131,10 +123,7 @@ func (t TopicHandler) ListTopics(ctx context.Context, request *pb.ListTopicsRequ
// Update response
for i := range topics {
topic := topics[i]
topicResponse, err := t.buildResponse(topic)
if err != nil {
return response, status.Error(codes.Internal, err.Error())
}
topicResponse := t.buildResponse(topic)
response.Topics = append(response.Topics, topicResponse)
}

Expand Down