Skip to content

Commit

Permalink
Merge pull request #498 from paillave/v
Browse files Browse the repository at this point in the history
provide processors and providers in connectors
  • Loading branch information
paillave authored Jul 31, 2024
2 parents c692394 + d8faf86 commit 0c9cb8d
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 45 deletions.
88 changes: 44 additions & 44 deletions src/Paillave.Etl.Zip/UnzipFileProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,60 +7,60 @@
using System.Linq;
using System.Threading;

namespace Paillave.Etl.Zip
namespace Paillave.Etl.Zip;

public class UnzipFileProcessorParams
{
public class UnzipFileProcessorParams
{
public string Password { get; set; }
public string FileNamePattern { get; set; }
public bool UseStreamCopy { get; set; } = true;
}
public class UnzippedFileValueMetadata : FileValueMetadataBase, IFileValueWithDestinationMetadata
public string Password { get; set; }
public string FileNamePattern { get; set; }
public bool UseStreamCopy { get; set; } = true;
}
public class UnzippedFileValueMetadata : FileValueMetadataBase, IFileValueWithDestinationMetadata
{
public string ParentFileName { get; set; }
public IFileValueMetadata ParentFileMetadata { get; set; }
public Dictionary<string, IEnumerable<Destination>> Destinations { get; set; }
}

public class UnzipFileProcessor : FileValueProcessorBase<object, UnzipFileProcessorParams>
{
public UnzipFileProcessor(string code, string name, string connectionName, object connectionParameters, UnzipFileProcessorParams processorParameters) : base(code, name, connectionName, connectionParameters, processorParameters)
{
public string ParentFileName { get; set; }
public IFileValueMetadata ParentFileMetadata { get; set; }
public Dictionary<string, IEnumerable<Destination>> Destinations { get; set; }
}
public override ProcessImpact PerformanceImpact => ProcessImpact.Heavy;

public class UnzipFileProcessor : FileValueProcessorBase<object, UnzipFileProcessorParams>
{
public UnzipFileProcessor(string code, string name, string connectionName, object connectionParameters, UnzipFileProcessorParams processorParameters) : base(code, name, connectionName, connectionParameters, processorParameters)
{
}
public override ProcessImpact PerformanceImpact => ProcessImpact.Heavy;
public override ProcessImpact MemoryFootPrint => ProcessImpact.Average;

public override ProcessImpact MemoryFootPrint => ProcessImpact.Average;
protected override void Process(IFileValue fileValue, object connectionParameters, UnzipFileProcessorParams processorParameters, Action<IFileValue> push, CancellationToken cancellationToken, IExecutionContext context)
{
var destinations = (fileValue.Metadata as IFileValueWithDestinationMetadata)?.Destinations;
if (cancellationToken.IsCancellationRequested) return;
using var stream = fileValue.Get(processorParameters.UseStreamCopy);
using var zf = new ZipFile(stream);
var searchPattern = string.IsNullOrEmpty(processorParameters.FileNamePattern) ? "*" : processorParameters.FileNamePattern;
var matcher = new Matcher().AddInclude(searchPattern);

protected override void Process(IFileValue fileValue, object connectionParameters, UnzipFileProcessorParams processorParameters, Action<IFileValue> push, CancellationToken cancellationToken, IExecutionContext context)
if (!String.IsNullOrEmpty(processorParameters.Password))
zf.Password = processorParameters.Password;
var fileNames = zf.OfType<ZipEntry>().Where(i => i.IsFile && matcher.Match(Path.GetFileName(i.Name)).HasMatches).Select(i => i.Name).ToHashSet();
foreach (ZipEntry zipEntry in zf)
{
var destinations = (fileValue.Metadata as IFileValueWithDestinationMetadata)?.Destinations;
if (cancellationToken.IsCancellationRequested) return;
using var stream = fileValue.Get(processorParameters.UseStreamCopy);
using var zf = new ZipFile(stream);
var searchPattern = string.IsNullOrEmpty(processorParameters.FileNamePattern) ? "*" : processorParameters.FileNamePattern;
var matcher = new Matcher().AddInclude(searchPattern);

if (!String.IsNullOrEmpty(processorParameters.Password))
zf.Password = processorParameters.Password;
var fileNames = zf.OfType<ZipEntry>().Where(i => i.IsFile && matcher.Match(Path.GetFileName(i.Name)).HasMatches).Select(i => i.Name).ToHashSet();
foreach (ZipEntry zipEntry in zf)
if (cancellationToken.IsCancellationRequested) break;
if (zipEntry.IsFile && matcher.Match(Path.GetFileName(zipEntry.Name)).HasMatches)
{
if (cancellationToken.IsCancellationRequested) break;
if (zipEntry.IsFile && matcher.Match(Path.GetFileName(zipEntry.Name)).HasMatches)
MemoryStream outputStream = new MemoryStream();
using (var zipStream = zf.GetInputStream(zipEntry))
zipStream.CopyTo(outputStream, 4096);
outputStream.Seek(0, SeekOrigin.Begin);
push(new UnzippedFileValue<UnzippedFileValueMetadata>(outputStream, zipEntry.Name, new UnzippedFileValueMetadata
{
MemoryStream outputStream = new MemoryStream();
using (var zipStream = zf.GetInputStream(zipEntry))
zipStream.CopyTo(outputStream, 4096);
outputStream.Seek(0, SeekOrigin.Begin);
push(new UnzippedFileValue<UnzippedFileValueMetadata>(outputStream, zipEntry.Name, new UnzippedFileValueMetadata
{
ParentFileName = fileValue.Name,
ParentFileMetadata = fileValue.Metadata,
Destinations = destinations
}, fileValue, fileNames, zipEntry.Name));
}
ParentFileName = fileValue.Name,
ParentFileMetadata = fileValue.Metadata,
Destinations = destinations
}, fileValue, fileNames, zipEntry.Name));
}
}
protected override void Test(object connectionParameters, UnzipFileProcessorParams processorParameters) { }
}
protected override void Test(object connectionParameters, UnzipFileProcessorParams processorParameters) { }
}

52 changes: 52 additions & 0 deletions src/Paillave.Etl.Zip/ZipFileProcessorValuesProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// using System;
// using System.Linq;
// using System.IO;
// using ICSharpCode.SharpZipLib.Zip;
// using Paillave.Etl.Core;
// using System.Threading;
// using Microsoft.Extensions.FileSystemGlobbing;

// namespace Paillave.Etl.Zip
// {
// public class ZipFileProcessorValuesProvider : ValuesProviderBase<IFileValue, IFileValue>
// {
// private ZipFileProcessorParams _args;
// public ZipFileProcessorValuesProvider(UnzipFileProcessorParams args)
// => _args = args;
// public override ProcessImpact PerformanceImpact => ProcessImpact.Average;
// public override ProcessImpact MemoryFootPrint => ProcessImpact.Average;
// public override void PushValues(IFileValue input, Action<IFileValue> push, CancellationToken cancellationToken, IExecutionContext context)
// {
// var destinations = (input.Metadata as IFileValueWithDestinationMetadata)?.Destinations;
// if (cancellationToken.IsCancellationRequested) return;
// using var stream = input.Get(_args.UseStreamCopy);
// using var zf = new ZipFile(stream);
// var searchPattern = string.IsNullOrEmpty(_args.FileNamePattern) ? "*" : _args.FileNamePattern;
// var matcher = new Matcher().AddInclude(searchPattern);

// if (!String.IsNullOrEmpty(_args.Password))
// zf.Password = _args.Password;
// var fileNames = zf.OfType<ZipEntry>().Where(i => i.IsFile && matcher.Match(Path.GetFileName(i.Name)).HasMatches).Select(i => i.Name).ToHashSet();
// foreach (ZipEntry zipEntry in zf)
// {
// if (cancellationToken.IsCancellationRequested) break;
// if (zipEntry.IsFile && matcher.Match(Path.GetFileName(zipEntry.Name)).HasMatches)
// {
// MemoryStream outputStream = new MemoryStream();
// using (var zipStream = zf.GetInputStream(zipEntry))
// zipStream.CopyTo(outputStream, 4096);
// outputStream.Seek(0, SeekOrigin.Begin);
// push(new UnzippedFileValue<UnzippedFileValueMetadata>(outputStream, zipEntry.Name, new UnzippedFileValueMetadata
// {
// ParentFileName = input.Name,
// ParentFileMetadata = input.Metadata,
// Destinations = destinations,
// ConnectorCode = input.Metadata.ConnectorCode,
// ConnectionName = input.Metadata.ConnectionName,
// ConnectorName = input.Metadata.ConnectorName
// }, input, fileNames, zipEntry.Name));
// }
// }
// }
// }
// }
3 changes: 3 additions & 0 deletions src/Paillave.Etl/Core/FileValueConnectors.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Linq;

namespace Paillave.Etl.Core
{
Expand All @@ -8,6 +9,8 @@ private Dictionary<string, IFileValueProcessor> _processors
= new Dictionary<string, IFileValueProcessor>();
private Dictionary<string, IFileValueProvider> _providers
= new Dictionary<string, IFileValueProvider>();
public string[] Processors => _processors.Keys.ToArray();
public string[] Providers => _providers.Keys.ToArray();
public FileValueConnectors Register(IFileValueProvider provider)
{
this._providers[provider.Code] = provider;
Expand Down
2 changes: 1 addition & 1 deletion src/SharedSettings.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<LangVersion>latest</LangVersion>
<Nullable>enable</Nullable>
<Version>2.1.27-beta</Version>
<Version>2.1.28-beta</Version>
<PackageIcon>NugetIcon.png</PackageIcon>
<PackageReadmeFile>README.md</PackageReadmeFile>
<Authors>Stéphane Royer</Authors>
Expand Down

0 comments on commit 0c9cb8d

Please sign in to comment.