Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provide processors and providers in connectors #498

Merged
merged 1 commit into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 44 additions & 44 deletions src/Paillave.Etl.Zip/UnzipFileProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,60 +7,60 @@
using System.Linq;
using System.Threading;

namespace Paillave.Etl.Zip
namespace Paillave.Etl.Zip;

public class UnzipFileProcessorParams
{
public class UnzipFileProcessorParams
{
public string Password { get; set; }
public string FileNamePattern { get; set; }
public bool UseStreamCopy { get; set; } = true;
}
public class UnzippedFileValueMetadata : FileValueMetadataBase, IFileValueWithDestinationMetadata
public string Password { get; set; }
public string FileNamePattern { get; set; }
public bool UseStreamCopy { get; set; } = true;
}
public class UnzippedFileValueMetadata : FileValueMetadataBase, IFileValueWithDestinationMetadata
{
public string ParentFileName { get; set; }
public IFileValueMetadata ParentFileMetadata { get; set; }
public Dictionary<string, IEnumerable<Destination>> Destinations { get; set; }
}

public class UnzipFileProcessor : FileValueProcessorBase<object, UnzipFileProcessorParams>
{
public UnzipFileProcessor(string code, string name, string connectionName, object connectionParameters, UnzipFileProcessorParams processorParameters) : base(code, name, connectionName, connectionParameters, processorParameters)
{
public string ParentFileName { get; set; }
public IFileValueMetadata ParentFileMetadata { get; set; }
public Dictionary<string, IEnumerable<Destination>> Destinations { get; set; }
}
public override ProcessImpact PerformanceImpact => ProcessImpact.Heavy;

public class UnzipFileProcessor : FileValueProcessorBase<object, UnzipFileProcessorParams>
{
public UnzipFileProcessor(string code, string name, string connectionName, object connectionParameters, UnzipFileProcessorParams processorParameters) : base(code, name, connectionName, connectionParameters, processorParameters)
{
}
public override ProcessImpact PerformanceImpact => ProcessImpact.Heavy;
public override ProcessImpact MemoryFootPrint => ProcessImpact.Average;

public override ProcessImpact MemoryFootPrint => ProcessImpact.Average;
protected override void Process(IFileValue fileValue, object connectionParameters, UnzipFileProcessorParams processorParameters, Action<IFileValue> push, CancellationToken cancellationToken, IExecutionContext context)
{
var destinations = (fileValue.Metadata as IFileValueWithDestinationMetadata)?.Destinations;
if (cancellationToken.IsCancellationRequested) return;
using var stream = fileValue.Get(processorParameters.UseStreamCopy);
using var zf = new ZipFile(stream);
var searchPattern = string.IsNullOrEmpty(processorParameters.FileNamePattern) ? "*" : processorParameters.FileNamePattern;
var matcher = new Matcher().AddInclude(searchPattern);

protected override void Process(IFileValue fileValue, object connectionParameters, UnzipFileProcessorParams processorParameters, Action<IFileValue> push, CancellationToken cancellationToken, IExecutionContext context)
if (!String.IsNullOrEmpty(processorParameters.Password))
zf.Password = processorParameters.Password;
var fileNames = zf.OfType<ZipEntry>().Where(i => i.IsFile && matcher.Match(Path.GetFileName(i.Name)).HasMatches).Select(i => i.Name).ToHashSet();
foreach (ZipEntry zipEntry in zf)
{
var destinations = (fileValue.Metadata as IFileValueWithDestinationMetadata)?.Destinations;
if (cancellationToken.IsCancellationRequested) return;
using var stream = fileValue.Get(processorParameters.UseStreamCopy);
using var zf = new ZipFile(stream);
var searchPattern = string.IsNullOrEmpty(processorParameters.FileNamePattern) ? "*" : processorParameters.FileNamePattern;
var matcher = new Matcher().AddInclude(searchPattern);

if (!String.IsNullOrEmpty(processorParameters.Password))
zf.Password = processorParameters.Password;
var fileNames = zf.OfType<ZipEntry>().Where(i => i.IsFile && matcher.Match(Path.GetFileName(i.Name)).HasMatches).Select(i => i.Name).ToHashSet();
foreach (ZipEntry zipEntry in zf)
if (cancellationToken.IsCancellationRequested) break;
if (zipEntry.IsFile && matcher.Match(Path.GetFileName(zipEntry.Name)).HasMatches)
{
if (cancellationToken.IsCancellationRequested) break;
if (zipEntry.IsFile && matcher.Match(Path.GetFileName(zipEntry.Name)).HasMatches)
MemoryStream outputStream = new MemoryStream();
using (var zipStream = zf.GetInputStream(zipEntry))
zipStream.CopyTo(outputStream, 4096);
outputStream.Seek(0, SeekOrigin.Begin);
push(new UnzippedFileValue<UnzippedFileValueMetadata>(outputStream, zipEntry.Name, new UnzippedFileValueMetadata
{
MemoryStream outputStream = new MemoryStream();
using (var zipStream = zf.GetInputStream(zipEntry))
zipStream.CopyTo(outputStream, 4096);
outputStream.Seek(0, SeekOrigin.Begin);
push(new UnzippedFileValue<UnzippedFileValueMetadata>(outputStream, zipEntry.Name, new UnzippedFileValueMetadata
{
ParentFileName = fileValue.Name,
ParentFileMetadata = fileValue.Metadata,
Destinations = destinations
}, fileValue, fileNames, zipEntry.Name));
}
ParentFileName = fileValue.Name,
ParentFileMetadata = fileValue.Metadata,
Destinations = destinations
}, fileValue, fileNames, zipEntry.Name));
}
}
protected override void Test(object connectionParameters, UnzipFileProcessorParams processorParameters) { }
}
protected override void Test(object connectionParameters, UnzipFileProcessorParams processorParameters) { }
}

52 changes: 52 additions & 0 deletions src/Paillave.Etl.Zip/ZipFileProcessorValuesProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// using System;
// using System.Linq;
// using System.IO;
// using ICSharpCode.SharpZipLib.Zip;
// using Paillave.Etl.Core;
// using System.Threading;
// using Microsoft.Extensions.FileSystemGlobbing;

// namespace Paillave.Etl.Zip
// {
// public class ZipFileProcessorValuesProvider : ValuesProviderBase<IFileValue, IFileValue>
// {
// private ZipFileProcessorParams _args;
// public ZipFileProcessorValuesProvider(UnzipFileProcessorParams args)
// => _args = args;
// public override ProcessImpact PerformanceImpact => ProcessImpact.Average;
// public override ProcessImpact MemoryFootPrint => ProcessImpact.Average;
// public override void PushValues(IFileValue input, Action<IFileValue> push, CancellationToken cancellationToken, IExecutionContext context)
// {
// var destinations = (input.Metadata as IFileValueWithDestinationMetadata)?.Destinations;
// if (cancellationToken.IsCancellationRequested) return;
// using var stream = input.Get(_args.UseStreamCopy);
// using var zf = new ZipFile(stream);
// var searchPattern = string.IsNullOrEmpty(_args.FileNamePattern) ? "*" : _args.FileNamePattern;
// var matcher = new Matcher().AddInclude(searchPattern);

// if (!String.IsNullOrEmpty(_args.Password))
// zf.Password = _args.Password;
// var fileNames = zf.OfType<ZipEntry>().Where(i => i.IsFile && matcher.Match(Path.GetFileName(i.Name)).HasMatches).Select(i => i.Name).ToHashSet();
// foreach (ZipEntry zipEntry in zf)
// {
// if (cancellationToken.IsCancellationRequested) break;
// if (zipEntry.IsFile && matcher.Match(Path.GetFileName(zipEntry.Name)).HasMatches)
// {
// MemoryStream outputStream = new MemoryStream();
// using (var zipStream = zf.GetInputStream(zipEntry))
// zipStream.CopyTo(outputStream, 4096);
// outputStream.Seek(0, SeekOrigin.Begin);
// push(new UnzippedFileValue<UnzippedFileValueMetadata>(outputStream, zipEntry.Name, new UnzippedFileValueMetadata
// {
// ParentFileName = input.Name,
// ParentFileMetadata = input.Metadata,
// Destinations = destinations,
// ConnectorCode = input.Metadata.ConnectorCode,
// ConnectionName = input.Metadata.ConnectionName,
// ConnectorName = input.Metadata.ConnectorName
// }, input, fileNames, zipEntry.Name));
// }
// }
// }
// }
// }
3 changes: 3 additions & 0 deletions src/Paillave.Etl/Core/FileValueConnectors.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Linq;

namespace Paillave.Etl.Core
{
Expand All @@ -8,6 +9,8 @@ private Dictionary<string, IFileValueProcessor> _processors
= new Dictionary<string, IFileValueProcessor>();
private Dictionary<string, IFileValueProvider> _providers
= new Dictionary<string, IFileValueProvider>();
public string[] Processors => _processors.Keys.ToArray();
public string[] Providers => _providers.Keys.ToArray();
public FileValueConnectors Register(IFileValueProvider provider)
{
this._providers[provider.Code] = provider;
Expand Down
2 changes: 1 addition & 1 deletion src/SharedSettings.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<LangVersion>latest</LangVersion>
<Nullable>enable</Nullable>
<Version>2.1.27-beta</Version>
<Version>2.1.28-beta</Version>
<PackageIcon>NugetIcon.png</PackageIcon>
<PackageReadmeFile>README.md</PackageReadmeFile>
<Authors>Stéphane Royer</Authors>
Expand Down
Loading