Skip to content

Commit fd2de5e

Browse files
authored
consumer group (#15)
- add new consumer package: consumer group - improve tests and coverage - improve rake tasks
1 parent c054656 commit fd2de5e

File tree

27 files changed

+1810
-573
lines changed

27 files changed

+1810
-573
lines changed

.github/workflows/build-push-cauldron-github-comsumer.yml renamed to .github/workflows/build-push-cauldron-github-group-consumer.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
name: Build and push Cauldron GitHub Consumer
1+
name: Build and push Cauldron GitHub Group Consumer
22

33
on:
44
workflow_dispatch:
@@ -24,7 +24,7 @@ jobs:
2424
uses: docker/build-push-action@v6
2525
with:
2626
context: .
27-
file: Dockerfile.github-consumer
27+
file: Dockerfile.github-consumer-group
2828
platforms: linux/amd64,linux/arm64
2929
push: true
3030
provenance: false

.golangci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ linters-settings:
3535
- h
3636
- d
3737
- p
38+
- l
3839
# ---------------------------------------------------------------------------
3940
errcheck:
4041
check-type-assertions: true

DEVELOPMENT.md

Lines changed: 34 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ bundle
7171
|:---------|:------------|---------|
7272
| `LOG_LEVEL` | Logging level, Valid values are: `"DEBUG"`, `"INFO"`, `"WARN"`, `"ERROR"` | `"INFO"` |
7373
| `KCP_BROKERS` | Kafka consumer/producer brokers list, comma separated | `"127.0.0.1:9094"` |
74-
| `KC_TOPIC_GITHUB` | Topic name for GitHub webhook consumer | `github` |
74+
| `KC_TOPIC_GITHUB` | Topic name for GitHub webhook consumer | `""` |
75+
| `KCG_NAME` | Kafka consumer group name | `""` |
7576
| `KC_PARTITION` | Consumer partition number | `0` |
7677
| `KC_DIAL_TIMEOUT` | Initial connection timeout used by broker (shared with consumer) | "`30s`" (seconds) |
7778
| `KC_READ_TIMEOUT` | Response timeout used by broker (shared with consumer) | "`30s`" (seconds) |
@@ -154,8 +155,11 @@ export KP_GITHUB_MESSAGE_QUEUE_SIZE=100
154155
# export KP_BACKOFF="2s"
155156
# export KP_MAX_RETRIES="10"
156157
157-
# kafka github consumer optional values.
158-
# export KC_TOPIC_GITHUB="github"
158+
# kafka github consumer group values.
159+
export KC_TOPIC_GITHUB="github"
160+
export KCG_NAME="github-group"
161+
162+
# kafka github consumer group optional values.
159163
# export KC_PARTITION="0"
160164
# export KC_DIAL_TIMEOUT="30s"
161165
# export KC_READ_TIMEOUT="30s"
@@ -212,31 +216,27 @@ of `rake tasks`:
212216
```bash
213217
rake -T
214218
215-
rake db:init # init database
216-
rake db:migrate # runs rake db:migrate up (shortcut)
217-
rake db:migrate:down # run migrate down
218-
rake db:migrate:goto[index] # go to migration
219-
rake db:migrate:up # run migrate up
220-
rake db:psql # connect local db with psql
221-
rake db:reset # reset database (drop and create)
222-
rake default # default task, runs server
223-
rake docker:build:github_consumer # build github consumer
224-
rake docker:build:migrator # build migrator
225-
rake docker:build:server # build server
226-
rake docker:compose:infra:down # stop the infra with all components
227-
rake docker:compose:infra:up # run the infra with all components
228-
rake docker:compose:kafka:down # stop the kafka and kafka-ui only
229-
rake docker:compose:kafka:up # run the kafka and kafka-ui only
230-
rake docker:run:github_consumer # run github consumer
231-
rake docker:run:migrator # run migrator
232-
rake docker:run:server # run server
233-
rake lint # run golang-ci linter
234-
rake rubocop:autofix # lint ruby and autofix
235-
rake rubocop:lint # lint ruby
236-
rake run:kafka:github:consumer # run kafka github consumer
237-
rake run:server # run server
238-
rake test # runs tests (shortcut)
239-
rake test:coverage # run tests and show coverage
219+
rake db:init # init database
220+
rake db:migrate # runs rake db:migrate up (shortcut)
221+
rake db:migrate:down # run migrate down
222+
rake db:migrate:goto[index] # go to migration
223+
rake db:migrate:up # run migrate up
224+
rake db:psql # connect local db with psql
225+
rake db:reset # reset database (drop and create)
226+
rake default # default task, runs server
227+
rake docker:compose:infra:down # stop the infra with all components
228+
rake docker:compose:infra:up # run the infra with all components
229+
rake docker:compose:kafka:down # stop the kafka and kafka-ui only
230+
rake docker:compose:kafka:up # run the kafka and kafka-ui only
231+
rake lint # run golang-ci linter
232+
rake psql:infra # connect to infra database with psql
233+
rake rubocop:autofix # lint ruby and autofix
234+
rake rubocop:lint # lint ruby
235+
rake run:kafka:github:consumer # run kafka github consumer
236+
rake run:kafka:github:consumer_group # run kafka github consumer group
237+
rake run:server # run server
238+
rake test # runs tests (shortcut)
239+
rake test:coverage # run tests and show coverage
240240
```
241241

242242
You can run tests:
@@ -347,24 +347,12 @@ rake rubocop:autofix # lints ruby code and auto fixes.
347347
```bash
348348
rake -T "docker:"
349349

350-
rake docker:build:github_consumer # build github consumer
351-
rake docker:build:migrator # build migrator
352-
rake docker:build:server # build server
353-
354-
rake docker:compose:infra:down # stop the infra with all components
355-
rake docker:compose:infra:up # run the infra with all components
356-
rake docker:compose:kafka:down # stop the kafka and kafka-ui only
357-
rake docker:compose:kafka:up # run the kafka and kafka-ui only
358-
359-
rake docker:run:github_consumer # run github consumer
360-
rake docker:run:migrator # run migrator
361-
rake docker:run:server # run server
350+
rake docker:compose:infra:down # stop the infra with all components
351+
rake docker:compose:infra:up # run the infra with all components
352+
rake docker:compose:kafka:down # stop the kafka and kafka-ui only
353+
rake docker:compose:kafka:up # run the kafka and kafka-ui only
362354
```
363355

364-
- `docker:build:*`: builds images locally, testing purposes.
365-
- `docker:run:*`: runs containers locally, testing purposes.
366-
- `docker:compose:*`: ups or downs whole infrastructure with services.
367-
368356
---
369357

370358
## Infrastructure Diagram
@@ -406,10 +394,10 @@ Now you can access:
406394

407395
- Kafka UI: `http://127.0.0.1:8080/`
408396
- Ngrok: `http://127.0.0.1:4040`
409-
- PostgreSQL: `PGPASSWORD="${POSTGRES_PASSWORD}" psql -h localhost -p 5433 -U postgres -d devchain_webhook`
397+
- PostgreSQL: `PGOPTIONS="--search_path=cauldron,public" PGPASSWORD="${POSTGRES_PASSWORD}" psql -h localhost -p 5433 -U postgres -d devchain_webhook`
410398

411399
For PostgreSQL, `5433` is exposed in container to avoid conflicts with the
412-
local PostgreSQL instance.
400+
local PostgreSQL instance. Use `rake psql:infra` to connect your infra database.
413401

414402
Logging for **kafka** and **kafka-ui** is set to `error` only. Due to development
415403
purposes, both were producing too much information, little clean up required.

Dockerfile.github-consumer-group

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
FROM golang:1.23-alpine AS builder
2+
3+
WORKDIR /build
4+
COPY . .
5+
6+
ARG GOOS
7+
ARG GOARCH
8+
RUN CGO_ENABLED=0 GOOS=${GOOS} GOARCH=${GOARCH} go build -o consumer cmd/githubconsumergroup/main.go
9+
10+
FROM alpine:latest AS certs
11+
RUN apk add --update --no-cache ca-certificates
12+
13+
FROM busybox:latest
14+
ARG UID=10001
15+
RUN adduser \
16+
--disabled-password \
17+
--gecos "" \
18+
--home "/nonexistent" \
19+
--shell "/sbin/nologin" \
20+
--no-create-home \
21+
--uid "${UID}" \
22+
appuser
23+
USER appuser
24+
COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
25+
COPY --from=builder /build/consumer /consumer
26+
27+
ENTRYPOINT ["/consumer"]
28+
29+
LABEL org.opencontainers.image.authors="Uğur vigo Özyılmazel <vigo@devchain.network>"
30+
LABEL org.opencontainers.image.licenses="MIT"
31+
LABEL org.opencontainers.image.source="https://github.com/devchain-network/cauldron"

Rakefile

Lines changed: 30 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ namespace :run do
3636

3737
namespace :kafka do
3838
namespace :github do
39+
3940
desc 'run kafka github consumer'
4041
task :consumer do
4142
run = %{ go run -race cmd/githubconsumer/main.go }
@@ -47,77 +48,25 @@ namespace :run do
4748
Process.kill('KILL', pid)
4849
0
4950
end
50-
end
51-
end
52-
end
53-
54-
namespace :docker do
55-
namespace :run do
56-
desc 'run server'
57-
task :server do
58-
system %{
59-
docker run \
60-
--env GITHUB_HMAC_SECRET=${GITHUB_HMAC_SECRET} \
61-
-p 8000:8000 \
62-
devchain-server:latest
63-
}
64-
$CHILD_STATUS&.exitstatus || 1
65-
rescue Interrupt
66-
0
67-
end
68-
69-
desc 'run github consumer'
70-
task :github_consumer do
71-
system %{
72-
docker run \
73-
--env KC_TOPIC=${KC_TOPIC} \
74-
devchain-gh-consumer:latest
75-
}
76-
$CHILD_STATUS&.exitstatus || 1
77-
rescue Interrupt
78-
0
79-
end
8051

81-
desc 'run migrator'
82-
task :migrator do
83-
system %{
84-
docker run \
85-
--env DATABASE_URL=${DATABASE_URL_DOCKER_TO_HOST} \
86-
devchain-migrator:latest
87-
}
88-
$CHILD_STATUS&.exitstatus || 1
89-
rescue Interrupt
90-
0
91-
end
92-
end
93-
namespace :build do
94-
desc 'build server'
95-
task :server do
96-
system %{ docker build -f Dockerfile.server -t devchain-server:latest . }
97-
$CHILD_STATUS&.exitstatus || 1
98-
rescue Interrupt
99-
0
100-
end
101-
102-
desc 'build github consumer'
103-
task :github_consumer do
104-
system %{ docker build -f Dockerfile.github-consumer -t devchain-gh-consumer:latest . }
105-
$CHILD_STATUS&.exitstatus || 1
106-
rescue Interrupt
107-
0
108-
end
52+
desc 'run kafka github consumer group'
53+
task :consumer_group do
54+
run = %{ go run -race cmd/githubconsumergroup/main.go }
55+
pid = Process.spawn(run)
56+
Process.wait(pid)
57+
$CHILD_STATUS&.exitstatus || 1
58+
rescue Interrupt
59+
Process.getpgid(pid)
60+
Process.kill('KILL', pid)
61+
0
62+
end
10963

110-
desc 'build migrator'
111-
task :migrator do
112-
system %{ docker build -f Dockerfile.migrator -t devchain-migrator:latest . }
113-
$CHILD_STATUS&.exitstatus || 1
114-
rescue Interrupt
115-
0
11664
end
11765
end
66+
end
11867

68+
namespace :docker do
11969
namespace :compose do
120-
12170
namespace :kafka do
12271
desc 'run the kafka and kafka-ui only'
12372
task :up do
@@ -295,3 +244,19 @@ end
295244

296245
desc 'runs tests (shortcut)'
297246
task test: 'test:test_all'
247+
248+
INFRA_POSTGRES_PASSWORD = ENV['POSTGRES_PASSWORD'] || nil
249+
namespace :psql do
250+
desc 'connect to infra database with psql'
251+
task :infra do
252+
abort 'infra POSTGRES_PASSWORD environment variable is not set' if INFRA_POSTGRES_PASSWORD.nil?
253+
system %{
254+
PGPASSWORD="#{INFRA_POSTGRES_PASSWORD}" \
255+
PGOPTIONS="--search_path=cauldron,public" \
256+
psql -h localhost -p 5433 -U postgres -d #{DATABASE_NAME}
257+
}
258+
$CHILD_STATUS&.exitstatus || 1
259+
rescue Interrupt
260+
0
261+
end
262+
end

cmd/githubconsumer/main.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"log"
77

8+
"github.com/IBM/sarama"
89
"github.com/devchain-network/cauldron/internal/kafkacp"
910
"github.com/devchain-network/cauldron/internal/kafkacp/kafkaconsumer"
1011
"github.com/devchain-network/cauldron/internal/slogger"
@@ -13,16 +14,22 @@ import (
1314
"github.com/vigo/getenv"
1415
)
1516

16-
const (
17-
defaultKafkaConsumerTopic = "github"
18-
)
17+
func storeMessage(strg storage.PingStorer) kafkaconsumer.ProcessMessageFunc {
18+
return func(ctx context.Context, msg *sarama.ConsumerMessage) error {
19+
if err := strg.MessageStore(ctx, msg); err != nil {
20+
return fmt.Errorf("message store error: [%w]", err)
21+
}
22+
23+
return nil
24+
}
25+
}
1926

2027
// Run runs kafa github consumer.
2128
func Run() error {
2229
logLevel := getenv.String("LOG_LEVEL", slogger.DefaultLogLevel)
2330
brokersList := getenv.String("KCP_BROKERS", kafkacp.DefaultKafkaBrokers)
2431

25-
kafkaTopic := getenv.String("KC_TOPIC_GITHUB", defaultKafkaConsumerTopic)
32+
kafkaTopic := getenv.String("KC_TOPIC_GITHUB", "")
2633
kafkaPartition := getenv.Int("KC_PARTITION", kafkaconsumer.DefaultPartition)
2734
kafkaDialTimeout := getenv.Duration("KC_DIAL_TIMEOUT", kafkaconsumer.DefaultDialTimeout)
2835
kafkaReadTimeout := getenv.Duration("KC_READ_TIMEOUT", kafkaconsumer.DefaultReadTimeout)
@@ -64,7 +71,7 @@ func Run() error {
6471

6572
kafkaGitHubConsumer, err := kafkaconsumer.New(
6673
kafkaconsumer.WithLogger(logger),
67-
kafkaconsumer.WithStorage(db),
74+
kafkaconsumer.WithProcessMessageFunc(storeMessage(db)),
6875
kafkaconsumer.WithKafkaBrokers(*brokersList),
6976
kafkaconsumer.WithDialTimeout(*kafkaDialTimeout),
7077
kafkaconsumer.WithReadTimeout(*kafkaReadTimeout),

0 commit comments

Comments
 (0)