Skip to content

Commit cd97991

Browse files
committed
feat: add batch size configuration for buffer and implement batch processing in report sender
1 parent 387c17a commit cd97991

File tree

4 files changed

+137
-8
lines changed

4 files changed

+137
-8
lines changed

internal/config/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type AgentConfig struct {
3838
type BufferConfig struct {
3939
Path string `mapstructure:"path"`
4040
RetentionHours int `mapstructure:"retention_hours"`
41+
BatchSize int `mapstructure:"batch_size"` // Number of reports to send per batch (default: 5)
4142
}
4243

4344
var (
@@ -52,6 +53,7 @@ var (
5253
Buffer: BufferConfig{
5354
Path: "/var/lib/node-pulse/buffer",
5455
RetentionHours: 48,
56+
BatchSize: 5,
5557
},
5658
Logging: logger.Config{
5759
Level: "info",
@@ -122,6 +124,7 @@ func setDefaults(v *viper.Viper) {
122124
v.SetDefault("agent.interval", defaultConfig.Agent.Interval)
123125
v.SetDefault("buffer.path", defaultConfig.Buffer.Path)
124126
v.SetDefault("buffer.retention_hours", defaultConfig.Buffer.RetentionHours)
127+
v.SetDefault("buffer.batch_size", defaultConfig.Buffer.BatchSize)
125128
v.SetDefault("logging.level", defaultConfig.Logging.Level)
126129
v.SetDefault("logging.output", defaultConfig.Logging.Output)
127130
v.SetDefault("logging.file.path", defaultConfig.Logging.File.Path)
@@ -180,6 +183,9 @@ func validate(cfg *Config) error {
180183
if cfg.Buffer.RetentionHours <= 0 {
181184
return fmt.Errorf("buffer.retention_hours must be positive")
182185
}
186+
if cfg.Buffer.BatchSize <= 0 {
187+
return fmt.Errorf("buffer.batch_size must be positive")
188+
}
183189

184190
return nil
185191
}

internal/installer/installer.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ type ConfigOptions struct {
3434
Interval string
3535

3636
// Buffer options (buffer is always enabled in new architecture)
37-
BufferPath string
37+
BufferPath string
3838
BufferRetentionHours int
39+
BufferBatchSize int
3940

4041
// Logging options
4142
LogLevel string
@@ -207,8 +208,9 @@ func DefaultConfigOptions() ConfigOptions {
207208
Interval: "5s",
208209

209210
// Buffer defaults (always enabled)
210-
BufferPath: DefaultBufferPath,
211+
BufferPath: DefaultBufferPath,
211212
BufferRetentionHours: 48,
213+
BufferBatchSize: 5,
212214

213215
// Logging defaults
214216
LogLevel: "info",
@@ -236,6 +238,7 @@ func WriteConfigFile(opts ConfigOptions) error {
236238
"buffer": map[string]interface{}{
237239
"path": opts.BufferPath,
238240
"retention_hours": opts.BufferRetentionHours,
241+
"batch_size": opts.BufferBatchSize,
239242
},
240243
"logging": map[string]interface{}{
241244
"level": opts.LogLevel,

internal/report/sender.go

Lines changed: 121 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package report
33
import (
44
"bytes"
55
"context"
6+
"encoding/json"
67
"fmt"
78
"io"
89
"math/rand"
@@ -104,6 +105,7 @@ func (s *Sender) StartDraining() {
104105
}
105106

106107
// drainLoop continuously drains the buffer with random delays
108+
// Batches up to 5 reports per request for efficiency
107109
func (s *Sender) drainLoop() {
108110
for {
109111
// Check if context is cancelled
@@ -128,19 +130,95 @@ func (s *Sender) drainLoop() {
128130
continue
129131
}
130132

131-
// Process oldest file
132-
filePath := files[0]
133-
if err := s.processBufferFile(filePath); err != nil {
134-
// Failed to send - keep file and retry after delay
135-
logger.Debug("Failed to process buffer file, will retry", logger.String("file", filePath), logger.Err(err))
133+
// Determine batch size: up to configured batch_size
134+
batchSize := len(files)
135+
if batchSize > s.config.Buffer.BatchSize {
136+
batchSize = s.config.Buffer.BatchSize
137+
}
138+
139+
// Process batch of files (oldest first)
140+
batch := files[:batchSize]
141+
if err := s.processBatch(batch); err != nil {
142+
// Failed to send - keep files and retry after delay
143+
logger.Debug("Failed to process batch, will retry", logger.Int("batch_size", batchSize), logger.Err(err))
136144
}
137145

138146
// Wait random delay before next attempt
139147
s.randomDelay()
140148
}
141149
}
142150

143-
// processBufferFile attempts to send all reports from a buffer file
151+
// processBatch loads and sends a batch of buffer files as a single array request
152+
// Returns error if send fails (files are kept for retry)
153+
// Handles corrupted files by sending N/A markers
154+
func (s *Sender) processBatch(filePaths []string) error {
155+
var reports []*metrics.Report
156+
var validFiles []string
157+
158+
// Load all reports from the batch
159+
for _, filePath := range filePaths {
160+
fileReports, err := s.buffer.LoadFile(filePath)
161+
if err != nil {
162+
// File is corrupted - send N/A marker and delete
163+
logger.Warn("Corrupted buffer file detected in batch, sending N/A metrics",
164+
logger.String("file", filePath),
165+
logger.Err(err))
166+
167+
// Create N/A report for this corrupted file
168+
naReport := s.createNAReport()
169+
reports = append(reports, naReport)
170+
171+
// Delete corrupted file immediately
172+
if delErr := s.buffer.DeleteFile(filePath); delErr != nil {
173+
logger.Error("Failed to delete corrupted buffer file",
174+
logger.String("file", filePath),
175+
logger.Err(delErr))
176+
} else {
177+
logger.Info("Deleted corrupted buffer file", logger.String("file", filePath))
178+
}
179+
continue
180+
}
181+
182+
// File loaded successfully - add to batch
183+
if len(fileReports) > 0 {
184+
reports = append(reports, fileReports...)
185+
validFiles = append(validFiles, filePath)
186+
}
187+
}
188+
189+
// If no reports to send, we're done
190+
if len(reports) == 0 {
191+
return nil
192+
}
193+
194+
// Send batch as array
195+
if err := s.sendBatch(reports); err != nil {
196+
// Send failed - keep valid files for retry
197+
return fmt.Errorf("failed to send batch of %d reports: %w", len(reports), err)
198+
}
199+
200+
// Send succeeded - delete all valid files
201+
for _, filePath := range validFiles {
202+
if err := s.buffer.DeleteFile(filePath); err != nil {
203+
logger.Error("Failed to delete buffer file after successful send",
204+
logger.String("file", filePath),
205+
logger.Err(err))
206+
}
207+
}
208+
209+
logger.Info("Successfully sent batch",
210+
logger.Int("reports", len(reports)),
211+
logger.Int("files", len(validFiles)))
212+
213+
// Periodically clean up old buffer files
214+
if err := s.buffer.Cleanup(); err != nil {
215+
logger.Warn("Failed to cleanup old buffer files", logger.Err(err))
216+
}
217+
218+
return nil
219+
}
220+
221+
// processBufferFile attempts to send all reports from a buffer file (DEPRECATED - kept for compatibility)
144222
// Returns error if any report fails to send (file is kept for retry)
145223
// If file is corrupted, sends N/A metrics and deletes the corrupted file
146224
func (s *Sender) processBufferFile(filePath string) error {
@@ -212,6 +290,43 @@ func (s *Sender) processBufferFile(filePath string) error {
212290
return nil
213291
}
214292

293+
// sendBatch sends an array of reports to the server
294+
func (s *Sender) sendBatch(reports []*metrics.Report) error {
295+
// Marshal array of reports to JSON
296+
data, err := json.Marshal(reports)
297+
if err != nil {
298+
return fmt.Errorf("failed to marshal batch: %w", err)
299+
}
300+
301+
// Send via HTTP
302+
if err := s.sendHTTP(data); err != nil {
303+
return fmt.Errorf("failed to send batch: %w", err)
304+
}
305+
306+
return nil
307+
}
308+
309+
// createNAReport creates a report with all metrics set to null (N/A)
310+
func (s *Sender) createNAReport() *metrics.Report {
311+
hostname, err := os.Hostname()
312+
if err != nil {
313+
hostname = "unknown"
314+
}
315+
316+
return &metrics.Report{
317+
ServerID: s.config.Agent.ServerID,
318+
Timestamp: time.Now().UTC().Format(time.RFC3339),
319+
Hostname: hostname,
320+
SystemInfo: nil,
321+
CPU: nil,
322+
Memory: nil,
323+
Disk: nil,
324+
Network: nil,
325+
Uptime: nil,
326+
Processes: nil,
327+
}
328+
}
329+
215330
// sendCorruptedFileMarker sends a report with all metrics set to null (N/A)
216331
// This keeps the timeline intact when a corrupted buffer file is encountered
217332
func (s *Sender) sendCorruptedFileMarker(filePath string) error {

nodepulse.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ buffer:
3131
# Files older than this are automatically deleted
3232
retention_hours: 48
3333

34+
# Number of reports to send per batch request
35+
# Higher values = fewer HTTP requests, larger payloads
36+
# Default: 5
37+
batch_size: 5
38+
3439
logging:
3540
# Log level: debug, info, warn, error
3641
# debug: Verbose diagnostic information for troubleshooting

0 commit comments

Comments
 (0)