Skip to content

Commit dfa7e79

Browse files
committed
Feature: implement grpc server for publish and subscribe
1 parent aabe308 commit dfa7e79

File tree

11 files changed

+611
-1
lines changed

11 files changed

+611
-1
lines changed

Dockerfile

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ WORKDIR /app
1515
COPY --from=Builder /app/application /app
1616

1717
EXPOSE 3000
18+
EXPOSE 3001
1819

1920
CMD ["./application"]
2021

@@ -26,3 +27,13 @@ RUN go install github.com/go-delve/delve/cmd/dlv@v1.22.1
2627
WORKDIR /app
2728

2829
CMD ["air"]
30+
31+
32+
FROM golang:1.22-alpine3.19 AS protobuf-grpc-build
33+
34+
WORKDIR /app
35+
36+
RUN go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.34.2
37+
RUN go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.4
38+
RUN apk update && apk add protobuf
39+

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,6 @@ live:
77

88
down:
99
docker compose -f compose.yml -f compose.development.yml down
10+
11+
proto:
12+
docker compose -f compose.development.yml up application-proto

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ You can publish messages with http POST on /{topic}/publish message must be sent
2323
To receive messages you can http GET on /{topic}/subscribe or connect via websocket to /{topic}/ws
2424
On the websocket you need to send "next" in plaintext to get a new message
2525

26+
You can use the gRPC endpoints to publish and subscribe to messages, see internal/grpc/messagebroker.proto
27+
2628
## License
2729

2830
Simple Http Message Queue \

compose.development.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,10 @@ services:
3333
- ./go.mod:/app/go.mod
3434
- ./go.sum:/app/go.sum
3535

36+
application-proto:
37+
container_name: application-proto
38+
command: sh -c "protoc grpc/*.proto --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative --proto_path=."
39+
build:
40+
target: protobuf-grpc-build
41+
volumes:
42+
- ./internal/grpc:/app/grpc

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ go 1.22
55
require (
66
github.com/gofiber/contrib/websocket v1.3.1
77
github.com/gofiber/fiber/v2 v2.52.4
8+
google.golang.org/grpc v1.64.0
9+
google.golang.org/protobuf v1.34.2
810
)
911

1012
require (
@@ -22,4 +24,6 @@ require (
2224
github.com/valyala/tcplisten v1.0.0 // indirect
2325
golang.org/x/net v0.24.0 // indirect
2426
golang.org/x/sys v0.19.0 // indirect
27+
golang.org/x/text v0.14.0 // indirect
28+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
2529
)

go.sum

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ github.com/gofiber/contrib/websocket v1.3.1 h1:iINEnUIT7Wi1ttGWW5fY1fnKQlIEa5KTD
88
github.com/gofiber/contrib/websocket v1.3.1/go.mod h1:oDLA6uM7x4hFq1zjy3US3HuvmrlWJKO5nrsw2ZKNSfY=
99
github.com/gofiber/fiber/v2 v2.52.4 h1:P+T+4iK7VaqUsq2PALYEfBBo6bJZ4q3FP8cZ84EggTM=
1010
github.com/gofiber/fiber/v2 v2.52.4/go.mod h1:KEOE+cXMhXG0zHc9d8+E38hoX+ZN7bhOtgeF2oT6jrQ=
11+
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
12+
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
1113
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
1214
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
1315
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
@@ -40,5 +42,13 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc
4042
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
4143
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
4244
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
45+
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
46+
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
47+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc=
48+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
49+
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY=
50+
google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg=
51+
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
52+
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
4353
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
4454
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

internal/grpc/grpc.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package grpc
2+
3+
import (
4+
"context"
5+
"github.com/ThomasBoom89/simple-http-message-queue/internal"
6+
"github.com/gofiber/fiber/v2/log"
7+
)
8+
9+
type Server struct {
10+
topicManager *internal.TopicManager
11+
}
12+
13+
func NewServer(topicManager *internal.TopicManager) *Server {
14+
return &Server{topicManager: topicManager}
15+
}
16+
17+
func (S *Server) Publish(ctx context.Context, request *PublishRequest) (*Empty, error) {
18+
log.Debug(request.GetMessage())
19+
S.topicManager.AddMessage(internal.Topic(request.GetTopic()), request.GetMessage())
20+
21+
return &Empty{}, nil
22+
}
23+
24+
func (S *Server) Subscribe(ctx context.Context, request *SubscribeRequest) (*Response, error) {
25+
message, err := S.topicManager.GetNextMessage(internal.Topic(request.GetTopic()))
26+
if err != nil {
27+
return &Response{Message: []byte{}}, nil
28+
}
29+
30+
return &Response{Message: message}, nil
31+
}
32+
33+
func (S *Server) mustEmbedUnimplementedMessageBrokerServer() {
34+
}

0 commit comments

Comments
 (0)