Skip to content

Commit

Permalink
Added Spark Slf4jSink.java and configurations
Browse files Browse the repository at this point in the history
  • Loading branch information
muttcg committed Oct 2, 2018
1 parent 71546ea commit ceda564
Show file tree
Hide file tree
Showing 14 changed files with 155 additions and 71 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ bin
.DS_Store
logs
log
temp
temp
tmp
10 changes: 10 additions & 0 deletions pipelines/beam-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<scope>provided</scope>
</dependency>

<!-- Utils -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
Expand Down Expand Up @@ -85,7 +87,7 @@ private Read(String filePath, String workingPath, boolean unCompressed) {

@Override
public PCollection<ExtendedRecord> expand(PBegin input) {
DwCASource source = new DwCASource(this);
DwcaSource source = new DwcaSource(this);
return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
}

Expand All @@ -97,11 +99,11 @@ public void populateDisplayData(DisplayData.Builder builder) {
}

/** A non-splittable bounded source. */
private static class DwCASource extends BoundedSource<ExtendedRecord> {
private static class DwcaSource extends BoundedSource<ExtendedRecord> {

private final Read read;

DwCASource(Read read) {
DwcaSource(Read read) {
this.read = read;
}

Expand Down Expand Up @@ -131,10 +133,12 @@ public BoundedReader<ExtendedRecord> createReader(PipelineOptions options) {
/** A wrapper around the standard DwC-IO provided NormalizedDwcArchive. */
private static class BoundedDwCAReader extends BoundedSource.BoundedReader<ExtendedRecord> {

private final DwCASource source;
private final Counter dwcaCount = Metrics.counter("DwcaIO", "dwcaCount");

private final DwcaSource source;
private DwcaReader reader;

private BoundedDwCAReader(DwCASource source) {
private BoundedDwCAReader(DwcaSource source) {
this.source = source;
}

Expand All @@ -149,6 +153,7 @@ public boolean start() throws IOException {

@Override
public boolean advance() {
dwcaCount.inc();
return reader.advance();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.gbif.pipelines.common.beam;

import java.util.Properties;

import com.codahale.metrics.MetricRegistry;
import org.apache.beam.runners.spark.metrics.AggregatorMetric;
import org.apache.beam.runners.spark.metrics.WithMetricsSupport;

/**
* A Spark {@link org.apache.spark.metrics.sink.Sink} that is tailored to report {@link
* AggregatorMetric} metrics to Slf4j.
*/
public class Slf4jSink extends org.apache.spark.metrics.sink.Slf4jSink {
public Slf4jSink(
final Properties properties,
final MetricRegistry metricRegistry,
final org.apache.spark.SecurityManager securityMgr) {
super(properties, WithMetricsSupport.forRegistry(metricRegistry), securityMgr);
}
}
8 changes: 4 additions & 4 deletions pipelines/ingest-gbif-standalone/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Please change:

#### From DwCA to ExtendedRecord *.avro file:
```
java -cp target/ingest-gbif-standalone-BUILD_VERSION-shaded.jar org.gbif.pipelines.standalone.DwcaPipeline \
java -jar target/ingest-gbif-standalone-BUILD_VERSION-shaded.jar \
--runner=SparkRunner \
--pipelineStep=DWCA_TO_VERBATIM \
--datasetId=DATASET_ID \
Expand All @@ -31,7 +31,7 @@ java -cp target/ingest-gbif-standalone-BUILD_VERSION-shaded.jar org.gbif.pipelin

#### From DwCA to GBIF interpreted *.avro files:
```
java -cp target/ingest-gbif-standalone-BUILD_VERSION-shaded.jar org.gbif.pipelines.standalone.DwcaPipeline \
java -jar target/ingest-gbif-standalone-BUILD_VERSION-shaded.jar \
--runner=SparkRunner \
--pipelineStep=DWCA_TO_INTERPRETED \
--datasetId=DATASET_ID \
Expand All @@ -43,7 +43,7 @@ java -cp target/ingest-gbif-standalone-BUILD_VERSION-shaded.jar org.gbif.pipelin

#### From DwCA to Elasticsearch index:
```
java -cp target/ingest-gbif-standalone-BUILD_VERSION-shaded.jar org.gbif.pipelines.standalone.DwcaPipeline \
java -jar target/ingest-gbif-standalone-BUILD_VERSION-shaded.jar \
--runner=SparkRunner \
--pipelineStep=DWCA_TO_ES_INDEX \
--datasetId=DATASET_ID \
Expand All @@ -56,7 +56,7 @@ java -cp target/ingest-gbif-standalone-BUILD_VERSION-shaded.jar org.gbif.pipelin

#### From GBIF interpreted *.avro files to Elasticsearch index:
```
java -cp target/ingest-gbif-standalone-BUILD_VERSION-shaded.jar org.gbif.pipelines.standalone.DwcaPipeline \
java -jar target/ingest-gbif-standalone-BUILD_VERSION-shaded.jar \
--runner=SparkRunner \
--pipelineStep=INTERPRETED_TO_ES_INDEX \
--datasetId=DATASET_ID \
Expand Down
49 changes: 35 additions & 14 deletions pipelines/ingest-gbif-standalone/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,29 +56,24 @@
<pattern>okio</pattern>
<shadedPattern>o113.okio</shadedPattern>
</relocation>

<relocation>
<pattern>retrofit2</pattern>
<shadedPattern>r240.retrofit2</shadedPattern>
</relocation>

<!-- Transient from elastic search Beam: conflicts with hive-jdbc-1.1.0-cdh5.12.1-standalone.jar -->
<relocation>
<pattern>org.apache.http</pattern>
<shadedPattern>hc45.org.apache.http</shadedPattern>
</relocation>

<!-- Transient from core: conflicts with Hadoop on Spark -->
<relocation>
<pattern>org.hsqldb</pattern>
<shadedPattern>h228.org.hsqldb</shadedPattern>
</relocation>

<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>g20.com.google.common</shadedPattern>
</relocation>

</relocations>
</configuration>
</execution>
Expand Down Expand Up @@ -109,18 +104,37 @@
<artifactId>beam-runners-spark</artifactId>
</dependency>

<!-- Logging -->
<!-- Hadoop -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>compile</scope>
</dependency>

<!-- Spark -->
Expand Down Expand Up @@ -150,10 +164,17 @@
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>

<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<groupId>biz.paluch.logging</groupId>
<artifactId>logstash-gelf</artifactId>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Set everything to be logged to the console
log4j.rootCategory=INFO, console, file, gelf
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%X{datasetId} %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark-project.jetty=WARN
project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up
nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

# File appender
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=logs/ingest-gbif.log
log4j.appender.file.ImmediateFlush=true
log4j.appender.file.Append=false
log4j.appender.file.MaxFileSize=100MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%X{datasetId} %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

# ELK appender
log4j.appender.gelf=biz.paluch.logging.gelf.log4j.GelfLogAppender
log4j.appender.gelf.Threshold=INFO
log4j.appender.gelf.Host=udp:127.0.0.1
log4j.appender.gelf.Port=12201
log4j.appender.gelf.Version=1.1
log4j.appender.gelf.Facility=ingest-gbif
log4j.appender.gelf.ExtractStackTrace=true
log4j.appender.gelf.FilterStackTrace=true
log4j.appender.gelf.MdcProfiling=true
log4j.appender.gelf.TimestampPattern=yyyy-MM-dd HH:mm:ss,SSSS
log4j.appender.gelf.MaximumMessageSize=8192
log4j.appender.gelf.MdcFields=datasetId,attempt,uuid
log4j.appender.gelf.IncludeFullMdc=true
22 changes: 0 additions & 22 deletions pipelines/ingest-gbif-standalone/src/main/resources/logback.xml

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
executor.sink.slf4j.class=org.apache.spark.metrics.sink.Slf4jSink
driver.sink.slf4j.class=org.gbif.pipelines.common.beam.Slf4jSink
9 changes: 3 additions & 6 deletions pipelines/ingest-gbif/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand Down Expand Up @@ -44,7 +45,7 @@
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>shaded</shadedClassifierName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<relocations>
<!-- Transient from core: conflicts with Beam on Spark -->
Expand Down Expand Up @@ -149,10 +150,6 @@
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
</dependency>

<!-- Json -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package org.gbif.pipelines.ingest.options;

import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.Validation;

public interface DwcaPipelineOptions
extends EsIndexingPipelineOptions, InterpretationPipelineOptions, SparkPipelineOptions {
extends EsIndexingPipelineOptions, InterpretationPipelineOptions {

enum PipelineStep {
DWCA_TO_VERBATIM, // only reads a Dwca and converts it to an avro file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.List;

import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.sdk.io.hdfs.HadoopFileSystemOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
Expand All @@ -15,7 +16,7 @@
* a {@link HadoopFileSystemOptions} when exporting/reading files.
*/
public interface InterpretationPipelineOptions
extends BasePipelineOptions, HadoopFileSystemOptions {
extends BasePipelineOptions, HadoopFileSystemOptions, SparkPipelineOptions {

@Override
@Description(
Expand Down
Loading

0 comments on commit ceda564

Please sign in to comment.