Skip to content

Commit 387c17a

Browse files
committed
feat: implement Write-Ahead Log pattern for metrics buffering and enhance buffer handling
1 parent b27a38c commit 387c17a

File tree

9 files changed

+293
-228
lines changed

9 files changed

+293
-228
lines changed

cmd/setup.go

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,19 @@ var (
1616
quickMode bool
1717

1818
// Config flags for quick mode
19-
flagEndpointURL string
20-
flagServerID string
21-
flagInterval string
22-
flagTimeout string
23-
flagBufferEnabled bool
24-
flagBufferDir string
25-
flagBufferRetention int
26-
flagLogLevel string
27-
flagLogOutput string
28-
flagLogFile string
29-
flagLogMaxSize int
30-
flagLogMaxBackups int
31-
flagLogMaxAge int
32-
flagLogCompress bool
19+
flagEndpointURL string
20+
flagServerID string
21+
flagInterval string
22+
flagTimeout string
23+
flagBufferDir string
24+
flagBufferRetention int
25+
flagLogLevel string
26+
flagLogOutput string
27+
flagLogFile string
28+
flagLogMaxSize int
29+
flagLogMaxBackups int
30+
flagLogMaxAge int
31+
flagLogCompress bool
3332
)
3433

3534
// setupCmd represents the setup command
@@ -56,8 +55,7 @@ func init() {
5655
setupCmd.Flags().StringVar(&flagServerID, "server-id", "", "Server ID (auto-generated UUID if not provided)")
5756
setupCmd.Flags().StringVar(&flagInterval, "interval", "5s", "Metric collection interval (5s, 10s, 30s, or 1m)")
5857

59-
// Buffer configuration flags
60-
setupCmd.Flags().BoolVar(&flagBufferEnabled, "buffer-enabled", true, "Enable offline buffering")
58+
// Buffer configuration flags (buffer is always enabled)
6159
setupCmd.Flags().StringVar(&flagBufferDir, "buffer-dir", "/var/lib/node-pulse/buffer", "Buffer directory path")
6260
setupCmd.Flags().IntVar(&flagBufferRetention, "buffer-retention", 48, "Buffer retention in hours")
6361

@@ -161,8 +159,7 @@ func runQuickMode(existing *installer.ExistingInstall) error {
161159
ServerID: finalServerID,
162160
Interval: flagInterval,
163161

164-
// Buffer options
165-
BufferEnabled: flagBufferEnabled,
162+
// Buffer options (always enabled)
166163
BufferPath: flagBufferDir,
167164
BufferRetentionHours: flagBufferRetention,
168165

@@ -178,12 +175,12 @@ func runQuickMode(existing *installer.ExistingInstall) error {
178175

179176
fmt.Println()
180177
fmt.Printf("Configuration summary:\n")
181-
fmt.Printf(" Endpoint: %s\n", opts.Endpoint)
182-
fmt.Printf(" Server ID: %s\n", opts.ServerID)
183-
fmt.Printf(" Interval: %s\n", opts.Interval)
184-
fmt.Printf(" Timeout: %s\n", opts.Timeout)
185-
fmt.Printf(" Buffer enabled: %v\n", opts.BufferEnabled)
186-
fmt.Printf(" Log level: %s\n", opts.LogLevel)
178+
fmt.Printf(" Endpoint: %s\n", opts.Endpoint)
179+
fmt.Printf(" Server ID: %s\n", opts.ServerID)
180+
fmt.Printf(" Interval: %s\n", opts.Interval)
181+
fmt.Printf(" Timeout: %s\n", opts.Timeout)
182+
fmt.Printf(" Buffer path: %s\n", opts.BufferPath)
183+
fmt.Printf(" Log level: %s\n", opts.LogLevel)
187184
fmt.Println()
188185

189186
// Perform installation

cmd/setup_tui.go

Lines changed: 11 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -666,10 +666,7 @@ func (m setupTUIModel) viewReview() string {
666666
summary.WriteString(labelStyle.Render("Server ID:") + " " + valueStyle.Render(m.config.ServerID) + "\n")
667667
summary.WriteString(labelStyle.Render("Interval:") + " " + valueStyle.Render(m.config.Interval) + "\n")
668668
summary.WriteString(labelStyle.Render("Timeout:") + " " + valueStyle.Render(m.config.Timeout) + "\n")
669-
bufferStatus := "Disabled"
670-
if m.config.BufferEnabled {
671-
bufferStatus = fmt.Sprintf("Enabled (%dh retention)", m.config.BufferRetentionHours)
672-
}
669+
bufferStatus := fmt.Sprintf("%s (%dh retention)", m.config.BufferPath, m.config.BufferRetentionHours)
673670
summary.WriteString(labelStyle.Render("Buffer:") + " " + valueStyle.Render(bufferStatus) + "\n")
674671
summary.WriteString(labelStyle.Render("Logging:") + " " + valueStyle.Render(fmt.Sprintf("%s → %s", m.config.LogLevel, m.config.LogOutput)) + "\n")
675672
summary.WriteString(labelStyle.Render("Config Path:") + " " + valueStyle.Render("/etc/node-pulse/nodepulse.yml"))
@@ -902,29 +899,16 @@ func (m setupTUIModel) handleEnter() (tea.Model, tea.Cmd) {
902899
m.config.Timeout = timeout
903900
m.err = nil
904901

905-
// Move to buffer screen
906-
m.screen = ScreenBuffer
907-
if m.config.BufferEnabled {
908-
m.textInput.SetValue("yes")
909-
} else {
910-
m.textInput.SetValue("no")
911-
}
912-
m.textInput.Placeholder = "yes/no"
902+
// Skip buffer screen (buffer is always enabled now) and go to logging
903+
m.screen = ScreenLogging
904+
m.textInput.SetValue(m.config.LogLevel)
905+
m.textInput.Placeholder = "debug, info, warn, error"
913906
m.textInput.Focus()
914907
return m, textinput.Blink
915908

916909
case ScreenBuffer:
917-
// Parse buffer settings
918-
input := strings.ToLower(strings.TrimSpace(m.textInput.Value()))
919-
if input != "yes" && input != "no" {
920-
m.err = fmt.Errorf("enter 'yes' or 'no'")
921-
return m, nil
922-
}
923-
924-
m.config.BufferEnabled = (input == "yes")
925-
m.err = nil
926-
927-
// Move to logging screen
910+
// Buffer screen is deprecated (buffer is always enabled)
911+
// Skip to logging screen
928912
m.screen = ScreenLogging
929913
m.textInput.SetValue(m.config.LogLevel)
930914
m.textInput.Placeholder = "debug, info, warn, error"
@@ -1005,14 +989,10 @@ func (m setupTUIModel) handleBack() (tea.Model, tea.Cmd) {
1005989
return m, textinput.Blink
1006990

1007991
case ScreenLogging:
1008-
// Go back to buffer
1009-
m.screen = ScreenBuffer
1010-
m.textInput.Placeholder = "yes/no"
1011-
if m.config.BufferEnabled {
1012-
m.textInput.SetValue("yes")
1013-
} else {
1014-
m.textInput.SetValue("no")
1015-
}
992+
// Go back to timeout (buffer screen is skipped)
993+
m.screen = ScreenTimeout
994+
m.textInput.Placeholder = "3s"
995+
m.textInput.SetValue(m.config.Timeout)
1016996
m.textInput.Focus()
1017997
return m, textinput.Blink
1018998

cmd/start.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ func runAgent(cmd *cobra.Command, args []string) error {
8888
}
8989
defer sender.Close()
9090

91+
// Start background draining goroutine (WAL pattern)
92+
// This continuously sends buffered reports with random jitter
93+
sender.StartDraining()
94+
9195
// Setup signal handling for graceful shutdown
9296
ctx, cancel := context.WithCancel(context.Background())
9397
defer cancel()
@@ -110,19 +114,20 @@ func runAgent(cmd *cobra.Command, args []string) error {
110114
logger.Duration("interval", cfg.Agent.Interval),
111115
logger.String("endpoint", cfg.Server.Endpoint))
112116

113-
// Collect and send immediately on start
117+
// Collect and save to buffer immediately on start
114118
if err := collectAndSend(sender, cfg.Agent.ServerID); err != nil {
115-
logger.Error("Collection and send failed", logger.Err(err))
119+
logger.Error("Collection failed", logger.Err(err))
116120
}
117121

118-
// Then continue with ticker
122+
// Then continue with ticker (collect and save to buffer at interval)
123+
// Background goroutine will drain buffer with random jitter
119124
for {
120125
select {
121126
case <-ctx.Done():
122127
return nil
123128
case <-ticker.C:
124129
if err := collectAndSend(sender, cfg.Agent.ServerID); err != nil {
125-
logger.Error("Collection and send failed", logger.Err(err))
130+
logger.Error("Collection failed", logger.Err(err))
126131
}
127132
}
128133
}
@@ -139,16 +144,16 @@ func collectAndSend(sender *report.Sender, serverID string) error {
139144
stats := metrics.GetGlobalStats()
140145
stats.RecordCollection(metricsReport)
141146

142-
// Send report
147+
// Save to buffer (WAL pattern - actual sending happens in background with jitter)
143148
if err := sender.Send(metricsReport); err != nil {
144-
// Record failure
149+
// Record failure (failed to save to buffer)
145150
stats.RecordFailure()
146-
return fmt.Errorf("failed to send report: %w", err)
151+
return fmt.Errorf("failed to save to buffer: %w", err)
147152
}
148153

149-
// Record success
154+
// Record success (successfully buffered)
150155
stats.RecordSuccess()
151-
logger.Info("Report sent successfully")
156+
logger.Info("Report collected and buffered")
152157
return nil
153158
}
154159

cmd/status.go

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,18 +59,14 @@ func runStatus(cmd *cobra.Command, args []string) error {
5959
fmt.Printf("Agent: %s\n", serviceStatus)
6060
fmt.Println()
6161

62-
// Buffer Status
63-
if cfg.Buffer.Enabled {
64-
bufferCount, err := countBufferFiles(cfg.Buffer.Path)
65-
if err != nil {
66-
fmt.Printf("Buffer: enabled (error checking: %v)\n", err)
67-
} else if bufferCount > 0 {
68-
fmt.Printf("Buffer: %d report(s) pending in %s\n", bufferCount, cfg.Buffer.Path)
69-
} else {
70-
fmt.Printf("Buffer: enabled, no pending reports\n")
71-
}
62+
// Buffer Status (always enabled in new architecture)
63+
bufferCount, err := countBufferFiles(cfg.Buffer.Path)
64+
if err != nil {
65+
fmt.Printf("Buffer: error checking: %v\n", err)
66+
} else if bufferCount > 0 {
67+
fmt.Printf("Buffer: %d report(s) pending in %s\n", bufferCount, cfg.Buffer.Path)
7268
} else {
73-
fmt.Printf("Buffer: disabled\n")
69+
fmt.Printf("Buffer: no pending reports\n")
7470
}
7571
fmt.Println()
7672

internal/config/config.go

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ type AgentConfig struct {
3434
}
3535

3636
// BufferConfig represents buffer settings
37+
// Note: Buffer is always enabled in the new architecture (write-ahead log pattern)
3738
type BufferConfig struct {
38-
Enabled bool `mapstructure:"enabled"`
3939
Path string `mapstructure:"path"`
4040
RetentionHours int `mapstructure:"retention_hours"`
4141
}
@@ -50,7 +50,6 @@ var (
5050
Interval: 5 * time.Second,
5151
},
5252
Buffer: BufferConfig{
53-
Enabled: true,
5453
Path: "/var/lib/node-pulse/buffer",
5554
RetentionHours: 48,
5655
},
@@ -121,7 +120,6 @@ func setDefaults(v *viper.Viper) {
121120
v.SetDefault("server.endpoint", defaultConfig.Server.Endpoint)
122121
v.SetDefault("server.timeout", defaultConfig.Server.Timeout)
123122
v.SetDefault("agent.interval", defaultConfig.Agent.Interval)
124-
v.SetDefault("buffer.enabled", defaultConfig.Buffer.Enabled)
125123
v.SetDefault("buffer.path", defaultConfig.Buffer.Path)
126124
v.SetDefault("buffer.retention_hours", defaultConfig.Buffer.RetentionHours)
127125
v.SetDefault("logging.level", defaultConfig.Logging.Level)
@@ -175,13 +173,12 @@ func validate(cfg *Config) error {
175173
return fmt.Errorf("agent.interval must be one of: 5s, 10s, 30s, 1m")
176174
}
177175

178-
if cfg.Buffer.Enabled {
179-
if cfg.Buffer.Path == "" {
180-
return fmt.Errorf("buffer.path is required when buffer is enabled")
181-
}
182-
if cfg.Buffer.RetentionHours <= 0 {
183-
return fmt.Errorf("buffer.retention_hours must be positive")
184-
}
176+
// Buffer is always enabled now
177+
if cfg.Buffer.Path == "" {
178+
return fmt.Errorf("buffer.path is required")
179+
}
180+
if cfg.Buffer.RetentionHours <= 0 {
181+
return fmt.Errorf("buffer.retention_hours must be positive")
185182
}
186183

187184
return nil
@@ -227,23 +224,13 @@ func isAlphanumeric(c rune) bool {
227224

228225
// EnsureBufferDir creates the buffer directory if it doesn't exist
229226
func (c *Config) EnsureBufferDir() error {
230-
if !c.Buffer.Enabled {
231-
return nil
232-
}
233-
234227
if err := os.MkdirAll(c.Buffer.Path, 0755); err != nil {
235228
return fmt.Errorf("failed to create buffer directory: %w", err)
236229
}
237230

238231
return nil
239232
}
240233

241-
// GetBufferFilePath returns the path for the current hour's buffer file
242-
func (c *Config) GetBufferFilePath(t time.Time) string {
243-
filename := fmt.Sprintf("%s.jsonl", t.Format("2006-01-02-15"))
244-
return filepath.Join(c.Buffer.Path, filename)
245-
}
246-
247234
// ConfigExists checks if a configuration file exists in any of the standard locations
248235
func ConfigExists(configPath string) bool {
249236
// If explicit config path provided, check only that

internal/installer/installer.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ type ConfigOptions struct {
3333
ServerID string
3434
Interval string
3535

36-
// Buffer options
37-
BufferEnabled bool
36+
// Buffer options (buffer is always enabled in new architecture)
3837
BufferPath string
3938
BufferRetentionHours int
4039

@@ -207,8 +206,7 @@ func DefaultConfigOptions() ConfigOptions {
207206
// Agent defaults
208207
Interval: "5s",
209208

210-
// Buffer defaults
211-
BufferEnabled: true,
209+
// Buffer defaults (always enabled)
212210
BufferPath: DefaultBufferPath,
213211
BufferRetentionHours: 48,
214212

@@ -236,7 +234,6 @@ func WriteConfigFile(opts ConfigOptions) error {
236234
"interval": opts.Interval,
237235
},
238236
"buffer": map[string]interface{}{
239-
"enabled": opts.BufferEnabled,
240237
"path": opts.BufferPath,
241238
"retention_hours": opts.BufferRetentionHours,
242239
},

0 commit comments

Comments
 (0)