Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion plugins/ingestion-fs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ For example, create a file `${base_directory}/test-stream/0.ndjson` with data

### 2. Start OpenSearch with the Plugin

We need to load the ingestion-fs plugin and also must specify the path.repo setting to allow OpenSearch to read from
the local file system under `${base_directory}`.

```
./gradlew run -PinstalledPlugins="['ingestion-fs']"
./gradlew run -PinstalledPlugins="['ingestion-fs']" -Dtests.opensearch.path.repo="${base_directory}"
```

### 3. Create a pull-based index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,26 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.index.IngestionShardConsumer;
import org.opensearch.index.IngestionShardPointer;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;

/**
* File-based consumer for testing ingestion. Reads from ${baseDir}/${stream}/${shardId}.ndjson.
*/
@SuppressForbidden(reason = "using Java file APIs for local testing purpose")
public class FilePartitionConsumer implements IngestionShardConsumer<FileOffset, FileMessage> {
private static final Logger logger = LogManager.getLogger(FilePartitionConsumer.class);

private final File shardFile;
private final Path shardFile;
private final int shardId;

private BufferedReader reader = null;
Expand All @@ -48,13 +44,13 @@ public class FilePartitionConsumer implements IngestionShardConsumer<FileOffset,
public FilePartitionConsumer(FileSourceConfig config, int shardId) {
String baseDir = config.getBaseDirectory();
String stream = config.getStream();
this.shardFile = new File(baseDir, stream + File.separator + shardId + ".ndjson");
this.shardFile = Path.of(baseDir, stream, shardId + ".ndjson");
this.shardId = shardId;

if (!shardFile.exists()) {
logger.warn("FilePartitionConsumer: File {} does not exist.", shardFile.getAbsolutePath());
if (!Files.exists(shardFile)) {
logger.warn("FilePartitionConsumer: File {} does not exist.", shardFile.toAbsolutePath());
} else {
logger.info("Initialized FilePartitionConsumer for shard {} with file: {}", shardId, shardFile.getAbsolutePath());
logger.info("Initialized FilePartitionConsumer for shard {} with file: {}", shardId, shardFile.toAbsolutePath());
}
}

Expand All @@ -73,19 +69,19 @@ public List<ReadResult<FileOffset, FileMessage>> readNext(long maxMessages, int
private synchronized List<ReadResult<FileOffset, FileMessage>> readFromFile(long startLine, long maxLines) throws TimeoutException {
List<ReadResult<FileOffset, FileMessage>> results = new ArrayList<>();

if (!shardFile.exists()) {
if (!Files.exists(shardFile)) {
return results;
}

try {
if (reader == null) {
reader = new BufferedReader(new FileReader(shardFile));
reader = Files.newBufferedReader(shardFile);
currentLineInReader = 0;
}

if (startLine < currentLineInReader) {
reader.close();
reader = new BufferedReader(new FileReader(shardFile));
reader = Files.newBufferedReader(shardFile);
currentLineInReader = 0;
}

Expand All @@ -103,7 +99,7 @@ private synchronized List<ReadResult<FileOffset, FileMessage>> readFromFile(long
}

} catch (IOException e) {
throw new RuntimeException("Failed to read from file: " + shardFile.getAbsolutePath(), e);
throw new RuntimeException("Failed to read from file: " + shardFile.toAbsolutePath(), e);
}

return results;
Expand All @@ -116,15 +112,11 @@ public IngestionShardPointer earliestPointer() {

@Override
public IngestionShardPointer latestPointer() {
if (!shardFile.exists()) {
if (!Files.exists(shardFile)) {
return new FileOffset(0);
}

try (
LineNumberReader lineNumberReader = new LineNumberReader(
new InputStreamReader(new FileInputStream(shardFile), StandardCharsets.UTF_8)
)
) {
try (LineNumberReader lineNumberReader = new LineNumberReader(Files.newBufferedReader(shardFile))) {
while (lineNumberReader.readLine() != null) {
// do nothing
}
Expand Down Expand Up @@ -156,7 +148,7 @@ public int getShardId() {

@Override
public long getPointerBasedLag(IngestionShardPointer expectedStartPointer) {
if (!shardFile.exists()) {
if (!Files.exists(shardFile)) {
return 0;
}

Expand Down
Loading