Skip to content

Also emit a watch event at startup when using the builder source. #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions src/main/java/com/github/davidmoten/rx/FileObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,7 @@ public final class FileObservable {
*/
public final static Observable<byte[]> tailFile(File file, long startPosition,
long sampleTimeMs, int chunkSize) {
Observable<Object> events = from(file, StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.OVERFLOW)
// don't care about the event details, just that there
// is one
.cast(Object.class)
// get lines once on subscription so we tail the lines
// in the file at startup
.startWith(new Object());
return tailFile(file, startPosition, sampleTimeMs, chunkSize, events);
return tailFile(file, startPosition, sampleTimeMs, chunkSize, getDefaultWatchEventSource(file, null));
}

/**
Expand Down Expand Up @@ -332,6 +324,17 @@ public Boolean call(Object event) {
}
};

private static Observable<Object> getDefaultWatchEventSource(File file, Action0 onWatchStarted) {
return from(file, onWatchStarted, StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.OVERFLOW)
// don't care about the event details, just that there
// is one
.cast(Object.class)
// get lines once on subscription so we tail the lines
// in the file at startup
.startWith(new Object());
}

public static Builder tailer() {
return new Builder();
}
Expand Down Expand Up @@ -452,12 +455,7 @@ public Observable<String> tailText() {
}

private Observable<?> getSource() {
if (source == null)
return from(file, onWatchStarted, StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.OVERFLOW);
else
return source;

return source != null ? source : getDefaultWatchEventSource(file, onWatchStarted);
}

}
Expand Down
25 changes: 25 additions & 0 deletions src/test/java/com/github/davidmoten/rx/FileObservableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,29 @@ public void call(String line) {
assertEquals("line 3", list.get(1).trim());
sub.unsubscribe();
}

@Test
public void testFileTailingExistingLines() throws InterruptedException, IOException {
final File log = new File("target/test.log");
log.delete();
log.createNewFile();
append(log, "a0");
append(log, "a1");
append(log, "a2");

Observable<String> tailer = FileObservable.tailer().file(log).sampleTimeMs(50).utf8().tailText();
final List<String> list = new ArrayList<String>();
final CountDownLatch latch = new CountDownLatch(3);
Subscription sub = tailer.subscribeOn(Schedulers.io()).subscribe(new Action1<String>() {
@Override
public void call(String line) {
System.out.println("received: '" + line + "'");
list.add(line);
latch.countDown();
}
});
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertEquals(Arrays.asList("a0", "a1", "a2"), list);
sub.unsubscribe();
}
}