Skip to content

Seeting X-Opaque-ID header for all reads and writes for mapreduce and spark (#1770) #1771

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions mr/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ itestJar {
}
}

tasks.named("test").configure {
jvmArgs "--add-opens=java.base/java.io=ALL-UNNAMED" // Needed for IOUtils's BYTE_ARRAY_BUFFER reflection
}

eclipse.classpath.file {
whenMerged { cp ->
// export all jars (to be used upstream by dependent projects) <-- for some reason Gradle removes all jars
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ public interface ConfigurationOptions {
String ES_NET_SSL_CERT_ALLOW_SELF_SIGNED_DEFAULT = "false";

String ES_NET_HTTP_HEADER_PREFIX = "es.net.http.header.";
String ES_NET_HTTP_HEADER_OPAQUE_ID = ES_NET_HTTP_HEADER_PREFIX + "X-Opaque-ID";

String ES_NET_HTTP_AUTH_USER = "es.net.http.auth.user";
String ES_NET_HTTP_AUTH_PASS = "es.net.http.auth.pass";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.elasticsearch.hadoop.cfg;

import java.io.InputStream;
import java.util.Locale;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobContext;
import org.elasticsearch.hadoop.mr.HadoopCfgUtils;
import org.elasticsearch.hadoop.mr.HadoopIOUtils;
import org.elasticsearch.hadoop.util.Assert;
Expand All @@ -33,6 +35,11 @@ public class HadoopSettings extends Settings {
public HadoopSettings(Configuration cfg) {
Assert.notNull(cfg, "Non-null properties expected");
this.cfg = cfg;
String jobName = cfg.get(JobContext.JOB_NAME, "");
String user = cfg.get(JobContext.USER_NAME, "");
String taskAttemptId = cfg.get(JobContext.TASK_ATTEMPT_ID, "");
String opaqueId = String.format(Locale.ROOT, "[mapreduce] [%s] [%s] [%s]", user, jobName, taskAttemptId);
setOpaqueId(opaqueId);
}

@Override
Expand Down
10 changes: 10 additions & 0 deletions mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.hadoop.util.unit.TimeValue;

import static org.elasticsearch.hadoop.cfg.ConfigurationOptions.*;
import static org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_NET_HTTP_HEADER_OPAQUE_ID;
import static org.elasticsearch.hadoop.cfg.InternalConfigurationOptions.*;

/**
Expand Down Expand Up @@ -765,5 +766,14 @@ public String save() {
}

public abstract Properties asProperties();

public Settings setOpaqueId(String opaqueId) {
setProperty(ES_NET_HTTP_HEADER_OPAQUE_ID, opaqueId);
return this;
}

public String getOpaqueId() {
return getProperty(ES_NET_HTTP_HEADER_OPAQUE_ID);
}
}

4 changes: 4 additions & 0 deletions spark/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ configurations.matching{ it.name.contains('CompilerPlugin') == false }.all { Con
conf.exclude group: "org.mortbay.jetty"
}

tasks.named("test").configure {
jvmArgs "--add-opens=java.base/java.io=ALL-UNNAMED" // Needed for IOUtils's BYTE_ARRAY_BUFFER reflection
}

// Set minimum compatibility and java home for compiler task
tasks.withType(ScalaCompile) { ScalaCompile task ->
task.sourceCompatibility = project.ext.minimumRuntimeVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
*/
package org.elasticsearch.spark.cfg;

import java.io.IOException;
import java.io.InputStream;
import java.util.Locale;
import java.util.Properties;

import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.SparkConf;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.util.Assert;
Expand All @@ -36,6 +39,18 @@ public class SparkSettings extends Settings {
public SparkSettings(SparkConf cfg) {
Assert.notNull(cfg, "non-null spark configuration expected");
this.cfg = cfg;
String user;
try {
user = System.getenv("SPARK_USER") == null ?
UserGroupInformation.getCurrentUser().getShortUserName() :
System.getenv("SPARK_USER");
} catch (IOException e) {
user = "";
}
String appName = cfg.get("app.name", cfg.get("spark.app.name", ""));
String appId = cfg.get("spark.app.id", "");
String opaqueId = String.format(Locale.ROOT, "[spark] [%s] [%s] [%s]", user, appName, appId);
this.setOpaqueId(opaqueId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.elasticsearch.hadoop.cfg.Settings
import org.elasticsearch.hadoop.rest.RestService
import org.elasticsearch.hadoop.rest.PartitionDefinition

import java.util.Locale

private[spark] abstract class AbstractEsRDDIterator[T](
val context: TaskContext,
partition: PartitionDefinition)
Expand All @@ -45,7 +47,10 @@ private[spark] abstract class AbstractEsRDDIterator[T](

// initialize mapping/ scroll reader
initReader(settings, log)

if (settings.getOpaqueId() != null && settings.getOpaqueId().contains("task attempt") == false) {
settings.setOpaqueId(String.format(Locale.ROOT, "%s, stage %s, task attempt %s", settings.getOpaqueId(),
context.stageId().toString, context.taskAttemptId.toString))
}
val readr = RestService.createReader(settings, partition, log)
readr.scrollQuery()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.elasticsearch.spark.serialization.ScalaMapFieldExtractor
import org.elasticsearch.spark.serialization.ScalaMetadataExtractor
import org.elasticsearch.spark.serialization.ScalaValueWriter

import java.util.Locale
import scala.reflect.ClassTag


Expand All @@ -63,6 +64,10 @@ private[spark] class EsRDDWriter[T: ClassTag](val serializedSettings: String,
lazy val metaExtractor = ObjectUtils.instantiate[MetadataExtractor](settings.getMappingMetadataExtractorClassName, settings)

def write(taskContext: TaskContext, data: Iterator[T]): Unit = {
if (settings.getOpaqueId() != null && settings.getOpaqueId().contains("] [task attempt ") == false) {
settings.setOpaqueId(String.format(Locale.ROOT, "%s [stage %s] [task attempt %s]", settings.getOpaqueId(),
taskContext.stageId().toString, taskContext.taskAttemptId.toString))
}
val writer = RestService.createWriter(settings, taskContext.partitionId.toLong, -1, log)

val listener = new TaskCompletionListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
*/
package org.elasticsearch.spark.cfg

import org.apache.hadoop.security.UserGroupInformation
import org.elasticsearch.spark.serialization.ReflectionUtils._
import org.junit.Test
import org.junit.Assert._
import org.hamcrest.Matchers._
import org.apache.spark.SparkConf
import org.elasticsearch.hadoop.cfg.PropertiesSettings

import java.util.Locale

class SparkConfigTest {

@Test
Expand All @@ -50,4 +53,16 @@ class SparkConfigTest {
val props = new PropertiesSettings().load(settings.save())
assertEquals("win", props.getProperty("type"))
}

@Test
def testOpaqueId(): Unit = {
var cfg = new SparkConf()
assertEquals(String.format(Locale.ROOT, "[spark] [%s] [] []", UserGroupInformation.getCurrentUser.getShortUserName),
new SparkSettingsManager().load(cfg).getOpaqueId)
val appName = "some app"
val appdId = "some app id"
cfg = new SparkConf().set("spark.app.name", appName).set("spark.app.id", appdId)
assertEquals(String.format(Locale.ROOT, "[spark] [%s] [%s] [%s]", UserGroupInformation.getCurrentUser.getShortUserName, appName,
appdId), new SparkSettingsManager().load(cfg).getOpaqueId)
}
}