Skip to content

Commit

Permalink
scannerを使わないように変更
Browse files Browse the repository at this point in the history
  • Loading branch information
masa23 committed Feb 14, 2023
1 parent e432cde commit 5e50a2f
Show file tree
Hide file tree
Showing 3 changed files with 272 additions and 89 deletions.
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
264 changes: 175 additions & 89 deletions tail.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package gotail

import (
"bufio"
"errors"
"io"
"io/ioutil"
"os"
"syscall"
Expand All @@ -10,19 +11,37 @@ import (
"gopkg.in/yaml.v2"
)

var (
// DefaultBufSize is default buffer size
// If one line of log is too long, please adjust
DefaultBufSize = 2008
// If true, there is no pos file Start reading from the end of the file
InitialReadPositionEnd = false
// timeout for readLine
ReadLineTimeout = 200 * time.Millisecond
)

// Tail is tail file struct
type Tail struct {
file string
fileFd *os.File
posFile string
posFd *os.File
Stat Stat
data []byte
buf []byte
start int
end int
n int
offset1 int64
offset2 int64
nextStart int
eofCount int
isEnd bool
bufEmpty bool
init bool
scanner *bufio.Scanner
err error
isCreatePosFile bool
InitialReadPositionEnd bool // If true, there is no pos file Start reading from the end of the file
InitialReadPositionEnd bool // deprecated
}

// Stat tail stats infomation struct
Expand All @@ -37,11 +56,20 @@ func Open(file string, posfile string) (*Tail, error) {
var err error
posStat := Stat{}
t := Tail{
file: file,
posFile: posfile,
init: true,
file: file,
posFile: posfile,
init: true,
bufEmpty: true,
}

// compatibility maintenance
if t.InitialReadPositionEnd {
InitialReadPositionEnd = true
}

// create buffer
t.buf = make([]byte, DefaultBufSize)

// open position file
if t.posFile != "" {
t.posFd, err = os.OpenFile(t.posFile, os.O_RDWR, 0644)
Expand Down Expand Up @@ -83,6 +111,7 @@ func Open(file string, posfile string) (*Tail, error) {
if stat.Ino == posStat.Inode && stat.Size >= posStat.Size {
// If the inode is not changed, restart from the subsequent Offset.
t.Stat.Offset = posStat.Offset
t.offset1 = posStat.Offset
} else {
// If the file size is small, set the offset to 0.
t.Stat.Offset = 0
Expand All @@ -95,7 +124,7 @@ func Open(file string, posfile string) (*Tail, error) {
}

// tail seek posititon.
_, err = t.fileFd.Seek(t.Stat.Offset, os.SEEK_SET)
_, err = t.fileFd.Seek(t.Stat.Offset, io.SeekStart)
if err != nil {
return &t, err
}
Expand Down Expand Up @@ -123,7 +152,7 @@ func (t *Tail) PositionUpdate() error {
return nil
}
t.posFd.Truncate(0)
t.posFd.Seek(0, 0)
t.posFd.Seek(0, io.SeekStart)

yml, err := yaml.Marshal(&t.Stat)
if err != nil {
Expand All @@ -144,119 +173,176 @@ func (t *Tail) PositionUpdate() error {

// Bytes is get one line bytes.
func (t *Tail) Bytes() []byte {
return t.data
return t.buf[t.start:t.end]
}

// Text is get one line strings.
func (t *Tail) Text() string {
return string(t.data)
return string(t.Bytes())
}

// Err is get Scan error
func (t *Tail) Err() error {
return t.err
}

// Scan is start scan.
func (t *Tail) Scan() bool {
// scanInit is only executed the first time Scan is run
func (t *Tail) scanInit() {
if t.init {
// there is no pos file Start reading from the end of the file
if (t.InitialReadPositionEnd && t.isCreatePosFile) ||
(t.InitialReadPositionEnd && t.posFile == "") {
t.fileFd.Seek(0, os.SEEK_END)
if (InitialReadPositionEnd && t.isCreatePosFile) ||
(InitialReadPositionEnd && t.posFile == "") {
t.fileFd.Seek(0, io.SeekEnd)
}
t.scanner = bufio.NewScanner(t.fileFd)
t.init = false
}
}

for {
if t.scanner.Scan() {
t.data = t.scanner.Bytes()
return true
}

if err := t.scanner.Err(); err != nil {
t.err = err
return false
}
// Scan is start scan.
func (t *Tail) Scan() bool {
var err error
// Executed only the first time
t.scanInit()

// ステータスをアップデート
fdstat, err := t.fileFd.Stat()
if err != nil {
t.err = err
return false
}
s := fdstat.Sys().(*syscall.Stat_t)
t.Stat.Inode = s.Ino
t.Stat.Size = s.Size
t.Stat.Offset, err = t.fileFd.Seek(0, os.SEEK_CUR)
// Change start to new position
t.start = t.nextStart

// posファイルアップデート
err = t.PositionUpdate()
if err != nil {
t.err = err
return false
}
for {
// buffer empty
if t.bufEmpty {
// change offset
t.offset2, _ = t.fileFd.Seek(t.offset1, io.SeekStart)

for {
// ファイルを開く
fd, err := os.Open(t.file)
if os.IsNotExist(err) {
//ファイルがない場合は待つ
time.Sleep(time.Second)
// read file
t.n, err = t.fileFd.Read(t.buf)
if t.n == 0 || errors.Is(err, io.EOF) {
// EOF file check
t.eofCount++
if t.eofCount > 5 {
t.eofCount = 0
t.fileCheck()
continue
}
// sleep & next buffer read
time.Sleep(ReadLineTimeout / 5)
continue
} else if err != nil {
t.err = err
return false
}
newFdStat, err := fd.Stat()
if err != nil {
t.err = err
return false
t.bufEmpty = false
}
t.eofCount = 0

// search newline
for i := t.start; i < t.n; i++ {
if t.buf[i] == '\n' {
t.end = i
t.nextStart = i + 1
t.isEnd = false
return true
}
newStat := newFdStat.Sys().(*syscall.Stat_t)
}

// 変更がない場合待って再度チェックする
if t.Stat.Inode == newStat.Ino && t.Stat.Size == newStat.Size {
fd.Close()
time.Sleep(100 * time.Millisecond)
// not found newline
// Move offset to last newline
t.offset1 = t.offset1 + int64(t.end)
t.bufEmpty = true
// If offset1 and offset2 are the same, the file has not been updated,
// so wait a certain amount of time and read it again
if t.offset1 == t.offset2 {
if !t.isEnd {
t.isEnd = true
time.Sleep(ReadLineTimeout)
continue
}
t.isEnd = false
t.end = t.n
// possiton update
t.Stat.Offset = t.offset1 - 1
t.PositionUpdate()
return true
} else {
// Move offset by line feed code
t.offset1++
t.start = 0
t.end = 0
t.nextStart = 0
}
}
}

// inodeの変更があったら新しいファイルに差し替える
if t.Stat.Inode != newStat.Ino {
t.Stat.Inode = newStat.Ino
t.Stat.Offset = 0
t.Stat.Size = newStat.Size
t.fileFd.Close()
t.fileFd = fd
t.scanner = bufio.NewScanner(t.fileFd)
break
}
func (t *Tail) fileCheck() error {
// status update
fdstat, err := t.fileFd.Stat()
if err != nil {
return err
}
s := fdstat.Sys().(*syscall.Stat_t)
t.Stat.Inode = s.Ino
t.Stat.Size = s.Size
t.Stat.Offset = t.offset1 - 1

// サイズ小さくなっていたらSEEK位置を先頭の戻す
if newStat.Size < t.Stat.Size {
_, err = t.fileFd.Seek(0, os.SEEK_SET)
if err != nil {
t.err = err
return false
}
t.Stat.Size = newStat.Size
t.scanner = bufio.NewScanner(t.fileFd)
fd.Close()
break
// update position file
err = t.PositionUpdate()
if err != nil {
return err
}

// find new file
for {
// open file
fd, err := os.Open(t.file)
if os.IsNotExist(err) {
// sleep & next file check
time.Sleep(time.Second)
continue
} else if err != nil {
return err
}
newFdStat, err := fd.Stat()
if err != nil {
return err
}
newStat := newFdStat.Sys().(*syscall.Stat_t)

// If there is no change in inode and size, wait a little longer
if t.Stat.Inode == newStat.Ino && t.Stat.Size == newStat.Size {
fd.Close()
time.Sleep(100 * time.Millisecond)
continue
}

// Replace any inode changes with new files
if t.Stat.Inode != newStat.Ino {
t.Stat.Inode = newStat.Ino
t.Stat.Offset = 0
t.offset1 = 0
t.Stat.Size = newStat.Size
t.fileFd.Close()
t.fileFd = fd
break
}

// If the size is smaller, move the SEEK position back to the beginning
if newStat.Size < t.Stat.Size {
_, err = t.fileFd.Seek(0, io.SeekStart)
if err != nil {
return err
}
t.Stat.Size = newStat.Size
fd.Close()
break
}

if newStat.Size > t.Stat.Size {
_, err := t.fileFd.Seek(t.Stat.Offset, os.SEEK_SET)
if err != nil {
t.err = err
return false
}
t.scanner = bufio.NewScanner(t.fileFd)
fd.Close()
break
if newStat.Size > t.Stat.Size {
_, err := t.fileFd.Seek(t.Stat.Offset, io.SeekStart)
if err != nil {
return err
}
fd.Close()
break
}
}

return nil
}
Loading

0 comments on commit 5e50a2f

Please sign in to comment.