Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
b850458
Fixing data race in DataflowSideInputReadCounter
pabloem Mar 18, 2019
1d7056c
Remove redundent array creation code when calling vararg functions in…
robinyqiu Apr 1, 2019
44044d8
Remove redundant array creation code in SimpleFunction as well
robinyqiu Apr 1, 2019
85c81bd
[BEAM-7010] MAX/MIN(STRING)
amaliujia Apr 4, 2019
7d6b7b8
[BEAM-5433] Deprecate environment url field.
lukecwik Apr 3, 2019
7840f4a
[BEAM-5433] Deprecate environment url field.
lukecwik Apr 8, 2019
684f813
[BEAM-7011] Update Beam SDKs to use the StandardSideInputType enums. …
lukecwik Apr 8, 2019
37556f2
[BEAM-6994] SamzaRunner: further improvements for upgrading Samza (#8…
xinyuiscool Apr 9, 2019
a0c4650
Merge pull request #8230: [BEAM-7010] MAX/MIN(STRING)
kanterov Apr 9, 2019
f0abb97
[BEAM-7006] Fix some race conditions in sdf splitting.
robertwb Apr 9, 2019
f5029b4
Revert "[BEAM-3279] Deprecate and remove Coder.consistentWithEquals (…
aaltay Apr 9, 2019
83fe192
[BEAM-6825] Improve Combine error messages. (#8243)
lostluck Apr 9, 2019
109eb32
Merge pull request #8256 from robertwb/flaky-sdf
aaltay Apr 9, 2019
9f43c11
[BEAM-6978] Disable SDF testing for Dataflow using portability. (#8261)
lukecwik Apr 9, 2019
a142470
[BEAM-5328] Starter Archetype generation fix & correctness test (#7715)
Mikhail-Ivanov Apr 9, 2019
bcebd8b
Merge pull request #8258 from aaltay/j7038
aaltay Apr 9, 2019
476e75e
[BEAM-6934] Fixing timer firing timing issue (#8252)
pabloem Apr 9, 2019
574d56e
Merge pull request #8080 from pabloem/fix-sic
pabloem Apr 9, 2019
a490868
Merge pull request #8192 from robinyqiu/master
aaltay Apr 9, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions model/pipeline/src/main/proto/beam_runner_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1029,15 +1029,14 @@ message SideInput {
// An environment for executing UDFs. By default, an SDK container URL, but
// can also be a process forked by a command, or an externally managed process.
message Environment {
// Deprecated. Tracked in BEAM-5433
string url = 1;

// (Required) The URN of the payload
string urn = 2;

// (Optional) The data specifying any parameters to the URN. If
// the URN does not require any arguments, this may be omitted.
bytes payload = 3;

reserved 1;
}

message StandardEnvironments {
Expand Down
43 changes: 43 additions & 0 deletions release/src/main/groovy/StarterArchetype.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!groovy
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

class StarterArchetype {
def static generate(TestScripts t) {
// Generate a maven project from the snapshot repository
String output_text = t.run """mvn archetype:generate \
--update-snapshots \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-starter \
-DarchetypeVersion=${t.ver()} \
-DgroupId=org.example \
-DartifactId=beam-starter \
-Dversion="0.1" \
-Dpackage=org.apache.beam.starter \
-DinteractiveMode=false"""

// Check if it was generated
t.see "[INFO] BUILD SUCCESS", output_text
t.run "cd beam-starter"
output_text = t.run "ls"
t.see "pom.xml", output_text
t.see "src", output_text
String starterPipeline = t.run "ls src/main/java/org/apache/beam/starter/"
t.see "StarterPipeline.java", starterPipeline
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!groovy
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Expand All @@ -15,24 +16,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.coders;

import static org.junit.Assert.assertEquals;
t = new TestScripts(args)

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/*
* Generate a starter archetype project and ensure its correctness
*/

t.describe 'Generate project from starter archetype'

t.intent 'Generates starter archetype project'
StarterArchetype.generate(t)

/** Tests for {@link FloatCoder}. */
@RunWith(JUnit4.class)
public class FloatCoderTest {
t.intent 'Runs the StarterPipeline Code with Direct runner'
// Run the wordcount example with the Direct runner
t.run """mvn compile exec:java -q \
-Dexec.mainClass=org.apache.beam.starter.StarterPipeline \
-Dexec.args="--inputFile=pom.xml --output=starterOutput"""

private static final Coder<Float> TEST_CODER = FloatCoder.of();
// Verify output correctness
String result = t.run "grep INFO starterOutput*"
t.see "INFO: HELLO", result
t.see "INFO: WORLD", result

@Test
public void testStructuralValueReturnTheSameValue() {
Float expected = 23.45F;
Object actual = TEST_CODER.structuralValue(expected);
assertEquals(expected, actual);
}
}
// Clean up
t.done()
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ public static Environment createOrGetDefaultEnvironment(String type, String conf

public static Environment createDockerEnvironment(String dockerImageUrl) {
return Environment.newBuilder()
.setUrl(dockerImageUrl)
.setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER))
.setPayload(
DockerPayload.newBuilder().setContainerImage(dockerImageUrl).build().toByteString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public void createEnvironments() throws IOException {
Environments.createOrGetDefaultEnvironment(Environments.ENVIRONMENT_DOCKER, "java"),
is(
Environment.newBuilder()
.setUrl("java")
.setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER))
.setPayload(
DockerPayload.newBuilder().setContainerImage("java").build().toByteString())
Expand Down
3 changes: 2 additions & 1 deletion runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,12 @@ def commonExcludeCategories = [
]

def fnApiWorkerExcludeCategories = [
'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo',
'org.apache.beam.sdk.testing.UsesCustomWindowMerging',
'org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported',
'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders',
'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo',
'org.apache.beam.sdk.testing.UsesSchema'
'org.apache.beam.sdk.testing.UsesSchema',
]

// For the following test tasks using legacy worker, set workerHarnessContainerImage to empty to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ public DataflowSideInputReadCounter(
this.declaringOperationContext = operationContext;
byteCounters = new HashMap<>();
executionStates = new HashMap<>();
checkState();
updateCurrentStateIfOutdated();
}

private void checkState() {
private void updateCurrentStateIfOutdated() {
DataflowExecutionState currentState =
(DataflowExecutionState) executionContext.getExecutionStateTracker().getCurrentState();
if (currentState == null
Expand Down Expand Up @@ -160,11 +160,11 @@ public void addBytesRead(long n) {

@Override
public Closeable enter() {
checkState();
// Only update status from tracked thread to avoid race condition and inconsistent state updates
if (executionContext.getExecutionStateTracker().getTrackedThread() != Thread.currentThread()) {
return () -> {};
}
updateCurrentStateIfOutdated();
return executionContext.getExecutionStateTracker().enterState(currentExecutionState);
}

Expand Down
49 changes: 49 additions & 0 deletions runners/samza/job-server/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import org.apache.beam.gradle.BeamModulePlugin

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

apply plugin: 'org.apache.beam.module'
apply plugin: 'application'
// we need to set mainClassName before applying shadow plugin
mainClassName = "org.apache.beam.runners.samza.SamzaJobServerDriver"

repositories {
maven {
url "https://artifactory.corp.linkedin.com:8083/artifactory/DDS/"
}
}

applyJavaNature(
validateShadowJar: false,
exportJavadoc: false,
shadowClosure: {
append "reference.conf"
},
)

dependencies {
compile project(path: ":beam-runners-samza", configuration: "shadow")
compile group: "org.slf4j", name: "jcl-over-slf4j", version: dependencies.create(project.library.java.slf4j_api).getVersion()
compile library.java.slf4j_simple
shadow library.java.guava
}

runShadow {
args = []
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.coders;
package org.apache.beam.runners.samza;

import static org.junit.Assert.assertEquals;
import org.apache.samza.config.Config;
import org.apache.samza.context.ExternalContext;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Life cycle listener for a Samza pipeline during runtime. */
public interface SamzaPipelineLifeCycleListener {
/** Callback when the pipeline options is created. */
void onInit(Config config);

/** Tests for {@link BooleanCoder}. */
@RunWith(JUnit4.class)
public class BooleanCoderTest {
private static final Coder<Boolean> TEST_CODER = BooleanCoder.of();
/** Callback when the pipeline is started. */
ExternalContext onStart();

@Test
public void testStructuralValueReturnTheSameValue() {
Boolean expected = Boolean.TRUE;
Object actual = TEST_CODER.structuralValue(expected);
assertEquals(expected, actual);
/**
* Callback after the pipeline is submmitted. This will be invoked only for Samza jobs submitted
* to a cluster.
*/
void onSubmit();

/** Callback after the pipeline is finished. */
void onFinish();

/** A registrar for {@link SamzaPipelineLifeCycleListener}. */
interface Registrar {
SamzaPipelineLifeCycleListener getLifeCycleListener();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
*/
package org.apache.beam.runners.samza;

import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.samza.config.ConfigFactory;
import org.apache.samza.config.factories.PropertiesConfigFactory;
import org.apache.samza.metrics.MetricsReporter;

/** Options which can be used to configure a Samza PortablePipelineRunner. */
public interface SamzaPipelineOptions extends PipelineOptions {
Expand Down Expand Up @@ -102,4 +105,10 @@ public interface SamzaPipelineOptions extends PipelineOptions {
int getTimerBufferSize();

void setTimerBufferSize(int timerBufferSize);

@JsonIgnore
@Description("The metrics reporters that will be used to emit metrics.")
List<MetricsReporter> getMetricsReporters();

void setMetricsReporters(List<MetricsReporter> reporters);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,17 @@ public class SamzaPipelineResult implements PipelineResult {
private final SamzaExecutionContext executionContext;
private final ApplicationRunner runner;
private final StreamApplication app;
private final SamzaPipelineLifeCycleListener listener;

public SamzaPipelineResult(
StreamApplication app, ApplicationRunner runner, SamzaExecutionContext executionContext) {
StreamApplication app,
ApplicationRunner runner,
SamzaExecutionContext executionContext,
SamzaPipelineLifeCycleListener listener) {
this.executionContext = executionContext;
this.runner = runner;
this.app = app;
this.listener = listener;
}

@Override
Expand All @@ -70,6 +75,11 @@ public State waitUntilFinish(@Nullable Duration duration) {
}

final StateInfo stateInfo = getStateInfo();

if (listener != null && (stateInfo.state == State.DONE || stateInfo.state == State.FAILED)) {
listener.onFinish();
}

if (stateInfo.state == State.FAILED) {
throw stateInfo.error;
}
Expand Down
Loading