Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/detekt/baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<ID>MaxLineLength:MapReduceIterable.kt$MapReduceIterable$*</ID>
<ID>SwallowedException:MockitoHelper.kt$MockitoHelper.DeepReflectionEqMatcher$e: Throwable</ID>
<ID>TooManyFunctions:ClientSession.kt$ClientSession : jClientSession</ID>
<ID>TooManyFunctions:FindFlow.kt$FindFlow&lt;T : Any> : Flow</ID>
<ID>TooManyFunctions:FindFlow.kt$FindFlow&lt;T : Any> : MongoAbstractFlow</ID>
<ID>TooManyFunctions:FindIterable.kt$FindIterable&lt;T : Any> : MongoIterable</ID>
<ID>TooManyFunctions:MongoCollection.kt$MongoCollection&lt;T : Any></ID>
<ID>TooManyFunctions:MongoDatabase.kt$MongoDatabase</ID>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ import com.mongodb.ExplainVerbosity
import com.mongodb.client.model.Collation
import com.mongodb.reactivestreams.client.AggregatePublisher
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingle
import org.bson.BsonValue
Expand All @@ -34,7 +31,7 @@ import org.bson.conversions.Bson
* @param T The type of the result.
* @see [Aggregation command](https://www.mongodb.com/docs/manual/reference/command/aggregate)
*/
public class AggregateFlow<T : Any>(private val wrapped: AggregatePublisher<T>) : Flow<T> {
public class AggregateFlow<T : Any>(private val wrapped: AggregatePublisher<T>) : MongoAbstractFlow<T>(wrapped) {

/**
* Sets the number of documents to return per batch.
Expand Down Expand Up @@ -167,7 +164,6 @@ public class AggregateFlow<T : Any>(private val wrapped: AggregatePublisher<T>)
/**
* Explain the execution plan for this operation with the given verbosity level
*
* @param R the type of the document class
* @param verbosity the verbosity of the explanation
* @return the execution plan
* @see [Explain command](https://www.mongodb.com/docs/manual/reference/command/explain/)
Expand Down Expand Up @@ -198,6 +194,4 @@ public class AggregateFlow<T : Any>(private val wrapped: AggregatePublisher<T>)
*/
public suspend inline fun <reified R : Any> explain(verbosity: ExplainVerbosity? = null): R =
explain(R::class.java, verbosity)

public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import com.mongodb.client.model.changestream.FullDocumentBeforeChange
import com.mongodb.reactivestreams.client.ChangeStreamPublisher
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.reactive.asFlow
import org.bson.BsonDocument
import org.bson.BsonTimestamp
Expand All @@ -37,7 +36,8 @@ import org.bson.BsonValue
*
* @param T The type of the result.
*/
public class ChangeStreamFlow<T : Any>(private val wrapped: ChangeStreamPublisher<T>) : Flow<ChangeStreamDocument<T>> {
public class ChangeStreamFlow<T : Any>(private val wrapped: ChangeStreamPublisher<T>) :
MongoAbstractFlow<ChangeStreamDocument<T>>(wrapped) {

/**
* Sets the fullDocument value.
Expand Down Expand Up @@ -173,6 +173,4 @@ public class ChangeStreamFlow<T : Any>(private val wrapped: ChangeStreamPublishe
public fun showExpandedEvents(showExpandedEvents: Boolean): ChangeStreamFlow<T> = apply {
wrapped.showExpandedEvents(showExpandedEvents)
}
public override suspend fun collect(collector: FlowCollector<ChangeStreamDocument<T>>): Unit =
wrapped.asFlow().collect(collector)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ package com.mongodb.kotlin.client.coroutine
import com.mongodb.client.model.Collation
import com.mongodb.reactivestreams.client.DistinctPublisher
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.reactive.asFlow
import org.bson.BsonValue
import org.bson.conversions.Bson

Expand All @@ -30,7 +27,7 @@ import org.bson.conversions.Bson
* @param T The type of the result.
* @see [Distinct command](https://www.mongodb.com/docs/manual/reference/command/distinct/)
*/
public class DistinctFlow<T : Any>(private val wrapped: DistinctPublisher<T>) : Flow<T> {
public class DistinctFlow<T : Any>(private val wrapped: DistinctPublisher<T>) : MongoAbstractFlow<T>(wrapped) {

/**
* Sets the number of documents to return per batch.
Expand Down Expand Up @@ -86,6 +83,4 @@ public class DistinctFlow<T : Any>(private val wrapped: DistinctPublisher<T>) :
* @return this
*/
public fun comment(comment: BsonValue?): DistinctFlow<T> = apply { wrapped.comment(comment) }

public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ import com.mongodb.ExplainVerbosity
import com.mongodb.client.model.Collation
import com.mongodb.reactivestreams.client.FindPublisher
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitSingle
import org.bson.BsonValue
import org.bson.Document
Expand All @@ -34,7 +31,7 @@ import org.bson.conversions.Bson
* @param T The type of the result.
* @see [Collection filter](https://www.mongodb.com/docs/manual/reference/method/db.collection.find/)
*/
public class FindFlow<T : Any>(private val wrapped: FindPublisher<T>) : Flow<T> {
public class FindFlow<T : Any>(private val wrapped: FindPublisher<T>) : MongoAbstractFlow<T>(wrapped) {

/**
* Sets the number of documents to return per batch.
Expand Down Expand Up @@ -292,6 +289,4 @@ public class FindFlow<T : Any>(private val wrapped: FindPublisher<T>) : Flow<T>
*/
public suspend inline fun <reified R : Any> explain(verbosity: ExplainVerbosity? = null): R =
explain(R::class.java, verbosity)

public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ package com.mongodb.kotlin.client.coroutine

import com.mongodb.reactivestreams.client.ListCollectionsPublisher
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.reactive.asFlow
import org.bson.BsonValue
import org.bson.conversions.Bson

Expand All @@ -29,7 +26,8 @@ import org.bson.conversions.Bson
* @param T The type of the result.
* @see [List collections](https://www.mongodb.com/docs/manual/reference/command/listCollections/)
*/
public class ListCollectionsFlow<T : Any>(private val wrapped: ListCollectionsPublisher<T>) : Flow<T> {
public class ListCollectionsFlow<T : Any>(private val wrapped: ListCollectionsPublisher<T>) :
MongoAbstractFlow<T>(wrapped) {
/**
* Sets the maximum execution time on the server for this operation.
*
Expand Down Expand Up @@ -74,6 +72,4 @@ public class ListCollectionsFlow<T : Any>(private val wrapped: ListCollectionsPu
* @return this
*/
public fun comment(comment: BsonValue?): ListCollectionsFlow<T> = apply { wrapped.comment(comment) }

public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ package com.mongodb.kotlin.client.coroutine

import com.mongodb.reactivestreams.client.ListDatabasesPublisher
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.reactive.asFlow
import org.bson.BsonValue
import org.bson.conversions.Bson

Expand All @@ -29,7 +26,8 @@ import org.bson.conversions.Bson
* @param T The type of the result.
* @see [List databases](https://www.mongodb.com/docs/manual/reference/command/listDatabases/)
*/
public class ListDatabasesFlow<T : Any>(private val wrapped: ListDatabasesPublisher<T>) : Flow<T> {
public class ListDatabasesFlow<T : Any>(private val wrapped: ListDatabasesPublisher<T>) :
MongoAbstractFlow<T>(wrapped) {
/**
* Sets the maximum execution time on the server for this operation.
*
Expand Down Expand Up @@ -93,6 +91,4 @@ public class ListDatabasesFlow<T : Any>(private val wrapped: ListDatabasesPublis
* @return this
*/
public fun comment(comment: BsonValue?): ListDatabasesFlow<T> = apply { wrapped.comment(comment) }

public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ package com.mongodb.kotlin.client.coroutine

import com.mongodb.reactivestreams.client.ListIndexesPublisher
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.reactive.asFlow
import org.bson.BsonValue

/**
Expand All @@ -28,7 +25,7 @@ import org.bson.BsonValue
* @param T The type of the result.
* @see [List indexes](https://www.mongodb.com/docs/manual/reference/command/listIndexes/)
*/
public class ListIndexesFlow<T : Any>(private val wrapped: ListIndexesPublisher<T>) : Flow<T> {
public class ListIndexesFlow<T : Any>(private val wrapped: ListIndexesPublisher<T>) : MongoAbstractFlow<T>(wrapped) {
/**
* Sets the maximum execution time on the server for this operation.
*
Expand Down Expand Up @@ -65,6 +62,4 @@ public class ListIndexesFlow<T : Any>(private val wrapped: ListIndexesPublisher<
* @return this
*/
public fun comment(comment: BsonValue?): ListIndexesFlow<T> = apply { wrapped.comment(comment) }

public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ import com.mongodb.client.model.Collation
import com.mongodb.client.model.MapReduceAction
import com.mongodb.reactivestreams.client.MapReducePublisher
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import org.bson.conversions.Bson

Expand All @@ -36,7 +33,7 @@ import org.bson.conversions.Bson
* @see [Map Reduce](https://www.mongodb.com/docs/manual/reference/command/mapReduce/)
*/
@Deprecated("Map Reduce has been deprecated. Use Aggregation instead", replaceWith = ReplaceWith(""))
public class MapReduceFlow<T : Any>(private val wrapped: MapReducePublisher<T>) : Flow<T> {
public class MapReduceFlow<T : Any>(private val wrapped: MapReducePublisher<T>) : MongoAbstractFlow<T>(wrapped) {
/**
* Sets the number of documents to return per batch.
*
Expand Down Expand Up @@ -209,6 +206,4 @@ public class MapReduceFlow<T : Any>(private val wrapped: MapReducePublisher<T>)
* @return this
*/
public fun collation(collation: Collation?): MapReduceFlow<T> = apply { wrapped.collation(collation) }

public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mongodb.kotlin.client.coroutine

import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.AbstractFlow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.reactive.asFlow
import org.reactivestreams.Publisher

/**
* The Mongo Abstract Flow implementation
*
* @param T The type of the result.
* @param wrapped the underlying publisher
*/
@OptIn(FlowPreview::class)
public sealed class MongoAbstractFlow<T : Any>(private val wrapped: Publisher<T>) : AbstractFlow<T>() {

override suspend fun collectSafely(collector: FlowCollector<T>) {
wrapped.asFlow().collect(collector)
}
}