Skip to content

Commit

Permalink
チャンネルでの受け渡しを廃止。
Browse files Browse the repository at this point in the history
  • Loading branch information
masa23 committed Apr 30, 2021
1 parent e79b9e5 commit d4b60bd
Showing 1 changed file with 68 additions and 48 deletions.
116 changes: 68 additions & 48 deletions tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"syscall"
"time"

yaml "gopkg.in/yaml.v2"
"gopkg.in/yaml.v2"
)

// Tail is tail file struct
Expand All @@ -17,9 +17,8 @@ type Tail struct {
posFile string
posFd *os.File
Stat Stat
data chan []byte
data []byte
init bool
done bool
scanner *bufio.Scanner
err error
isCreatePosFile bool
Expand All @@ -41,7 +40,6 @@ func Open(file string, posfile string) (*Tail, error) {
file: file,
posFile: posfile,
init: true,
done: false,
}

// open position file
Expand Down Expand Up @@ -91,7 +89,7 @@ func Open(file string, posfile string) (*Tail, error) {
}

// update position file
err = posUpdate(&t)
err = t.PositionUpdate()
if err != nil {
return &t, err
}
Expand Down Expand Up @@ -120,11 +118,7 @@ func (t *Tail) Close() error {
}

// PositionUpdate is pos file update
func (t *Tail) PositionUpdate() {
posUpdate(t)
}

func posUpdate(t *Tail) error {
func (t *Tail) PositionUpdate() error {
if t.posFile == "" {
return nil
}
Expand All @@ -150,12 +144,12 @@ func posUpdate(t *Tail) error {

// Bytes is get one line bytes.
func (t *Tail) Bytes() []byte {
return <-t.data
return t.data
}

// Text is get one line strings.
func (t *Tail) Text() string {
return string(<-t.data)
return string(t.data)
}

// Err is get Scan error
Expand All @@ -165,27 +159,19 @@ func (t *Tail) Err() error {

// Scan is start scan.
func (t *Tail) Scan() bool {
var err error
if t.done {
return false
}
if t.init {
// there is no pos file Start reading from the end of the file
if (t.InitialReadPositionEnd && t.isCreatePosFile) ||
(t.InitialReadPositionEnd && t.posFile == "") {
t.fileFd.Seek(0, os.SEEK_END)
}
t.data = make(chan []byte, 1)
t.scanner = bufio.NewScanner(t.fileFd)
t.init = false
}

for {
if t.scanner.Scan() {
buf := t.scanner.Bytes()
out := make([]byte, len(buf))
copy(out, buf)
t.data <- out
t.data = t.scanner.Bytes()
return true
}

Expand All @@ -194,49 +180,83 @@ func (t *Tail) Scan() bool {
return false
}

t.Stat.Offset, err = t.fileFd.Seek(0, os.SEEK_CUR)
// ステータスをアップデート
fdstat, err := t.fileFd.Stat()
if err != nil {
t.err = err
return false
}
s := fdstat.Sys().(*syscall.Stat_t)
t.Stat.Inode = s.Ino
t.Stat.Size = s.Size
t.Stat.Offset, err = t.fileFd.Seek(0, os.SEEK_CUR)

fd, err := os.Open(t.file)
if os.IsNotExist(err) {
time.Sleep(time.Millisecond * 10)
} else if err != nil {
t.err = err
return false
}
fdStat, err := fd.Stat()
// posファイルアップデート
err = t.PositionUpdate()
if err != nil {
t.err = err
return false
}
stat := fdStat.Sys().(*syscall.Stat_t)
if stat.Ino != t.Stat.Inode {
t.Stat.Inode = stat.Ino
t.Stat.Offset = 0
t.Stat.Size = stat.Size
_ = t.fileFd.Close()
t.fileFd = fd
} else {
if stat.Size < t.Stat.Size {

for {
// ファイルを開く
fd, err := os.Open(t.file)
if os.IsNotExist(err) {
//ファイルがない場合は待つ
time.Sleep(time.Second)
continue
} else if err != nil {
t.err = err
return false
}
newFdStat, err := fd.Stat()
if err != nil {
t.err = err
return false
}
newStat := newFdStat.Sys().(*syscall.Stat_t)

// 変更がない場合待って再度チェックする
if t.Stat.Inode == newStat.Ino && t.Stat.Size == newStat.Size {
fd.Close()
time.Sleep(100 * time.Millisecond)
continue
}

// inodeの変更があったら新しいファイルに差し替える
if t.Stat.Inode != newStat.Ino {
t.Stat.Inode = newStat.Ino
t.Stat.Offset = 0
t.Stat.Size = newStat.Size
t.fileFd.Close()
t.fileFd = fd
t.scanner = bufio.NewScanner(t.fileFd)
break
}

// サイズ小さくなっていたらSEEK位置を先頭の戻す
if newStat.Size < t.Stat.Size {
_, err = t.fileFd.Seek(0, os.SEEK_SET)
if err != nil {
t.err = err
return false
}
t.Stat.Size = newStat.Size
t.scanner = bufio.NewScanner(t.fileFd)
fd.Close()
break
}
t.Stat.Size = stat.Size
time.Sleep(time.Millisecond * 10)
_ = fd.Close()
}
t.scanner = bufio.NewScanner(t.fileFd)

err = posUpdate(t)
if err != nil {
t.err = err
return false
if newStat.Size > t.Stat.Size {
_, err := t.fileFd.Seek(t.Stat.Offset, os.SEEK_SET)
if err != nil {
t.err = err
return false
}
t.scanner = bufio.NewScanner(t.fileFd)
fd.Close()
break
}
}
}
}

0 comments on commit d4b60bd

Please sign in to comment.