From 5e50a2f69537d2ce36ab0663d2e367bd05a9abba Mon Sep 17 00:00:00 2001 From: Masafumi Yamamoto Date: Mon, 13 Feb 2023 18:04:12 +0900 Subject: [PATCH] =?UTF-8?q?scanner=E3=82=92=E4=BD=BF=E3=82=8F=E3=81=AA?= =?UTF-8?q?=E3=81=84=E3=82=88=E3=81=86=E3=81=AB=E5=A4=89=E6=9B=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.sum | 1 + tail.go | 264 ++++++++++++++++++++++++++++++++++----------------- tail_test.go | 96 +++++++++++++++++++ 3 files changed, 272 insertions(+), 89 deletions(-) create mode 100644 tail_test.go diff --git a/go.sum b/go.sum index 7534661..dd0bc19 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,4 @@ +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/tail.go b/tail.go index 93573ae..6690cd7 100644 --- a/tail.go +++ b/tail.go @@ -1,7 +1,8 @@ package gotail import ( - "bufio" + "errors" + "io" "io/ioutil" "os" "syscall" @@ -10,6 +11,16 @@ import ( "gopkg.in/yaml.v2" ) +var ( + // DefaultBufSize is default buffer size + // If one line of log is too long, please adjust + DefaultBufSize = 2008 + // If true, there is no pos file Start reading from the end of the file + InitialReadPositionEnd = false + // timeout for readLine + ReadLineTimeout = 200 * time.Millisecond +) + // Tail is tail file struct type Tail struct { file string @@ -17,12 +28,20 @@ type Tail struct { posFile string posFd *os.File Stat Stat - data []byte + buf []byte + start int + end int + n int + offset1 int64 + offset2 int64 + nextStart int + eofCount int + isEnd bool + bufEmpty bool init bool - scanner *bufio.Scanner err error isCreatePosFile bool - InitialReadPositionEnd bool // If true, there is no pos file Start reading from the end of the file + InitialReadPositionEnd bool // deprecated } // Stat tail stats infomation struct @@ -37,11 +56,20 @@ func Open(file string, posfile string) (*Tail, error) { var err error posStat := Stat{} t := Tail{ - file: file, - posFile: posfile, - init: true, + file: file, + posFile: posfile, + init: true, + bufEmpty: true, + } + + // compatibility maintenance + if t.InitialReadPositionEnd { + InitialReadPositionEnd = true } + // create buffer + t.buf = make([]byte, DefaultBufSize) + // open position file if t.posFile != "" { t.posFd, err = os.OpenFile(t.posFile, os.O_RDWR, 0644) @@ -83,6 +111,7 @@ func Open(file string, posfile string) (*Tail, error) { if stat.Ino == posStat.Inode && stat.Size >= posStat.Size { // If the inode is not changed, restart from the subsequent Offset. t.Stat.Offset = posStat.Offset + t.offset1 = posStat.Offset } else { // If the file size is small, set the offset to 0. t.Stat.Offset = 0 @@ -95,7 +124,7 @@ func Open(file string, posfile string) (*Tail, error) { } // tail seek posititon. - _, err = t.fileFd.Seek(t.Stat.Offset, os.SEEK_SET) + _, err = t.fileFd.Seek(t.Stat.Offset, io.SeekStart) if err != nil { return &t, err } @@ -123,7 +152,7 @@ func (t *Tail) PositionUpdate() error { return nil } t.posFd.Truncate(0) - t.posFd.Seek(0, 0) + t.posFd.Seek(0, io.SeekStart) yml, err := yaml.Marshal(&t.Stat) if err != nil { @@ -144,12 +173,12 @@ func (t *Tail) PositionUpdate() error { // Bytes is get one line bytes. func (t *Tail) Bytes() []byte { - return t.data + return t.buf[t.start:t.end] } // Text is get one line strings. func (t *Tail) Text() string { - return string(t.data) + return string(t.Bytes()) } // Err is get Scan error @@ -157,106 +186,163 @@ func (t *Tail) Err() error { return t.err } -// Scan is start scan. -func (t *Tail) Scan() bool { +// scanInit is only executed the first time Scan is run +func (t *Tail) scanInit() { if t.init { // there is no pos file Start reading from the end of the file - if (t.InitialReadPositionEnd && t.isCreatePosFile) || - (t.InitialReadPositionEnd && t.posFile == "") { - t.fileFd.Seek(0, os.SEEK_END) + if (InitialReadPositionEnd && t.isCreatePosFile) || + (InitialReadPositionEnd && t.posFile == "") { + t.fileFd.Seek(0, io.SeekEnd) } - t.scanner = bufio.NewScanner(t.fileFd) t.init = false } +} - for { - if t.scanner.Scan() { - t.data = t.scanner.Bytes() - return true - } - - if err := t.scanner.Err(); err != nil { - t.err = err - return false - } +// Scan is start scan. +func (t *Tail) Scan() bool { + var err error + // Executed only the first time + t.scanInit() - // ステータスをアップデート - fdstat, err := t.fileFd.Stat() - if err != nil { - t.err = err - return false - } - s := fdstat.Sys().(*syscall.Stat_t) - t.Stat.Inode = s.Ino - t.Stat.Size = s.Size - t.Stat.Offset, err = t.fileFd.Seek(0, os.SEEK_CUR) + // Change start to new position + t.start = t.nextStart - // posファイルアップデート - err = t.PositionUpdate() - if err != nil { - t.err = err - return false - } + for { + // buffer empty + if t.bufEmpty { + // change offset + t.offset2, _ = t.fileFd.Seek(t.offset1, io.SeekStart) - for { - // ファイルを開く - fd, err := os.Open(t.file) - if os.IsNotExist(err) { - //ファイルがない場合は待つ - time.Sleep(time.Second) + // read file + t.n, err = t.fileFd.Read(t.buf) + if t.n == 0 || errors.Is(err, io.EOF) { + // EOF file check + t.eofCount++ + if t.eofCount > 5 { + t.eofCount = 0 + t.fileCheck() + continue + } + // sleep & next buffer read + time.Sleep(ReadLineTimeout / 5) continue } else if err != nil { t.err = err - return false } - newFdStat, err := fd.Stat() - if err != nil { - t.err = err - return false + t.bufEmpty = false + } + t.eofCount = 0 + + // search newline + for i := t.start; i < t.n; i++ { + if t.buf[i] == '\n' { + t.end = i + t.nextStart = i + 1 + t.isEnd = false + return true } - newStat := newFdStat.Sys().(*syscall.Stat_t) + } - // 変更がない場合待って再度チェックする - if t.Stat.Inode == newStat.Ino && t.Stat.Size == newStat.Size { - fd.Close() - time.Sleep(100 * time.Millisecond) + // not found newline + // Move offset to last newline + t.offset1 = t.offset1 + int64(t.end) + t.bufEmpty = true + // If offset1 and offset2 are the same, the file has not been updated, + // so wait a certain amount of time and read it again + if t.offset1 == t.offset2 { + if !t.isEnd { + t.isEnd = true + time.Sleep(ReadLineTimeout) continue } + t.isEnd = false + t.end = t.n + // possiton update + t.Stat.Offset = t.offset1 - 1 + t.PositionUpdate() + return true + } else { + // Move offset by line feed code + t.offset1++ + t.start = 0 + t.end = 0 + t.nextStart = 0 + } + } +} - // inodeの変更があったら新しいファイルに差し替える - if t.Stat.Inode != newStat.Ino { - t.Stat.Inode = newStat.Ino - t.Stat.Offset = 0 - t.Stat.Size = newStat.Size - t.fileFd.Close() - t.fileFd = fd - t.scanner = bufio.NewScanner(t.fileFd) - break - } +func (t *Tail) fileCheck() error { + // status update + fdstat, err := t.fileFd.Stat() + if err != nil { + return err + } + s := fdstat.Sys().(*syscall.Stat_t) + t.Stat.Inode = s.Ino + t.Stat.Size = s.Size + t.Stat.Offset = t.offset1 - 1 - // サイズ小さくなっていたらSEEK位置を先頭の戻す - if newStat.Size < t.Stat.Size { - _, err = t.fileFd.Seek(0, os.SEEK_SET) - if err != nil { - t.err = err - return false - } - t.Stat.Size = newStat.Size - t.scanner = bufio.NewScanner(t.fileFd) - fd.Close() - break + // update position file + err = t.PositionUpdate() + if err != nil { + return err + } + + // find new file + for { + // open file + fd, err := os.Open(t.file) + if os.IsNotExist(err) { + // sleep & next file check + time.Sleep(time.Second) + continue + } else if err != nil { + return err + } + newFdStat, err := fd.Stat() + if err != nil { + return err + } + newStat := newFdStat.Sys().(*syscall.Stat_t) + + // If there is no change in inode and size, wait a little longer + if t.Stat.Inode == newStat.Ino && t.Stat.Size == newStat.Size { + fd.Close() + time.Sleep(100 * time.Millisecond) + continue + } + + // Replace any inode changes with new files + if t.Stat.Inode != newStat.Ino { + t.Stat.Inode = newStat.Ino + t.Stat.Offset = 0 + t.offset1 = 0 + t.Stat.Size = newStat.Size + t.fileFd.Close() + t.fileFd = fd + break + } + + // If the size is smaller, move the SEEK position back to the beginning + if newStat.Size < t.Stat.Size { + _, err = t.fileFd.Seek(0, io.SeekStart) + if err != nil { + return err } + t.Stat.Size = newStat.Size + fd.Close() + break + } - if newStat.Size > t.Stat.Size { - _, err := t.fileFd.Seek(t.Stat.Offset, os.SEEK_SET) - if err != nil { - t.err = err - return false - } - t.scanner = bufio.NewScanner(t.fileFd) - fd.Close() - break + if newStat.Size > t.Stat.Size { + _, err := t.fileFd.Seek(t.Stat.Offset, io.SeekStart) + if err != nil { + return err } + fd.Close() + break } } + + return nil } diff --git a/tail_test.go b/tail_test.go new file mode 100644 index 0000000..869d122 --- /dev/null +++ b/tail_test.go @@ -0,0 +1,96 @@ +package gotail + +import ( + "bytes" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "testing" + "time" +) + +var tmp string + +func TestMain(m *testing.M) { + var err error + tmp, err = ioutil.TempDir("", "gotail_test") + if err != nil { + panic(err) + } + defer os.RemoveAll(tmp) + + // generate test log file + go func() { + path := filepath.Join(tmp, "test.txt") + fd, err := os.Create(path) + if err != nil { + panic(err) + } + defer fd.Close() + i := 0 + for { + now := time.Now() + fd.WriteString(strconv.Itoa(i) + " ") + fd.WriteString(now.Format("random mojiretsu")) + fd.WriteString("\n") + i++ + } + }() + time.Sleep(time.Millisecond) + ret := m.Run() + os.RemoveAll(tmp) + os.Exit(ret) +} + +func TestOpen(t *testing.T) { + path := filepath.Join(tmp, "test.txt") + posPath := filepath.Join(tmp, "test.txt.pos") + _, err := Open(path, posPath) + if err != nil { + t.Fatal(err) + } +} + +func TestScan(t *testing.T) { + path := filepath.Join(tmp, "test.txt") + tail, err := Open(path, "") + if err != nil { + t.Fatal(err) + } + defer tail.Close() + + if !tail.Scan() { + t.Fatal(fmt.Errorf("scan error")) + } +} + +func TestBytes(t *testing.T) { + path := filepath.Join(tmp, "test.txt") + posPath := filepath.Join(tmp, "test.txt.pos") + tail, err := Open(path, posPath) + InitialReadPositionEnd = true + if err != nil { + t.Fatal(err) + } + defer tail.Close() + + i := 0 + for tail.Scan() { + if err := tail.Err(); err != nil { + t.Fatal(err) + } + if !bytes.Contains(tail.Bytes(), []byte(strconv.Itoa(i)+" random mojiretsu")) { + t.Fatal(strconv.Itoa(i)+" lines read miss match", "sample:"+strconv.Itoa(i)+" random mojiretsu", "read:"+string(tail.Bytes())) + } + if i > 100000 { + break + } + i++ + } + + if err := tail.Err(); err != nil { + t.Fatal(err) + } +}