Skip to content

Commit 7934439

Browse files
committed
feat(nginx_log): implement true incremental indexing for log files
1 parent 43dba4c commit 7934439

File tree

2 files changed

+269
-35
lines changed

2 files changed

+269
-35
lines changed

internal/cron/incremental_indexing.go

Lines changed: 265 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package cron
22

33
import (
4+
"compress/gzip"
5+
"context"
46
"fmt"
7+
"io"
58
"os"
9+
"strings"
610
"time"
711

812
"github.com/0xJacky/Nginx-UI/internal/nginx_log"
@@ -84,7 +88,7 @@ func performIncrementalIndexing() {
8488
for _, log := range allLogs {
8589
// Check if file needs incremental indexing
8690
if needsIncrementalIndexing(log, persistence) {
87-
logger.Infof("Starting incremental indexing for file: %s", log.Path)
91+
logger.Debugf("Starting incremental indexing for file: %s", log.Path)
8892

8993
// Set status to indexing
9094
if err := setFileIndexStatus(log.Path, string(indexer.IndexStatusIndexing), logFileManager); err != nil {
@@ -110,7 +114,7 @@ func performIncrementalIndexing() {
110114
}
111115

112116
if changedCount > 0 {
113-
logger.Infof("Completed incremental indexing for %d log files", changedCount)
117+
logger.Debugf("Completed incremental indexing for %d log files", changedCount)
114118
// Update searcher shards once after all files are processed
115119
nginx_log.UpdateSearcherShards()
116120
} else {
@@ -200,7 +204,8 @@ func needsIncrementalIndexing(log *nginx_log.NginxLogWithIndex, persistence logI
200204
return false
201205
}
202206

203-
// performSingleFileIncrementalIndexing performs incremental indexing for a single file synchronously
207+
// performSingleFileIncrementalIndexing performs TRUE incremental indexing for a single file synchronously
208+
// This implements real incremental indexing by using LastPosition to only read new content
204209
func performSingleFileIncrementalIndexing(logPath string, modernIndexer interface{}, logFileManager interface{}) error {
205210
defer func() {
206211
// Ensure status is always updated, even on panic
@@ -210,57 +215,284 @@ func performSingleFileIncrementalIndexing(logPath string, modernIndexer interfac
210215
}
211216
}()
212217

213-
// Perform incremental indexing
218+
lfm, ok := logFileManager.(*indexer.LogFileManager)
219+
if !ok {
220+
return fmt.Errorf("invalid log file manager type")
221+
}
222+
223+
persistence := lfm.GetPersistence()
224+
if persistence == nil {
225+
return fmt.Errorf("persistence not available")
226+
}
227+
228+
// Get current file info
229+
fileInfo, err := os.Stat(logPath)
230+
if err != nil {
231+
return fmt.Errorf("failed to stat file: %w", err)
232+
}
233+
234+
currentSize := fileInfo.Size()
235+
isGzipped := strings.HasSuffix(strings.ToLower(logPath), ".gz")
236+
237+
// Check existing index metadata
238+
existingIndex, err := persistence.GetLogIndex(logPath)
239+
if err != nil {
240+
logger.Warnf("Could not get existing log index for %s: %v", logPath, err)
241+
}
242+
243+
var startPosition int64 = 0
244+
var existingDocCount uint64 = 0
245+
246+
if existingIndex != nil {
247+
if isGzipped {
248+
// For gzip files, we cannot reliably map persisted LastPosition (compressed bytes)
249+
// to a position in the decompressed stream. Treat every incremental run as a
250+
// full re-index when the file changes to avoid skipping or duplicating data.
251+
logger.Debugf("Gzip file %s detected; ignoring LastPosition and resetting document count for full re-index (last_size=%d, current_size=%d)",
252+
logPath, existingIndex.LastSize, currentSize)
253+
startPosition = 0
254+
existingDocCount = 0
255+
} else {
256+
existingDocCount = existingIndex.DocumentCount
257+
258+
// Detect file rotation (size decreased)
259+
if currentSize < existingIndex.LastSize {
260+
startPosition = 0
261+
existingDocCount = 0 // Reset count for rotated file
262+
logger.Debugf("Log rotation detected for %s: size %d -> %d, full re-index",
263+
logPath, existingIndex.LastSize, currentSize)
264+
} else if existingIndex.LastPosition > 0 && existingIndex.LastPosition < currentSize {
265+
// TRUE INCREMENTAL: File grew, resume from last position
266+
startPosition = existingIndex.LastPosition
267+
logger.Debugf("TRUE INCREMENTAL: %s grew %d -> %d bytes, reading from position %d",
268+
logPath, existingIndex.LastSize, currentSize, startPosition)
269+
} else if existingIndex.LastPosition == currentSize {
270+
// File unchanged
271+
logger.Debugf("File %s unchanged (size=%d, position=%d), skipping",
272+
logPath, currentSize, existingIndex.LastPosition)
273+
return nil
274+
} else if existingIndex.LastPosition == 0 && existingDocCount > 0 {
275+
// Inconsistent state: we have documents but no recorded position.
276+
// Treat this as a full re-index from the beginning to avoid duplicate counting.
277+
logger.Debugf("Inconsistent index state for %s (docs=%d, last_position=0); resetting existing count and re-indexing from start",
278+
logPath, existingDocCount)
279+
startPosition = 0
280+
existingDocCount = 0
281+
}
282+
}
283+
}
284+
285+
// Perform incremental indexing with position-aware reading
214286
startTime := time.Now()
215-
docsCountMap, minTime, maxTime, err := modernIndexer.(*indexer.ParallelIndexer).IndexSingleFileIncrementally(logPath, nil)
287+
newDocsIndexed, minTime, maxTime, finalPosition, err := indexFileFromPosition(
288+
modernIndexer.(*indexer.ParallelIndexer),
289+
logPath,
290+
startPosition,
291+
)
216292

217293
if err != nil {
218294
return fmt.Errorf("indexing failed: %w", err)
219295
}
220296

221-
// Calculate total documents indexed
222-
var totalDocsIndexed uint64
223-
for _, docCount := range docsCountMap {
224-
totalDocsIndexed += docCount
297+
duration := time.Since(startTime)
298+
finalDocCount := existingDocCount + newDocsIndexed
299+
300+
// Save metadata with updated position
301+
if err := lfm.SaveIndexMetadata(logPath, finalDocCount, startTime, duration, minTime, maxTime); err != nil {
302+
return fmt.Errorf("failed to save metadata: %w", err)
225303
}
226304

227-
// Save indexing metadata
228-
duration := time.Since(startTime)
305+
// CRITICAL FIX for Bug 1 & Bug 2:
306+
// Re-fetch the index record after SaveIndexMetadata to ensure we have the latest data
307+
// (SaveIndexMetadata internally updates LastModified and other fields)
308+
// Then update LastPosition which is critical for true incremental indexing
309+
updatedIndex, err := persistence.GetLogIndex(logPath)
310+
if err != nil {
311+
// If we still can't get it, this is a critical error as LastPosition won't be persisted
312+
return fmt.Errorf("failed to get index after save (LastPosition will be lost): %w", err)
313+
}
314+
315+
// Get the CURRENT file info again to ensure we record the latest size and modification time
316+
finalFileInfo, err := os.Stat(logPath)
317+
if err != nil {
318+
return fmt.Errorf("failed to stat file after indexing: %w", err)
319+
}
320+
321+
// Update position to the end of the data we actually read.
322+
// For non-gzip files, this is the byte offset returned by indexFileFromPosition.
323+
// For gzip files, LastPosition is not used for incremental seeks and is kept for diagnostics only.
324+
updatedIndex.LastPosition = finalPosition
325+
updatedIndex.LastSize = finalFileInfo.Size()
326+
updatedIndex.LastModified = finalFileInfo.ModTime()
229327

230-
if lfm, ok := logFileManager.(*indexer.LogFileManager); ok {
231-
persistence := lfm.GetPersistence()
232-
var existingDocCount uint64
328+
if err := persistence.SaveLogIndex(updatedIndex); err != nil {
329+
return fmt.Errorf("failed to update LastPosition (incremental will fail next time): %w", err)
330+
}
331+
332+
logger.Debugf("TRUE INCREMENTAL completed: %s, new_docs=%d, total_docs=%d, position=%d->%d",
333+
logPath, newDocsIndexed, finalDocCount, startPosition, finalFileInfo.Size())
334+
return nil
335+
}
233336

234-
existingIndex, err := persistence.GetLogIndex(logPath)
337+
// indexFileFromPosition reads and indexes only the new content from a file starting at the given position.
338+
// This is the core implementation of TRUE incremental indexing.
339+
// It returns the number of successfully indexed documents, the time range, and the final byte position read.
340+
func indexFileFromPosition(pi *indexer.ParallelIndexer, filePath string, startPosition int64) (uint64, *time.Time, *time.Time, int64, error) {
341+
file, err := os.Open(filePath)
342+
if err != nil {
343+
return 0, nil, nil, 0, fmt.Errorf("failed to open file: %w", err)
344+
}
345+
defer file.Close()
346+
347+
fileInfo, err := file.Stat()
348+
if err != nil {
349+
return 0, nil, nil, 0, fmt.Errorf("failed to stat file: %w", err)
350+
}
351+
352+
fileSize := fileInfo.Size()
353+
isGzipped := strings.HasSuffix(strings.ToLower(filePath), ".gz")
354+
var reader io.Reader
355+
356+
if isGzipped {
357+
// Gzip files: must read from beginning and discard up to startPosition
358+
gzReader, err := gzip.NewReader(file)
235359
if err != nil {
236-
logger.Warnf("Could not get existing log index for %s: %v", logPath, err)
360+
return 0, nil, nil, 0, fmt.Errorf("failed to create gzip reader: %w", err)
361+
}
362+
defer gzReader.Close()
363+
364+
if startPosition > 0 {
365+
// WARNING: For large gzip files, this is still slow as we must decompress from start
366+
// Consider skipping gzip files that were recently indexed
367+
logger.Debugf("Gzip %s: reading %d bytes to skip to position %d", filePath, startPosition, startPosition)
368+
if _, err := io.CopyN(io.Discard, gzReader, startPosition); err != nil && err != io.EOF {
369+
return 0, nil, nil, 0, fmt.Errorf("failed to skip to position: %w", err)
370+
}
371+
}
372+
reader = gzReader
373+
} else {
374+
// Regular files: direct seek (fast!)
375+
if startPosition > 0 {
376+
if _, err := file.Seek(startPosition, io.SeekStart); err != nil {
377+
return 0, nil, nil, 0, fmt.Errorf("failed to seek: %w", err)
378+
}
379+
logger.Debugf("Seeked to position %d in %s (file size: %d, reading %d new bytes)",
380+
startPosition, filePath, fileSize, fileSize-startPosition)
381+
}
382+
reader = file
383+
}
384+
385+
// Parse only the new content
386+
ctx := context.Background()
387+
logDocs, err := indexer.ParseLogStream(ctx, reader, filePath)
388+
if err != nil {
389+
return 0, nil, nil, 0, fmt.Errorf("failed to parse new content: %w", err)
390+
}
391+
392+
// Calculate time range for new documents using stable values
393+
var (
394+
minTimeVal time.Time
395+
maxTimeVal time.Time
396+
hasMin bool
397+
hasMax bool
398+
)
399+
for _, doc := range logDocs {
400+
if doc.Timestamp <= 0 {
401+
continue
402+
}
403+
ts := time.Unix(doc.Timestamp, 0)
404+
if !hasMin || ts.Before(minTimeVal) {
405+
minTimeVal = ts
406+
hasMin = true
237407
}
408+
if !hasMax || ts.After(maxTimeVal) {
409+
maxTimeVal = ts
410+
hasMax = true
411+
}
412+
}
238413

239-
// Determine if the file was rotated by checking if the current size is smaller than the last recorded size.
240-
// This is a strong indicator of log rotation.
241-
fileInfo, statErr := os.Stat(logPath)
242-
isRotated := false
243-
if statErr == nil && existingIndex != nil && fileInfo.Size() < existingIndex.LastSize {
244-
isRotated = true
245-
logger.Infof("Log rotation detected for %s: new size %d is smaller than last size %d. Resetting document count.",
246-
logPath, fileInfo.Size(), existingIndex.LastSize)
414+
var minTime, maxTime *time.Time
415+
if hasMin {
416+
minTime = &minTimeVal
417+
}
418+
if hasMax {
419+
maxTime = &maxTimeVal
420+
}
421+
422+
// Index the new documents using batch writer
423+
var indexedDocCount uint64
424+
var finalPosition int64
425+
426+
// CRITICAL: Calculate finalPosition BEFORE batch operations to ensure it's available
427+
// even if batch.Flush() fails. This prevents losing track of where we read to.
428+
// Bug fix for issue where flush failure returns position=0, causing duplicate indexing.
429+
if !isGzipped {
430+
// For regular files, get current file position after ParseLogStream finished reading
431+
if pos, err := file.Seek(0, io.SeekCurrent); err == nil {
432+
finalPosition = pos
433+
} else {
434+
logger.Warnf("Failed to determine current read position for %s: %v", filePath, err)
435+
// Fallback: assume we read to EOF if we can't get position
436+
finalPosition = fileSize
247437
}
438+
} else {
439+
// For gzip files, we've decompressed the entire stream to EOF
440+
// LastPosition is not used for seeks but kept for diagnostics
441+
finalPosition = fileSize
442+
}
248443

249-
if existingIndex != nil && !isRotated {
250-
// If it's a normal incremental update (not a rotation), we build upon the existing count.
251-
existingDocCount = existingIndex.DocumentCount
444+
if len(logDocs) > 0 {
445+
batch := pi.StartBatch()
446+
447+
for i, doc := range logDocs {
448+
// Deterministic, segment-scoped document ID:
449+
// - filePath: physical log file
450+
// - startPosition: byte offset where this incremental segment begins
451+
// - i: index within this segment
452+
// This ensures:
453+
// * Uniqueness within a single run
454+
// * Stable IDs across retries for the same (filePath, startPosition) segment,
455+
// so re-processing due to errors overwrites instead of creating duplicates.
456+
docID := fmt.Sprintf("%s_%d_%d", filePath, startPosition, i)
457+
458+
document := &indexer.Document{
459+
ID: docID,
460+
Fields: doc,
461+
}
462+
if err := batch.Add(document); err != nil {
463+
// If Add fails, an auto-flush may have failed internally. We conservatively
464+
// treat this document as not indexed and continue with the remaining ones.
465+
logger.Warnf("Failed to add document %s: %v", docID, err)
466+
continue
467+
}
468+
indexedDocCount++
252469
}
253-
// If the file was rotated, existingDocCount remains 0, effectively starting the count over for the new file.
254470

255-
finalDocCount := existingDocCount + totalDocsIndexed
471+
// At this point:
472+
// indexedDocCount = total documents successfully handed to the batch writer
473+
// batch.Size() = documents currently buffered but NOT yet flushed.
474+
// Any documents that were auto-flushed due to internal batch limits have already
475+
// been sent to the indexer and removed from the internal buffer.
476+
pendingBeforeFlush := batch.Size()
477+
autoFlushedCount := indexedDocCount
478+
if pendingBeforeFlush > 0 && indexedDocCount >= uint64(pendingBeforeFlush) {
479+
autoFlushedCount = indexedDocCount - uint64(pendingBeforeFlush)
480+
}
256481

257-
if err := lfm.SaveIndexMetadata(logPath, finalDocCount, startTime, duration, minTime, maxTime); err != nil {
258-
return fmt.Errorf("failed to save metadata: %w", err)
482+
if _, err := batch.Flush(); err != nil {
483+
// CRITICAL BUG FIX: Return the actual finalPosition we calculated earlier,
484+
// not 0. This ensures that even on flush failure, the next incremental run
485+
// knows where we read to and won't duplicate the auto-flushed documents.
486+
logger.Warnf("Final batch flush failed for %s: %v (auto-flushed docs=%d, pending=%d, position will be saved as=%d)",
487+
filePath, err, autoFlushedCount, pendingBeforeFlush, finalPosition)
488+
return autoFlushedCount, minTime, maxTime, finalPosition, fmt.Errorf("failed to flush batch: %w", err)
259489
}
260490
}
261491

262-
logger.Infof("Successfully completed incremental indexing for %s, Documents: %d", logPath, totalDocsIndexed)
263-
return nil
492+
logger.Debugf("Indexed %d NEW documents from %s (position %d -> %d)",
493+
indexedDocCount, filePath, startPosition, fileSize)
494+
495+
return indexedDocCount, minTime, maxTime, finalPosition, nil
264496
}
265497

266498
// setFileIndexStatus updates the index status for a file in the database using enhanced status management

internal/nginx_log/indexer/parallel_indexer.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -872,8 +872,10 @@ func (pi *ParallelIndexer) IndexLogGroupWithRotationScanning(basePaths []string,
872872
return docsCountMap, overallMinTime, overallMaxTime, nil
873873
}
874874

875-
// IndexSingleFileIncrementally is a more efficient version for incremental updates.
876-
// It indexes only the specified single file instead of the entire log group.
875+
// IndexSingleFileIncrementally indexes a single file (not the entire log group).
876+
// Note: The actual incremental logic (using LastPosition) is implemented in the cron job layer
877+
// to have access to persistence. This method performs a full file scan.
878+
// For true incremental behavior, see internal/cron/incremental_indexing.go
877879
func (pi *ParallelIndexer) IndexSingleFileIncrementally(filePath string, progressConfig *ProgressConfig) (map[string]uint64, *time.Time, *time.Time, error) {
878880
if !pi.IsHealthy() {
879881
return nil, nil, nil, fmt.Errorf("indexer not healthy")

0 commit comments

Comments
 (0)