Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -220,79 +220,79 @@ public void close() throws Exception {
* @return Whether or not more data was found.
*/
public boolean next() {
try {
if (completed.isDone() && queue.isEmpty()) {
return false;
}

pending--;
requestOutstanding();
while (true) { // Add a loop to make the logic iterative
try {
if (completed.isDone() && queue.isEmpty()) {
return false;
}

Object data = queue.take();
if (DONE == data) {
queue.put(DONE);
// Other code ignores the value of this CompletableFuture, only whether it's completed (or has an exception)
completed.complete(null);
return false;
} else if (DONE_EX == data) {
queue.put(DONE_EX);
if (ex instanceof Exception) {
throw (Exception) ex;
pending--;
requestOutstanding();

Object data = queue.take();
if (DONE == data) {
queue.put(DONE);
completed.complete(null);
return false;
} else if (DONE_EX == data) {
queue.put(DONE_EX);
if (ex instanceof Exception) {
throw (Exception) ex;
} else {
throw new Exception(ex);
}
} else {
throw new Exception(ex);
}
} else {
try (ArrowMessage msg = ((ArrowMessage) data)) {
if (msg.getMessageType() == HeaderType.NONE) {
updateMetadata(msg);
// We received a message without data, so erase any leftover data
if (fulfilledRoot != null) {
fulfilledRoot.clear();
}
} else if (msg.getMessageType() == HeaderType.RECORD_BATCH) {
checkMetadataVersion(msg);
// Ensure we have the root
root.get().clear();
try (ArrowRecordBatch arb = msg.asRecordBatch()) {
loader.load(arb);
}
updateMetadata(msg);
} else if (msg.getMessageType() == HeaderType.DICTIONARY_BATCH) {
checkMetadataVersion(msg);
// Ensure we have the root
root.get().clear();
try (ArrowDictionaryBatch arb = msg.asDictionaryBatch()) {
final long id = arb.getDictionaryId();
if (dictionaries == null) {
throw new IllegalStateException("Dictionary ownership was claimed by the application.");
try (ArrowMessage msg = ((ArrowMessage) data)) {
if (msg.getMessageType() == HeaderType.NONE) {
updateMetadata(msg);
if (fulfilledRoot != null) {
fulfilledRoot.clear();
}
final Dictionary dictionary = dictionaries.lookup(id);
if (dictionary == null) {
throw new IllegalArgumentException("Dictionary not defined in schema: ID " + id);
} else if (msg.getMessageType() == HeaderType.RECORD_BATCH) {
checkMetadataVersion(msg);
root.get().clear();
try (ArrowRecordBatch arb = msg.asRecordBatch()) {
loader.load(arb);
}

final FieldVector vector = dictionary.getVector();
final VectorSchemaRoot dictionaryRoot = new VectorSchemaRoot(Collections.singletonList(vector.getField()),
Collections.singletonList(vector), 0);
final VectorLoader dictionaryLoader = new VectorLoader(dictionaryRoot);
dictionaryLoader.load(arb.getDictionary());
updateMetadata(msg);
} else if (msg.getMessageType() == HeaderType.DICTIONARY_BATCH) {
checkMetadataVersion(msg);
root.get().clear();
try (ArrowDictionaryBatch arb = msg.asDictionaryBatch()) {
final long id = arb.getDictionaryId();
if (dictionaries == null) {
throw new IllegalStateException("Dictionary ownership was claimed by the application.");
}
final Dictionary dictionary = dictionaries.lookup(id);
if (dictionary == null) {
throw new IllegalArgumentException("Dictionary not defined in schema: ID " + id);
}

final FieldVector vector = dictionary.getVector();
final VectorSchemaRoot dictionaryRoot = new VectorSchemaRoot(
Collections.singletonList(vector.getField()),
Collections.singletonList(vector), 0);
final VectorLoader dictionaryLoader = new VectorLoader(dictionaryRoot);
dictionaryLoader.load(arb.getDictionary());
}
continue; // Skip the remaining code and iterate again
} else {
throw new UnsupportedOperationException("Message type is unsupported: " + msg.getMessageType());
}
return next();
} else {
throw new UnsupportedOperationException("Message type is unsupported: " + msg.getMessageType());
return true;
}
return true;
}
} catch (RuntimeException e) {
throw e;
} catch (ExecutionException e) {
throw StatusUtils.fromThrowable(e.getCause());
} catch (Exception e) {
throw new RuntimeException(e);
}
} catch (RuntimeException e) {
throw e;
} catch (ExecutionException e) {
throw StatusUtils.fromThrowable(e.getCause());
} catch (Exception e) {
throw new RuntimeException(e);
}
}


/** Update our metadata reference with a new one from this message. */
private void updateMetadata(ArrowMessage msg) {
if (this.applicationMetadata != null) {
Expand Down