-
Notifications
You must be signed in to change notification settings - Fork 275
Open
Description
Java API client version
8.18.5
Java version
17
Elasticsearch Version
8.15.5
Problem description
DateHistogramAggregation dateHistogramAggregation = DateHistogramAggregation.of(d -> d
.field("timestamp")
.calendarInterval(CalendarInterval.Day)
.timeZone("+08:00")
.extendedBounds(b-> b
.min(FieldDateMath.of(fn->fn.expr(String.valueOf(start))))
.max(FieldDateMath.of(fn->fn.expr(String.valueOf(end))))
)
);
Adding .timeZone after DateHistogramAggregation results in a query error.
error: Caused by: co.elastic.clients.elasticsearch._types.ElasticsearchException: [es/search] failed: [search_phase_execution_exception] all shards failed
all code:
`
public Mono<List<Map<String, Object>>> getConsume(String targetDeviceId,
String timeGranularity,
CalendarInterval calendarInterval,
Integer size,
Long start, Long end) {
Query q1 = RangeQuery.of(q -> q
.untyped(nrq -> nrq
.field("timestamp")
.gt(JsonData.of(start))
.lt(JsonData.of(end))
.timeZone("+08:00")
)
)._toQuery();
DateHistogramAggregation dateHistogramAggregation = DateHistogramAggregation.of(d -> d
.field("timestamp")
.calendarInterval(CalendarInterval.Day)
.timeZone("+08:00")
.extendedBounds(b-> b
.min(FieldDateMath.of(fn->fn.expr(String.valueOf(start))))
.max(FieldDateMath.of(fn->fn.expr(String.valueOf(end))))
)
);
CompletableFuture<SearchResponse<Void>> future = asyncClient
.search(s -> s
.index(IndexNameUtil.getDeviceDataIndex())
.size(0)
.query(q1)
.aggregations("by_device", a -> a
.terms(t -> t.field("deviceId.keyword"))
.aggregations("by_hour", a2 -> a2
.dateHistogram(dateHistogramAggregation)
.aggregations("first_record", sub -> sub
.topHits(th -> th
.size(1)
.sort(so -> so.field(f -> f.field("timestamp").order(SortOrder.Asc)))
.source(src -> src.filter(f -> f.includes("data.power_consumption")))
)
)
.aggregations("last_record", sub -> sub
.topHits(th -> th
.size(1)
.sort(so -> so.field(f -> f.field("timestamp").order(SortOrder.Desc)))
.source(src -> src.filter(f -> f.includes("data.power_consumption")))
)
)
)
)
.sort(sort -> sort
.field(fs -> fs.field("timestamp").order(SortOrder.Asc))
),
Void.class
);
return Mono.fromFuture(future)
.flatMapMany(resp -> Flux.fromIterable(resp.aggregations().get("by_device").sterms().buckets().array()))
.filter(deviceBucket -> targetDeviceId == null || targetDeviceId.equals(deviceBucket.key().stringValue()))
.flatMap(deviceBucket -> Flux.fromIterable(deviceBucket.aggregations().get("by_hour").dateHistogram().buckets().array())
.map(hourBucket -> {
HitsMetadata<JsonData> firstHits = hourBucket.aggregations().get("first_record").topHits().hits();
double first = Optional.ofNullable(!firstHits.hits().isEmpty()
? firstHits.hits().get(0).source()
: null
)
.map(src -> src.to(Map.class).get("data"))
.map(data -> ((Number) ((Map<?, ?>) data).get("power_consumption")).doubleValue())
.orElse(0.0);
HitsMetadata<JsonData> lastHits = hourBucket.aggregations().get("last_record").topHits().hits();
double last = Optional.ofNullable(!lastHits.hits().isEmpty()
? lastHits.hits().get(0).source()
: null
)
.map(src -> src.to(Map.class).get("data"))
.map(data -> ((Number) ((Map<?, ?>) data).get("power_consumption")).doubleValue())
.orElse(0.0);
double usage = last - first;
LocalDateTime ldt = Instant.ofEpochMilli(hourBucket.key())
.atZone(ZoneId.systemDefault())
.toLocalDateTime();
String timeStr;
if (timeGranularity.equals("oneday")) {
timeStr = ldt.format(DateTimeFormatter.ofPattern("H:00"));
} else if (timeGranularity.equals("threeday")) {
timeStr = ldt.format(DateTimeFormatter.ofPattern("yyyy-MM-dd H:00"));
} else if (timeGranularity.equals("sevenday")) {
timeStr = ldt.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
} else if (timeGranularity.equals("month")) {
timeStr = ldt.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
} else {
timeStr = ldt.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
}
Map<String, Double> map = new TreeMap<>();
map.merge(timeStr, usage, Double::sum);
return map;
}))
.reduce((acc, next) -> {
next.forEach((k, v) -> acc.merge(k, v, Double::sum));
return acc;
})
.map(hourlyTotal -> buildConsumeResults(hourlyTotal, timeGranularity, size))
.switchIfEmpty(Mono.fromCallable(() -> buildConsumeResults(new TreeMap<>(), timeGranularity, size)));
}
private List<Map<String, Object>> buildConsumeResults(Map<String, Double> hourlyTotal, String timeGranularity, int size) {
List<Map<String, Object>> results = new ArrayList<>();
if (timeGranularity.equals("oneday")) {
for (int i = 0; i < size; i++) {
String hourStr = i + ":00";
Double v = hourlyTotal.get(hourStr);
if (v == null) {
v = 0.0;
} else {
v = NumberUtil.round(v, 2);
}
results.add(Map.of(hourStr, v));
}
} else if (timeGranularity.equals("threeday")) {
LocalDate startDate = LocalDate.now().minusDays(3);
for (int dayOffset = 0; dayOffset < 3; dayOffset++) { // 3 天
LocalDate date = startDate.plusDays(dayOffset);
for (int hour = 0; hour < 24; hour++) { // 每天 24 小时
String hourStr = date.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + " " + hour + ":00";
Double v = hourlyTotal.getOrDefault(hourStr, 0.0);
v = NumberUtil.round(v, 2);
results.add(Map.of(hourStr, v));
}
}
} else if (timeGranularity.equals("sevenday")) {
LocalDate startDate = LocalDate.now().minusDays(7);
for (int dayOffset = 0; dayOffset < 7; dayOffset++) {
LocalDate date = startDate.plusDays(dayOffset);
String dateStr = date.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
Double v = hourlyTotal.getOrDefault(dateStr, 0.0);
v = NumberUtil.round(v, 2);
results.add(Map.of(dateStr, v));
}
} else if (timeGranularity.equals("month")) {
LocalDate startDate = DateUtil.millisecondsToLocalDate(DateUtil.getFirstDayOfMonthTimestamp());
for (int dayOffset = 0; dayOffset < size; dayOffset++) {
LocalDate date = startDate.plusDays(dayOffset);
String dateStr = date.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
Double v = hourlyTotal.getOrDefault(dateStr, 0.0);
v = NumberUtil.round(v, 2);
results.add(Map.of(dateStr, v));
}
}
return results;
}
`
Metadata
Metadata
Assignees
Labels
No labels