Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static java.lang.String.valueOf;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -106,6 +107,11 @@ public class S3BitStoreService extends BaseBitStoreService {
private String endpoint;
private boolean pathStyleAccessEnabled;

/**
* The maximum size of individual chunk to download from S3 when a file is accessed. Default 5Mb
*/
private long bufferSize = 5 * 1024 * 1024;

/**
* container for all the assets
*/
Expand Down Expand Up @@ -191,7 +197,7 @@ public boolean isEnabled() {
@Override
public void init() throws IOException {

if (this.isInitialized()) {
if (this.isInitialized() || !this.isEnabled()) {
return;
}

Expand Down Expand Up @@ -289,20 +295,7 @@ public InputStream get(Bitstream bitstream) throws IOException {
if (isRegisteredBitstream(key)) {
key = key.substring(REGISTERED_FLAG.length());
}
try {
File tempFile = File.createTempFile("s3-disk-copy-" + UUID.randomUUID(), "temp");
tempFile.deleteOnExit();

GetObjectRequest getObjectRequest = new GetObjectRequest(bucketName, key);

Download download = tm.download(getObjectRequest, tempFile);
download.waitForCompletion();

return new DeleteOnCloseFileInputStream(tempFile);
} catch (AmazonClientException | InterruptedException e) {
log.error("get(" + key + ")", e);
throw new IOException(e);
}
return new S3LazyInputStream(key, bufferSize, bitstream.getSizeBytes());
}

/**
Expand Down Expand Up @@ -669,4 +662,84 @@ public boolean isRegisteredBitstream(String internalId) {
return internalId.startsWith(REGISTERED_FLAG);
}

public void setBufferSize(long bufferSize) {
this.bufferSize = bufferSize;
}

/**
* This inner class represent an InputStream that uses temporary files to
* represent chunk of the object downloaded from S3. When the input stream is
* read the class look first to the current chunk and download a new one once if
* the current one as been fully read. The class is responsible to close a chunk
* as soon as a new one is retrieved, the last chunk is closed when the input
* stream itself is closed or the last byte is read (the first of the two)
*/
public class S3LazyInputStream extends InputStream {
private InputStream currentChunkStream;
private String objectKey;
private long endOfChunk = -1;
private long chunkMaxSize;
private long currPos = 0;
private long fileSize;

public S3LazyInputStream(String objectKey, long chunkMaxSize, long fileSize) throws IOException {
this.objectKey = objectKey;
this.chunkMaxSize = chunkMaxSize;
this.endOfChunk = 0;
this.fileSize = fileSize;
downloadChunk();
}

@Override
public int read() throws IOException {
// is the current chunk completely read and other are available?
if (currPos == endOfChunk && currPos < fileSize) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this condition is (at least) superfluous
currPos == endOfChunk && currPos < fileSize

currentChunkStream.close();
downloadChunk();
}

int byteRead = currPos < endOfChunk ? currentChunkStream.read() : -1;
// do we get any data or are we at the end of the file?
if (byteRead != -1) {
currPos++;
} else {
currentChunkStream.close();
}
return byteRead;
}

/**
* This method download the next chunk from S3
*
* @throws IOException
* @throws FileNotFoundException
*/
private void downloadChunk() throws IOException, FileNotFoundException {
// Create a DownloadFileRequest with the desired byte range
long startByte = currPos; // Start byte (inclusive)
long endByte = Long.min(startByte + chunkMaxSize - 1, fileSize - 1); // End byte (inclusive)
GetObjectRequest getRequest = new GetObjectRequest(bucketName, objectKey)
.withRange(startByte, endByte);

File currentChunkFile = File.createTempFile("s3-disk-copy-" + UUID.randomUUID(), "temp");
currentChunkFile.deleteOnExit();
try {
Download download = tm.download(getRequest, currentChunkFile);
download.waitForCompletion();
currentChunkStream = new DeleteOnCloseFileInputStream(currentChunkFile);
endOfChunk = endOfChunk + download.getProgress().getBytesTransferred();
} catch (AmazonClientException | InterruptedException e) {
currentChunkFile.delete();
throw new IOException(e);
}
}

@Override
public void close() throws IOException {
if (currentChunkStream != null) {
currentChunkStream.close();
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

Expand All @@ -42,6 +43,7 @@
import io.findify.s3mock.S3Mock;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.BooleanUtils;
import org.dspace.AbstractIntegrationTestWithDatabase;
import org.dspace.app.matcher.LambdaMatcher;
import org.dspace.authorize.AuthorizeException;
Expand All @@ -53,6 +55,8 @@
import org.dspace.content.Collection;
import org.dspace.content.Item;
import org.dspace.core.Utils;
import org.dspace.services.ConfigurationService;
import org.dspace.services.factory.DSpaceServicesFactory;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
Expand All @@ -77,9 +81,12 @@ public class S3BitStoreServiceIT extends AbstractIntegrationTestWithDatabase {

private File s3Directory;

private ConfigurationService configurationService = DSpaceServicesFactory.getInstance().getConfigurationService();

@Before
public void setup() throws Exception {

configurationService.setProperty("assetstore.s3.enabled", "true");
s3Directory = new File(System.getProperty("java.io.tmpdir"), "s3");

s3Mock = S3Mock.create(8001, s3Directory.getAbsolutePath());
Expand All @@ -88,7 +95,9 @@ public void setup() throws Exception {
amazonS3Client = createAmazonS3Client();

s3BitStoreService = new S3BitStoreService(amazonS3Client);

s3BitStoreService.setEnabled(BooleanUtils.toBoolean(
configurationService.getProperty("assetstore.s3.enabled")));
s3BitStoreService.setBufferSize(22);
context.turnOffAuthorisationSystem();

parentCommunity = CommunityBuilder.createCommunity(context)
Expand Down Expand Up @@ -119,12 +128,25 @@ public void testBitstreamPutAndGetWithAlreadyPresentBucket() throws IOException
assertThat(amazonS3Client.listBuckets(), contains(bucketNamed(bucketName)));

context.turnOffAuthorisationSystem();
String content = "Test bitstream content";
String content = "Test bitstream content";
String contentOverOneSpan = "This content span two chunks";
String contentExactlyTwoSpans = "Test bitstream contentTest bitstream content";
String contentOverOneTwoSpans = "Test bitstream contentThis content span three chunks";
Bitstream bitstream = createBitstream(content);
Bitstream bitstreamOverOneSpan = createBitstream(contentOverOneSpan);
Bitstream bitstreamExactlyTwoSpans = createBitstream(contentExactlyTwoSpans);
Bitstream bitstreamOverOneTwoSpans = createBitstream(contentOverOneTwoSpans);
context.restoreAuthSystemState();

s3BitStoreService.put(bitstream, toInputStream(content));
checkGetPut(bucketName, content, bitstream);
checkGetPut(bucketName, contentOverOneSpan, bitstreamOverOneSpan);
checkGetPut(bucketName, contentExactlyTwoSpans, bitstreamExactlyTwoSpans);
checkGetPut(bucketName, contentOverOneTwoSpans, bitstreamOverOneTwoSpans);

}

private void checkGetPut(String bucketName, String content, Bitstream bitstream) throws IOException {
s3BitStoreService.put(bitstream, toInputStream(content));
String expectedChecksum = Utils.toHex(generateChecksum(content));

assertThat(bitstream.getSizeBytes(), is((long) content.length()));
Expand All @@ -137,7 +159,6 @@ public void testBitstreamPutAndGetWithAlreadyPresentBucket() throws IOException
String key = s3BitStoreService.getFullKey(bitstream.getInternalId());
ObjectMetadata objectMetadata = amazonS3Client.getObjectMetadata(bucketName, key);
assertThat(objectMetadata.getContentMD5(), is(expectedChecksum));

}

@Test
Expand Down Expand Up @@ -382,6 +403,17 @@ public void givenBitStreamIdentifierWithSlashesWhenSanitizedThenSlashesMustBeRem
assertThat(computedPath, Matchers.not(Matchers.containsString(File.separator)));
}

@Test
public void testDoNotInitializeConfigured() throws Exception {
String assetstores3enabledOldValue = configurationService.getProperty("assetstore.s3.enabled");
configurationService.setProperty("assetstore.s3.enabled", false);
s3BitStoreService = new S3BitStoreService(amazonS3Client);
s3BitStoreService.init();
assertFalse(s3BitStoreService.isInitialized());
assertFalse(s3BitStoreService.isEnabled());
configurationService.setProperty("assetstore.s3.enabled", assetstores3enabledOldValue);
}

private byte[] generateChecksum(String content) {
try {
MessageDigest m = MessageDigest.getInstance("MD5");
Expand Down