Skip to content

Commit

Permalink
NIFI-12923 Added append avro mode to PutHDFS
Browse files Browse the repository at this point in the history
NIFI-12923 remove var keyword

NIFI-12923 change property name

NIFI-12923 Added property dependency for append_mode

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes apache#8544
  • Loading branch information
balazsgerner authored and mattyb149 committed Apr 19, 2024
1 parent 36c0ec4 commit 3239f59
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
<artifactId>nifi-hadoop-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flowfile-packager</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.mapred.FsInput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -45,6 +50,8 @@
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
Expand All @@ -66,6 +73,7 @@
import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
Expand Down Expand Up @@ -114,6 +122,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
.build();

// properties
public static final String DEFAULT_APPEND_MODE = "DEFAULT";
public static final String AVRO_APPEND_MODE = "AVRO";

protected static final String REPLACE_RESOLUTION = "replace";
protected static final String IGNORE_RESOLUTION = "ignore";
Expand Down Expand Up @@ -154,6 +164,15 @@ public class PutHDFS extends AbstractHadoopProcessor {
.allowableValues(WRITE_AND_RENAME_AV, SIMPLE_WRITE_AV)
.build();

public static final PropertyDescriptor APPEND_MODE = new PropertyDescriptor.Builder()
.name("Append Mode")
.description("Defines the append strategy to use when the Conflict Resolution Strategy is set to 'append'.")
.allowableValues(DEFAULT_APPEND_MODE, AVRO_APPEND_MODE)
.defaultValue(DEFAULT_APPEND_MODE)
.dependsOn(CONFLICT_RESOLUTION, APPEND_RESOLUTION)
.required(true)
.build();

public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder()
.name("Block Size")
.description("Size of each block as written to HDFS. This overrides the Hadoop Configuration")
Expand Down Expand Up @@ -231,6 +250,7 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
.description("The parent HDFS directory to which files should be written. The directory will be created if it doesn't exist.")
.build());
props.add(CONFLICT_RESOLUTION);
props.add(APPEND_MODE);
props.add(WRITING_STRATEGY);
props.add(BLOCK_SIZE);
props.add(BUFFER_SIZE);
Expand All @@ -243,6 +263,22 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return props;
}

@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
final PropertyValue codec = validationContext.getProperty(COMPRESSION_CODEC);
final boolean isCodecSet = codec.isSet() && !CompressionType.NONE.name().equals(codec.getValue());
if (isCodecSet && APPEND_RESOLUTION.equals(validationContext.getProperty(CONFLICT_RESOLUTION).getValue())
&& AVRO_APPEND_MODE.equals(validationContext.getProperty(APPEND_MODE).getValue())) {
problems.add(new ValidationResult.Builder()
.subject("Codec")
.valid(false)
.explanation("Compression codec cannot be set when used in 'append avro' mode")
.build());
}
return problems;
}

@Override
protected void preProcessConfiguration(final Configuration config, final ProcessContext context) {
// Set umask once, to avoid thread safety issues doing it in onTrigger
Expand Down Expand Up @@ -384,14 +420,32 @@ public Object run() {
null, null);
}

if (codec != null) {
fos = codec.createOutputStream(fos);
}
createdFile = actualCopyFile;
BufferedInputStream bis = new BufferedInputStream(in);
StreamUtils.copy(bis, fos);
bis = null;
fos.flush();
if (codec != null) {
fos = codec.createOutputStream(fos);
}
createdFile = actualCopyFile;

final String appendMode = context.getProperty(APPEND_MODE).getValue();
if (APPEND_RESOLUTION.equals(conflictResponse)
&& AVRO_APPEND_MODE.equals(appendMode)
&& destinationExists) {
getLogger().info("Appending avro record to existing avro file");
try (final DataFileStream<Object> reader = new DataFileStream<>(in, new GenericDatumReader<>());
final DataFileWriter<Object> writer = new DataFileWriter<>(new GenericDatumWriter<>())) {
writer.appendTo(new FsInput(copyFile, configuration), fos); // open writer to existing file
writer.appendAllFrom(reader, false); // append flowfile content
writer.flush();
getLogger().info("Successfully appended avro record");
} catch (Exception e) {
getLogger().error("Error occurred during appending to existing avro file", e);
throw new ProcessException(e);
}
} else {
BufferedInputStream bis = new BufferedInputStream(in);
StreamUtils.copy(bis, fos);
bis = null;
fos.flush();
}
} finally {
try {
if (fos != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,26 @@
import java.util.List;
import java.util.Map;

import static org.apache.nifi.processors.hadoop.CompressionType.GZIP;
import static org.apache.nifi.processors.hadoop.CompressionType.NONE;
import static org.apache.nifi.processors.hadoop.PutHDFS.APPEND_RESOLUTION;
import static org.apache.nifi.processors.hadoop.PutHDFS.AVRO_APPEND_MODE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

public class PutHDFSTest {
private final static String TARGET_DIRECTORY = "target/test-classes";
private final static String AVRO_TARGET_DIRECTORY = TARGET_DIRECTORY + "/testdata-avro";
private final static String SOURCE_DIRECTORY = "src/test/resources/testdata";
private final static String AVRO_SOURCE_DIRECTORY = "src/test/resources/testdata-avro";
private final static String FILE_NAME = "randombytes-1";
private final static String AVRO_FILE_NAME = "input.avro";

private KerberosProperties kerberosProperties;
private MockFileSystem mockFileSystem;
Expand Down Expand Up @@ -186,6 +195,34 @@ public void testValidators() {
for (ValidationResult vr : results) {
assertTrue(vr.toString().contains("is invalid because Given value not found in allowed set"));
}

results = new HashSet<>();
runner.setProperty(PutHDFS.DIRECTORY, "target");
runner.setProperty(PutHDFS.APPEND_MODE, AVRO_APPEND_MODE);
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, APPEND_RESOLUTION);
runner.setProperty(PutHDFS.COMPRESSION_CODEC, GZIP.name());
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
assertEquals(1, results.size());
for (ValidationResult vr : results) {
assertEquals(vr.getSubject(), "Codec");
assertEquals(vr.getExplanation(), "Compression codec cannot be set when used in 'append avro' mode");
}

results = new HashSet<>();
runner.setProperty(PutHDFS.DIRECTORY, "target");
runner.setProperty(PutHDFS.APPEND_MODE, AVRO_APPEND_MODE);
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, APPEND_RESOLUTION);
runner.setProperty(PutHDFS.COMPRESSION_CODEC, NONE.name());
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
assertEquals(0, results.size());
}

@Test
Expand Down Expand Up @@ -229,6 +266,58 @@ public void testPutFile() throws IOException {
verify(spyFileSystem, times(1)).rename(any(Path.class), any(Path.class));
}

@Test
public void testPutFileWithAppendAvroModeNewFileCreated() throws IOException {
// given
final FileSystem spyFileSystem = Mockito.spy(mockFileSystem);
final PutHDFS proc = new TestablePutHDFS(kerberosProperties, spyFileSystem);
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, AVRO_TARGET_DIRECTORY);
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, APPEND_RESOLUTION);
runner.setProperty(PutHDFS.APPEND_MODE, AVRO_APPEND_MODE);
final Path targetPath = new Path(AVRO_TARGET_DIRECTORY + "/" + AVRO_FILE_NAME);

// when
try (final FileInputStream fis = new FileInputStream(AVRO_SOURCE_DIRECTORY + "/" + AVRO_FILE_NAME)) {
runner.enqueue(fis, Map.of(CoreAttributes.FILENAME.key(), AVRO_FILE_NAME));
assertTrue(runner.isValid());
runner.run();
}

// then
assertAvroAppendValues(runner, spyFileSystem, targetPath);
verify(spyFileSystem, times(0)).append(eq(targetPath), anyInt());
verify(spyFileSystem, times(1)).rename(any(Path.class), eq(targetPath));
assertEquals(100, spyFileSystem.getFileStatus(targetPath).getLen());
}

@Test
public void testPutFileWithAppendAvroModeWhenTargetFileAlreadyExists() throws IOException {
// given
final FileSystem spyFileSystem = Mockito.spy(mockFileSystem);
final PutHDFS proc = new TestablePutHDFS(kerberosProperties, spyFileSystem);
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, AVRO_TARGET_DIRECTORY);
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, APPEND_RESOLUTION);
runner.setProperty(PutHDFS.APPEND_MODE, AVRO_APPEND_MODE);
spyFileSystem.setConf(new Configuration());
final Path targetPath = new Path(AVRO_TARGET_DIRECTORY + "/" + AVRO_FILE_NAME);
spyFileSystem.createNewFile(targetPath);

// when
try (final FileInputStream fis = new FileInputStream(AVRO_SOURCE_DIRECTORY + "/" + AVRO_FILE_NAME)) {
runner.enqueue(fis, Map.of(CoreAttributes.FILENAME.key(), AVRO_FILE_NAME));
assertTrue(runner.isValid());
runner.run();
}

// then
assertAvroAppendValues(runner, spyFileSystem, targetPath);
verify(spyFileSystem).append(eq(targetPath), anyInt());
verify(spyFileSystem, times(0)).rename(any(Path.class), eq(targetPath));
assertEquals(200, spyFileSystem.getFileStatus(targetPath).getLen());
}

@Test
public void testPutFileWithSimpleWrite() throws IOException {
// given
Expand Down Expand Up @@ -642,7 +731,29 @@ public void testPutFileWithCreateException() throws IOException {
mockFileSystem.delete(p, true);
}

private class TestablePutHDFS extends PutHDFS {
private static void assertAvroAppendValues(TestRunner runner, FileSystem spyFileSystem, Path targetPath) throws IOException {
final List<MockFlowFile> failedFlowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE);
assertTrue(failedFlowFiles.isEmpty());

final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());

final MockFlowFile flowFile = flowFiles.get(0);
assertTrue(spyFileSystem.exists(targetPath));
assertEquals(AVRO_FILE_NAME, flowFile.getAttribute(CoreAttributes.FILENAME.key()));
assertEquals(AVRO_TARGET_DIRECTORY, flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
assertEquals("true", flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));

final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
final ProvenanceEventRecord sendEvent = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
// If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
assertTrue(sendEvent.getTransitUri().endsWith(AVRO_TARGET_DIRECTORY + "/" + AVRO_FILE_NAME));
assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith(AVRO_TARGET_DIRECTORY + "/" + AVRO_FILE_NAME));
}

private static class TestablePutHDFS extends PutHDFS {

private final KerberosProperties testKerberosProperties;
private final FileSystem fileSystem;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,19 @@
import java.util.Set;

public class MockFileSystem extends FileSystem {
private static final long DIR_LENGTH = 1L;
private static final long FILE_LENGTH = 100L;
private final Map<Path, FileStatus> pathToStatus = new HashMap<>();
private final Map<Path, List<AclEntry>> pathToAcl = new HashMap<>();
private final Map<Path, Set<FileStatus>> fileStatuses = new HashMap<>();
private final Map<Path, FSDataOutputStream> pathToOutputStream = new HashMap<>();

private boolean failOnOpen;
private boolean failOnClose;
private boolean failOnCreate;
private boolean failOnFileStatus;
private boolean failOnExists;


public void setFailOnClose(final boolean failOnClose) {
this.failOnClose = failOnClose;
}
Expand Down Expand Up @@ -102,22 +104,18 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, fi
throw new IOException(new AuthenticationException("test auth error"));
}
pathToStatus.put(f, newFile(f, permission));
if(failOnClose) {
return new FSDataOutputStream(new ByteArrayOutputStream(), new FileSystem.Statistics("")) {
@Override
public void close() throws IOException {
super.close();
throw new IOException("Fail on close");
}
};
} else {
return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics(""));
}
final FSDataOutputStream outputStream = createOutputStream();
pathToOutputStream.put(f, outputStream);
return outputStream;
}

@Override
public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) {
return null;
pathToOutputStream.computeIfAbsent(f, f2 -> createOutputStream());
final FileStatus oldStatus = pathToStatus.get(f);
final long newLength = oldStatus.getLen() + FILE_LENGTH;
pathToStatus.put(f, updateLength(oldStatus, newLength));
return pathToOutputStream.get(f);
}

@Override
Expand Down Expand Up @@ -192,19 +190,45 @@ public boolean exists(Path f) throws IOException {
return pathToStatus.containsKey(f);
}

private FSDataOutputStream createOutputStream() {
if(failOnClose) {
return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics("")) {
@Override
public void close() throws IOException {
super.close();
throw new IOException("Fail on close");
}
};
} else {
return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics(""));
}
}

private FileStatus updateLength(FileStatus oldStatus, Long newLength) {
try {
return new FileStatus(newLength, oldStatus.isDirectory(), oldStatus.getReplication(),
oldStatus.getBlockSize(), oldStatus.getModificationTime(), oldStatus.getAccessTime(),
oldStatus.getPermission(), oldStatus.getOwner(), oldStatus.getGroup(),
(oldStatus.isSymlink() ? oldStatus.getSymlink() : null),
oldStatus.getPath());
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public FileStatus newFile(Path p, FsPermission permission) {
return new FileStatus(100L, false, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, permission, "owner", "group", p);
return new FileStatus(FILE_LENGTH, false, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, permission, "owner", "group", p);
}

public FileStatus newDir(Path p) {
return new FileStatus(1L, true, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, perms((short) 0755), "owner", "group", (Path)null, p, true, false, false);
return new FileStatus(DIR_LENGTH, true, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, perms((short) 0755), "owner", "group", (Path)null, p, true, false, false);
}

public FileStatus newFile(String p) {
return new FileStatus(100L, false, 3, 128*1024*1024, 1523456000000L, 1523457000000L, perms((short)0644), "owner", "group", new Path(p));
return new FileStatus(FILE_LENGTH, false, 3, 128*1024*1024, 1523456000000L, 1523457000000L, perms((short)0644), "owner", "group", new Path(p));
}
public FileStatus newDir(String p) {
return new FileStatus(1L, true, 3, 128*1024*1024, 1523456000000L, 1523457000000L, perms((short)0755), "owner", "group", new Path(p));
return new FileStatus(DIR_LENGTH, true, 3, 128*1024*1024, 1523456000000L, 1523457000000L, perms((short)0755), "owner", "group", new Path(p));
}

@Override
Expand Down
Binary file not shown.
Loading

0 comments on commit 3239f59

Please sign in to comment.