Skip to content

Commit

Permalink
Merge pull request moov-io#200 from moov-io/expose-producer-errors
Browse files Browse the repository at this point in the history
stream: unwrap underlying error from sarama.ProducerError
  • Loading branch information
adamdecaf authored Sep 11, 2023
2 parents 321e81a + 501058e commit 4222be0
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 20 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ endif
.PHONY: teardown
teardown:
-docker-compose down --remove-orphans
-docker-compose rm -sfv

docker: update
docker build --pull --build-arg VERSION=${VERSION} -t moov/achgateway:${VERSION} -f Dockerfile .
Expand Down
59 changes: 46 additions & 13 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,53 @@ services:
- "2181:2181"

kafka1:
image: wurstmeister/kafka:2.13-2.6.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: test1:1:1
PORT_COMMAND: "docker port $$(hostname) 9092/tcp | cut -d: -f2"
HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
image: docker.redpanda.com/vectorized/redpanda:v22.3.21
container_name: kafka1
healthcheck:
{
test: curl -f localhost:9644/v1/status/ready,
interval: 1s,
start_period: 30s,
}
volumes:
- /var/run/docker.sock:/var/run/docker.sock
tmpfs: # Run this mysql in memory as its used for testing
- /tmp/kafka-logs
- redpanda-0:/var/lib/redpanda/data
networks:
- intranet
ports:
- 18081:18081
- 18082:18082
- 19092:19092
- 19644:9644
command:
- redpanda
- start
- --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
- --advertise-kafka-addr internal://kafka1:9092,external://localhost:19092
- --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082
- --advertise-pandaproxy-addr internal://kafka1:8082,external://localhost:18082
- --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081
- --rpc-addr kafka1:33145
- --advertise-rpc-addr kafka1:33145
- --smp 1
- --memory 1G
- --mode dev-container
- --default-log-level=info

topics:
image: docker.redpanda.com/vectorized/redpanda:v22.3.21
depends_on:
kafka1:
condition: service_healthy
networks:
- intranet
command:
- topic
- --brokers kafka1:9092
- create
- ach.outgoing-files

networks:
intranet: {}

volumes:
redpanda-0: null
7 changes: 5 additions & 2 deletions internal/incoming/stream/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,14 @@ func (kp *kafkaProducer) Send(ctx context.Context, m *pubsub.Message) error {
if err != nil {
var producerError sarama.ProducerError
if kp.topic.ErrorAs(err, &producerError) {
return fmt.Errorf("producer error sending message: %w", producerError)
return fmt.Errorf("producer error: %w", producerError.Err)
}
var producerErrors sarama.ProducerErrors
if kp.topic.ErrorAs(err, &producerErrors) {
return fmt.Errorf("producer errors sending message: %w", producerErrors)
if len(producerErrors) > 0 {
return fmt.Errorf("first producer error (of %d) - %w", len(producerErrors), producerErrors[0].Err)
}
return fmt.Errorf("producer errors: %w", producerErrors)
}
return fmt.Errorf("error sending message: %w", err)
}
Expand Down
57 changes: 52 additions & 5 deletions internal/incoming/stream/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ package stream

import (
"context"
"errors"
"strings"
"testing"

"github.com/moov-io/achgateway/internal/service"
"github.com/moov-io/base/docker"
"github.com/moov-io/base/log"

"github.com/stretchr/testify/require"
Expand All @@ -34,7 +37,7 @@ func TestStream(t *testing.T) {
defer sub.Shutdown(ctx)

// quick send and receive
send(ctx, topic, "hello, world")
send(t, ctx, topic, "hello, world")
if msg, err := receive(ctx, sub); err == nil {
if msg != "hello, world" {
t.Errorf("got %q", msg)
Expand All @@ -44,19 +47,63 @@ func TestStream(t *testing.T) {
}
}

func send(ctx context.Context, t Publisher, body string) *pubsub.Message {
func TestStreamErrors(t *testing.T) {
if testing.Short() {
t.Skip("-short flag enabled")
}
if !docker.Enabled() {
t.Skip("Docker not enabled")
}

cfg := &service.Config{
Inbound: service.Inbound{
Kafka: &service.KafkaConfig{
Brokers: []string{"localhost:19092"},
Key: "",
Secret: "",
Topic: "test1",
TLS: false,
},
},
}
ctx := context.Background()
logger := log.NewTestLogger()

topic, err := Topic(logger, cfg)
require.NoError(t, err)
defer topic.Shutdown(ctx)

// Produce a message that's too big
msg := &pubsub.Message{
Body: []byte(strings.Repeat("A", 1e9)),
Metadata: make(map[string]string),
}
err = topic.Send(ctx, msg)
require.ErrorContains(t, err, "kafka server: Message was too large, server rejected it to avoid allocation error")
}

func send(t *testing.T, ctx context.Context, topic Publisher, body string) *pubsub.Message {
t.Helper()

msg := &pubsub.Message{
Body: []byte(body),
Metadata: make(map[string]string),
}
t.Send(ctx, msg)
err := topic.Send(ctx, msg)
if err != nil {
t.Error(err)
}
return msg
}

func receive(ctx context.Context, t Subscription) (string, error) {
msg, err := t.Receive(ctx)
func receive(ctx context.Context, sub Subscription) (string, error) {
msg, err := sub.Receive(ctx)
if err != nil {
return "", err
}
if msg == nil {
return "", errors.New("nil Message received")
}
msg.Ack()
return string(msg.Body), nil
}

0 comments on commit 4222be0

Please sign in to comment.