Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ Or on a YARN cluster:
# Building


To retrieve the latest development version (0.4.0-SNAPSHOT) of the Cascading Connector for Apache Flink™, run the following command
To retrieve the latest development version (0.4.1-SNAPSHOT) of the Cascading Connector for Apache Flink™, run the following command

git clone https://github.com/dataArtisans/cascading-flink.git

Expand Down
143 changes: 93 additions & 50 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ limitations under the License.

<groupId>com.data-artisans</groupId>
<artifactId>cascading-flink</artifactId>
<version>0.4.0-SNAPSHOT</version>
<version>0.4.1-SNAPSHOT</version>

<name>Cascading on Flink</name>
<packaging>jar</packaging>
Expand Down Expand Up @@ -52,9 +52,11 @@ limitations under the License.

<properties>
<!-- dependency versions -->
<cascading.version>3.1.0</cascading.version>
<flink.version>1.0.3</flink.version>
<slf4j.version>1.7.7</slf4j.version>
<cascading.version>3.1.2</cascading.version>
<flink.version>1.18.1</flink.version>
<slf4j.version>1.7.32</slf4j.version>
<log4j.version>2.17.1</log4j.version>
<hadoop.version>2.10.0</hadoop.version>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
Expand All @@ -68,27 +70,21 @@ limitations under the License.
</properties>

<repositories>

<repository>
<id>conjars.org</id>
<url>https://conjars.org/repo</url>
</repository>

<!--
<repository>
<id>conjars.org</id>
<url>https://conjars.wensel.net/repo</url>
</repository>
<repository>
<id>flink.release-staging</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/orgapacheflink-1063</url>
<releases>
<enabled>true</enabled>
</releases>
<id>repository.jboss.org</id>
<url>http://repository.jboss.org/nexus/content/groups/public/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</repository>
-->

</repositories>
</repositories>

<distributionManagement>
<snapshotRepository>
Expand All @@ -107,7 +103,7 @@ limitations under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>

Expand All @@ -117,6 +113,12 @@ limitations under the License.
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.12</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- CASCADING -->

<dependency>
Expand All @@ -129,6 +131,12 @@ limitations under the License.
<groupId>cascading</groupId>
<artifactId>cascading-hadoop2-io</artifactId>
<version>${cascading.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand All @@ -137,6 +145,20 @@ limitations under the License.
<version>${cascading.version}</version>
</dependency>

<!-- Hadoop -->

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>

<!-- CASCADING TESTS -->

<dependency>
Expand Down Expand Up @@ -164,11 +186,13 @@ limitations under the License.
</dependency>

<!-- OTHER LIBRARIES -->
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core -->


<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.2.0</version>
<version>${hadoop.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
Expand All @@ -186,15 +210,27 @@ limitations under the License.
</dependency>

<dependency>
<groupId>org.clapper</groupId>
<artifactId>grizzled-slf4j_2.10</artifactId>
<version>1.0.2</version>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>${log4j.version}</version>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<groupId>org.clapper</groupId>
<artifactId>grizzled-slf4j_2.10</artifactId>
<version>1.3.4</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -289,6 +325,31 @@ limitations under the License.
</resources>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version> <!-- Use the latest version -->
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<!-- The main class to execute when running the JAR -->
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.dataartisans.flink.cascading.example.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
Expand Down Expand Up @@ -363,7 +424,8 @@ limitations under the License.
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
limitations under the License.mvn exec:exec -Dinput=kinglear.txt -Doutput=wordcounts.txt

-->
<license
implementation="org.apache.rat.analysis.license.SimplePatternBasedLicense">
Expand Down Expand Up @@ -406,34 +468,15 @@ limitations under the License.
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.12.1</version>
<executions>
<execution>
<id>validate</id>
<phase>validate</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
<configuration>
<configLocation>/tools/maven/checkstyle.xml</configLocation>
<logViolationsToConsole>true</logViolationsToConsole>
</configuration>
</plugin>

<plugin>
<!-- just define the Java version to be used for compiling and plugins -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<!--$NO-MVN-MAN-VER$-->
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public static void main(String[] args) {
RegexSplitGenerator splitter = new RegexSplitGenerator( token, "\\s+" );
// only returns "token"
Pipe docPipe = new Each( "token", text, splitter, Fields.RESULTS );

Pipe wcPipe = new Pipe( "wc", docPipe );
wcPipe = new AggregateBy( wcPipe, token, new CountBy(new Fields("count")));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import cascading.flow.planner.PlatformInfo;
import com.dataartisans.flink.cascading.runtime.util.FlinkFlowProcess;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.client.program.ProgramAbortException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import riffle.process.ProcessComplete;
Expand Down Expand Up @@ -81,8 +82,8 @@ public void complete() {
catch(FlowException fe) {
// check if we need to unwrap a ProgramAbortException
Throwable t = fe.getCause();
if (t instanceof OptimizerPlanEnvironment.ProgramAbortException) {
throw (OptimizerPlanEnvironment.ProgramAbortException)t;
if (t instanceof ProgramAbortException) {
throw (ProgramAbortException)t;
}
else {
throw fe;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import com.dataartisans.flink.cascading.runtime.coGroup.regularJoin.CoGroupReducer;
import com.dataartisans.flink.cascading.runtime.coGroup.regularJoin.TupleAppendOuterJoiner;
import com.dataartisans.flink.cascading.runtime.coGroup.regularJoin.TupleOuterJoiner;
import com.dataartisans.flink.cascading.runtime.each.EachMapper;
import com.dataartisans.flink.cascading.runtime.groupBy.GroupByReducer;
import com.dataartisans.flink.cascading.runtime.groupBy.GroupByReducer;
import com.dataartisans.flink.cascading.runtime.hashJoin.NaryHashJoinJoiner;
import com.dataartisans.flink.cascading.runtime.util.FlinkFlowProcess;
Expand All @@ -57,9 +59,9 @@
import com.dataartisans.flink.cascading.runtime.hashJoin.TupleAppendCrosser;
import com.dataartisans.flink.cascading.runtime.hashJoin.TupleAppendJoiner;
import com.dataartisans.flink.cascading.runtime.hashJoin.HashJoinMapper;
import com.dataartisans.flink.cascading.runtime.each.EachMapper;
import com.dataartisans.flink.cascading.runtime.sink.TapOutputFormat;
import com.dataartisans.flink.cascading.runtime.source.TapInputFormat;
import com.dataartisans.flink.cascading.runtime.util.FlinkFlowProcess;
import com.dataartisans.flink.cascading.runtime.util.IdMapper;
import com.dataartisans.flink.cascading.types.tuple.TupleTypeInfo;
import com.dataartisans.flink.cascading.types.tuplearray.TupleArrayTypeInfo;
Expand All @@ -71,9 +73,9 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.operators.PartitionOperator;
import org.apache.flink.api.java.operators.SortPartitionOperator;
import org.apache.flink.api.java.operators.SortedGrouping;
Expand Down Expand Up @@ -102,8 +104,8 @@ public class FlinkFlowStep extends BaseFlowStep<Configuration> {

private static final Logger LOG = LoggerFactory.getLogger(FlinkFlowStep.class);

private ExecutionEnvironment env;
private List<String> classPath;
private final ExecutionEnvironment env;
private final List<String> classPath;

public FlinkFlowStep(ExecutionEnvironment env, ElementGraph elementGraph, FlowNodeGraph flowNodeGraph, List<String> classPath) {
super(elementGraph, flowNodeGraph);
Expand Down
Loading