Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion runners/spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ description = "Apache Beam :: Runners :: Spark"
* we are attempting to reference the "sourceSets.test.output" directly.
*/
evaluationDependsOn(":beam-sdks-java-core")
evaluationDependsOn(":beam-sdks-java-io-hadoop-format")

configurations {
validatesRunner
Expand Down Expand Up @@ -87,6 +88,8 @@ dependencies {
shadowTest library.java.jackson_dataformat_yaml
shadowTest "org.apache.kafka:kafka_2.11:0.11.0.1"
validatesRunner project(path: ":beam-sdks-java-core", configuration: "shadowTest")
validatesRunner project(path: ":beam-sdks-java-io-hadoop-format", configuration: "shadowTest")
validatesRunner project(path: ":beam-examples-java", configuration: "shadowTest")
validatesRunner project(path: project.path, configuration: "shadowTest")
validatesRunner project(path: project.path, configuration: "shadow")
validatesRunner project(path: project.path, configuration: "provided")
Expand Down Expand Up @@ -114,8 +117,9 @@ task validatesRunnerBatch(type: Test) {
systemProperty "spark.ui.enabled", "false"
systemProperty "spark.ui.showConsoleProgress", "false"


classpath = configurations.validatesRunner
testClassesDirs = files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs) + files(project.sourceSets.test.output.classesDirs)
testClassesDirs = files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs) + files(project(":beam-sdks-java-io-hadoop-format").sourceSets.test.output.classesDirs) + files(project.sourceSets.test.output.classesDirs)
// Only one SparkContext may be running in a JVM (SPARK-2243)
forkEvery 1
maxParallelForks 4
Expand Down
96 changes: 96 additions & 0 deletions sdks/java/io/hadoop-format/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import groovy.json.JsonOutput

apply plugin: org.apache.beam.gradle.BeamModulePlugin
applyJavaNature()
provideIntegrationTestingDependencies()
enableJavaPerformanceTesting()

description = "Apache Beam :: SDKs :: Java :: IO :: Hadoop Format"
ext.summary = "IO to read data from sources and to write data to sinks that implement Hadoop MapReduce Format."

configurations.create("sparkRunner")
configurations.sparkRunner {
// Ban certain dependencies to prevent a StackOverflow within Spark
// because JUL -> SLF4J -> JUL, and similarly JDK14 -> SLF4J -> JDK14
exclude group: "org.slf4j", module: "jul-to-slf4j"
exclude group: "org.slf4j", module: "slf4j-jdk14"
}

// Ban dependencies from the test runtime classpath
configurations.testRuntimeClasspath {
// Prevent a StackOverflow because of wiring LOG4J -> SLF4J -> LOG4J
exclude group: "org.slf4j", module: "log4j-over-slf4j"
}

dependencies {
shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
compile library.java.guava
shadow library.java.slf4j_api
shadow project(path: ":beam-sdks-java-io-hadoop-common", configuration: "shadow")
provided library.java.hadoop_common
provided library.java.hadoop_hdfs
provided library.java.hadoop_mapreduce_client_core
testCompile project(path: ":beam-runners-direct-java", configuration: "shadow")
testCompile project(path: ":beam-sdks-java-core", configuration: "shadowTest")
testCompile project(path: ":beam-sdks-java-io-common", configuration: "shadow")
testCompile project(path: ":beam-sdks-java-io-common", configuration: "shadowTest")
testCompile project(path: ":beam-sdks-java-io-jdbc", configuration: "shadow")
testCompile project(path: ":beam-sdks-java-io-hadoop-input-format", configuration: "shadowTest")
testCompile project(path: ":beam-examples-java", configuration: "shadowTest")
testCompile library.java.postgres
testCompile library.java.slf4j_jdk14
testCompile library.java.junit
testCompile library.java.hamcrest_core
testCompile library.java.hamcrest_library
shadow library.java.commons_io_2x

delegate.add("sparkRunner", project(path: ":beam-sdks-java-io-hadoop-format", configuration: "shadow"))
delegate.add("sparkRunner", project(path: ":beam-sdks-java-io-hadoop-format", configuration: "shadowTest"))

sparkRunner project(path: ":beam-examples-java", configuration: "shadowTest")
sparkRunner project(path: ":beam-runners-spark", configuration: "shadow")
sparkRunner project(path: ":beam-sdks-java-io-hadoop-file-system", configuration: "shadow")
sparkRunner library.java.spark_streaming
sparkRunner library.java.spark_core
}

def runnerClass = "org.apache.beam.runners.spark.TestSparkRunner"
task sparkRunner(type: Test) {
group = "Verification"
def beamTestPipelineOptions = [
"--project=hadoop-format",
"--tempRoot=/tmp/hadoop-format/",
"--streaming=false",
"--runner=" + runnerClass,
"--enableSparkMetricSinks=false",
]
classpath = configurations.sparkRunner
include "**/HadoopFormatIOSequenceFileTest.class"
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
}
forkEvery 1
maxParallelForks 4
systemProperty "spark.ui.enabled", "false"
systemProperty "spark.ui.showConsoleProgress", "false"
systemProperty "beam.spark.test.reuseSparkContext", "true"
systemProperty "beamTestPipelineOptions", JsonOutput.toJson(beamTestPipelineOptions)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.hadoop.format;

import java.io.Serializable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;

/**
* Provides mechanism for acquiring locks related to the job. Serves as source of unique events
* among the job.
*/
public interface ExternalSynchronization extends Serializable {

/**
* Tries to acquire lock for given job.
*
* @param conf configuration bounded with given job.
* @return {@code true} if the lock was acquired, {@code false} otherwise.
*/
boolean tryAcquireJobLock(Configuration conf);

/**
* Deletes lock ids bounded with given job if any exists.
*
* @param conf hadoop configuration of given job.
*/
void releaseJobIdLock(Configuration conf);

/**
* Creates {@link TaskID} with unique id among given job.
*
* @param conf hadoop configuration of given job.
* @return {@link TaskID} with unique id among given job.
*/
TaskID acquireTaskIdLock(Configuration conf);

/**
* Creates unique {@link TaskAttemptID} for given taskId.
*
* @param conf configuration of given task and job
* @param taskId id of the task
* @return Unique {@link TaskAttemptID} for given taskId.
*/
TaskAttemptID acquireTaskAttemptIdLock(Configuration conf, int taskId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.hadoop.format;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.Serializable;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of {@link ExternalSynchronization} which registers locks in the HDFS.
*
* <p>Requires {@code locksDir} to be specified. This directory MUST be different that directory
* which is possibly stored under {@code "mapreduce.output.fileoutputformat.outputdir"} key.
* Otherwise setup of job will fail because the directory will exist before job setup.
*/
public class HDFSSynchronization implements ExternalSynchronization {

private static final Logger LOGGER = LoggerFactory.getLogger(HDFSSynchronization.class);

private static final String LOCKS_DIR_PATTERN = "%s/";
private static final String LOCKS_DIR_TASK_PATTERN = LOCKS_DIR_PATTERN + "%s";
private static final String LOCKS_DIR_TASK_ATTEMPT_PATTERN = LOCKS_DIR_TASK_PATTERN + "_%s";
private static final String LOCKS_DIR_JOB_FILENAME = LOCKS_DIR_PATTERN + "_job";

private static final transient Random RANDOM_GEN = new Random();

private final String locksDir;
private final ThrowingFunction<Configuration, FileSystem, IOException> fileSystemFactory;

/**
* Creates instance of {@link HDFSSynchronization}.
*
* @param locksDir directory where locks will be stored. This directory MUST be different that
* directory which is possibly stored under {@code
* "mapreduce.output.fileoutputformat.outputdir"} key. Otherwise setup of job will fail
* because the directory will exist before job setup.
*/
public HDFSSynchronization(String locksDir) {
this(locksDir, FileSystem::newInstance);
}

/**
* Creates instance of {@link HDFSSynchronization}. Exists only for easier testing.
*
* @param locksDir directory where locks will be stored. This directory MUST be different that
* directory which is possibly stored under {@code
* "mapreduce.output.fileoutputformat.outputdir"} key. Otherwise setup of job will fail
* because the directory will exist before job setup.
* @param fileSystemFactory supplier of the file system
*/
HDFSSynchronization(
String locksDir, ThrowingFunction<Configuration, FileSystem, IOException> fileSystemFactory) {
this.locksDir = locksDir;
this.fileSystemFactory = fileSystemFactory;
}

@Override
public boolean tryAcquireJobLock(Configuration conf) {
Path path = new Path(locksDir, String.format(LOCKS_DIR_JOB_FILENAME, getJobJtIdentifier(conf)));

return tryCreateFile(conf, path);
}

@Override
public void releaseJobIdLock(Configuration conf) {
Path path = new Path(locksDir, String.format(LOCKS_DIR_PATTERN, getJobJtIdentifier(conf)));

try (FileSystem fileSystem = fileSystemFactory.apply(conf)) {
if (fileSystem.delete(path, true)) {
LOGGER.info("Delete of lock directory {} was successful", path);
} else {
LOGGER.warn("Delete of lock directory {} was unsuccessful", path);
}

} catch (IOException e) {
String formattedExceptionMessage =
String.format("Delete of lock directory %s was unsuccessful", path);
LOGGER.warn(formattedExceptionMessage, e);
throw new IllegalStateException(formattedExceptionMessage, e);
}
}

@Override
public TaskID acquireTaskIdLock(Configuration conf) {
JobID jobId = HadoopFormats.getJobId(conf);
boolean lockAcquired = false;
int taskIdCandidate = 0;

while (!lockAcquired) {
taskIdCandidate = RANDOM_GEN.nextInt(Integer.MAX_VALUE);
Path path =
new Path(
locksDir,
String.format(LOCKS_DIR_TASK_PATTERN, getJobJtIdentifier(conf), taskIdCandidate));
lockAcquired = tryCreateFile(conf, path);
}

return HadoopFormats.createTaskID(jobId, taskIdCandidate);
}

@Override
public TaskAttemptID acquireTaskAttemptIdLock(Configuration conf, int taskId) {
String jobJtIdentifier = getJobJtIdentifier(conf);
JobID jobId = HadoopFormats.getJobId(conf);
int taskAttemptCandidate = 0;
boolean taskAttemptAcquired = false;

while (!taskAttemptAcquired) {
taskAttemptCandidate++;
Path path =
new Path(
locksDir,
String.format(
LOCKS_DIR_TASK_ATTEMPT_PATTERN, jobJtIdentifier, taskId, taskAttemptCandidate));
taskAttemptAcquired = tryCreateFile(conf, path);
}

return HadoopFormats.createTaskAttemptID(jobId, taskId, taskAttemptCandidate);
}

private boolean tryCreateFile(Configuration conf, Path path) {
try (FileSystem fileSystem = fileSystemFactory.apply(conf)) {
try {
return fileSystem.createNewFile(path);
} catch (FileAlreadyExistsException | org.apache.hadoop.fs.FileAlreadyExistsException e) {
return false;
} catch (RemoteException e) {
//remote hdfs exception
if (e.getClassName().equals(AlreadyBeingCreatedException.class.getName())) {
return false;
}
throw e;
}
} catch (IOException e) {
throw new IllegalStateException(String.format("Creation of file on path %s failed", path), e);
}
}

private String getJobJtIdentifier(Configuration conf) {
JobID job =
Preconditions.checkNotNull(
HadoopFormats.getJobId(conf),
"Configuration must contain jobID under key %s.",
HadoopFormatIO.JOB_ID);
return job.getJtIdentifier();
}

/**
* Function which can throw exception.
*
* @param <T1> parameter type
* @param <T2> result type
* @param <X> exception type
*/
@FunctionalInterface
public interface ThrowingFunction<T1, T2, X extends Exception> extends Serializable {
T2 apply(T1 value) throws X;
}
}
Loading