Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 70 additions & 42 deletions .evergreen/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,15 @@ functions:
${PREPARE_SHELL}
MONGODB_URI="${MONGODB_URI}" SCALA_VERSION="${SCALA}" SPARK_VERSION="${SPARK}" .evergreen/run-integration-tests.sh

"run sharded integration tests":
- command: shell.exec
type: test
params:
working_dir: "src"
script: |
${PREPARE_SHELL}
MONGODB_URI="${MONGODB_URI}" SCALA_VERSION="${SCALA}" SPARK_VERSION="${SPARK}" .evergreen/run-sharded-integration-tests.sh

"publish snapshots":
- command: shell.exec
type: test
Expand Down Expand Up @@ -321,34 +330,62 @@ post:
- func: "upload test results"
- func: "cleanup"

variables:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

variables ftw

- &run-on
run_on: "ubuntu2004-small"
- &exclude-spec
# Scala 2.12 only works with 3.1.3 spark
# Scala 2.13 doesn't work on 3.1.3 spark
exclude_spec:
- { scala: "2.12", spark: "3.2.4" }
- { scala: "2.12", spark: "3.3.4" }
- { scala: "2.12", spark: "3.4.2" }
- { scala: "2.12", spark: "3.5.1" }
- { scala: "2.13", spark: "3.1.3" }

tasks:
- name: "static-checks-task"
<<: *run-on
commands:
- func: "run static checks"

- name: "unit-test-task"
<<: *run-on
commands:
- func: "run unit tests"

- name: "integration-test-task"
<<: *run-on
commands:
- func: "bootstrap mongo-orchestration"
- func: "run integration tests"
- func: "stop mongo-orchestration"
- func: "upload test results"

- name: "sharded-integration-test-task"
<<: *run-on
commands:
- func: "bootstrap mongo-orchestration"
- func: "run sharded integration tests"
- func: "stop mongo-orchestration"
- func: "upload test results"

- name: "publish-snapshots-task"
<<: *run-on
depends_on:
- name: "static-checks-task"
variant: ".static-check"
- name: "unit-test-task"
variant: ".unit-test"
- name: "integration-test-task"
variant: ".integration-test"
- name: "sharded-integration-test-task"
variant: ".sharded-integration-test"
commands:
- func: "publish snapshots"

- name: "publish-release-task"
<<: *run-on
git_tag_only: true
commands:
- func: "publish release"
Expand All @@ -365,24 +402,14 @@ axes:
display_name: "4.4"
variables:
VERSION: "4.4"
- id: "5.0"
display_name: "5.0"
variables:
VERSION: "5.0"
- id: "6.0"
display_name: "6.0"
- id: "7.0"
display_name: "7.0"
variables:
VERSION: "6.0"
VERSION: "7.0"
- id: "latest"
display_name: "latest"
variables:
VERSION: "latest"
- id: "os"
display_name: "OS"
values:
- id: "ubuntu"
display_name: "Ubuntu 1804"
run_on: "ubuntu1804-test"

- id: "topology"
display_name: "Topology"
Expand All @@ -391,7 +418,7 @@ axes:
display_name: "Replica Set"
variables:
TOPOLOGY: "replica_set"
- id: "sharded-cluster"
- id: "sharded"
display_name: "Sharded Cluster"
variables:
TOPOLOGY: "sharded_cluster"
Expand All @@ -411,71 +438,72 @@ axes:
- id: "spark"
display_name: "Spark"
values:
- id: "3.1.2"
display_name: "Spark 3.1.2"
- id: "3.1.3"
display_name: "Spark 3.1.3"
variables:
SPARK: "3.1.3"
- id: "3.2.4"
display_name: "Spark 3.2.4"
variables:
SPARK: "3.2.4"
- id: "3.3.4"
display_name: "Spark 3.3.4"
variables:
SPARK: "3.3.4"
- id: "3.4.2"
display_name: "Spark 3.4.2"
variables:
SPARK: "3.1.2"
- id: "3.2.2"
display_name: "Spark 3.2.2"
SPARK: "3.4.2"
- id: "3.5.1"
display_name: "Spark 3.5.1"
variables:
SPARK: "3.2.2"
SPARK: "3.5.1"

buildvariants:

- matrix_name: "static-checks"
matrix_spec: { scala: "*", spark: "*" }
display_name: "Static checks: ${scala} ${spark}"
exclude_spec:
- {scala: "2.12", spark: "3.2.2"}
- {scala: "2.13", spark: "3.1.2"}
run_on:
- "ubuntu1804-test"
<<: *exclude-spec
tags: ["static-check"]
tasks:
- name: "static-checks-task"

- matrix_name: "unit-tests"
matrix_spec: { scala: "*", spark: "*" }
exclude_spec: {scala: "2.13", spark: "3.1.2"}
<<: *exclude-spec
display_name: "Units tests: ${scala} ${spark}"
tags: ["unit-test"]
run_on:
- "ubuntu1804-test"
tasks:
- name: "unit-test-task"

- matrix_name: "integration-tests-2-12"
matrix_spec: { scala: "2.12", spark: "*", version: ["6.0"], topology: "sharded-cluster"}
matrix_spec: { scala: "2.12", spark: "3.1.3", version: ["7.0"], topology: "replicaset"}
display_name: "Integration tests: ${scala} ${spark} ${version} ${topology}"
tags: ["integration-test"]
run_on:
- "ubuntu1804-test"
tasks:
- name: "integration-test-task"

- matrix_name: "integration-tests-2-13"
matrix_spec: { scala: "2.13", spark: "3.2.2", version: "*", topology: "*"}
exclude_spec:
- {scala: "*", spark: "*", version: ["latest"], topology: "replicaset"}
- {scala: "*", spark: "*", version: ["4.0", "4.4", "5.0"], topology: "sharded-cluster"}

matrix_spec: { scala: "2.13", spark: ["3.2.4", "3.5.1"], version: ["4.0", "7.0", "latest"], topology: "replicaset" }
display_name: "Integration tests: ${scala} ${spark} ${version} ${topology}"
tags: ["integration-test"]
run_on:
- "ubuntu1804-test"
tasks:
- name: "integration-test-task"

- matrix_name: "integration-tests-2-13-sharded"
matrix_spec: { scala: "2.13", spark: [ "3.2.4", "3.5.1" ], version: ["4.4", "7.0", "latest" ], topology: "sharded" }
display_name: "Integration tests: ${scala} ${spark} ${version} ${topology}"
tags: [ "sharded-integration-test" ]
tasks:
- name: "sharded-integration-test-task"

- name: "publish-snapshots"
display_name: "Publish Snapshots"
run_on:
- "ubuntu1804-test"
tasks:
- name: "publish-snapshots-task"

- name: "publish-release"
display_name: "Publish Release"
run_on:
- "ubuntu1804-test"
tasks:
- name: "publish-release-task"
24 changes: 24 additions & 0 deletions .evergreen/run-sharded-integration-tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/bin/bash

set -o xtrace # Write all commands first to stderr
set -o errexit # Exit the script with error if any of the commands fail

# Supported/used environment variables:
# MONGODB_URI Set the suggested connection MONGODB_URI (including credentials and topology info)
# SCALA_VERSION The Scala version to compile with
# SPARK_VERSION The spark version to test against
MONGODB_URI=${MONGODB_URI:-}
SCALA_VERSION=${SCALA_VERSION:-2.12}
SPARK_VERSION=${SPARK_VERSION:-3.1.2}

export JAVA_HOME="/opt/java/jdk11"

############################################
# Main Program #
############################################


echo "Running tests connecting to $MONGODB_URI on JDK${JAVA_VERSION}"

./gradlew -version
./gradlew -Dorg.mongodb.test.uri=${MONGODB_URI} --stacktrace --info integrationTest --tests "com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitionerTest" -DscalaVersion=$SCALA_VERSION -DsparkVersion=$SPARK_VERSION
4 changes: 2 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ repositories {
mavenCentral()
}

// Usage: ./gradlew -DscalaVersion=2.12 -DsparkVersion=3.1.2
// Usage: ./gradlew -DscalaVersion=2.12 -DsparkVersion=3.1.4
val scalaVersion = System.getProperty("scalaVersion", "2.13")
val sparkVersion = System.getProperty("sparkVersion", "3.2.2")
val sparkVersion = System.getProperty("sparkVersion", "3.5.1")

extra.apply {
set("annotationsVersion", "22.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,20 @@ void dropNamespaceTest() {
assertTrue(MONGO_CATALOG.dropNamespace(identifier.namespace()));
}

@Test
void dropNamespaceTestCascade() {
Identifier identifier = createIdentifier(DATABASE_NAME, COLLECTION_NAME);
assertThrows(
IllegalStateException.class,
() -> MONGO_CATALOG.dropNamespace(identifier.namespace(), true));

MONGO_CATALOG.initialize(identifier.namespace()[0], getConnectionProviderOptions());
assertFalse(MONGO_CATALOG.dropNamespace(identifier.namespace(), true));

createCollection(identifier);
assertTrue(MONGO_CATALOG.dropNamespace(identifier.namespace(), true));
}

@Test
void alterNamespaceTest() {
Identifier identifier = createIdentifier(DATABASE_NAME, COLLECTION_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.jetbrains.annotations.VisibleForTesting;

/** Spark Catalog methods for working with namespaces (databases) and tables (collections). */
public class MongoCatalog implements TableCatalog, SupportsNamespaces {
public class MongoCatalog implements TableCatalog, SupportsNamespaces, SupportsNamespacesAdapter {
private static final Bson NOT_SYSTEM_NAMESPACE =
Filters.not(Filters.regex("name", "^system\\..*"));
private static final Bson IS_COLLECTION =
Expand Down Expand Up @@ -178,12 +178,11 @@ public void alterNamespace(final String[] namespace, final NamespaceChange... ch
throw new UnsupportedOperationException("Altering databases is currently not supported");
}

/**
* Drop a database.
*
* @param namespace (database) a multi-part namespace
* @return true if the namespace (database) was dropped
*/
@Override
public boolean dropNamespace(final String[] namespace, final boolean cascade) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dropNamespace(namespace) was removed in 3.4
dropNamespace(namespace, cascade) was added in 3.4 - probably with a default value for cascade.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean by a default value for cascade.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala allows default values for parameters - so they may have just done that.

return dropNamespace(namespace);
}

@Override
public boolean dropNamespace(final String[] namespace) {
assertInitialized();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.mongodb.spark.sql.connector;

/**
* An adapter for SupportsNamespaces as the API for dropNamespace changed between versions
*/
interface SupportsNamespacesAdapter {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Declare both dropNamespace methods then both can be overridden

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's crazy that this works! Ingenious.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is clever💡


/**
* Drop a database.
*
* @param namespace (database) a multi-part namespace
* @return true if the namespace (database) was dropped
*/
boolean dropNamespace(String[] namespace);

/**
* Drop a namespace from the catalog with cascade mode
*
* @param namespace a multi-part namespace
* @param cascade ignored for mongodb
* @return true if the namespace was dropped
*/
boolean dropNamespace(String[] namespace, boolean cascade);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

/** Shared converter helper methods and statics */
public final class ConverterHelper {
static final SchemaToExpressionEncoderFunction SCHEMA_TO_EXPRESSION_ENCODER_FUNCTION =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make static the function that is used to convert a schema to an expression encoder.

new SchemaToExpressionEncoderFunction();
static final Codec<BsonValue> BSON_VALUE_CODEC = new BsonValueCodec();

static final JsonWriterSettings RELAXED_JSON_WRITER_SETTINGS = JsonWriterSettings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@

package com.mongodb.spark.sql.connector.schema;

import static com.mongodb.spark.sql.connector.schema.ConverterHelper.SCHEMA_TO_EXPRESSION_ENCODER_FUNCTION;

import java.io.Serializable;
import java.util.function.Function;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder$;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.types.StructType;
import scala.collection.immutable.Seq;

/**
* An InternalRow to Row function that uses a resolved and bound encoder for the given schema.
Expand All @@ -39,13 +37,8 @@ final class InternalRowToRowFunction implements Function<InternalRow, Row>, Seri

private final ExpressionEncoder.Deserializer<Row> deserializer;

@SuppressWarnings("unchecked")
InternalRowToRowFunction(final StructType schema) {
ExpressionEncoder<Row> rowEncoder = RowEncoder$.MODULE$.apply(schema);
Seq<Attribute> attributeSeq =
(Seq<Attribute>) (Seq<? extends Attribute>) rowEncoder.schema().toAttributes();
this.deserializer =
rowEncoder.resolveAndBind(attributeSeq, SimpleAnalyzer$.MODULE$).createDeserializer();
deserializer = SCHEMA_TO_EXPRESSION_ENCODER_FUNCTION.apply(schema).createDeserializer();
}

@Override
Expand Down
Loading