Skip to content

Add $documents stage #1061

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 56 additions & 14 deletions driver-core/src/main/com/mongodb/client/model/Aggregates.java
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,14 @@ public static Bson lookup(final String from, final String localField, final Stri
}

/**
* Creates a $lookup pipeline stage, joining the current collection with the one specified in from using the given pipeline
*
* @param from the name of the collection in the same database to perform the join with.
* Creates a $lookup pipeline stage, joining the current collection with the
* one specified in from using the given pipeline. If the first stage in the
* pipeline is a {@link Aggregates#documents(List) $documents} stage, then
* the {@code from} collection is ignored.
*
* @param from the name of the collection in the same database to
* perform the join with. May be {$code null} if the
* first pipeline stage is $documents.
* @param pipeline the pipeline to run on the joined collection.
* @param as the name of the new array field to add to the input documents.
* @return the $lookup pipeline stage
Expand All @@ -310,15 +315,20 @@ public static Bson lookup(final String from, final String localField, final Stri
* @since 3.7
*
*/
public static Bson lookup(final String from, final List<? extends Bson> pipeline, final String as) {
public static Bson lookup(@Nullable final String from, final List<? extends Bson> pipeline, final String as) {
return lookup(from, null, pipeline, as);
}

/**
* Creates a $lookup pipeline stage, joining the current collection with the one specified in from using the given pipeline
* Creates a $lookup pipeline stage, joining the current collection with the
* one specified in from using the given pipeline. If the first stage in the
* pipeline is a {@link Aggregates#documents(List) $documents} stage, then
* the {@code from} collection is ignored.
*
* @param <TExpression> the Variable value expression type
* @param from the name of the collection in the same database to perform the join with.
* @param from the name of the collection in the same database to
* perform the join with. May be {$code null} if the
* first pipeline stage is $documents.
* @param let the variables to use in the pipeline field stages.
* @param pipeline the pipeline to run on the joined collection.
* @param as the name of the new array field to add to the input documents.
Expand All @@ -327,7 +337,7 @@ public static Bson lookup(final String from, final List<? extends Bson> pipeline
* @mongodb.server.release 3.6
* @since 3.7
*/
public static <TExpression> Bson lookup(final String from, @Nullable final List<Variable<TExpression>> let,
public static <TExpression> Bson lookup(@Nullable final String from, @Nullable final List<Variable<TExpression>> let,
final List<? extends Bson> pipeline, final String as) {
return new LookupStage<>(from, let, pipeline, as);
}
Expand Down Expand Up @@ -928,7 +938,7 @@ public static Bson searchMeta(final SearchCollector collector, final SearchOptio
*
* @param fields the fields to exclude. May use dot notation.
* @return the $unset pipeline stage
* @mongodb.driver.manual reference/operator/aggregation/project/ $unset
* @mongodb.driver.manual reference/operator/aggregation/unset/ $unset
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catches

* @mongodb.server.release 4.2
* @since 4.8
*/
Expand All @@ -941,7 +951,7 @@ public static Bson unset(final String... fields) {
*
* @param fields the fields to exclude. May use dot notation.
* @return the $unset pipeline stage
* @mongodb.driver.manual reference/operator/aggregation/project/ $unset
* @mongodb.driver.manual reference/operator/aggregation/unset/ $unset
* @mongodb.server.release 4.2
* @since 4.8
*/
Expand All @@ -962,7 +972,7 @@ public static Bson unset(final List<String> fields) {
* To specify a field within an embedded document, use dot notation.
* @param options {@link GeoNearOptions}
* @return the $geoNear pipeline stage
* @mongodb.driver.manual reference/operator/aggregation/project/ $geoNear
* @mongodb.driver.manual reference/operator/aggregation/geoNear/ $geoNear
* @since 4.8
*/
public static Bson geoNear(
Expand Down Expand Up @@ -1012,7 +1022,7 @@ public String toString() {
* @param distanceField The output field that contains the calculated distance.
* To specify a field within an embedded document, use dot notation.
* @return the $geoNear pipeline stage
* @mongodb.driver.manual reference/operator/aggregation/project/ $geoNear
* @mongodb.driver.manual reference/operator/aggregation/geoNear/ $geoNear
* @since 4.8
*/
public static Bson geoNear(
Expand All @@ -1021,6 +1031,33 @@ public static Bson geoNear(
return geoNear(near, distanceField, geoNearOptions());
}

/**
* Creates a $documents pipeline stage.
*
* @param documents the documents.
* @return the $documents pipeline stage.
* @mongodb.driver.manual reference/operator/aggregation/documents/ $documents
* @mongodb.server.release 5.1
* @since 4.9
*/
public static Bson documents(final List<? extends Bson> documents) {
notNull("documents", documents);
return new Bson() {
@Override
public <TDocument> BsonDocument toBsonDocument(final Class<TDocument> documentClass, final CodecRegistry codecRegistry) {
BsonDocumentWriter writer = new BsonDocumentWriter(new BsonDocument());
writer.writeStartDocument();
writer.writeStartArray("$documents");
for (Bson bson : documents) {
BuildersHelper.encodeValue(writer, bson, codecRegistry);
}
writer.writeEndArray();
writer.writeEndDocument();
return writer.getDocument();
}
};
}

static void writeBucketOutput(final CodecRegistry codecRegistry, final BsonDocumentWriter writer,
@Nullable final List<BsonField> output) {
if (output != null) {
Expand Down Expand Up @@ -1242,8 +1279,11 @@ private static final class LookupStage<TExpression> implements Bson {
private final List<? extends Bson> pipeline;
private final String as;

private LookupStage(final String from, @Nullable final List<Variable<TExpression>> let, final List<? extends Bson> pipeline,
final String as) {
private LookupStage(
@Nullable final String from,
@Nullable final List<Variable<TExpression>> let,
final List<? extends Bson> pipeline,
final String as) {
this.from = from;
this.let = let;
this.pipeline = pipeline;
Expand All @@ -1258,7 +1298,9 @@ public <TDocument> BsonDocument toBsonDocument(final Class<TDocument> tDocumentC

writer.writeStartDocument("$lookup");

writer.writeString("from", from);
if (from != null) {
writer.writeString("from", from);
}

if (let != null) {
writer.writeStartDocument("let");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.bson.BsonDocument;
import org.bson.Document;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

Expand Down Expand Up @@ -160,4 +161,53 @@ public void testGeoNear() {
+ " }\n"
+ "}]");
}

@Test
public void testDocuments() {
assumeTrue(serverVersionAtLeast(5, 1));
Bson stage = Aggregates.documents(asList(
Document.parse("{a: 1, b: {$add: [1, 1]} }"),
BsonDocument.parse("{a: 3, b: 4}")));
assertPipeline(
"{$documents: [{a: 1, b: {$add: [1, 1]}}, {a: 3, b: 4}]}",
stage);

List<Bson> pipeline = Arrays.asList(stage);
getCollectionHelper().aggregateDb(pipeline);

assertEquals(
parseToList("[{a: 1, b: 2}, {a: 3, b: 4}]"),
getCollectionHelper().aggregateDb(pipeline));

// accepts lists of Documents and BsonDocuments
List<BsonDocument> documents = Arrays.asList(BsonDocument.parse("{a: 1, b: 2}"));
assertPipeline("{$documents: [{a: 1, b: 2}]}", Aggregates.documents(documents));
List<BsonDocument> bsonDocuments = Arrays.asList(BsonDocument.parse("{a: 1, b: 2}"));
assertPipeline("{$documents: [{a: 1, b: 2}]}", Aggregates.documents(bsonDocuments));
}

@Test
public void testDocumentsLookup() {
assumeTrue(serverVersionAtLeast(5, 1));

getCollectionHelper().insertDocuments("[{_id: 1, a: 8}, {_id: 2, a: 9}]");
Bson documentsStage = Aggregates.documents(asList(Document.parse("{a: 5}")));

Bson lookupStage = Aggregates.lookup("ignored", Arrays.asList(documentsStage), "added");
assertPipeline(
"{'$lookup': {'from': 'ignored', 'pipeline': [{'$documents': [{'a': 5}]}], 'as': 'added'}}",
lookupStage);
assertEquals(
parseToList("[{_id:1, a:8, added: [{a: 5}]}, {_id:2, a:9, added: [{a: 5}]}]"),
getCollectionHelper().aggregate(Arrays.asList(lookupStage)));

// null variant
Bson lookupStageNull = Aggregates.lookup(null, Arrays.asList(documentsStage), "added");
assertPipeline(
"{'$lookup': {'pipeline': [{'$documents': [{'a': 5}]}], 'as': 'added'}}",
lookupStageNull);
assertEquals(
parseToList("[{_id:1, a:8, added: [{a: 5}]}, {_id:2, a:9, added: [{a: 5}]}]"),
getCollectionHelper().aggregate(Arrays.asList(lookupStageNull)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.mongodb.internal.bulk.InsertRequest;
import com.mongodb.internal.bulk.UpdateRequest;
import com.mongodb.internal.bulk.WriteRequest;
import com.mongodb.internal.client.model.AggregationLevel;
import com.mongodb.internal.operation.AggregateOperation;
import com.mongodb.internal.operation.BatchCursor;
import com.mongodb.internal.operation.CommandReadOperation;
Expand Down Expand Up @@ -281,12 +282,20 @@ public List<T> aggregate(final List<Bson> pipeline) {
}

public <D> List<D> aggregate(final List<Bson> pipeline, final Decoder<D> decoder) {
List<BsonDocument> bsonDocumentPipeline = new ArrayList<>();
return aggregate(pipeline, decoder, AggregationLevel.COLLECTION);
}

public List<T> aggregateDb(final List<Bson> pipeline) {
return aggregate(pipeline, codec, AggregationLevel.DATABASE);
}

private <D> List<D> aggregate(final List<Bson> pipeline, final Decoder<D> decoder, final AggregationLevel level) {
List<BsonDocument> bsonDocumentPipeline = new ArrayList<BsonDocument>();
for (Bson cur : pipeline) {
bsonDocumentPipeline.add(cur.toBsonDocument(Document.class, registry));
}
BatchCursor<D> cursor = new AggregateOperation<>(namespace, bsonDocumentPipeline, decoder)
.execute(getBinding());
BatchCursor<D> cursor = new AggregateOperation<D>(namespace, bsonDocumentPipeline, decoder, level)
.execute(getBinding());
List<D> results = new ArrayList<>();
while (cursor.hasNext()) {
results.addAll(cursor.next());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,14 @@ object Aggregates {
JAggregates.lookup(from, localField, foreignField, as)

/**
* Creates a `\$lookup` pipeline stage, joining the current collection with the one specified in from using the given pipeline
*
* @param from the name of the collection in the same database to perform the join with.
* Creates a `\$lookup` pipeline stage, joining the current collection with
* the one specified in from using the given pipeline. If the first stage in
* the pipeline is a `\$documents` stage, then the "from" collection is
* ignored.
*
* @param from the name of the collection in the same database to
* perform the join with. May be null if the
* first pipeline stage is `\$documents`.
* @param pipeline the pipeline to run on the joined collection.
* @param as the name of the new array field to add to the input documents.
* @return the `\$lookup` pipeline stage:
Expand All @@ -338,9 +343,14 @@ object Aggregates {
JAggregates.lookup(from, pipeline.asJava, as)

/**
* Creates a `\$lookup` pipeline stage, joining the current collection with the one specified in from using the given pipeline
* Creates a `\$lookup` pipeline stage, joining the current collection with
* the one specified in from using the given pipeline. If the first stage in
* the pipeline is a `\$documents` stage, then the "from" collection is
* ignored.
*
* @param from the name of the collection in the same database to perform the join with.
* @param from the name of the collection in the same database to
* perform the join with. May be null if the
* first pipeline stage is `\$documents`.
* @param let the variables to use in the pipeline field stages.
* @param pipeline the pipeline to run on the joined collection.
* @param as the name of the new array field to add to the input documents.
Expand Down Expand Up @@ -746,4 +756,14 @@ object Aggregates {
*/
def geoNear(near: Point, distanceField: String): Bson =
JAggregates.geoNear(near, distanceField)

/**
* Creates a `\$documents` pipeline stage.
*
* @param documents the documents.
* @return the `\$documents` pipeline stage
* @see [[https://www.mongodb.com/docs/manual/reference/operator/aggregation/documents/ \$documents]]
* @since 4.9
*/
def documents(documents: Bson*): Bson = JAggregates.documents(documents.asJava)
}
Original file line number Diff line number Diff line change
Expand Up @@ -805,4 +805,15 @@ class AggregatesSpec extends BaseSpec {
|}""".stripMargin)
)
}

it should "render $documents" in {
toBson(
Aggregates.documents(
org.mongodb.scala.bson.BsonDocument("""{a: 1, b: {$add: [1, 1]} }"""),
Document("""{a: 3, b: 4}""")
)
) should equal(
Document("""{$documents: [{a: 1, b: {$add: [1, 1]}}, {a: 3, b: 4}]}""")
)
}
}