Skip to content

Commit 8e44fc6

Browse files
committed
Fixing backwards compatibility for the integration tests.
1 parent e78932e commit 8e44fc6

File tree

7 files changed

+62
-8
lines changed

7 files changed

+62
-8
lines changed

mr/src/itest/java/org/elasticsearch/hadoop/integration/mr/AbstractMRNewApiSaveTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.hadoop.mr.LinkedMapWritable;
4343
import org.elasticsearch.hadoop.mr.MultiOutputFormat;
4444
import org.elasticsearch.hadoop.mr.RestUtils;
45+
import org.elasticsearch.hadoop.util.EsMajorVersion;
4546
import org.elasticsearch.hadoop.util.StringUtils;
4647
import org.elasticsearch.hadoop.util.TestSettings;
4748
import org.elasticsearch.hadoop.util.TestUtils;
@@ -54,6 +55,7 @@
5455
import org.junit.runners.Parameterized.Parameters;
5556

5657
import static org.junit.Assert.assertFalse;
58+
import static org.junit.Assume.assumeTrue;
5759

5860
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
5961
@RunWith(Parameterized.class)
@@ -190,6 +192,11 @@ public void testCreateWithIdShouldFailOnDuplicate() throws Exception {
190192

191193
@Test
192194
public void testSaveWithIngest() throws Exception {
195+
RestUtils.ExtendedRestClient versionTestingClient = new RestUtils.ExtendedRestClient();
196+
EsMajorVersion esMajorVersion = versionTestingClient.remoteEsVersion();
197+
assumeTrue("Ingest Supported in 5.x and above only", esMajorVersion.onOrAfter(EsMajorVersion.V_5_X));
198+
versionTestingClient.close();
199+
193200
Configuration conf = createConf();
194201

195202
RestUtils.ExtendedRestClient client = new RestUtils.ExtendedRestClient();

mr/src/itest/java/org/elasticsearch/hadoop/integration/mr/AbstractMROldApiSaveTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,12 @@
5050
import org.elasticsearch.hadoop.mr.LinkedMapWritable;
5151
import org.elasticsearch.hadoop.mr.MultiOutputFormat;
5252
import org.elasticsearch.hadoop.mr.RestUtils;
53+
import org.elasticsearch.hadoop.util.EsMajorVersion;
5354
import org.elasticsearch.hadoop.util.StringUtils;
5455
import org.elasticsearch.hadoop.util.TestSettings;
5556
import org.elasticsearch.hadoop.util.TestUtils;
5657
import org.elasticsearch.hadoop.util.WritableUtils;
58+
import org.junit.Assume;
5759
import org.junit.FixMethodOrder;
5860
import org.junit.Test;
5961
import org.junit.runner.RunWith;
@@ -252,6 +254,11 @@ public void testCreateWithIdShouldFailOnDuplicate() throws Exception {
252254

253255
@Test
254256
public void testSaveWithIngest() throws Exception {
257+
RestUtils.ExtendedRestClient versionTestingClient = new RestUtils.ExtendedRestClient();
258+
EsMajorVersion esMajorVersion = versionTestingClient.remoteEsVersion();
259+
Assume.assumeTrue("Ingest Supported in 5.x and above only", esMajorVersion.onOrAfter(EsMajorVersion.V_5_X));
260+
versionTestingClient.close();
261+
255262
JobConf conf = createJobConf();
256263

257264
RestUtils.ExtendedRestClient client = new RestUtils.ExtendedRestClient();

spark/core/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSpark.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@ import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_QUERY
3838
import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_READ_METADATA
3939
import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE
4040
import org.elasticsearch.hadoop.mr.RestUtils
41-
import org.elasticsearch.hadoop.util.TestSettings
42-
import org.elasticsearch.hadoop.util.TestUtils
41+
import org.elasticsearch.hadoop.util.{EsMajorVersion, StringUtils, TestSettings, TestUtils}
4342
import org.elasticsearch.spark.rdd.EsSpark
4443
import org.elasticsearch.spark.rdd.Metadata.ID
4544
import org.elasticsearch.spark.rdd.Metadata.TTL
@@ -54,17 +53,14 @@ import org.elasticsearch.spark.sparkStringJsonRDDFunctions
5453
import org.hamcrest.Matchers.both
5554
import org.hamcrest.Matchers.containsString
5655
import org.hamcrest.Matchers.not
57-
import org.junit.AfterClass
56+
import org.junit.{AfterClass, Assume, BeforeClass, Test}
5857
import org.junit.Assert.assertEquals
5958
import org.junit.Assert.assertNotNull
6059
import org.junit.Assert.assertThat
6160
import org.junit.Assert.assertTrue
62-
import org.junit.BeforeClass
63-
import org.junit.Test
6461
import org.junit.runner.RunWith
6562
import org.junit.runners.Parameterized
6663
import org.junit.runners.Parameterized.Parameters
67-
import org.elasticsearch.hadoop.util.StringUtils
6864
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException
6965
import java.nio.file.Paths
7066
import java.nio.charset.StandardCharsets
@@ -289,6 +285,16 @@ class AbstractScalaEsScalaSpark(prefix: String, readMetadata: jl.Boolean) extend
289285

290286
@Test
291287
def testEsRDDIngest() {
288+
try {
289+
val versionTestingClient: RestUtils.ExtendedRestClient = new RestUtils.ExtendedRestClient
290+
try {
291+
val esMajorVersion: EsMajorVersion = versionTestingClient.remoteEsVersion
292+
Assume.assumeTrue("Ingest Supported in 5.x and above only", esMajorVersion.onOrAfter(EsMajorVersion.V_5_X))
293+
} finally {
294+
if (versionTestingClient != null) versionTestingClient.close()
295+
}
296+
}
297+
292298
val client: RestUtils.ExtendedRestClient = new RestUtils.ExtendedRestClient
293299
val prefix: String = "spark"
294300
val pipeline: String = "{\"description\":\"Test Pipeline\",\"processors\":[{\"set\":{\"field\":\"pipeTEST\",\"value\":true,\"override\":true}}]}"

spark/sql-13/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkStreamingTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
4040
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
4141
import org.elasticsearch.hadoop.mr.RestUtils;
42+
import org.elasticsearch.hadoop.util.EsMajorVersion;
4243
import org.elasticsearch.hadoop.util.StringUtils;
4344
import org.elasticsearch.hadoop.util.TestSettings;
4445
import org.elasticsearch.spark.rdd.Metadata;
@@ -47,6 +48,7 @@
4748
import org.junit.After;
4849
import org.junit.AfterClass;
4950
import org.junit.Assert;
51+
import org.junit.Assume;
5052
import org.junit.Before;
5153
import org.junit.BeforeClass;
5254
import org.junit.FixMethodOrder;
@@ -377,6 +379,11 @@ public void testEsRDDWriteWithMappingExclude() throws Exception {
377379

378380
@Test
379381
public void testEsRDDIngest() throws Exception {
382+
try (RestUtils.ExtendedRestClient versionTestingClient = new RestUtils.ExtendedRestClient()) {
383+
EsMajorVersion esMajorVersion = versionTestingClient.remoteEsVersion();
384+
Assume.assumeTrue("Ingest Supported in 5.x and above only", esMajorVersion.onOrAfter(EsMajorVersion.V_5_X));
385+
}
386+
380387
RestUtils.ExtendedRestClient client = new RestUtils.ExtendedRestClient();
381388
String pipelineName = prefix + "-pipeline";
382389
String pipeline = "{\"description\":\"Test Pipeline\",\"processors\":[{\"set\":{\"field\":\"pipeTEST\",\"value\":true,\"override\":true}}]}";

spark/sql-13/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsScalaSparkStreaming.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException
3131
import org.elasticsearch.hadoop.cfg.ConfigurationOptions
3232
import org.elasticsearch.hadoop.cfg.ConfigurationOptions._
3333
import org.elasticsearch.hadoop.mr.RestUtils
34-
import org.elasticsearch.hadoop.util.{StringUtils, TestSettings}
34+
import org.elasticsearch.hadoop.util.{EsMajorVersion, StringUtils, TestSettings}
3535
import org.elasticsearch.spark.rdd.EsSpark
3636
import org.elasticsearch.spark.rdd.Metadata._
3737
import org.elasticsearch.spark.serialization.{Bean, ReflectionUtils}
@@ -241,6 +241,16 @@ class AbstractScalaEsScalaSparkStreaming(val prefix: String, readMetadata: jl.Bo
241241

242242
@Test
243243
def testEsRDDIngest() {
244+
try {
245+
val versionTestingClient: RestUtils.ExtendedRestClient = new RestUtils.ExtendedRestClient
246+
try {
247+
val esMajorVersion: EsMajorVersion = versionTestingClient.remoteEsVersion
248+
Assume.assumeTrue("Ingest Supported in 5.x and above only", esMajorVersion.onOrAfter(EsMajorVersion.V_5_X))
249+
} finally {
250+
if (versionTestingClient != null) versionTestingClient.close()
251+
}
252+
}
253+
244254
val client: RestUtils.ExtendedRestClient = new RestUtils.ExtendedRestClient
245255
val pipelineName: String = prefix + "-pipeline"
246256
val pipeline: String = "{\"description\":\"Test Pipeline\",\"processors\":[{\"set\":{\"field\":\"pipeTEST\",\"value\":true,\"override\":true}}]}"

spark/sql-20/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkStreamingTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
4040
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
4141
import org.elasticsearch.hadoop.mr.RestUtils;
42+
import org.elasticsearch.hadoop.util.EsMajorVersion;
4243
import org.elasticsearch.hadoop.util.StringUtils;
4344
import org.elasticsearch.hadoop.util.TestSettings;
4445
import org.elasticsearch.spark.rdd.Metadata;
@@ -47,6 +48,7 @@
4748
import org.junit.After;
4849
import org.junit.AfterClass;
4950
import org.junit.Assert;
51+
import org.junit.Assume;
5052
import org.junit.Before;
5153
import org.junit.BeforeClass;
5254
import org.junit.FixMethodOrder;
@@ -377,6 +379,11 @@ public void testEsRDDWriteWithMappingExclude() throws Exception {
377379

378380
@Test
379381
public void testEsRDDIngest() throws Exception {
382+
try (RestUtils.ExtendedRestClient versionTestingClient = new RestUtils.ExtendedRestClient()) {
383+
EsMajorVersion esMajorVersion = versionTestingClient.remoteEsVersion();
384+
Assume.assumeTrue("Ingest Supported in 5.x and above only", esMajorVersion.onOrAfter(EsMajorVersion.V_5_X));
385+
}
386+
380387
RestUtils.ExtendedRestClient client = new RestUtils.ExtendedRestClient();
381388
String pipelineName = prefix + "-pipeline";
382389
String pipeline = "{\"description\":\"Test Pipeline\",\"processors\":[{\"set\":{\"field\":\"pipeTEST\",\"value\":true,\"override\":true}}]}";

spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsScalaSparkStreaming.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException
3131
import org.elasticsearch.hadoop.cfg.ConfigurationOptions
3232
import org.elasticsearch.hadoop.cfg.ConfigurationOptions._
3333
import org.elasticsearch.hadoop.mr.RestUtils
34-
import org.elasticsearch.hadoop.util.{StringUtils, TestSettings}
34+
import org.elasticsearch.hadoop.util.{EsMajorVersion, StringUtils, TestSettings}
3535
import org.elasticsearch.spark.rdd.EsSpark
3636
import org.elasticsearch.spark.rdd.Metadata._
3737
import org.elasticsearch.spark.serialization.{Bean, ReflectionUtils}
@@ -241,6 +241,16 @@ class AbstractScalaEsScalaSparkStreaming(val prefix: String, readMetadata: jl.Bo
241241

242242
@Test
243243
def testEsRDDIngest() {
244+
try {
245+
val versionTestingClient: RestUtils.ExtendedRestClient = new RestUtils.ExtendedRestClient
246+
try {
247+
val esMajorVersion: EsMajorVersion = versionTestingClient.remoteEsVersion
248+
Assume.assumeTrue("Ingest Supported in 5.x and above only", esMajorVersion.onOrAfter(EsMajorVersion.V_5_X))
249+
} finally {
250+
if (versionTestingClient != null) versionTestingClient.close()
251+
}
252+
}
253+
244254
val client: RestUtils.ExtendedRestClient = new RestUtils.ExtendedRestClient
245255
val pipelineName: String = prefix + "-pipeline"
246256
val pipeline: String = "{\"description\":\"Test Pipeline\",\"processors\":[{\"set\":{\"field\":\"pipeTEST\",\"value\":true,\"override\":true}}]}"

0 commit comments

Comments
 (0)