Skip to content

Commit

Permalink
Merge pull request #490 from paillave/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
paillave authored Jul 8, 2024
2 parents da6c451 + 8167c3a commit 2f526bb
Show file tree
Hide file tree
Showing 16 changed files with 458 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public GraphApiMailFileValueProcessor(string code, string name, string connectio
}
}
}

}
protected override void Process(
IFileValue fileValue, GraphApiAdapterConnectionParameters connectionParameters, GraphApiAdapterProcessorParameters processorParameters,
Expand Down
11 changes: 11 additions & 0 deletions src/Paillave.Etl.S3/IS3ConnectionInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace Paillave.Etl.S3;
public interface IS3ConnectionInfo
{
string RootFolder { get; set; }
string ServiceUrl { get; set; }
int? PortNumber { get; set; }
int MaxAttempts { get; set; }
string Bucket { get; set; }
string AccessKeyId { get; set; }
string AccessKeySecret { get; set; }
}
24 changes: 24 additions & 0 deletions src/Paillave.Etl.S3/Paillave.Etl.S3.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\SharedSettings.props" />
<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<PackageId>Paillave.EtlNet.S3</PackageId>
<PackageTags>ETL .net core SSIS reactive S3</PackageTags>
<Product>ETL.net S3 extensions</Product>
<Description>Extensions for Etl.Net to send of read files on S3</Description>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="AWSSDK.Core" Version="3.7.304.24" />
<PackageReference Include="AWSSDK.S3" Version="3.7.310.1" />
<PackageReference Include="Microsoft.Extensions.FileSystemGlobbing" Version="7.0.0" allowedVersions="6.0.0" />
<PackageReference Include="MimeTypes" Version="2.4.1" allowedVersions="2.1.0">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" allowedVersions="13.0.1" />

</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Paillave.Etl\Paillave.Etl.csproj" />
</ItemGroup>
</Project>
85 changes: 85 additions & 0 deletions src/Paillave.Etl.S3/S3Bucket.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Amazon.Runtime;
using Amazon.S3;
using Amazon.S3.Model;

namespace Paillave.Etl.S3;
public class S3Bucket : IDisposable
{
private readonly IAmazonS3 _client;
private readonly string _bucketName;

public S3Bucket(AWSCredentials credentials, AmazonS3Config config, string bucketName)
{
_bucketName = bucketName;
_client = new AmazonS3Client(credentials, config);
}

public async Task UploadAsync(string objectName, Stream stream)
{
var response = await _client.PutObjectAsync(new PutObjectRequest
{
BucketName = _bucketName,
Key = objectName,
InputStream = stream
// FilePath = filePath,
});
if (response.HttpStatusCode != System.Net.HttpStatusCode.OK)
{
throw new Exception();
}
}

public async Task<Stream> DownloadAsync(string objectName)
{
GetObjectResponse response = await _client.GetObjectAsync(new GetObjectRequest
{
BucketName = _bucketName,
Key = objectName,
});
if (response.HttpStatusCode != System.Net.HttpStatusCode.OK)
{
throw new Exception();
}
return response.ResponseStream;
}

public async Task<List<S3Object>> ListAsync()
{
List<S3Object> objects = new List<S3Object>();
var request = new ListObjectsV2Request
{
BucketName = _bucketName,
MaxKeys = 100,
};

ListObjectsV2Response response;

do
{
response = await _client.ListObjectsV2Async(request);
objects.AddRange(response.S3Objects);

request.ContinuationToken = response.NextContinuationToken;
}
while (response.IsTruncated);
return objects;
}

public async Task DeleteAsync(string objectName)
{
var response = await _client.DeleteObjectAsync(_bucketName, objectName);
if (response.HttpStatusCode != System.Net.HttpStatusCode.OK && response.HttpStatusCode != System.Net.HttpStatusCode.NoContent)
{
throw new Exception();
}
}

public void Dispose()
{
_client.Dispose();
}
}
16 changes: 16 additions & 0 deletions src/Paillave.Etl.S3/S3ConnectionInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.IO;

namespace Paillave.Etl.S3;

public class S3ConnectionInfo : IS3ConnectionInfo
{
public string RootFolder { get; set; }
public string ServiceUrl { get; set; }
public int? PortNumber { get; set; }
public int MaxAttempts { get; set; } = 3;
public string Bucket { get; set; }
public string AccessKeyId { get; set; }
public string AccessKeySecret { get; set; }
}
17 changes: 17 additions & 0 deletions src/Paillave.Etl.S3/S3ConnectionInfo.ex.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using Amazon.Runtime;
using Amazon.S3;

namespace Paillave.Etl.S3;
internal static class S3ConnectionInfoEx
{
public static S3Bucket CreateBucketConnection(this IS3ConnectionInfo info)
=> new S3Bucket(
new BasicAWSCredentials(info.AccessKeyId, info.AccessKeySecret),
new AmazonS3Config
{
ServiceURL = info.PortNumber == null ? info.ServiceUrl : $"{info.ServiceUrl}:{info.PortNumber}",
UseHttp = false,
ForcePathStyle = true
},
info.Bucket);
}
58 changes: 58 additions & 0 deletions src/Paillave.Etl.S3/S3FileValue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using System.IO;
using Paillave.Etl.Core;

namespace Paillave.Etl.S3;
public class S3FileValue : FileValueBase<S3FileValueMetadata>
{
public override string Name { get; }
private readonly string _folder;
private readonly IS3ConnectionInfo _connectionInfo;

public S3FileValue(IS3ConnectionInfo connectionInfo, string folder, string fileName)
: this(connectionInfo, folder, fileName, null, null, null) { }
public S3FileValue(IS3ConnectionInfo connectionInfo, string folder, string fileName, string connectorCode, string connectionName, string connectorName)
: base(new S3FileValueMetadata
{
ServiceUrl = connectionInfo.ServiceUrl,
Bucket = connectionInfo.Bucket,
Folder = folder,
Name = fileName,
ConnectorCode = connectorCode,
ConnectionName = connectionName,
ConnectorName = connectorName
}) => (Name, _folder, _connectionInfo) = (fileName, folder, connectionInfo);
protected override void DeleteFile() => ActionRunner.TryExecute(_connectionInfo.MaxAttempts, DeleteFileSingleTime);
protected void DeleteFileSingleTime()
{
using (var client = _connectionInfo.CreateBucketConnection())
{
client.DeleteAsync(Path.Combine(_folder, Name)).Wait();
}
}
public override Stream GetContent() => ActionRunner.TryExecute(_connectionInfo.MaxAttempts, GetContentSingleTime);
private Stream GetContentSingleTime()
{
using (S3Bucket? client = _connectionInfo.CreateBucketConnection())
{
var ms = new MemoryStream();
using (var stream = client.DownloadAsync(Path.Combine(_folder, Name)).Result)
stream.CopyTo(ms);
ms.Seek(0, SeekOrigin.Begin);
return ms;
}
}
private StreamWithResource OpenContentSingleTime()
{
var client = _connectionInfo.CreateBucketConnection();
return new StreamWithResource(client.DownloadAsync(Path.Combine(_folder, Name)).Result, client);
}

public override StreamWithResource OpenContent() => ActionRunner.TryExecute(_connectionInfo.MaxAttempts, OpenContentSingleTime);
}
public class S3FileValueMetadata : FileValueMetadataBase
{
public string Bucket { get; set; }
public string ServiceUrl { get; set; }
public string Folder { get; set; }
public string Name { get; set; }
}
50 changes: 50 additions & 0 deletions src/Paillave.Etl.S3/S3FileValueProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
using System;
using System.IO;
using System.Threading;
using Paillave.Etl.Core;

namespace Paillave.Etl.S3;
public class S3FileValueProcessor : FileValueProcessorBase<S3AdapterConnectionParameters, S3AdapterProcessorParameters>
{
public S3FileValueProcessor(string code, string name, string connectionName, S3AdapterConnectionParameters connectionParameters, S3AdapterProcessorParameters processorParameters)
: base(code, name, connectionName, connectionParameters, processorParameters) { }
public S3FileValueProcessor(string code, string name, string serviceUrl, string bucket, string accessKeyId, string accessKeySecret)
: base(code, name, name, new S3AdapterConnectionParameters
{
AccessKeyId = accessKeyId,
AccessKeySecret = accessKeySecret,
Bucket = bucket,
ServiceUrl = serviceUrl
},
new S3AdapterProcessorParameters { })
{ }
public override ProcessImpact PerformanceImpact => ProcessImpact.Heavy;
public override ProcessImpact MemoryFootPrint => ProcessImpact.Average;
protected override void Process(IFileValue fileValue, S3AdapterConnectionParameters connectionParameters, S3AdapterProcessorParameters processorParameters, Action<IFileValue> push, CancellationToken cancellationToken, IExecutionContext context)
{
var folder = string.IsNullOrWhiteSpace(connectionParameters.RootFolder) ? (processorParameters.SubFolder ?? "") : Path.Combine(connectionParameters.RootFolder, processorParameters.SubFolder ?? "");
using var stream = fileValue.Get(processorParameters.UseStreamCopy);
ActionRunner.TryExecute(connectionParameters.MaxAttempts, () => UploadSingleTime(connectionParameters, stream, Path.Combine(folder, fileValue.Name)));
push(fileValue);
}
private void UploadSingleTime(S3AdapterConnectionParameters connectionParameters, Stream stream, string filePath)
{
using (var client = connectionParameters.CreateBucketConnection())
{
client.UploadAsync(filePath, stream).Wait();
}
}
protected override void Test(S3AdapterConnectionParameters connectionParameters, S3AdapterProcessorParameters processorParameters)
{
var fileName = Guid.NewGuid().ToString();
var folder = string.IsNullOrWhiteSpace(connectionParameters.RootFolder) ? (processorParameters.SubFolder ?? "") : Path.Combine(connectionParameters.RootFolder, processorParameters.SubFolder ?? "");
using (var client = connectionParameters.CreateBucketConnection())
{
client.UploadAsync(Path.Combine(folder, fileName), new MemoryStream()).Wait();
}
using (var client = connectionParameters.CreateBucketConnection())
{
client.DeleteAsync(Path.Combine(folder, fileName)).Wait();
}
}
}
112 changes: 112 additions & 0 deletions src/Paillave.Etl.S3/S3FileValueProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
using System;
using System.IO;
using System.Threading;
using Paillave.Etl.Core;
using Microsoft.Extensions.FileSystemGlobbing;
using System.Linq;
using System.Collections.Generic;
using Amazon.S3.Model;

namespace Paillave.Etl.S3;
public class S3FileValueProvider : FileValueProviderBase<S3AdapterConnectionParameters, S3AdapterProviderParameters>
{
public S3FileValueProvider(string code, string name, string connectionName, S3AdapterConnectionParameters connectionParameters, S3AdapterProviderParameters providerParameters)
: base(code, name, connectionName, connectionParameters, providerParameters) { }
public S3FileValueProvider(string code, string name, string serviceUrl, string bucket, string accessKeyId, string accessKeySecret, string? fileNamePattern = null)
: base(code, name, name, new S3AdapterConnectionParameters
{
AccessKeyId = accessKeyId,
AccessKeySecret = accessKeySecret,
Bucket = bucket,
ServiceUrl = serviceUrl
}, new S3AdapterProviderParameters
{
FileNamePattern = fileNamePattern
})
{ }
public override ProcessImpact PerformanceImpact => ProcessImpact.Heavy;
public override ProcessImpact MemoryFootPrint => ProcessImpact.Average;
protected override void Provide(Action<IFileValue> pushFileValue, S3AdapterConnectionParameters connectionParameters, S3AdapterProviderParameters providerParameters, CancellationToken cancellationToken, IExecutionContext context)
{
var searchPattern = string.IsNullOrEmpty(providerParameters.FileNamePattern) ? "*" : providerParameters.FileNamePattern;
var matcher = new Matcher().AddInclude(searchPattern);
var folder = string.IsNullOrWhiteSpace(connectionParameters.RootFolder) ? (providerParameters.SubFolder ?? "") : Path.Combine(connectionParameters.RootFolder, providerParameters.SubFolder ?? "");

var files = ActionRunner.TryExecute(connectionParameters.MaxAttempts, () => GetFileList(connectionParameters, providerParameters));
foreach (var file in files)
{
if (cancellationToken.IsCancellationRequested) break;
if (matcher.Match(file.Name).HasMatches)
pushFileValue(new S3FileValue(connectionParameters, folder, file.Name, this.Code, this.Name, this.ConnectionName));
}
}
private class S3FileItem
{
public string? FolderName { get; set; }
public string Name { get; set; }
public S3Object S3Object { get; set; }
}
private List<S3FileItem> GetFileList(S3AdapterConnectionParameters connectionParameters, S3AdapterProviderParameters providerParameters)
{
var folder = string.IsNullOrWhiteSpace(connectionParameters.RootFolder) ? (providerParameters.SubFolder ?? "") : Path.Combine(connectionParameters.RootFolder, providerParameters.SubFolder ?? "");
using (var client = connectionParameters.CreateBucketConnection())
{
// folder
var items = client.ListAsync().Result;
return items
.Select(i =>
{
if (i.Key.EndsWith('/'))
{
return null;
}
var parent = Path.GetDirectoryName(i.Key);
string? fileName = Path.GetFileName(i.Key);
return new S3FileItem
{
FolderName = string.IsNullOrWhiteSpace(parent) ? null : parent,
Name = fileName,
S3Object = i
};
})
.Where(i => i != null)
.Where(i =>
{
if (string.IsNullOrWhiteSpace(folder))
{
if (providerParameters.Recursive ?? false)
{
return true;
}
else
{
return i!.FolderName == null;
}
}
else
{
if (providerParameters.Recursive ?? false)
{
return i!.FolderName?.Equals(folder, StringComparison.InvariantCultureIgnoreCase) ?? false;
}
else
{
return i!.FolderName?.StartsWith(folder, StringComparison.InvariantCultureIgnoreCase) ?? false;
}
}
})
.ToList()!;
}
}
protected override void Test(S3AdapterConnectionParameters connectionParameters, S3AdapterProviderParameters providerParameters)
{
// var folder = string.IsNullOrWhiteSpace(connectionParameters.RootFolder) ? (providerParameters.SubFolder ?? "") : Path.Combine(connectionParameters.RootFolder, providerParameters.SubFolder ?? "");
// var searchPattern = string.IsNullOrEmpty(providerParameters.FileNamePattern) ? "*" : providerParameters.FileNamePattern;
// var matcher = new Matcher().AddInclude(searchPattern);
using (var client = connectionParameters.CreateBucketConnection())
{
client.ListAsync().Wait();
}
}
}
Loading

0 comments on commit 2f526bb

Please sign in to comment.