Skip to content

Commit a0fa118

Browse files
committed
feat: findTasksStream
1 parent 19bde3e commit a0fa118

File tree

3 files changed

+126
-0
lines changed

3 files changed

+126
-0
lines changed

client/src/main/java/com/influxdb/client/TasksApi.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.time.OffsetDateTime;
2525
import java.util.List;
26+
import java.util.stream.Stream;
2627
import javax.annotation.Nonnull;
2728
import javax.annotation.Nullable;
2829
import javax.annotation.concurrent.ThreadSafe;
@@ -275,6 +276,15 @@ List<Task> findTasks(@Nullable final String afterID,
275276
@Nonnull
276277
List<Task> findTasks(@Nonnull final TasksQuery query);
277278

279+
/**
280+
* Query tasks, automaticaly paged by given limit (default 100).
281+
*
282+
* @param query query params for task
283+
* @return A list of tasks
284+
*/
285+
@Nonnull
286+
Stream<Task> findTasksStream(@Nonnull final TasksQuery query);
287+
278288
/**
279289
* List all task members.
280290
*

client/src/main/java/com/influxdb/client/internal/TasksApiImpl.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,15 @@
2222
package com.influxdb.client.internal;
2323

2424
import java.time.OffsetDateTime;
25+
import java.util.Collections;
26+
import java.util.Iterator;
2527
import java.util.List;
28+
import java.util.Spliterator;
29+
import java.util.Spliterators;
2630
import java.util.logging.Level;
2731
import java.util.logging.Logger;
32+
import java.util.stream.Stream;
33+
import java.util.stream.StreamSupport;
2834
import javax.annotation.Nonnull;
2935
import javax.annotation.Nullable;
3036

@@ -147,6 +153,77 @@ public List<Task> findTasks(@Nonnull final TasksQuery query) {
147153
return tasks.getTasks();
148154
}
149155

156+
@Nonnull
157+
@Override
158+
public Stream<Task> findTasksStream(@Nonnull final TasksQuery query) {
159+
Iterator<Task> iterator = new Iterator<Task>() {
160+
private boolean hasNext = true;
161+
162+
@Nonnull
163+
private Iterator<Task> tasksIterator = Collections.emptyIterator();
164+
165+
@Nullable
166+
private String after = query.getAfter();
167+
168+
@Override
169+
public boolean hasNext() {
170+
if (tasksIterator.hasNext()) {
171+
return true;
172+
} else if (hasNext) {
173+
doQueryNext();
174+
return tasksIterator.hasNext();
175+
} else {
176+
return false;
177+
}
178+
}
179+
180+
private void doQueryNext() {
181+
Call<Tasks> call = service.getTasks(null, query.getName(), after, query.getUser(),
182+
query.getOrg(), query.getOrgID(), query.getStatus(), query.getLimit(), query.getType());
183+
184+
Tasks tasks = execute(call);
185+
186+
List<Task> tasksList = tasks.getTasks();
187+
tasksIterator = tasksList.iterator();
188+
if (!tasksList.isEmpty()) {
189+
Task lastTask = tasksList.get(tasksList.size() - 1);
190+
after = lastTask.getId();
191+
}
192+
193+
@Nullable String nextUrl = tasks.getLinks().getNext();
194+
hasNext = nextUrl != null && !nextUrl.isEmpty();
195+
196+
String logMsg = "findTasksStream found: {0} has next page: {1} next after {2}: ";
197+
LOG.log(Level.FINEST, logMsg, new Object[]{tasks, hasNext, after});
198+
}
199+
200+
@Override
201+
public Task next() throws IndexOutOfBoundsException {
202+
if (!tasksIterator.hasNext() && hasNext) {
203+
doQueryNext();
204+
}
205+
206+
if (tasksIterator.hasNext()){
207+
return tasksIterator.next();
208+
} else {
209+
throw new IndexOutOfBoundsException();
210+
}
211+
}
212+
213+
@Override
214+
public void remove() throws UnsupportedOperationException {
215+
throw new UnsupportedOperationException();
216+
}
217+
};
218+
219+
Stream<Task> stream = StreamSupport.stream(
220+
Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED),
221+
false);
222+
223+
return stream;
224+
225+
}
226+
150227
@Nonnull
151228
@Override
152229
public Task createTask(@Nonnull final Task task) {

client/src/test/java/com/influxdb/client/ITTasksApi.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import java.util.List;
3030
import java.util.Map;
3131
import java.util.logging.Logger;
32+
import java.util.stream.Collectors;
33+
import java.util.stream.Stream;
3234
import javax.annotation.Nonnull;
3335

3436
import com.influxdb.client.domain.Authorization;
@@ -49,6 +51,7 @@
4951
import com.influxdb.exceptions.NotFoundException;
5052

5153
import org.assertj.core.api.Assertions;
54+
import org.jetbrains.annotations.Nullable;
5255
import org.junit.jupiter.api.BeforeEach;
5356
import org.junit.jupiter.api.Disabled;
5457
import org.junit.jupiter.api.Test;
@@ -315,6 +318,42 @@ void findTasksAfterSpecifiedID() {
315318
Assertions.assertThat(tasks.get(0).getId()).isEqualTo(task2.getId());
316319
}
317320

321+
@Test
322+
void findTasksAll() {
323+
String taskName = generateName("it task all");
324+
int numOfTasks = 10;
325+
326+
for (int i = 0; i < numOfTasks; i++) {
327+
tasksApi.createTaskCron(taskName, TASK_FLUX, "0 2 * * *", organization);
328+
}
329+
330+
final TasksQuery tasksQuery = new TasksQuery();
331+
tasksQuery.setName(taskName);
332+
333+
List<Task> tasks;
334+
335+
// get tasks in 3-4 batches
336+
tasksQuery.setLimit(numOfTasks / 3);
337+
tasks = tasksApi.findTasksStream(tasksQuery).collect(Collectors.toList());
338+
Assertions.assertThat(tasks).hasSize(numOfTasks);
339+
340+
// get tasks in one equally size batch
341+
tasksQuery.setLimit(numOfTasks);
342+
tasks = tasksApi.findTasksStream(tasksQuery).collect(Collectors.toList());
343+
Assertions.assertThat(tasks).hasSize(numOfTasks);
344+
345+
// get tasks in one batch
346+
tasksQuery.setLimit(numOfTasks + 1);
347+
tasks = tasksApi.findTasksStream(tasksQuery).collect(Collectors.toList());
348+
Assertions.assertThat(tasks).hasSize(numOfTasks);
349+
350+
// get no tasks
351+
tasksQuery.setLimit(null);
352+
tasksQuery.setName(taskName + "___");
353+
tasks = tasksApi.findTasksStream(tasksQuery).collect(Collectors.toList());
354+
Assertions.assertThat(tasks).hasSize(0);
355+
}
356+
318357
@Test
319358
void deleteTask() {
320359

0 commit comments

Comments
 (0)