Skip to content

Commit

Permalink
SDC-1429 - Cluster mode prints warnings about offsets
Browse files Browse the repository at this point in the history
Change-Id: Ic6f286203cff4c9d50ce4534fa656b14cf62fb1d
Reviewed-on: https://review.streamsets.net/1176
Tested-by: StreamSets CI <streamsets-ci-spam@streamsets.com>
Reviewed-by: Brock Noland <brock@streamsets.com>
  • Loading branch information
brockn committed Sep 2, 2015
1 parent e5273db commit 369d544
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package com.streamsets.pipeline.impl;

import java.util.Collections;
import java.util.List;

public class OffsetAndResult<T> {
Expand All @@ -12,7 +13,7 @@ public class OffsetAndResult<T> {

public OffsetAndResult(Object offset, List<T> result) {
this.offset = offset;
this.result = result;
this.result = result == null ? Collections.<T>emptyList() : result;
}

public Object getOffset() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ public void put(OffsetAndResult<Map.Entry> batch) throws InterruptedException {
throw new RuntimeException(Utils.format("Producer encountered error: {}", producerError), producerError);
}
try {
final Object expectedOffset = batch.getOffset();
Object expectedOffset = "EMPTY_BATCH";
if (!batch.getResult().isEmpty()) {
expectedOffset = batch.getResult().get(batch.getResult().size() - 1).getKey(); // get the last one
}
while (!dataChannel.offer(batch, 10, TimeUnit.MILLISECONDS)) {
for (ControlChannel.Message controlMessage : controlChannel.getProducerMessages()) {
switch (controlMessage.getType()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,12 @@ private static class Record implements Map.Entry {
}
@Override
public Object getKey() {
return null;
return "key-" + i;
}

@Override
public Object getValue() {
return null;
return "val-" + i;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ public List<ConfigIssue> init() {
try {
List<Map.Entry> buffer;
if (dataFormat == DataFormat.AVRO) {
buffer = readAvroBatch(fileStatus, PREVIEW_SIZE);
buffer = previewAvroBatch(fileStatus, PREVIEW_SIZE);
} else {
buffer = readPreviewBatch(fileStatus, PREVIEW_SIZE);
buffer = previewTextBatch(fileStatus, PREVIEW_SIZE);
}
for (int i = 0; i < buffer.size() && previewBuffer.size() < PREVIEW_SIZE; i++) {
Map.Entry entry = buffer.get(i);
Expand Down Expand Up @@ -288,7 +288,7 @@ public List<ConfigIssue> init() {
return issues;
}

private List<Map.Entry> readPreviewBatch(FileStatus fileStatus, int batchSize)
private List<Map.Entry> previewTextBatch(FileStatus fileStatus, int batchSize)
throws IOException, InterruptedException {
TextInputFormat textInputFormat = new TextInputFormat();
InputSplit fileSplit = new FileSplit(fileStatus.getPath(), 0, fileStatus.getLen(), null);
Expand All @@ -306,11 +306,10 @@ private List<Map.Entry> readPreviewBatch(FileStatus fileStatus, int batchSize)
return batch;
}

private List<Map.Entry> readAvroBatch(FileStatus fileStatus, int batchSize) throws IOException, InterruptedException {
private List<Map.Entry> previewAvroBatch(FileStatus fileStatus, int batchSize) throws IOException, InterruptedException {
SeekableInput input = new FsInput(fileStatus.getPath(), hadoopConf);
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
DatumReader<GenericRecord> reader = new GenericDatumReader<>();
FileReader<GenericRecord> fileReader = DataFileReader.openReader(input, reader);
boolean hasNext = fileReader.hasNext();
List<Map.Entry> batch = new ArrayList<>();
int count = 0;
while (fileReader.hasNext() && batch.size() < batchSize) {
Expand All @@ -323,7 +322,6 @@ private List<Map.Entry> readAvroBatch(FileStatus fileStatus, int batchSize) thro
dataFileWriter.close();
out.close();
batch.add(new Pair(fileStatus.getPath().toUri().getPath() + "::" + count, out.toByteArray()));
hasNext = fileReader.hasNext();
count++;
}
return batch;
Expand Down

0 comments on commit 369d544

Please sign in to comment.