Skip to content

[DE-511] Retriable cursor #505

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
retriable cursor
  • Loading branch information
rashtao committed May 26, 2023
commit 3627461fbd0e086a8274dbb4c6ef75b81103021d
7 changes: 7 additions & 0 deletions core/src/main/java/com/arangodb/ArangoCursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,11 @@ public interface ArangoCursor<T> extends ArangoIterable<T>, ArangoIterator<T>, C
*/
boolean isPotentialDirtyRead();

/**
* @return The ID of the batch after the current one. The first batch has an ID of 1 and the value is incremented by
* 1 with every batch. Only set if the allowRetry query option is enabled.
* @since ArangoDB 3.11
*/
String getNextBatchId();

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*/
public interface ArangoCursorExecute {

InternalCursorEntity next(String id);
InternalCursorEntity next(String id, String nextBatchId);

void close(String id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,10 @@ private <T> ArangoCursor<T> createCursor(

final ArangoCursorExecute execute = new ArangoCursorExecute() {
@Override
public InternalCursorEntity next(final String id) {
return executor.execute(queryNextRequest(id, options), internalCursorEntityDeserializer(), hostHandle);
public InternalCursorEntity next(final String id, final String nextBatchId) {
InternalRequest request = nextBatchId == null ?
queryNextRequest(id, options) : queryNextByBatchIdRequest(id, nextBatchId, options);
return executor.execute(request, internalCursorEntityDeserializer(), hostHandle);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ protected InternalRequest queryRequest(final String query, final Map<String, Obj

protected InternalRequest queryNextRequest(final String id, final AqlQueryOptions options) {
final InternalRequest request = request(name, RequestType.POST, PATH_API_CURSOR, id);
return completeQueryNextRequest(request, options);
}

protected InternalRequest queryNextByBatchIdRequest(final String id,
final String nextBatchId,
final AqlQueryOptions options) {
final InternalRequest request = request(name, RequestType.POST, PATH_API_CURSOR, id, nextBatchId);
return completeQueryNextRequest(request, options);
}

private InternalRequest completeQueryNextRequest(final InternalRequest request, final AqlQueryOptions options) {
final AqlQueryOptions opt = options != null ? options : new AqlQueryOptions();
if (Boolean.TRUE.equals(opt.getAllowDirtyRead())) {
RequestUtils.allowDirtyRead(request);
Expand Down

This file was deleted.

112 changes: 80 additions & 32 deletions core/src/main/java/com/arangodb/internal/cursor/ArangoCursorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,39 +27,49 @@
import com.arangodb.internal.ArangoCursorExecute;
import com.arangodb.internal.InternalArangoDatabase;
import com.arangodb.internal.cursor.entity.InternalCursorEntity;
import com.arangodb.internal.cursor.entity.InternalCursorEntity.Extras;
import com.fasterxml.jackson.databind.JsonNode;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.List;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
* @author Mark Vollmary
*/
public class ArangoCursorImpl<T> extends AbstractArangoIterable<T> implements ArangoCursor<T> {
public class ArangoCursorImpl<T> implements ArangoCursor<T> {

protected final ArangoCursorIterator<T> iterator;
private final Class<T> type;
private final String id;
private final ArangoCursorExecute execute;
private final boolean isPontentialDirtyRead;
private final boolean pontentialDirtyRead;
private final boolean allowRetry;

public ArangoCursorImpl(final InternalArangoDatabase<?, ?> db, final ArangoCursorExecute execute,
final Class<T> type, final InternalCursorEntity result) {
super();
this.execute = execute;
this.type = type;
iterator = createIterator(this, db, execute, result);
id = result.getId();
isPontentialDirtyRead = result.isPontentialDirtyRead();
pontentialDirtyRead = result.isPontentialDirtyRead();
iterator = new ArangoCursorIterator<>(id, type, execute, db, result);
this.allowRetry = result.getNextBatchId() != null;
}

protected ArangoCursorIterator<T> createIterator(
final ArangoCursor<T> cursor,
final InternalArangoDatabase<?, ?> db,
final ArangoCursorExecute execute,
final InternalCursorEntity result) {
return new ArangoCursorIterator<>(cursor, execute, db, result);
@Override
public void close() {
if (getId() != null && (allowRetry || iterator.result.getHasMore())) {
getExecute().close(getId());
}
}

@Override
public T next() {
return iterator.next();
}

@Override
Expand All @@ -74,44 +84,32 @@ public Class<T> getType() {

@Override
public Integer getCount() {
return iterator.getResult().getCount();
return iterator.result.getCount();
}

@Override
public CursorStats getStats() {
final Extras extra = iterator.getResult().getExtra();
final InternalCursorEntity.Extras extra = iterator.result.getExtra();
return extra != null ? extra.getStats() : null;
}

@Override
public Collection<CursorWarning> getWarnings() {
final Extras extra = iterator.getResult().getExtra();
final InternalCursorEntity.Extras extra = iterator.result.getExtra();
return extra != null ? extra.getWarnings() : null;
}

@Override
public boolean isCached() {
final Boolean cached = iterator.getResult().getCached();
final Boolean cached = iterator.result.getCached();
return Boolean.TRUE.equals(cached);
}

@Override
public void close() {
if (id != null && hasNext()) {
execute.close(id);
}
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public T next() {
return iterator.next();
}

@Override
public List<T> asListRemaining() {
final List<T> remaining = new ArrayList<>();
Expand All @@ -123,17 +121,67 @@ public List<T> asListRemaining() {

@Override
public boolean isPotentialDirtyRead() {
return isPontentialDirtyRead;
return pontentialDirtyRead;
}

@Override
public void remove() {
throw new UnsupportedOperationException();
public ArangoIterator<T> iterator() {
return iterator;
}

@Override
public ArangoIterator<T> iterator() {
return iterator;
public String getNextBatchId() {
return iterator.result.getNextBatchId();
}

protected ArangoCursorExecute getExecute() {
return execute;
}

public Stream<T> stream() {
return StreamSupport.stream(spliterator(), false);
}

protected static class ArangoCursorIterator<T> implements ArangoIterator<T> {
private final String cursorId;
private final Class<T> type;
private final InternalArangoDatabase<?, ?> db;
private final ArangoCursorExecute execute;
private InternalCursorEntity result;
private Iterator<JsonNode> arrayIterator;

protected ArangoCursorIterator(final String cursorId, final Class<T> type, final ArangoCursorExecute execute,
final InternalArangoDatabase<?, ?> db, final InternalCursorEntity result) {
this.cursorId = cursorId;
this.type = type;
this.execute = execute;
this.db = db;
this.result = result;
arrayIterator = result.getResult().iterator();
}

@Override
public boolean hasNext() {
return arrayIterator.hasNext() || result.getHasMore();
}

@Override
public T next() {
if (!arrayIterator.hasNext() && Boolean.TRUE.equals(result.getHasMore())) {
result = execute.next(cursorId, result.getNextBatchId());
arrayIterator = result.getResult().iterator();
}
if (!hasNext()) {
throw new NoSuchElementException();
}
return deserialize(db.getSerde().serialize(arrayIterator.next()), type);
}

private <R> R deserialize(final byte[] result, final Class<R> type) {
return db.getSerde().deserializeUserData(result, type);
}

}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public final class InternalCursorEntity {
private Boolean hasMore;
private JsonNode result;
private Boolean pontentialDirtyRead;
private String nextBatchId;

public String getId() {
return id;
Expand Down Expand Up @@ -99,6 +100,15 @@ public void setPontentialDirtyRead(final Boolean pontentialDirtyRead) {
this.pontentialDirtyRead = pontentialDirtyRead;
}

/**
* @return The ID of the batch after the current one. The first batch has an ID of 1 and the value is incremented by
* 1 with every batch. Only set if the allowRetry query option is enabled.
* @since ArangoDB 3.11
*/
public String getNextBatchId() {
return nextBatchId;
}

public static final class Extras {
private final Collection<CursorWarning> warnings = Collections.emptyList();
private CursorStats stats;
Expand Down
Loading