Skip to content

DateHistogramAggregation specifying timeZone results in an error. #1100

@ty-lq

Description

@ty-lq

Java API client version

8.18.5

Java version

17

Elasticsearch Version

8.15.5

Problem description

    DateHistogramAggregation dateHistogramAggregation = DateHistogramAggregation.of(d -> d
            .field("timestamp")
            .calendarInterval(CalendarInterval.Day)
            .timeZone("+08:00")
            .extendedBounds(b-> b
                    .min(FieldDateMath.of(fn->fn.expr(String.valueOf(start))))
                    .max(FieldDateMath.of(fn->fn.expr(String.valueOf(end))))
            )
    );

Adding .timeZone after DateHistogramAggregation results in a query error.

error: Caused by: co.elastic.clients.elasticsearch._types.ElasticsearchException: [es/search] failed: [search_phase_execution_exception] all shards failed

all code:

`
public Mono<List<Map<String, Object>>> getConsume(String targetDeviceId,
String timeGranularity,
CalendarInterval calendarInterval,
Integer size,
Long start, Long end) {
Query q1 = RangeQuery.of(q -> q
.untyped(nrq -> nrq
.field("timestamp")
.gt(JsonData.of(start))
.lt(JsonData.of(end))
.timeZone("+08:00")
)
)._toQuery();

    DateHistogramAggregation dateHistogramAggregation = DateHistogramAggregation.of(d -> d
            .field("timestamp")
            .calendarInterval(CalendarInterval.Day)
            .timeZone("+08:00")
            .extendedBounds(b-> b
                    .min(FieldDateMath.of(fn->fn.expr(String.valueOf(start))))
                    .max(FieldDateMath.of(fn->fn.expr(String.valueOf(end))))
            )
    );

    CompletableFuture<SearchResponse<Void>> future = asyncClient
            .search(s -> s
                            .index(IndexNameUtil.getDeviceDataIndex())
                            .size(0)
                            .query(q1)
                            .aggregations("by_device", a -> a
                                    .terms(t -> t.field("deviceId.keyword"))
                                    .aggregations("by_hour", a2 -> a2
                                            .dateHistogram(dateHistogramAggregation)
                                            .aggregations("first_record", sub -> sub
                                                    .topHits(th -> th
                                                            .size(1)
                                                            .sort(so -> so.field(f -> f.field("timestamp").order(SortOrder.Asc)))
                                                            .source(src -> src.filter(f -> f.includes("data.power_consumption")))
                                                    )
                                            )
                                            .aggregations("last_record", sub -> sub
                                                    .topHits(th -> th
                                                            .size(1)
                                                            .sort(so -> so.field(f -> f.field("timestamp").order(SortOrder.Desc)))
                                                            .source(src -> src.filter(f -> f.includes("data.power_consumption")))
                                                    )
                                            )
                                    )
                            )
                            .sort(sort -> sort
                                    .field(fs -> fs.field("timestamp").order(SortOrder.Asc))
                            ),
                    Void.class
            );

    return Mono.fromFuture(future)
            .flatMapMany(resp -> Flux.fromIterable(resp.aggregations().get("by_device").sterms().buckets().array()))
            .filter(deviceBucket -> targetDeviceId == null || targetDeviceId.equals(deviceBucket.key().stringValue()))
            .flatMap(deviceBucket -> Flux.fromIterable(deviceBucket.aggregations().get("by_hour").dateHistogram().buckets().array())
                    .map(hourBucket -> {
                        HitsMetadata<JsonData> firstHits = hourBucket.aggregations().get("first_record").topHits().hits();
                        double first = Optional.ofNullable(!firstHits.hits().isEmpty()
                                        ? firstHits.hits().get(0).source()
                                        : null
                                )
                                .map(src -> src.to(Map.class).get("data"))
                                .map(data -> ((Number) ((Map<?, ?>) data).get("power_consumption")).doubleValue())
                                .orElse(0.0);

                        HitsMetadata<JsonData> lastHits = hourBucket.aggregations().get("last_record").topHits().hits();
                        double last = Optional.ofNullable(!lastHits.hits().isEmpty()
                                        ? lastHits.hits().get(0).source()
                                        : null
                                )
                                .map(src -> src.to(Map.class).get("data"))
                                .map(data -> ((Number) ((Map<?, ?>) data).get("power_consumption")).doubleValue())
                                .orElse(0.0);

                        double usage = last - first;
                        LocalDateTime ldt = Instant.ofEpochMilli(hourBucket.key())
                                .atZone(ZoneId.systemDefault())
                                .toLocalDateTime();

                        String timeStr;
                        if (timeGranularity.equals("oneday")) {
                            timeStr = ldt.format(DateTimeFormatter.ofPattern("H:00"));
                        } else if (timeGranularity.equals("threeday")) {
                            timeStr = ldt.format(DateTimeFormatter.ofPattern("yyyy-MM-dd H:00"));
                        } else if (timeGranularity.equals("sevenday")) {
                            timeStr = ldt.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
                        } else if (timeGranularity.equals("month")) {
                            timeStr = ldt.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
                        } else {
                            timeStr = ldt.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
                        }

                        Map<String, Double> map = new TreeMap<>();
                        map.merge(timeStr, usage, Double::sum);

                        return map;
                    }))
            .reduce((acc, next) -> {
                next.forEach((k, v) -> acc.merge(k, v, Double::sum));
                return acc;
            })
            .map(hourlyTotal -> buildConsumeResults(hourlyTotal, timeGranularity, size))
            .switchIfEmpty(Mono.fromCallable(() -> buildConsumeResults(new TreeMap<>(), timeGranularity, size)));
}

private List<Map<String, Object>> buildConsumeResults(Map<String, Double> hourlyTotal, String timeGranularity, int size) {
    List<Map<String, Object>> results = new ArrayList<>();

    if (timeGranularity.equals("oneday")) {
        for (int i = 0; i < size; i++) {
            String hourStr = i + ":00";
            Double v = hourlyTotal.get(hourStr);
            if (v == null) {
                v = 0.0;
            } else {
                v = NumberUtil.round(v, 2);
            }
            results.add(Map.of(hourStr, v));
        }
    } else if (timeGranularity.equals("threeday")) {
        LocalDate startDate = LocalDate.now().minusDays(3);
        for (int dayOffset = 0; dayOffset < 3; dayOffset++) { // 3 天
            LocalDate date = startDate.plusDays(dayOffset);
            for (int hour = 0; hour < 24; hour++) { // 每天 24 小时
                String hourStr = date.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + " " + hour + ":00";
                Double v = hourlyTotal.getOrDefault(hourStr, 0.0);
                v = NumberUtil.round(v, 2);
                results.add(Map.of(hourStr, v));
            }
        }
    } else if (timeGranularity.equals("sevenday")) {
        LocalDate startDate = LocalDate.now().minusDays(7);
        for (int dayOffset = 0; dayOffset < 7; dayOffset++) {
            LocalDate date = startDate.plusDays(dayOffset);
            String dateStr = date.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
            Double v = hourlyTotal.getOrDefault(dateStr, 0.0);
            v = NumberUtil.round(v, 2);
            results.add(Map.of(dateStr, v));
        }
    } else if (timeGranularity.equals("month")) {
        LocalDate startDate = DateUtil.millisecondsToLocalDate(DateUtil.getFirstDayOfMonthTimestamp());
        for (int dayOffset = 0; dayOffset < size; dayOffset++) {
            LocalDate date = startDate.plusDays(dayOffset);
            String dateStr = date.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
            Double v = hourlyTotal.getOrDefault(dateStr, 0.0);
            v = NumberUtil.round(v, 2);
            results.add(Map.of(dateStr, v));
        }
    }

    return results;
}

`

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions