Skip to content

Commit

Permalink
ENH - Set message max bytes (#27)
Browse files Browse the repository at this point in the history
* Add messages.max.bytes proprety

* update tests

* Update readme
  • Loading branch information
ldechoux authored Aug 28, 2024
1 parent a069602 commit dc96a59
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 0 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ Configuration variables with prefix are first loaded and then without prefix. Fo

A big value here can increase the heap memory of the application as all the payload that have to be sent to Kafka will be maintained in channel.

#### KAFKA_MESSAGE_MAX_BYTES
*Type*: integer

*Description*: The maximum message size in bytes at the producer level (default: 1024*1024)

#### LOG_CLI_VERBOSE
*Type*: boolean

Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Kafka struct {
Topic string `config:"KAFKA_TOPIC"`
ProduceChannelSize int `config:"KAFKA_PRODUCE_CHANNEL_SIZE"`
WithDecorators bool `config:"KAFKA_WITH_DECORATORS"`
MessageMaxBytes int `config:"KAFKA_MESSAGE_MAX_BYTES"`
}

// NewBase returns a new base configuration
Expand Down Expand Up @@ -107,6 +108,7 @@ func NewBase(ctx context.Context, configPrefix string) *Base {
Topic: "kafka-mongo-watcher",
ProduceChannelSize: 10000,
WithDecorators: true,
MessageMaxBytes: 1024 * 1024,
},
}

Expand Down
1 change: 1 addition & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var cfg = &Base{
Topic: "kafka-mongo-watcher",
ProduceChannelSize: 10000,
WithDecorators: true,
MessageMaxBytes: 1024 * 1024,
},
}

Expand Down
1 change: 1 addition & 0 deletions internal/service/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ func (container *Container) GetKafkaProducer() *kafkaconfluent.Producer {
producer, err := kafkaconfluent.NewProducer(&kafkaconfluent.ConfigMap{
"bootstrap.servers": container.Cfg.Kafka.BootstrapServers,
"go.produce.channel.size": container.Cfg.Kafka.ProduceChannelSize,
"message.max.bytes": container.Cfg.Kafka.MessageMaxBytes,
})
if err != nil {
panic(err)
Expand Down

0 comments on commit dc96a59

Please sign in to comment.