Skip to content

RUST-1604 Add custom bucketing fields to timeseries options #907

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions src/db/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,7 @@ pub struct CreateCollectionOptions {
/// Used to automatically delete documents in time series collections. See the [`create`
/// command documentation](https://www.mongodb.com/docs/manual/reference/command/create/) for more
/// information.
#[serde(
default,
deserialize_with = "serde_util::deserialize_duration_option_from_u64_seconds",
serialize_with = "serde_util::serialize_duration_option_as_int_secs"
)]
#[serde(default, with = "serde_util::duration_option_as_int_seconds")]
pub expire_after_seconds: Option<Duration>,

/// Options for supporting change stream pre- and post-images.
Expand Down Expand Up @@ -192,8 +188,10 @@ pub struct IndexOptionDefaults {
}

/// Specifies options for creating a timeseries collection.
#[skip_serializing_none]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, TypedBuilder)]
#[serde(rename_all = "camelCase")]
#[builder(field_defaults(default))]
#[non_exhaustive]
pub struct TimeseriesOptions {
/// Name of the top-level field to be used for time. Inserted documents must have this field,
Expand All @@ -208,6 +206,31 @@ pub struct TimeseriesOptions {
/// The units you'd use to describe the expected interval between subsequent measurements for a
/// time-series. Defaults to `TimeseriesGranularity::Seconds` if unset.
pub granularity: Option<TimeseriesGranularity>,

/// The maximum time between timestamps in the same bucket. This value must be between 1 and
/// 31,536,000 seconds. If this value is set, the same value should be set for
/// `bucket_rounding` and `granularity` should not be set.
///
/// This option is only available on MongoDB 6.3+.
#[serde(
default,
with = "serde_util::duration_option_as_int_seconds",
rename = "bucketMaxSpanSeconds"
)]
pub bucket_max_span: Option<Duration>,

/// The time interval that determines the starting timestamp for a new bucket. When a document
/// requires a new bucket, MongoDB rounds down the document's timestamp value by this interval
/// to set the minimum time for the bucket. If this value is set, the same value should be set
/// for `bucket_max_span` and `granularity` should not be set.
///
/// This option is only available on MongoDB 6.3+.
#[serde(
default,
with = "serde_util::duration_option_as_int_seconds",
rename = "bucketRoundingSeconds"
)]
pub bucket_rounding: Option<Duration>,
}

/// The units you'd use to describe the expected interval between subsequent measurements for a
Expand Down
3 changes: 1 addition & 2 deletions src/index/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ pub struct IndexOptions {
#[serde(
rename = "expireAfterSeconds",
default,
deserialize_with = "serde_util::deserialize_duration_option_from_u64_seconds",
serialize_with = "serde_util::serialize_duration_option_as_int_secs"
with = "serde_util::duration_option_as_int_seconds"
)]
pub expire_after: Option<Duration>,

Expand Down
5 changes: 2 additions & 3 deletions src/selection_criteria.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,7 @@ pub struct ReadPreferenceOptions {
#[serde(
rename = "maxStalenessSeconds",
default,
deserialize_with = "serde_util::deserialize_duration_option_from_u64_seconds",
serialize_with = "serde_util::serialize_duration_option_as_int_secs"
with = "serde_util::duration_option_as_int_seconds"
)]
pub max_staleness: Option<Duration>,

Expand Down Expand Up @@ -390,7 +389,7 @@ impl ReadPreference {

readpreferencetags: Option<&'a Vec<HashMap<String, String>>>,

#[serde(serialize_with = "serde_util::serialize_duration_option_as_int_secs")]
#[serde(serialize_with = "serde_util::duration_option_as_int_seconds::serialize")]
maxstalenessseconds: Option<Duration>,
}

Expand Down
50 changes: 27 additions & 23 deletions src/serde_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,42 @@ use crate::{
error::{Error, Result},
};

pub(crate) fn serialize_duration_option_as_int_millis<S: Serializer>(
val: &Option<Duration>,
serializer: S,
) -> std::result::Result<S::Ok, S::Error> {
match val {
Some(duration) if duration.as_millis() > i32::MAX as u128 => {
serializer.serialize_i64(duration.as_millis() as i64)
pub(crate) mod duration_option_as_int_seconds {
use super::*;

pub(crate) fn serialize<S: Serializer>(
val: &Option<Duration>,
serializer: S,
) -> std::result::Result<S::Ok, S::Error> {
match val {
Some(duration) if duration.as_secs() > i32::MAX as u64 => {
serializer.serialize_i64(duration.as_secs() as i64)
}
Some(duration) => serializer.serialize_i32(duration.as_secs() as i32),
None => serializer.serialize_none(),
}
Some(duration) => serializer.serialize_i32(duration.as_millis() as i32),
None => serializer.serialize_none(),
}

pub(crate) fn deserialize<'de, D>(
deserializer: D,
) -> std::result::Result<Option<Duration>, D::Error>
where
D: Deserializer<'de>,
{
let millis = Option::<u64>::deserialize(deserializer)?;
Ok(millis.map(Duration::from_secs))
}
}

pub(crate) fn serialize_duration_option_as_int_secs<S: Serializer>(
pub(crate) fn serialize_duration_option_as_int_millis<S: Serializer>(
val: &Option<Duration>,
serializer: S,
) -> std::result::Result<S::Ok, S::Error> {
match val {
Some(duration) if duration.as_secs() > i32::MAX as u64 => {
serializer.serialize_i64(duration.as_secs() as i64)
Some(duration) if duration.as_millis() > i32::MAX as u128 => {
serializer.serialize_i64(duration.as_millis() as i64)
}
Some(duration) => serializer.serialize_i32(duration.as_secs() as i32),
Some(duration) => serializer.serialize_i32(duration.as_millis() as i32),
None => serializer.serialize_none(),
}
}
Expand All @@ -44,16 +58,6 @@ where
Ok(millis.map(Duration::from_millis))
}

pub(crate) fn deserialize_duration_option_from_u64_seconds<'de, D>(
deserializer: D,
) -> std::result::Result<Option<Duration>, D::Error>
where
D: Deserializer<'de>,
{
let millis = Option::<u64>::deserialize(deserializer)?;
Ok(millis.map(Duration::from_secs))
}

#[allow(clippy::trivially_copy_pass_by_ref)]
pub(crate) fn serialize_u32_option_as_i32<S: Serializer>(
val: &Option<u32>,
Expand Down
2 changes: 1 addition & 1 deletion src/test/spec/collection_management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ async fn run_unified() {
let _guard = LOCK.run_exclusively().await;
run_unified_tests(&["collection-management"])
// The driver does not support modifyCollection.
.skip_files(&["modifyCollection-pre_and_post_images.json"])
.skip_files(&["modifyCollection-pre_and_post_images.json", "modifyCollection-errorResponse.json"])
.await;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
{
"description": "modifyCollection-errorResponse",
"schemaVersion": "1.12",
"createEntities": [
{
"client": {
"id": "client0",
"observeEvents": [
"commandStartedEvent"
]
}
},
{
"database": {
"id": "database0",
"client": "client0",
"databaseName": "collMod-tests"
}
},
{
"collection": {
"id": "collection0",
"database": "database0",
"collectionName": "test"
}
}
],
"initialData": [
{
"collectionName": "test",
"databaseName": "collMod-tests",
"documents": [
{
"_id": 1,
"x": 1
},
{
"_id": 2,
"x": 1
}
]
}
],
"tests": [
{
"description": "modifyCollection prepareUnique violations are accessible",
"runOnRequirements": [
{
"minServerVersion": "5.2"
}
],
"operations": [
{
"name": "createIndex",
"object": "collection0",
"arguments": {
"keys": {
"x": 1
}
}
},
{
"name": "modifyCollection",
"object": "database0",
"arguments": {
"collection": "test",
"index": {
"keyPattern": {
"x": 1
},
"prepareUnique": true
}
}
},
{
"name": "insertOne",
"object": "collection0",
"arguments": {
"document": {
"_id": 3,
"x": 1
}
},
"expectError": {
"errorCode": 11000
}
},
{
"name": "modifyCollection",
"object": "database0",
"arguments": {
"collection": "test",
"index": {
"keyPattern": {
"x": 1
},
"unique": true
}
},
"expectError": {
"isClientError": false,
"errorCode": 359,
"errorResponse": {
"violations": [
{
"ids": [
1,
2
]
}
]
}
}
}
]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
description: "modifyCollection-errorResponse"

schemaVersion: "1.12"

createEntities:
- client:
id: &client0 client0
observeEvents: [ commandStartedEvent ]
- database:
id: &database0 database0
client: *client0
databaseName: &database0Name collMod-tests
- collection:
id: &collection0 collection0
database: *database0
collectionName: &collection0Name test

initialData: &initialData
- collectionName: *collection0Name
databaseName: *database0Name
documents:
- { _id: 1, x: 1 }
- { _id: 2, x: 1 }

tests:
- description: "modifyCollection prepareUnique violations are accessible"
runOnRequirements:
- minServerVersion: "5.2" # SERVER-61158
operations:
- name: createIndex
object: *collection0
arguments:
keys: { x: 1 }
- name: modifyCollection
object: *database0
arguments:
collection: *collection0Name
index:
keyPattern: { x: 1 }
prepareUnique: true
- name: insertOne
object: *collection0
arguments:
document: { _id: 3, x: 1 }
expectError:
errorCode: 11000 # DuplicateKey
- name: modifyCollection
object: *database0
arguments:
collection: *collection0Name
index:
keyPattern: { x: 1 }
unique: true
expectError:
isClientError: false
errorCode: 359 # CannotConvertIndexToUnique
errorResponse:
violations:
- { ids: [ 1, 2 ] }
Loading