Skip to content

Commit c239baf

Browse files
committed
[REST] Fixing connection leak in the InitializationUtils.
Mismanagement of a RestRepository connection would occur if no connection was passed to the check, the index auto create was disabled, and the index already exists upon check. Fixes elastic#883
1 parent 49f1565 commit c239baf

File tree

5 files changed

+25
-20
lines changed

5 files changed

+25
-20
lines changed

mr/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -256,11 +256,8 @@ private void init(Configuration cfg) throws IOException {
256256
Settings settings = HadoopSettingsManager.loadFrom(cfg);
257257
Assert.hasText(settings.getResourceWrite(), String.format("No resource ['%s'] (index/query/location) specified", ES_RESOURCE));
258258

259-
// lazy-init
260-
RestRepository client = null;
261-
262259
InitializationUtils.checkIdForOperation(settings);
263-
InitializationUtils.checkIndexExistence(settings, client);
260+
InitializationUtils.checkIndexExistence(settings);
264261

265262
if (HadoopCfgUtils.getReduceTasks(cfg) != null) {
266263
if (HadoopCfgUtils.getSpeculativeReduce(cfg)) {

mr/src/main/java/org/elasticsearch/hadoop/rest/InitializationUtils.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -253,23 +253,31 @@ public static EsMajorVersion discoverEsVersion(Settings settings, Log log) {
253253
}
254254

255255
public static void checkIndexExistence(RestRepository client) {
256-
checkIndexExistence(client.getSettings(), client);
256+
if (!client.getSettings().getIndexAutoCreate()) {
257+
doCheckIndexExistence(client.getSettings(), client);
258+
}
257259
}
258260

259-
public static void checkIndexExistence(Settings settings, RestRepository client) {
260-
// check index existence
261+
public static void checkIndexExistence(Settings settings) {
262+
// Only open a connection and check if autocreate is disabled
261263
if (!settings.getIndexAutoCreate()) {
262-
if (client == null) {
263-
client = new RestRepository(settings);
264-
}
265-
if (!client.indexExists(false)) {
266-
client.close();
267-
throw new EsHadoopIllegalArgumentException(String.format("Target index [%s] does not exist and auto-creation is disabled [setting '%s' is '%s']",
268-
settings.getResourceWrite(), ConfigurationOptions.ES_INDEX_AUTO_CREATE, settings.getIndexAutoCreate()));
264+
RestRepository repository = new RestRepository(settings);
265+
try {
266+
doCheckIndexExistence(settings, repository);
267+
} finally {
268+
repository.close();
269269
}
270270
}
271271
}
272272

273+
private static void doCheckIndexExistence(Settings settings, RestRepository client) {
274+
// check index existence
275+
if (!client.indexExists(false)) {
276+
throw new EsHadoopIllegalArgumentException(String.format("Target index [%s] does not exist and auto-creation is disabled [setting '%s' is '%s']",
277+
settings.getResourceWrite(), ConfigurationOptions.ES_INDEX_AUTO_CREATE, settings.getIndexAutoCreate()));
278+
}
279+
}
280+
273281
public static boolean setFieldExtractorIfNotSet(Settings settings, Class<? extends FieldExtractor> clazz, Log log) {
274282
if (!StringUtils.hasText(settings.getMappingIdExtractorClassName())) {
275283
Log logger = (log != null ? log : LogFactory.getLog(clazz));

spark/core/main/scala/org/elasticsearch/spark/rdd/EsSpark.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ object EsSpark {
9696
val config = new PropertiesSettings().load(sparkCfg.save())
9797
config.merge(cfg.asJava)
9898

99-
InitializationUtils.checkIdForOperation(config);
100-
InitializationUtils.checkIndexExistence(config, null);
99+
InitializationUtils.checkIdForOperation(config)
100+
InitializationUtils.checkIndexExistence(config)
101101

102102
rdd.sparkContext.runJob(rdd, new EsRDDWriter(config.save(), hasMeta).write _)
103103
}

spark/sql-13/src/main/scala/org/elasticsearch/spark/sql/EsSparkSQL.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ object EsSparkSQL {
7171
val esCfg = new PropertiesSettings().load(sparkCfg.save())
7272
esCfg.merge(cfg.asJava)
7373

74-
InitializationUtils.checkIdForOperation(esCfg);
75-
InitializationUtils.checkIndexExistence(esCfg, null);
74+
InitializationUtils.checkIdForOperation(esCfg)
75+
InitializationUtils.checkIndexExistence(esCfg)
7676

7777
sparkCtx.runJob(srdd.rdd, new EsDataFrameWriter(srdd.schema, esCfg.save()).write _)
7878
}

spark/sql-20/src/main/scala/org/elasticsearch/spark/sql/EsSparkSQL.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ object EsSparkSQL {
8888
val esCfg = new PropertiesSettings().load(sparkCfg.save())
8989
esCfg.merge(cfg.asJava)
9090

91-
InitializationUtils.checkIdForOperation(esCfg);
92-
InitializationUtils.checkIndexExistence(esCfg, null);
91+
InitializationUtils.checkIdForOperation(esCfg)
92+
InitializationUtils.checkIndexExistence(esCfg)
9393

9494
sparkCtx.runJob(srdd.toDF().rdd, new EsDataFrameWriter(srdd.schema, esCfg.save()).write _)
9595
}

0 commit comments

Comments
 (0)