Skip to content

Commit

Permalink
Merge pull request #797 from redHJ/fixDirxRead
Browse files Browse the repository at this point in the history
fix dirx can't read data when one match dir is empty
  • Loading branch information
wonderflow authored Oct 24, 2018
2 parents 6d008e1 + 5c52a88 commit e3c8f46
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 2 deletions.
12 changes: 10 additions & 2 deletions reader/dirx/dirx.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,8 @@ func (r *Reader) ReadLine() (string, error) {
case msg := <-r.msgChan:
r.currentFile = msg.logpath
return msg.result, nil
case err := <-r.errChan:
return "", err
case <-timer.C:
return "", r.readError()
}

return "", nil
Expand Down Expand Up @@ -398,3 +397,12 @@ func (r *Reader) Close() error {
func (r *Reader) Reset() error {
return r.dirReaders.Reset()
}

func (r *Reader) readError() error {
select {
case err := <-r.errChan:
return err
default:
return nil
}
}
70 changes: 70 additions & 0 deletions reader/dirx/dirx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestStart(t *testing.T) {
funcMap := map[string]func(*testing.T){
"multiReaderOneLineTest": multiReaderOneLineTest,
"multiReaderMultiLineTest": multiReaderMultiLineTest,
"multiReaderEmptyDirxTest": multiReaderEmptyDirxTest,
"multiReaderSyncMetaOneLineTest": multiReaderSyncMetaOneLineTest,
"multiReaderSyncMetaMutilineTest": multiReaderSyncMetaMutilineTest,
}
Expand Down Expand Up @@ -678,3 +679,72 @@ func TestReaderErrBegin(t *testing.T) {
assert.NoError(t, dr.Close())
t.Log("Reader has closed")
}

func multiReaderEmptyDirxTest(t *testing.T) {
dirName := "multiReaderEmptyDirxTest"
dir1 := filepath.Join(dirName, "logs/abc")
dir2 := filepath.Join(dirName, "logs/xyz")
dir1file1 := filepath.Join(dir1, "file1.log")

createDirWithName(dirName)
defer os.RemoveAll(dirName)

createDirWithName(dir1)
createFileWithContent(dir1file1, "abc123\nabc123\nabc123\nabc123\nabc123\n")
createDirWithName(dir2)
go func() {
time.Sleep(5 * time.Second)
dir1file2 := filepath.Join(dir1, "file2.log")
createFileWithContent(dir1file2, "xyz1\nxyz2\nxyz3\nxyz4\nxyz5\nxyz6\nxyz7\nxyz8\nxyz9\nxyz10\n")
}()

logPathPattern := filepath.Join(dirName, "logs/*")
c := conf.MapConf{
"log_path": logPathPattern,
"stat_interval": "1s",
"expire": "0s",
"submeta_expire": "720h",
"max_open_files": "128",
"read_from": "newest",
"reader_buf_size": "1024",
"meta_path": dirName,
"mode": reader.ModeDirx,
}
meta, err := reader.NewMetaWithConf(c)
assert.Nil(t, err)
r, err := NewReader(meta, c)
assert.Nil(t, err)

dr := r.(*Reader)
assert.NoError(t, dr.Start())
t.Log("Reader has started")

maxNum := 0
emptyNum := 0
var lastError error
for {
data, err := dr.ReadLine()
if data != "" {
t.Log("Data:", data, maxNum)
maxNum++
} else {
if err != nil {
lastError = err
}
emptyNum++
}
if err == io.EOF {
break
}
if maxNum >= 10 {
break
}
}
t.Log("Reader has finished reading one")
assert.Equal(t, 10, maxNum)
if emptyNum < 0 {
t.Fatalf("expect > 0, but got: %d", emptyNum)
}
assert.NotNil(t, lastError)
assert.Equal(t, "file does not exist", lastError.Error())
}

0 comments on commit e3c8f46

Please sign in to comment.