Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions v2/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<guava.version>20.0</guava.version>
<http-client.version>1.27.0</http-client.version>
<jackson.version>2.9.9</jackson.version>
<parquet.version>2.14.0</parquet.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -81,5 +82,11 @@
<version>${guava.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-parquet</artifactId>
<version>${parquet.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright (C) 2019 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.io;

import org.apache.beam.sdk.io.DefaultFilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The {@link WindowedFilenamePolicy} class outputs filenames for file sinks which handle windowed
* writes.
*/
@SuppressWarnings("serial")
public class WindowedFilenamePolicy extends FilenamePolicy {
/**
* The logger to output status messages to.
*/
private static final Logger LOG = LoggerFactory.getLogger(WindowedFilenamePolicy.class);

private static final DateTimeFormatter YEAR = DateTimeFormat.forPattern("YYYY");
private static final DateTimeFormatter MONTH = DateTimeFormat.forPattern("MM");
private static final DateTimeFormatter DAY = DateTimeFormat.forPattern("dd");
private static final DateTimeFormatter HOUR = DateTimeFormat.forPattern("HH");
/**
* The filename baseFile.
*/
private final ValueProvider<String> outputDirectory;
/**
* The prefix of the file to output.
*/
private final ValueProvider<String> outputFilenamePrefix;
/**
* The filename suffix.
*/
private final ValueProvider<String> suffix;
/**
* The shard template used during file formatting.
*/
private final ValueProvider<String> shardTemplate;

/**
* Constructs a new {@link WindowedFilenamePolicy} with the supplied baseFile used for output
* files.
*
* @param outputDirectory The output directory for all files.
* @param outputFilenamePrefix The common prefix for output files.
* @param shardTemplate The template used to create uniquely named sharded files.
* @param suffix The suffix to append to all files output by the policy.
*/
public WindowedFilenamePolicy(
String outputDirectory, String outputFilenamePrefix, String shardTemplate, String suffix) {
this(
StaticValueProvider.of(outputDirectory),
StaticValueProvider.of(outputFilenamePrefix),
StaticValueProvider.of(shardTemplate),
StaticValueProvider.of(suffix));
}

/**
* Constructs a new {@link WindowedFilenamePolicy} with the supplied baseFile used for output
* files.
*
* @param outputDirectory The output directory for all files.
* @param outputFilenamePrefix The common prefix for output files.
* @param shardTemplate The template used to create uniquely named sharded files.
* @param suffix The suffix to append to all files output by the policy.
*/
public WindowedFilenamePolicy(
ValueProvider<String> outputDirectory,
ValueProvider<String> outputFilenamePrefix,
ValueProvider<String> shardTemplate,
ValueProvider<String> suffix) {
this.outputDirectory = outputDirectory;
this.outputFilenamePrefix = outputFilenamePrefix;
this.shardTemplate = shardTemplate;
this.suffix = suffix;
}

/**
* The windowed filename method will construct filenames per window according to the baseFile,
* suffix, and shardTemplate supplied. Directories with date templates in them will automatically
* have their values resolved. For example the outputDirectory of /YYYY/MM/DD would resolve to
* /2017/01/08 on January 8th, 2017.
*/
@Override
public ResourceId windowedFilename(
int shardNumber,
int numShards,
BoundedWindow window,
PaneInfo paneInfo,
OutputFileHints outputFileHints) {

ResourceId outputFile =
resolveWithDateTemplates(outputDirectory, window)
.resolve(outputFilenamePrefix.get(), StandardResolveOptions.RESOLVE_FILE);

DefaultFilenamePolicy policy =
DefaultFilenamePolicy.fromStandardParameters(
StaticValueProvider.of(outputFile), shardTemplate.get(), suffix.get(), true);
ResourceId result =
policy.windowedFilename(shardNumber, numShards, window, paneInfo, outputFileHints);
LOG.debug("Windowed file name policy created: {}", result.toString());
return result;
}

/**
* Unwindowed writes are unsupported by this filename policy so an {@link
* UnsupportedOperationException} will be thrown if invoked.
*/
@Override
public ResourceId unwindowedFilename(
int shardNumber, int numShards, OutputFileHints outputFileHints) {
throw new UnsupportedOperationException(
"There is no windowed filename policy for "
+ "unwindowed file output. Please use the WindowedFilenamePolicy with windowed "
+ "writes or switch filename policies.");
}

/**
* Resolves any date variables which exist in the output directory path. This allows for the
* dynamically changing of the output location based on the window end time.
*
* @return The new output directory with all variables resolved.
*/
private ResourceId resolveWithDateTemplates(
ValueProvider<String> outputDirectoryStr, BoundedWindow window) {
ResourceId outputDirectory = FileSystems.matchNewResource(outputDirectoryStr.get(), true);

if (window instanceof IntervalWindow) {
IntervalWindow intervalWindow = (IntervalWindow) window;
DateTime time = intervalWindow.end().toDateTime();
String outputPath = outputDirectory.toString();
outputPath = outputPath.replace("YYYY", YEAR.print(time));
outputPath = outputPath.replace("MM", MONTH.print(time));
outputPath = outputPath.replace("DD", DAY.print(time));
outputPath = outputPath.replace("HH", HOUR.print(time));
outputDirectory = FileSystems.matchNewResource(outputPath, true);
}
return outputDirectory;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (C) 2019 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

/** IO Utility classes for templates. */
package com.google.cloud.teleport.v2.io;
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (C) 2019 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.transforms;

import java.util.Collections;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;

/**
* The {@link KeyValueToGenericRecordFn} class converts a KV<String, String> to a GenericRecord
* using a static schema.
*/
public class KeyValueToGenericRecordFn extends DoFn<KV<String, String>, GenericRecord> {

/** Schema used for generating a GenericRecord. */
private static final String SCHEMA_STRING =
"{\n"
+ " \"namespace\": \"com.google.cloud.teleport.v2.avro\",\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"GenericKafkaRecord\",\n"
+ " \"fields\": [\n"
+ " {\"name\": \"message\",\"type\": \"string\"},\n"
+ " {\"name\": \"attributes\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},\n"
+ " {\"name\": \"timestamp\", \"type\": \"long\"}\n"
+ " ]\n"
+ "}";

public static final Schema SCHEMA = new Schema.Parser().parse(SCHEMA_STRING);

/** Generates the records using {@link GenericRecordBuilder}. */
@ProcessElement
public void processElement(ProcessContext c) {

KV<String, String> message = c.element();
String attributeKey = message.getKey();
String attributeValue = message.getValue();

Map<String, String> attributeMap;

if (attributeValue != null) {
if (attributeKey != null) {
attributeMap = Collections.singletonMap(attributeKey, attributeValue);
} else {
attributeMap = Collections.singletonMap("", attributeValue);
}
} else {
attributeMap = Collections.EMPTY_MAP;
}

c.output(
new GenericRecordBuilder(SCHEMA)
.set("message", attributeValue)
.set("attributes", attributeMap)
.set("timestamp", c.timestamp().getMillis())
.build());
}
}
Loading