Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-12923 Added append avro mode to PutHDFS #8544

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
<artifactId>nifi-hadoop-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flowfile-packager</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.mapred.FsInput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -45,6 +50,8 @@
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
Expand All @@ -66,6 +73,7 @@
import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
Expand Down Expand Up @@ -114,6 +122,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
.build();

// properties
public static final String DEFAULT_APPEND_MODE = "DEFAULT";
public static final String AVRO_APPEND_MODE = "AVRO";

protected static final String REPLACE_RESOLUTION = "replace";
protected static final String IGNORE_RESOLUTION = "ignore";
Expand Down Expand Up @@ -154,6 +164,15 @@ public class PutHDFS extends AbstractHadoopProcessor {
.allowableValues(WRITE_AND_RENAME_AV, SIMPLE_WRITE_AV)
.build();

public static final PropertyDescriptor APPEND_MODE = new PropertyDescriptor.Builder()
.name("Append Mode")
.description("Defines the append strategy to use when the Conflict Resolution Strategy is set to 'append'.")
.allowableValues(DEFAULT_APPEND_MODE, AVRO_APPEND_MODE)
.defaultValue(DEFAULT_APPEND_MODE)
.dependsOn(CONFLICT_RESOLUTION, APPEND_RESOLUTION)
.required(true)
.build();

public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder()
.name("Block Size")
.description("Size of each block as written to HDFS. This overrides the Hadoop Configuration")
Expand Down Expand Up @@ -231,6 +250,7 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
.description("The parent HDFS directory to which files should be written. The directory will be created if it doesn't exist.")
.build());
props.add(CONFLICT_RESOLUTION);
props.add(APPEND_MODE);
props.add(WRITING_STRATEGY);
props.add(BLOCK_SIZE);
props.add(BUFFER_SIZE);
Expand All @@ -243,6 +263,22 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return props;
}

@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
final PropertyValue codec = validationContext.getProperty(COMPRESSION_CODEC);
final boolean isCodecSet = codec.isSet() && !CompressionType.NONE.name().equals(codec.getValue());
if (isCodecSet && APPEND_RESOLUTION.equals(validationContext.getProperty(CONFLICT_RESOLUTION).getValue())
&& AVRO_APPEND_MODE.equals(validationContext.getProperty(APPEND_MODE).getValue())) {
problems.add(new ValidationResult.Builder()
.subject("Codec")
.valid(false)
.explanation("Compression codec cannot be set when used in 'append avro' mode")
.build());
}
return problems;
}

@Override
protected void preProcessConfiguration(final Configuration config, final ProcessContext context) {
// Set umask once, to avoid thread safety issues doing it in onTrigger
Expand Down Expand Up @@ -384,14 +420,32 @@ public Object run() {
null, null);
}

if (codec != null) {
fos = codec.createOutputStream(fos);
}
createdFile = actualCopyFile;
BufferedInputStream bis = new BufferedInputStream(in);
StreamUtils.copy(bis, fos);
bis = null;
fos.flush();
if (codec != null) {
fos = codec.createOutputStream(fos);
}
createdFile = actualCopyFile;

final String appendMode = context.getProperty(APPEND_MODE).getValue();
if (APPEND_RESOLUTION.equals(conflictResponse)
&& AVRO_APPEND_MODE.equals(appendMode)
&& destinationExists) {
getLogger().info("Appending avro record to existing avro file");
try (final DataFileStream<Object> reader = new DataFileStream<>(in, new GenericDatumReader<>());
final DataFileWriter<Object> writer = new DataFileWriter<>(new GenericDatumWriter<>())) {
writer.appendTo(new FsInput(copyFile, configuration), fos); // open writer to existing file
writer.appendAllFrom(reader, false); // append flowfile content
writer.flush();
getLogger().info("Successfully appended avro record");
} catch (Exception e) {
getLogger().error("Error occurred during appending to existing avro file", e);
throw new ProcessException(e);
}
} else {
BufferedInputStream bis = new BufferedInputStream(in);
StreamUtils.copy(bis, fos);
bis = null;
fos.flush();
}
} finally {
try {
if (fos != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,26 @@
import java.util.List;
import java.util.Map;

import static org.apache.nifi.processors.hadoop.CompressionType.GZIP;
import static org.apache.nifi.processors.hadoop.CompressionType.NONE;
import static org.apache.nifi.processors.hadoop.PutHDFS.APPEND_RESOLUTION;
import static org.apache.nifi.processors.hadoop.PutHDFS.AVRO_APPEND_MODE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

public class PutHDFSTest {
private final static String TARGET_DIRECTORY = "target/test-classes";
private final static String AVRO_TARGET_DIRECTORY = TARGET_DIRECTORY + "/testdata-avro";
private final static String SOURCE_DIRECTORY = "src/test/resources/testdata";
private final static String AVRO_SOURCE_DIRECTORY = "src/test/resources/testdata-avro";
private final static String FILE_NAME = "randombytes-1";
private final static String AVRO_FILE_NAME = "input.avro";

private KerberosProperties kerberosProperties;
private MockFileSystem mockFileSystem;
Expand Down Expand Up @@ -186,6 +195,34 @@ public void testValidators() {
for (ValidationResult vr : results) {
assertTrue(vr.toString().contains("is invalid because Given value not found in allowed set"));
}

results = new HashSet<>();
runner.setProperty(PutHDFS.DIRECTORY, "target");
runner.setProperty(PutHDFS.APPEND_MODE, AVRO_APPEND_MODE);
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, APPEND_RESOLUTION);
runner.setProperty(PutHDFS.COMPRESSION_CODEC, GZIP.name());
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
assertEquals(1, results.size());
for (ValidationResult vr : results) {
assertEquals(vr.getSubject(), "Codec");
assertEquals(vr.getExplanation(), "Compression codec cannot be set when used in 'append avro' mode");
}

results = new HashSet<>();
runner.setProperty(PutHDFS.DIRECTORY, "target");
runner.setProperty(PutHDFS.APPEND_MODE, AVRO_APPEND_MODE);
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, APPEND_RESOLUTION);
runner.setProperty(PutHDFS.COMPRESSION_CODEC, NONE.name());
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
assertEquals(0, results.size());
}

@Test
Expand Down Expand Up @@ -229,6 +266,58 @@ public void testPutFile() throws IOException {
verify(spyFileSystem, times(1)).rename(any(Path.class), any(Path.class));
}

@Test
public void testPutFileWithAppendAvroModeNewFileCreated() throws IOException {
// given
final FileSystem spyFileSystem = Mockito.spy(mockFileSystem);
final PutHDFS proc = new TestablePutHDFS(kerberosProperties, spyFileSystem);
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, AVRO_TARGET_DIRECTORY);
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, APPEND_RESOLUTION);
runner.setProperty(PutHDFS.APPEND_MODE, AVRO_APPEND_MODE);
final Path targetPath = new Path(AVRO_TARGET_DIRECTORY + "/" + AVRO_FILE_NAME);

// when
try (final FileInputStream fis = new FileInputStream(AVRO_SOURCE_DIRECTORY + "/" + AVRO_FILE_NAME)) {
runner.enqueue(fis, Map.of(CoreAttributes.FILENAME.key(), AVRO_FILE_NAME));
assertTrue(runner.isValid());
runner.run();
}

// then
assertAvroAppendValues(runner, spyFileSystem, targetPath);
verify(spyFileSystem, times(0)).append(eq(targetPath), anyInt());
verify(spyFileSystem, times(1)).rename(any(Path.class), eq(targetPath));
assertEquals(100, spyFileSystem.getFileStatus(targetPath).getLen());
}

@Test
public void testPutFileWithAppendAvroModeWhenTargetFileAlreadyExists() throws IOException {
// given
final FileSystem spyFileSystem = Mockito.spy(mockFileSystem);
final PutHDFS proc = new TestablePutHDFS(kerberosProperties, spyFileSystem);
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, AVRO_TARGET_DIRECTORY);
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, APPEND_RESOLUTION);
runner.setProperty(PutHDFS.APPEND_MODE, AVRO_APPEND_MODE);
spyFileSystem.setConf(new Configuration());
final Path targetPath = new Path(AVRO_TARGET_DIRECTORY + "/" + AVRO_FILE_NAME);
spyFileSystem.createNewFile(targetPath);

// when
try (final FileInputStream fis = new FileInputStream(AVRO_SOURCE_DIRECTORY + "/" + AVRO_FILE_NAME)) {
runner.enqueue(fis, Map.of(CoreAttributes.FILENAME.key(), AVRO_FILE_NAME));
assertTrue(runner.isValid());
runner.run();
}

// then
assertAvroAppendValues(runner, spyFileSystem, targetPath);
verify(spyFileSystem).append(eq(targetPath), anyInt());
verify(spyFileSystem, times(0)).rename(any(Path.class), eq(targetPath));
assertEquals(200, spyFileSystem.getFileStatus(targetPath).getLen());
}

@Test
public void testPutFileWithSimpleWrite() throws IOException {
// given
Expand Down Expand Up @@ -642,7 +731,29 @@ public void testPutFileWithCreateException() throws IOException {
mockFileSystem.delete(p, true);
}

private class TestablePutHDFS extends PutHDFS {
private static void assertAvroAppendValues(TestRunner runner, FileSystem spyFileSystem, Path targetPath) throws IOException {
final List<MockFlowFile> failedFlowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE);
assertTrue(failedFlowFiles.isEmpty());

final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());

final MockFlowFile flowFile = flowFiles.get(0);
assertTrue(spyFileSystem.exists(targetPath));
assertEquals(AVRO_FILE_NAME, flowFile.getAttribute(CoreAttributes.FILENAME.key()));
assertEquals(AVRO_TARGET_DIRECTORY, flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
assertEquals("true", flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));

final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
final ProvenanceEventRecord sendEvent = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
// If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
assertTrue(sendEvent.getTransitUri().endsWith(AVRO_TARGET_DIRECTORY + "/" + AVRO_FILE_NAME));
assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith(AVRO_TARGET_DIRECTORY + "/" + AVRO_FILE_NAME));
}

private static class TestablePutHDFS extends PutHDFS {

private final KerberosProperties testKerberosProperties;
private final FileSystem fileSystem;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,19 @@
import java.util.Set;

public class MockFileSystem extends FileSystem {
private static final long DIR_LENGTH = 1L;
private static final long FILE_LENGTH = 100L;
private final Map<Path, FileStatus> pathToStatus = new HashMap<>();
private final Map<Path, List<AclEntry>> pathToAcl = new HashMap<>();
private final Map<Path, Set<FileStatus>> fileStatuses = new HashMap<>();
private final Map<Path, FSDataOutputStream> pathToOutputStream = new HashMap<>();

private boolean failOnOpen;
private boolean failOnClose;
private boolean failOnCreate;
private boolean failOnFileStatus;
private boolean failOnExists;


public void setFailOnClose(final boolean failOnClose) {
this.failOnClose = failOnClose;
}
Expand Down Expand Up @@ -102,22 +104,18 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, fi
throw new IOException(new AuthenticationException("test auth error"));
}
pathToStatus.put(f, newFile(f, permission));
if(failOnClose) {
return new FSDataOutputStream(new ByteArrayOutputStream(), new FileSystem.Statistics("")) {
@Override
public void close() throws IOException {
super.close();
throw new IOException("Fail on close");
}
};
} else {
return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics(""));
}
final FSDataOutputStream outputStream = createOutputStream();
pathToOutputStream.put(f, outputStream);
return outputStream;
}

@Override
public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) {
return null;
pathToOutputStream.computeIfAbsent(f, f2 -> createOutputStream());
final FileStatus oldStatus = pathToStatus.get(f);
final long newLength = oldStatus.getLen() + FILE_LENGTH;
pathToStatus.put(f, updateLength(oldStatus, newLength));
return pathToOutputStream.get(f);
}

@Override
Expand Down Expand Up @@ -192,19 +190,45 @@ public boolean exists(Path f) throws IOException {
return pathToStatus.containsKey(f);
}

private FSDataOutputStream createOutputStream() {
if(failOnClose) {
return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics("")) {
@Override
public void close() throws IOException {
super.close();
throw new IOException("Fail on close");
}
};
} else {
return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics(""));
}
}

private FileStatus updateLength(FileStatus oldStatus, Long newLength) {
try {
return new FileStatus(newLength, oldStatus.isDirectory(), oldStatus.getReplication(),
oldStatus.getBlockSize(), oldStatus.getModificationTime(), oldStatus.getAccessTime(),
oldStatus.getPermission(), oldStatus.getOwner(), oldStatus.getGroup(),
(oldStatus.isSymlink() ? oldStatus.getSymlink() : null),
oldStatus.getPath());
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public FileStatus newFile(Path p, FsPermission permission) {
return new FileStatus(100L, false, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, permission, "owner", "group", p);
return new FileStatus(FILE_LENGTH, false, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, permission, "owner", "group", p);
}

public FileStatus newDir(Path p) {
return new FileStatus(1L, true, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, perms((short) 0755), "owner", "group", (Path)null, p, true, false, false);
return new FileStatus(DIR_LENGTH, true, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, perms((short) 0755), "owner", "group", (Path)null, p, true, false, false);
}

public FileStatus newFile(String p) {
return new FileStatus(100L, false, 3, 128*1024*1024, 1523456000000L, 1523457000000L, perms((short)0644), "owner", "group", new Path(p));
return new FileStatus(FILE_LENGTH, false, 3, 128*1024*1024, 1523456000000L, 1523457000000L, perms((short)0644), "owner", "group", new Path(p));
}
public FileStatus newDir(String p) {
return new FileStatus(1L, true, 3, 128*1024*1024, 1523456000000L, 1523457000000L, perms((short)0755), "owner", "group", new Path(p));
return new FileStatus(DIR_LENGTH, true, 3, 128*1024*1024, 1523456000000L, 1523457000000L, perms((short)0755), "owner", "group", new Path(p));
}

@Override
Expand Down
Binary file not shown.
Loading