Skip to content

Commit

Permalink
Improve Time-Series Bucketing Scalability (#1137)
Browse files Browse the repository at this point in the history
Introduce customizable bucketMaxSpan and bucketRounding options for Time-Series collections, providing users with more control over bucketing behaviours. 

JAVA-4888
  • Loading branch information
vbabanin authored Jun 15, 2023
1 parent 2468e98 commit ce6c119
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public enum TimeSeriesGranularity {
/**
* Seconds-level granularity.
* <p>
* If granularity of a time-series collection is unspecified, this is the default value.
* This is the default value.
* </p>
*/
SECONDS,
Expand Down
105 changes: 104 additions & 1 deletion driver-core/src/main/com/mongodb/client/model/TimeSeriesOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

import com.mongodb.lang.Nullable;

import java.util.concurrent.TimeUnit;

import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.assertions.Assertions.isTrueArgument;
import static com.mongodb.assertions.Assertions.notNull;

/**
Expand All @@ -31,6 +35,8 @@ public final class TimeSeriesOptions {
private final String timeField;
private String metaField;
private TimeSeriesGranularity granularity;
private Long bucketMaxSpanSeconds;
private Long bucketRoundingSeconds;

/**
* Construct a new instance.
Expand Down Expand Up @@ -92,24 +98,121 @@ public TimeSeriesGranularity getGranularity() {
/**
* Sets the granularity of the time-series data.
* <p>
* The default value is {@link TimeSeriesGranularity#SECONDS}.
* The default value is {@link TimeSeriesGranularity#SECONDS} if neither {@link #bucketMaxSpan(Long, TimeUnit)} nor
* {@link #bucketRounding(Long, TimeUnit)} is set. If any of these bucketing options are set, the granularity parameter cannot be set.
* </p>
*
* @param granularity the time-series granularity
* @return this
* @see #getGranularity()
*/
public TimeSeriesOptions granularity(@Nullable final TimeSeriesGranularity granularity) {
isTrue("granularity is not allowed when bucketMaxSpan is set", bucketMaxSpanSeconds == null);
isTrue("granularity is not allowed when bucketRounding is set", bucketRoundingSeconds == null);
this.granularity = granularity;
return this;
}

/**
* Returns the maximum time span between measurements in a bucket.
*
* @param timeUnit the time unit.
* @return time span between measurements, or {@code null} if not set.
* @since 4.10
* @mongodb.server.release 6.3
* @see #bucketMaxSpan(Long, TimeUnit)
*/
@Nullable
public Long getBucketMaxSpan(final TimeUnit timeUnit) {
notNull("timeUnit", timeUnit);
if (bucketMaxSpanSeconds == null) {
return null;
}
return timeUnit.convert(bucketMaxSpanSeconds, TimeUnit.SECONDS);
}

/**
* Sets the maximum time span between measurements in a bucket.
* <p>
* The value of {@code bucketMaxSpan} must be the same as {@link #bucketRounding(Long, TimeUnit)}, which also means that the options
* must either be both set or both unset. If you set the {@code bucketMaxSpan} parameter, you can't set the granularity parameter.
* </p>
*
* @param bucketMaxSpan time span between measurements. After conversion to seconds using {@link TimeUnit#convert(long, java.util.concurrent.TimeUnit)},
* the value must be &gt;= 1. {@code null} can be provided to unset any previously set value.
* @param timeUnit the time unit.
* @return this
* @since 4.10
* @mongodb.server.release 6.3
* @see #getBucketMaxSpan(TimeUnit)
*/
public TimeSeriesOptions bucketMaxSpan(@Nullable final Long bucketMaxSpan, final TimeUnit timeUnit) {
notNull("timeUnit", timeUnit);
if (bucketMaxSpan == null) {
this.bucketMaxSpanSeconds = null;
} else {
isTrue("bucketMaxSpan is not allowed when granularity is set", granularity == null);
long seconds = TimeUnit.SECONDS.convert(bucketMaxSpan, timeUnit);
isTrueArgument("bucketMaxSpan, after conversion to seconds, must be >= 1", seconds > 0);
this.bucketMaxSpanSeconds = seconds;
}
return this;
}

/**
* Returns the time interval that determines the starting timestamp for a new bucket.
*
* @param timeUnit the time unit.
* @return the time interval, or {@code null} if not set.
* @since 4.10
* @mongodb.server.release 6.3
* @see #bucketRounding(Long, TimeUnit)
*/
@Nullable
public Long getBucketRounding(final TimeUnit timeUnit) {
notNull("timeUnit", timeUnit);
if (bucketRoundingSeconds == null) {
return null;
}
return timeUnit.convert(bucketRoundingSeconds, TimeUnit.SECONDS);
}

/**
* Specifies the time interval that determines the starting timestamp for a new bucket.
* <p>
* The value of {@code bucketRounding} must be the same as {@link #bucketMaxSpan(Long, TimeUnit)}, which also means that the options
* must either be both set or both unset. If you set the {@code bucketRounding} parameter, you can't set the granularity parameter.
* </p>
*
* @param bucketRounding time interval. After conversion to seconds using {@link TimeUnit#convert(long, java.util.concurrent.TimeUnit)},
* the value must be &gt;= 1. {@code null} can be provided to unset any previously set value.
* @param timeUnit the time unit.
* @return this
* @since 4.10
* @mongodb.server.release 6.3
* @see #getBucketRounding(TimeUnit)
*/
public TimeSeriesOptions bucketRounding(@Nullable final Long bucketRounding, final TimeUnit timeUnit) {
notNull("timeUnit", timeUnit);
if (bucketRounding == null) {
this.bucketRoundingSeconds = null;
} else {
isTrue("bucketRounding is not allowed when granularity is set", granularity == null);
long seconds = TimeUnit.SECONDS.convert(bucketRounding, timeUnit);
isTrueArgument("bucketRounding, after conversion to seconds, must be >= 1", seconds > 0);
this.bucketRoundingSeconds = seconds;
}
return this;
}

@Override
public String toString() {
return "TimeSeriesOptions{"
+ "timeField='" + timeField + '\''
+ ", metaField='" + metaField + '\''
+ ", granularity=" + granularity
+ ", bucketMaxSpanSeconds=" + bucketMaxSpanSeconds
+ ", bucketRoundingSeconds=" + bucketRoundingSeconds
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@
import org.bson.BsonArray;
import org.bson.BsonBoolean;
import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.BsonString;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static com.mongodb.assertions.Assertions.notNull;
Expand Down Expand Up @@ -347,6 +349,14 @@ private BsonDocument getCreateCollectionCommand() {
if (granularity != null) {
timeSeriesDocument.put("granularity", new BsonString(getGranularityAsString(granularity)));
}
Long bucketMaxSpan = timeSeriesOptions.getBucketMaxSpan(TimeUnit.SECONDS);
if (bucketMaxSpan != null){
timeSeriesDocument.put("bucketMaxSpanSeconds", new BsonInt64(bucketMaxSpan));
}
Long bucketRounding = timeSeriesOptions.getBucketRounding(TimeUnit.SECONDS);
if (bucketRounding != null){
timeSeriesDocument.put("bucketRoundingSeconds", new BsonInt64(bucketRounding));
}
document.put("timeseries", timeSeriesDocument);
}
if (changeStreamPreAndPostImagesOptions != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.client.model;

import com.mongodb.lang.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.params.provider.Arguments.arguments;

class TimeSeriesOptionsTest {

private TimeSeriesOptions timeSeriesOptions;

@BeforeEach
void setUp() {
timeSeriesOptions = new TimeSeriesOptions("test");
}

@Test
void shouldThrowErrorWhenGranularityIsAlreadySet() {
//given
timeSeriesOptions.granularity(TimeSeriesGranularity.SECONDS);

//when & then
assertAll(
() -> assertThrows(IllegalStateException.class, () -> timeSeriesOptions.bucketRounding(1L, TimeUnit.SECONDS)),
() -> assertThrows(IllegalStateException.class, () -> timeSeriesOptions.bucketMaxSpan(1L, TimeUnit.SECONDS))
);
}

@Test
void shouldThrowErrorWhenGetWithNullParameter() {
assertAll(
() -> assertThrows(IllegalArgumentException.class, () -> timeSeriesOptions.getBucketMaxSpan(null)),
() -> assertThrows(IllegalArgumentException.class, () -> timeSeriesOptions.getBucketRounding(null))
);
}

@ParameterizedTest
@MethodSource("args")
void shouldThrowErrorWhenInvalidArgumentProvided(@Nullable final Long valueToSet, @Nullable final TimeUnit timeUnit) {
assertAll(
() -> assertThrows(IllegalArgumentException.class, () -> timeSeriesOptions.bucketRounding(valueToSet, timeUnit)),
() -> assertThrows(IllegalArgumentException.class, () -> timeSeriesOptions.bucketMaxSpan(valueToSet, timeUnit))
);
}

private static Stream<Arguments> args() {
return Stream.of(
arguments(1L, null),
arguments(null, null),
arguments(1L, TimeUnit.MILLISECONDS)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,71 @@
]
}
]
},
{
"description": "createCollection with bucketing options",
"runOnRequirements": [
{
"minServerVersion": "6.3"
}
],
"operations": [
{
"name": "dropCollection",
"object": "database0",
"arguments": {
"collection": "test"
}
},
{
"name": "createCollection",
"object": "database0",
"arguments": {
"collection": "test",
"timeseries": {
"timeField": "time",
"bucketMaxSpanSeconds": 3600,
"bucketRoundingSeconds": 3600
}
}
},
{
"name": "assertCollectionExists",
"object": "testRunner",
"arguments": {
"databaseName": "ts-tests",
"collectionName": "test"
}
}
],
"expectEvents": [
{
"client": "client0",
"events": [
{
"commandStartedEvent": {
"command": {
"drop": "test"
},
"databaseName": "ts-tests"
}
},
{
"commandStartedEvent": {
"command": {
"create": "test",
"timeseries": {
"timeField": "time",
"bucketMaxSpanSeconds": 3600,
"bucketRoundingSeconds": 3600
}
},
"databaseName": "ts-tests"
}
}
]
}
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class ExtensionMethodsTest {
"FindOneAndReplaceOptions",
"FindOneAndUpdateOptions",
"IndexOptions",
"TransactionOptions")
"TransactionOptions",
"TimeSeriesOptions")

ClassGraph().enableClassInfo().enableMethodInfo().acceptPackages("com.mongodb").scan().use { scanResult ->
val optionsClassesWithTimeUnit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class ExtensionMethodsTest {
"FindOneAndReplaceOptions",
"FindOneAndUpdateOptions",
"IndexOptions",
"TransactionOptions")
"TransactionOptions",
"TimeSeriesOptions")

ClassGraph().enableClassInfo().enableMethodInfo().acceptPackages("com.mongodb").scan().use { scanResult ->
val optionsClassesWithTimeUnit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,12 @@ private TimeSeriesOptions createTimeSeriesOptions(final BsonDocument timeSeriesD
case "metaField":
options.metaField(cur.getValue().asString().getValue());
break;
case "bucketMaxSpanSeconds":
options.bucketMaxSpan(cur.getValue().asInt32().longValue(), TimeUnit.SECONDS);
break;
case "bucketRoundingSeconds":
options.bucketRounding(cur.getValue().asInt32().longValue(), TimeUnit.SECONDS);
break;
case "granularity":
options.granularity(createTimeSeriesGranularity(cur.getValue().asString().getValue()));
break;
Expand Down

0 comments on commit ce6c119

Please sign in to comment.