|
| 1 | +# Buffer Mechanism |
| 2 | + |
| 3 | +The Node Pulse Agent uses a **Write-Ahead Log (WAL)** pattern for reliable metrics delivery. All scraped metrics are buffered to disk before being sent to the dashboard. |
| 4 | + |
| 5 | +## Architecture |
| 6 | + |
| 7 | +``` |
| 8 | +node_exporter → Agent scrapes → Save to buffer → Background drain loop → Send to dashboard |
| 9 | + ↓ |
| 10 | + Always persisted first |
| 11 | +``` |
| 12 | + |
| 13 | +## How It Works |
| 14 | + |
| 15 | +### 1. Write-Ahead Log Pattern |
| 16 | + |
| 17 | +**Every scrape is saved to buffer first:** |
| 18 | + |
| 19 | +```go |
| 20 | +// From internal/report/sender.go |
| 21 | +func (s *Sender) SendPrometheus(data []byte, serverID string) error { |
| 22 | + // Always save to buffer first (WAL pattern) |
| 23 | + if err := s.buffer.SavePrometheus(data, serverID); err != nil { |
| 24 | + return fmt.Errorf("failed to save prometheus data to buffer: %w", err) |
| 25 | + } |
| 26 | + |
| 27 | + logger.Debug("Prometheus data saved to buffer") |
| 28 | + return nil |
| 29 | +} |
| 30 | +``` |
| 31 | + |
| 32 | +**Key benefit:** Data is persisted to disk before attempting network send. If the agent crashes, metrics are not lost. |
| 33 | + |
| 34 | +### 2. Background Drain Loop |
| 35 | + |
| 36 | +A separate goroutine continuously drains the buffer: |
| 37 | + |
| 38 | +```go |
| 39 | +// From internal/report/sender.go |
| 40 | +func (s *Sender) StartDraining() { |
| 41 | + go s.drainLoop() |
| 42 | + logger.Info("Started buffer drain goroutine with random jitter") |
| 43 | +} |
| 44 | + |
| 45 | +func (s *Sender) drainLoop() { |
| 46 | + for { |
| 47 | + // Get buffered files (oldest first) |
| 48 | + files, err := s.buffer.GetBufferFiles() |
| 49 | + |
| 50 | + // Batch processing: up to batch_size (default: 5) |
| 51 | + batchSize := len(files) |
| 52 | + if batchSize > s.config.Buffer.BatchSize { |
| 53 | + batchSize = s.config.Buffer.BatchSize |
| 54 | + } |
| 55 | + |
| 56 | + // Process batch |
| 57 | + s.processBatch(files[:batchSize]) |
| 58 | + |
| 59 | + // Random delay before next attempt |
| 60 | + s.randomDelay() |
| 61 | + } |
| 62 | +} |
| 63 | +``` |
| 64 | + |
| 65 | +**Process:** |
| 66 | +1. Check buffer directory for `.prom` files |
| 67 | +2. Select oldest files (up to batch size) |
| 68 | +3. Attempt to send each file |
| 69 | +4. Delete file on successful send |
| 70 | +5. Keep file on failed send (retry later) |
| 71 | +6. Wait random delay before next iteration |
| 72 | + |
| 73 | +### 3. Random Jitter |
| 74 | + |
| 75 | +Prevents thundering herd problem with multiple agents: |
| 76 | + |
| 77 | +```go |
| 78 | +// From internal/report/sender.go |
| 79 | +func (s *Sender) randomDelay() { |
| 80 | + // Generate random delay: 0 to full interval |
| 81 | + maxDelay := s.config.Agent.Interval // Default: 15s |
| 82 | + delay := time.Duration(s.rng.Int63n(int64(maxDelay))) |
| 83 | + |
| 84 | + logger.Debug("Waiting random delay before next drain attempt", |
| 85 | + logger.Duration("delay", delay)) |
| 86 | + |
| 87 | + time.Sleep(delay) |
| 88 | +} |
| 89 | +``` |
| 90 | + |
| 91 | +**Why random jitter?** |
| 92 | +- With 1000 agents, if all retry at the same time → traffic spike |
| 93 | +- Random delay (0-15s) spreads retries across the interval window |
| 94 | +- Smooths load on dashboard server |
| 95 | + |
| 96 | +### 4. Batch Processing |
| 97 | + |
| 98 | +Process up to 5 files per batch (configurable): |
| 99 | + |
| 100 | +```go |
| 101 | +// From internal/report/sender.go |
| 102 | +func (s *Sender) processBatch(filePaths []string) error { |
| 103 | + successCount := 0 |
| 104 | + |
| 105 | + for _, filePath := range filePaths { |
| 106 | + // Load Prometheus data from file |
| 107 | + entry, err := s.buffer.LoadPrometheusFile(filePath) |
| 108 | + if err != nil { |
| 109 | + // Corrupted file - delete it |
| 110 | + logger.Warn("Corrupted buffer file detected, deleting") |
| 111 | + s.buffer.DeleteFile(filePath) |
| 112 | + continue |
| 113 | + } |
| 114 | + |
| 115 | + // Send Prometheus data |
| 116 | + if err := s.sendPrometheusHTTP(entry.Data, entry.ServerID); err != nil { |
| 117 | + // Send failed - keep file for retry |
| 118 | + logger.Debug("Failed to send, will retry later") |
| 119 | + break // Stop on first failure |
| 120 | + } |
| 121 | + |
| 122 | + // Send succeeded - delete file |
| 123 | + s.buffer.DeleteFile(filePath) |
| 124 | + successCount++ |
| 125 | + } |
| 126 | + |
| 127 | + return nil |
| 128 | +} |
| 129 | +``` |
| 130 | + |
| 131 | +**Behavior:** |
| 132 | +- Process files oldest-first (chronological order) |
| 133 | +- Stop batch on first failure (preserve ordering) |
| 134 | +- Delete file only after successful HTTP 2xx response |
| 135 | + |
| 136 | +## Buffer File Format |
| 137 | + |
| 138 | +**File naming:** |
| 139 | +``` |
| 140 | +YYYYMMDD-HHMMSS-<server_id>.prom |
| 141 | +``` |
| 142 | + |
| 143 | +**Examples:** |
| 144 | +``` |
| 145 | +/var/lib/node-pulse/buffer/20251027-140000-550e8400-e29b-41d4-a716-446655440000.prom |
| 146 | +/var/lib/node-pulse/buffer/20251027-140015-550e8400-e29b-41d4-a716-446655440000.prom |
| 147 | +/var/lib/node-pulse/buffer/20251027-140030-550e8400-e29b-41d4-a716-446655440000.prom |
| 148 | +``` |
| 149 | + |
| 150 | +**Content:** |
| 151 | +Each file contains Prometheus text format from a single scrape: |
| 152 | + |
| 153 | +``` |
| 154 | +# HELP node_cpu_seconds_total Seconds the CPUs spent in each mode. |
| 155 | +# TYPE node_cpu_seconds_total counter |
| 156 | +node_cpu_seconds_total{cpu="0",mode="idle"} 123456.78 |
| 157 | +node_cpu_seconds_total{cpu="0",mode="system"} 1234.56 |
| 158 | +
|
| 159 | +# HELP node_memory_MemTotal_bytes Memory information field MemTotal_bytes. |
| 160 | +# TYPE node_memory_MemTotal_bytes gauge |
| 161 | +node_memory_MemTotal_bytes 8.589934592e+09 |
| 162 | +``` |
| 163 | + |
| 164 | +## Configuration |
| 165 | + |
| 166 | +```yaml |
| 167 | +buffer: |
| 168 | + path: "/var/lib/node-pulse/buffer" |
| 169 | + retention_hours: 48 |
| 170 | + batch_size: 5 |
| 171 | +``` |
| 172 | +
|
| 173 | +**Settings:** |
| 174 | +- `path`: Buffer directory location (default: `/var/lib/node-pulse/buffer`) |
| 175 | +- `retention_hours`: Auto-delete files older than this (default: 48 hours) |
| 176 | +- `batch_size`: Maximum files to process per batch (default: 5) |
| 177 | + |
| 178 | +## Example Scenarios |
| 179 | + |
| 180 | +### Scenario 1: Normal Operation |
| 181 | + |
| 182 | +1. Agent scrapes node_exporter every 15s |
| 183 | +2. Saves to buffer: `20251027-140000-uuid.prom` |
| 184 | +3. Drain loop immediately picks it up (no delay if buffer empty) |
| 185 | +4. Sends to dashboard successfully |
| 186 | +5. Deletes file |
| 187 | +6. Buffer stays empty |
| 188 | + |
| 189 | +**Result:** Near real-time delivery, buffer acts as safety net |
| 190 | + |
| 191 | +### Scenario 2: Dashboard Temporarily Down |
| 192 | + |
| 193 | +1. Agent scrapes every 15s, saves to buffer |
| 194 | +2. Drain loop tries to send, fails (connection refused) |
| 195 | +3. Keeps file, waits random delay (0-15s) |
| 196 | +4. Tries again, fails again |
| 197 | +5. Buffer accumulates files: 240 files after 1 hour (3600s / 15s = 240) |
| 198 | +6. Dashboard comes back online |
| 199 | +7. Drain loop sends oldest 5 files (batch) |
| 200 | +8. Deletes those 5 files |
| 201 | +9. Random delay (0-15s) |
| 202 | +10. Sends next 5 files |
| 203 | +11. Continues until buffer empty |
| 204 | + |
| 205 | +**Result:** All metrics eventually delivered, in order, without data loss |
| 206 | + |
| 207 | +### Scenario 3: Network Instability |
| 208 | + |
| 209 | +1. Dashboard intermittently reachable |
| 210 | +2. Drain loop succeeds sometimes, fails sometimes |
| 211 | +3. Buffer grows slowly during failures, shrinks during successes |
| 212 | +4. Oldest-first processing maintains chronological order |
| 213 | + |
| 214 | +**Result:** Metrics delivered in order as network allows |
| 215 | + |
| 216 | +### Scenario 4: Agent Crash |
| 217 | + |
| 218 | +1. Agent scrapes and saves to buffer: `20251027-140000-uuid.prom` |
| 219 | +2. Agent crashes before drain loop sends |
| 220 | +3. File remains on disk |
| 221 | +4. Agent restarts |
| 222 | +5. Drain loop finds existing file |
| 223 | +6. Sends it successfully |
| 224 | + |
| 225 | +**Result:** No data loss, persisted metrics survive crashes |
| 226 | + |
| 227 | +## Cleanup |
| 228 | + |
| 229 | +**Automatic cleanup runs periodically:** |
| 230 | + |
| 231 | +```go |
| 232 | +// From internal/report/buffer.go |
| 233 | +func (b *Buffer) Cleanup() error { |
| 234 | + files, err := b.getBufferFiles() |
| 235 | + if err != nil { |
| 236 | + return err |
| 237 | + } |
| 238 | +
|
| 239 | + cutoffTime := time.Now().Add(-time.Duration(b.config.Buffer.RetentionHours) * time.Hour) |
| 240 | +
|
| 241 | + for _, filePath := range files { |
| 242 | + // Parse timestamp from filename |
| 243 | + // Format: YYYYMMDD-HHMMSS-<server_id>.prom |
| 244 | + filename := filepath.Base(filePath) |
| 245 | + parts := strings.SplitN(strings.TrimSuffix(filename, ".prom"), "-", 3) |
| 246 | + timeStr := parts[0] + "-" + parts[1] |
| 247 | +
|
| 248 | + fileTime, err := time.Parse("20060102-150405", timeStr) |
| 249 | + if err != nil { |
| 250 | + continue |
| 251 | + } |
| 252 | +
|
| 253 | + // If file is older than cutoff, delete it |
| 254 | + if fileTime.Before(cutoffTime) { |
| 255 | + os.Remove(filePath) |
| 256 | + logger.Debug("Removed old buffer file", logger.String("file", filePath)) |
| 257 | + } |
| 258 | + } |
| 259 | +
|
| 260 | + return nil |
| 261 | +} |
| 262 | +``` |
| 263 | + |
| 264 | +**When it runs:** |
| 265 | +- After successful batch send |
| 266 | +- Prevents indefinite buffer growth if dashboard is permanently unreachable |
| 267 | + |
| 268 | +**Retention:** 48 hours (configurable) |
| 269 | + |
| 270 | +## Monitoring Buffer Status |
| 271 | + |
| 272 | +Check buffer status with `pulse status`: |
| 273 | + |
| 274 | +```bash |
| 275 | +$ pulse status |
| 276 | +
|
| 277 | +Node Pulse Agent Status |
| 278 | +===================== |
| 279 | +
|
| 280 | +Server ID: 550e8400-e29b-41d4-a716-446655440000 |
| 281 | +Config File: /etc/node-pulse/nodepulse.yml |
| 282 | +Endpoint: https://dashboard.nodepulse.io/metrics/prometheus |
| 283 | +Interval: 15s |
| 284 | +
|
| 285 | +Agent: running (via systemd) |
| 286 | +
|
| 287 | +Buffer: 12 report(s) pending in /var/lib/node-pulse/buffer |
| 288 | + Oldest: 2025-10-27 14:00:00 |
| 289 | + Total size: 156 KB |
| 290 | +
|
| 291 | +Log File: /var/log/node-pulse/agent.log |
| 292 | +``` |
| 293 | + |
| 294 | +**Buffer metrics:** |
| 295 | +- File count: Number of `.prom` files in buffer |
| 296 | +- Report count: Same as file count (1 file = 1 scrape) |
| 297 | +- Oldest file: Timestamp of oldest buffered scrape |
| 298 | +- Total size: Disk space used by buffer |
| 299 | + |
| 300 | +## Key Benefits |
| 301 | + |
| 302 | +1. **No data loss:** Metrics persisted before sending |
| 303 | +2. **Crash recovery:** Files survive agent restarts |
| 304 | +3. **Ordered delivery:** Oldest-first processing |
| 305 | +4. **Load distribution:** Random jitter prevents spikes |
| 306 | +5. **Efficient batching:** Process multiple files per iteration |
| 307 | +6. **Automatic cleanup:** Old files deleted after retention period |
| 308 | +7. **Resilient:** Continues working during network issues |
| 309 | + |
| 310 | +## Technical Details |
| 311 | + |
| 312 | +**Thread safety:** |
| 313 | +- Buffer operations use `sync.Mutex` for concurrent access |
| 314 | +- Single drain goroutine (no race conditions) |
| 315 | + |
| 316 | +**Error handling:** |
| 317 | +- Corrupted files deleted (logged as warnings) |
| 318 | +- Network errors keep files for retry |
| 319 | +- HTTP 4xx/5xx errors keep files for retry |
| 320 | + |
| 321 | +**Performance:** |
| 322 | +- Minimal disk I/O (small files, sequential writes) |
| 323 | +- Batch processing reduces HTTP overhead |
| 324 | +- Random jitter smooths load over time |
| 325 | + |
| 326 | +## Configuration Recommendations |
| 327 | + |
| 328 | +**Default settings (recommended for most deployments):** |
| 329 | +```yaml |
| 330 | +buffer: |
| 331 | + path: "/var/lib/node-pulse/buffer" |
| 332 | + retention_hours: 48 |
| 333 | + batch_size: 5 |
| 334 | +``` |
| 335 | + |
| 336 | +**High-traffic servers (>1000 agents):** |
| 337 | +```yaml |
| 338 | +buffer: |
| 339 | + batch_size: 10 # Drain faster when backlog builds up |
| 340 | +``` |
| 341 | + |
| 342 | +**Unstable networks:** |
| 343 | +```yaml |
| 344 | +buffer: |
| 345 | + retention_hours: 168 # 7 days (keep metrics longer) |
| 346 | +``` |
| 347 | + |
| 348 | +**Storage-constrained systems:** |
| 349 | +```yaml |
| 350 | +buffer: |
| 351 | + retention_hours: 24 # 1 day (cleanup more aggressively) |
| 352 | +``` |
0 commit comments