Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions apps/dav/lib/Connector/Sabre/File.php
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ public function put($data) {
}
}

$lengthHeader = $this->request->getHeader('content-length');
$expected = $lengthHeader !== '' ? (int)$lengthHeader : null;

if ($partStorage->instanceOfStorage(IWriteStreamStorage::class)) {
$isEOF = false;
$wrappedData = CallbackWrapper::wrap($data, null, null, null, null, function ($stream) use (&$isEOF): void {
Expand All @@ -215,7 +218,7 @@ public function put($data) {
$count = -1;
try {
/** @var IWriteStreamStorage $partStorage */
$count = $partStorage->writeStream($internalPartPath, $wrappedData);
$count = $partStorage->writeStream($internalPartPath, $wrappedData, $expected);
} catch (GenericFileException $e) {
$logger = Server::get(LoggerInterface::class);
$logger->error('Error while writing stream to storage: ' . $e->getMessage(), ['exception' => $e, 'app' => 'webdav']);
Expand All @@ -235,10 +238,7 @@ public function put($data) {
[$count, $result] = Files::streamCopy($data, $target, true);
fclose($target);
}

$lengthHeader = $this->request->getHeader('content-length');
$expected = $lengthHeader !== '' ? (int)$lengthHeader : -1;
if ($result === false && $expected >= 0) {
if ($result === false && $expected !== null) {
throw new Exception(
$this->l10n->t(
'Error while copying file to target location (copied: %1$s, expected filesize: %2$s)',
Expand All @@ -253,7 +253,7 @@ public function put($data) {
// if content length is sent by client:
// double check if the file was fully received
// compare expected and actual size
if ($expected >= 0
if ($expected !== null
&& $expected !== $count
&& $this->request->getMethod() === 'PUT'
) {
Expand Down
42 changes: 20 additions & 22 deletions lib/private/Files/ObjectStore/ObjectStoreStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,9 @@ public function writeStream(string $path, $stream, ?int $size = null): int {
'original-storage' => $this->getId(),
'original-path' => $path,
];
if ($size) {
$metadata['size'] = $size;
}

$stat['mimetype'] = $mimetype;
$stat['etag'] = $this->getETag($path);
Expand All @@ -496,32 +499,27 @@ public function writeStream(string $path, $stream, ?int $size = null): int {
$urn = $this->getURN($fileId);
try {
//upload to object storage
if ($size === null) {
$countStream = CountWrapper::wrap($stream, function ($writtenSize) use ($fileId, &$size) {

$totalWritten = 0;
$countStream = CountWrapper::wrap($stream, function ($writtenSize) use ($fileId, $size, $exists, &$totalWritten) {
if (is_null($size) && !$exists) {
$this->getCache()->update($fileId, [
'size' => $writtenSize,
]);
$size = $writtenSize;
});
if ($this->objectStore instanceof IObjectStoreMetaData) {
$this->objectStore->writeObjectWithMetaData($urn, $countStream, $metadata);
} else {
$this->objectStore->writeObject($urn, $countStream, $metadata['mimetype']);
}
if (is_resource($countStream)) {
fclose($countStream);
}
$stat['size'] = $size;
$totalWritten = $writtenSize;
});

if ($this->objectStore instanceof IObjectStoreMetaData) {
$this->objectStore->writeObjectWithMetaData($urn, $countStream, $metadata);
} else {
if ($this->objectStore instanceof IObjectStoreMetaData) {
$this->objectStore->writeObjectWithMetaData($urn, $stream, $metadata);
} else {
$this->objectStore->writeObject($urn, $stream, $metadata['mimetype']);
}
if (is_resource($stream)) {
fclose($stream);
}
$this->objectStore->writeObject($urn, $countStream, $metadata['mimetype']);
}
if (is_resource($countStream)) {
fclose($countStream);
}

$stat['size'] = $totalWritten;
} catch (\Exception $ex) {
if (!$exists) {
/*
Expand All @@ -545,7 +543,7 @@ public function writeStream(string $path, $stream, ?int $size = null): int {
]
);
}
throw $ex; // make this bubble up
throw new GenericFileException('Error while writing stream to object store', 0, $ex);
}

if ($exists) {
Expand All @@ -561,7 +559,7 @@ public function writeStream(string $path, $stream, ?int $size = null): int {
}
}

return $size;
return $totalWritten;
}

public function getObjectStore(): IObjectStore {
Expand Down
33 changes: 30 additions & 3 deletions lib/private/Files/ObjectStore/S3ObjectTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
*/
namespace OC\Files\ObjectStore;

use Aws\Command;
use Aws\Exception\MultipartUploadException;
use Aws\S3\Exception\S3MultipartUploadException;
use Aws\S3\MultipartCopy;
use Aws\S3\MultipartUploader;
Expand Down Expand Up @@ -96,15 +98,23 @@ private function buildS3Metadata(array $metadata): array {
protected function writeSingle(string $urn, StreamInterface $stream, array $metaData): void {
$mimetype = $metaData['mimetype'] ?? null;
unset($metaData['mimetype']);
$this->getConnection()->putObject([
unset($metaData['size']);

$args = [
'Bucket' => $this->bucket,
'Key' => $urn,
'Body' => $stream,
'ACL' => 'private',
'ContentType' => $mimetype,
'Metadata' => $this->buildS3Metadata($metaData),
'StorageClass' => $this->storageClass,
] + $this->getSSECParameters());
] + $this->getSSECParameters();

if ($size = $stream->getSize()) {
$args['ContentLength'] = $size;
}

$this->getConnection()->putObject($args);
}


Expand All @@ -119,12 +129,15 @@ protected function writeSingle(string $urn, StreamInterface $stream, array $meta
protected function writeMultiPart(string $urn, StreamInterface $stream, array $metaData): void {
$mimetype = $metaData['mimetype'] ?? null;
unset($metaData['mimetype']);
unset($metaData['size']);

$attempts = 0;
$uploaded = false;
$concurrency = $this->concurrency;
$exception = null;
$state = null;
$size = $stream->getSize();
$totalWritten = 0;

// retry multipart upload once with concurrency at half on failure
while (!$uploaded && $attempts <= 1) {
Expand All @@ -139,6 +152,15 @@ protected function writeMultiPart(string $urn, StreamInterface $stream, array $m
'Metadata' => $this->buildS3Metadata($metaData),
'StorageClass' => $this->storageClass,
] + $this->getSSECParameters(),
'before_upload' => function (Command $command) use (&$totalWritten) {
$totalWritten += $command['ContentLength'];
},
'before_complete' => function ($_command) use (&$totalWritten, $size, &$uploader, &$attempts) {
if ($size !== null && $totalWritten != $size) {
$e = new \Exception('Incomplete multi part upload, expected ' . $size . ' bytes, wrote ' . $totalWritten);
throw new MultipartUploadException($uploader->getState(), $e);
}
},
]);

try {
Expand All @@ -155,6 +177,9 @@ protected function writeMultiPart(string $urn, StreamInterface $stream, array $m
if ($stream->isSeekable()) {
$stream->rewind();
}
} catch (MultipartUploadException $e) {
$exception = $e;
break;
}
}

Expand All @@ -180,7 +205,9 @@ public function writeObject($urn, $stream, ?string $mimetype = null) {

public function writeObjectWithMetaData(string $urn, $stream, array $metaData): void {
$canSeek = fseek($stream, 0, SEEK_CUR) === 0;
$psrStream = Utils::streamFor($stream);
$psrStream = Utils::streamFor($stream, [
'size' => $metaData['size'] ?? null,
]);


$size = $psrStream->getSize();
Expand Down
Loading