Skip to content

Commit 71bcab4

Browse files
author
Eduard
committed
Merge branch 'develop' of https://github.com/managedcode/Storage into develop
2 parents 66c059e + 33ee859 commit 71bcab4

File tree

1 file changed

+187
-0
lines changed

1 file changed

+187
-0
lines changed
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Collections.Generic;
4+
using System.IO;
5+
using System.Linq;
6+
using System.Threading.Tasks;
7+
using Amazon.S3;
8+
using Amazon.S3.Model;
9+
10+
namespace ManagedCode.Storage.Aws;
11+
12+
public class BlobStream : Stream
13+
{
14+
/* Note the that maximum size (as of now) of a file in S3 is 5TB so it isn't
15+
* safe to assume all uploads will work here. MAX_PART_SIZE times MAX_PART_COUNT
16+
* is ~50TB, which is too big for S3. */
17+
const long MIN_PART_LENGTH = 5L * 1024 * 1024; // all parts but the last this size or greater
18+
const long MAX_PART_LENGTH = 5L * 1024 * 1024 * 1024; // 5GB max per PUT
19+
const long MAX_PART_COUNT = 10000; // no more than 10,000 parts total
20+
const long DEFAULT_PART_LENGTH = MIN_PART_LENGTH;
21+
22+
internal class Metadata
23+
{
24+
public string BucketName;
25+
public string Key;
26+
public long PartLength = DEFAULT_PART_LENGTH;
27+
28+
public int PartCount = 0;
29+
public string UploadId;
30+
public MemoryStream CurrentStream;
31+
32+
public long Position = 0; // based on bytes written
33+
public long Length = 0; // based on bytes written or SetLength, whichever is larger (no truncation)
34+
35+
public List<Task> Tasks = new List<Task>();
36+
public ConcurrentDictionary<int, string> PartETags = new ConcurrentDictionary<int, string>();
37+
}
38+
39+
Metadata _metadata = new Metadata();
40+
IAmazonS3 _s3 = null;
41+
42+
public BlobStream(IAmazonS3 s3, string s3uri, long partLength = DEFAULT_PART_LENGTH)
43+
: this(s3, new Uri(s3uri), partLength)
44+
{
45+
}
46+
47+
public BlobStream(IAmazonS3 s3, Uri s3uri, long partLength = DEFAULT_PART_LENGTH)
48+
: this (s3, s3uri.Host, s3uri.LocalPath.Substring(1), partLength)
49+
{
50+
}
51+
52+
public BlobStream(IAmazonS3 s3, string bucket, string key, long partLength = DEFAULT_PART_LENGTH)
53+
{
54+
_s3 = s3;
55+
_metadata.BucketName = bucket;
56+
_metadata.Key = key;
57+
_metadata.PartLength = partLength;
58+
}
59+
60+
protected override void Dispose(bool disposing)
61+
{
62+
if (disposing)
63+
{
64+
if (_metadata != null)
65+
{
66+
Flush(true);
67+
CompleteUpload();
68+
}
69+
}
70+
_metadata = null;
71+
base.Dispose(disposing);
72+
}
73+
74+
public override bool CanRead => false;
75+
public override bool CanSeek => false;
76+
public override bool CanWrite => true;
77+
public override long Length => _metadata.Length = Math.Max(_metadata.Length, _metadata.Position);
78+
79+
public override long Position
80+
{
81+
get => _metadata.Position;
82+
set => throw new NotImplementedException();
83+
}
84+
85+
public override int Read(byte[] buffer, int offset, int count) => throw new NotImplementedException();
86+
public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException();
87+
88+
public override void SetLength(long value)
89+
{
90+
_metadata.Length = Math.Max(_metadata.Length, value);
91+
_metadata.PartLength = Math.Max(MIN_PART_LENGTH, Math.Min(MAX_PART_LENGTH, _metadata.Length / MAX_PART_COUNT));
92+
}
93+
94+
private void StartNewPart()
95+
{
96+
if (_metadata.CurrentStream != null) {
97+
Flush(false);
98+
}
99+
_metadata.CurrentStream = new MemoryStream();
100+
_metadata.PartLength = Math.Min(MAX_PART_LENGTH, Math.Max(_metadata.PartLength, (_metadata.PartCount / 2 + 1) * MIN_PART_LENGTH));
101+
}
102+
103+
public override void Flush()
104+
{
105+
Flush(false);
106+
}
107+
108+
private void Flush(bool disposing)
109+
{
110+
if ((_metadata.CurrentStream == null || _metadata.CurrentStream.Length < MIN_PART_LENGTH) &&
111+
!disposing)
112+
return;
113+
114+
if (_metadata.UploadId == null) {
115+
_metadata.UploadId = _s3.InitiateMultipartUploadAsync(new InitiateMultipartUploadRequest()
116+
{
117+
BucketName = _metadata.BucketName,
118+
Key = _metadata.Key
119+
}).GetAwaiter().GetResult().UploadId;
120+
}
121+
122+
if (_metadata.CurrentStream != null)
123+
{
124+
var i = ++_metadata.PartCount;
125+
126+
_metadata.CurrentStream.Seek(0, SeekOrigin.Begin);
127+
var request = new UploadPartRequest()
128+
{
129+
BucketName = _metadata.BucketName,
130+
Key = _metadata.Key,
131+
UploadId = _metadata.UploadId,
132+
PartNumber = i,
133+
IsLastPart = disposing,
134+
InputStream = _metadata.CurrentStream
135+
};
136+
_metadata.CurrentStream = null;
137+
138+
var upload = Task.Run(async () =>
139+
{
140+
var response = await _s3.UploadPartAsync(request);
141+
_metadata.PartETags.AddOrUpdate(i, response.ETag,
142+
(n, s) => response.ETag);
143+
request.InputStream.Dispose();
144+
});
145+
_metadata.Tasks.Add(upload);
146+
}
147+
}
148+
149+
private void CompleteUpload()
150+
{
151+
Task.WaitAll(_metadata.Tasks.ToArray());
152+
153+
if (Length > 0) {
154+
_s3.CompleteMultipartUploadAsync(new CompleteMultipartUploadRequest()
155+
{
156+
BucketName = _metadata.BucketName,
157+
Key = _metadata.Key,
158+
PartETags = _metadata.PartETags.Select(e => new PartETag(e.Key, e.Value)).ToList(),
159+
UploadId = _metadata.UploadId
160+
}).GetAwaiter().GetResult();
161+
}
162+
}
163+
164+
public override void Write(byte[] buffer, int offset, int count)
165+
{
166+
if (count == 0) return;
167+
168+
// write as much of the buffer as will fit to the current part, and if needed
169+
// allocate a new part and continue writing to it (and so on).
170+
var o = offset;
171+
var c = Math.Min(count, buffer.Length - offset); // don't over-read the buffer, even if asked to
172+
do
173+
{
174+
if (_metadata.CurrentStream == null || _metadata.CurrentStream.Length >= _metadata.PartLength)
175+
StartNewPart();
176+
177+
var remaining = _metadata.PartLength - _metadata.CurrentStream.Length;
178+
var w = Math.Min(c, (int)remaining);
179+
_metadata.CurrentStream.Write(buffer, o, w);
180+
181+
_metadata.Position += w;
182+
c -= w;
183+
o += w;
184+
} while (c > 0);
185+
}
186+
}
187+
}

0 commit comments

Comments
 (0)