Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Data.Common/Utils/StreamedTableManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

public class StreamedTableManager : IDisposable
{
private const int streamResetsAllowed = 4;
private const int streamResetsAllowed = 6;
private readonly Dictionary<string, StreamHolder> tables = new();

public void AddTable(string tableName, Func<Stream> streamCreationFunc)
Expand Down
40 changes: 40 additions & 0 deletions src/Data.Common/Utils/SubStreamReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
namespace Data.Common.Utils;

public class SubStreamReader
{
private readonly Stream stream;
private readonly int linesLimit;
private readonly StreamReader streamReader;

public SubStreamReader(Stream stream, int linesLimit)
{
this.stream = stream ?? throw new ArgumentNullException(nameof(stream));
this.linesLimit = linesLimit > 0 ? linesLimit : throw new ArgumentOutOfRangeException(nameof(linesLimit), "Must be greater than zero.");
streamReader = new StreamReader(stream, null, true, -1, leaveOpen: true);
}

public string ReadToLimit()
{
var lines = string.Join("\n", ReadLines());
stream.Seek(0, SeekOrigin.Begin);
return lines;
}

private IEnumerable<string> ReadLines()
{
for (var i = 0; i < linesLimit; i++)
{
var line = streamReader.ReadLine();
if (line is null)
{
yield break;
}

if (!string.IsNullOrEmpty(line))
{
yield return line;
}
}
}

}
5 changes: 4 additions & 1 deletion src/Data.Csv/ADO.NET/CsvConnection.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Data.Common.DataSource;
using Data.Common.Utils.ConnectionString;
using Data.Csv.CsvIO;
using Data.Csv.Utils;

namespace System.Data.CsvClient;

Expand Down Expand Up @@ -32,9 +33,11 @@ public CsvConnection(FileConnectionString connectionString)
/// <inheritdoc />
public override string FileExtension => "csv";

public Func<IEnumerable<string>, Type> GuessTypeFunction { get; set; }
public Func<IEnumerable<string>, Type> GuessTypeFunction { get; set; } = TypeGuesser.GuessType;
public int GuessTypeRows { get; set; } = 1000;

public string[] SupportedSeparators { get; set; } = new string[] { ",", ";", "|", "\t" };
public int GuessSeparatorRows { get; set; } = 100;
protected override FileReader CreateFileReader => new CsvReader(this);

/// <inheritdoc />
Expand Down
6 changes: 5 additions & 1 deletion src/Data.Csv/CsvIO/Read/CsvReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,12 @@ protected override void UpdateFromFile() =>
private VirtualDataTable PrepareDataTable(StreamReader streamReader, string tableName)
{
CsvConnection csvConnection = (CsvConnection)fileConnection;
CsvSeparatorDetector separatorDetector = new(streamReader.BaseStream, csvConnection.SupportedSeparators, csvConnection.GuessSeparatorRows);
var separator = separatorDetector.Detect();

CsvVirtualDataTable virtualDataTable = new(streamReader, tableName, pageSize, csvConnection.GuessTypeRows,
fileConnection.PreferredFloatingPointDataType, csvConnection.GuessTypeFunction);
fileConnection.PreferredFloatingPointDataType, csvConnection.GuessTypeFunction,
separator);

return virtualDataTable;
}
Expand Down
50 changes: 50 additions & 0 deletions src/Data.Csv/Utils/CsvSeparatorDetector.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
using CsvHelper.Configuration;
using CsvHelper.Delegates;
using Data.Common.Utils;
using System.Globalization;

namespace Data.Csv.Utils;

public class CsvSeparatorDetector
{
private readonly Stream stream;
private readonly string[] supportedSeparators;
private readonly int linesCount;

public CsvSeparatorDetector(Stream stream, string[] supportedSeparators, int linesCount)
{
this.stream = stream ?? throw new ArgumentNullException(nameof(stream));
this.supportedSeparators = supportedSeparators ?? throw new ArgumentNullException(nameof(supportedSeparators));
this.linesCount = linesCount > 0 ? linesCount : throw new ArgumentOutOfRangeException(nameof(linesCount));

this.stream.Seek(0, SeekOrigin.Begin);
}

public char Detect()
{
SubStreamReader subStreamReader = new(stream, linesCount);
var sample = subStreamReader.ReadToLimit();

// The Culture ListSeparator is set as "*DoNotDetect" because the CsvHelper checks it when determining the separator used.
// If the ListSeparator is present in every CSV line analyzed for separator detection, it will be used as the separator without considering the ranking of all the separators in the CSV file.
// For CultureInfo.InvariantCulture, the ListSeparator will always be a comma. So, if a comma is found in every line even if it's not the real separator, it will be returned as separator of the GetDelimiter method.
var culture = (CultureInfo)CultureInfo.InvariantCulture.Clone();
culture.TextInfo.ListSeparator = "*DoNotDetect";
var config = new CsvConfiguration(culture)
{
DetectDelimiterValues = supportedSeparators
};

var detectedDelimiter =
ConfigurationFunctions.GetDelimiter(new GetDelimiterArgs(sample, config));

return detectedDelimiter switch
{
"," => ',',
";" => ';',
"|" => '|',
"\t" => '\t',
_ => ','
};
}
}
22 changes: 12 additions & 10 deletions src/Data.Csv/Utils/CsvTransformStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,20 @@
/// </summary>
public class CsvTransformStream : Stream
{
private StreamReader streamReader;
private readonly StreamReader streamReader;
private readonly char separator;
private int expectedSeparatorCount;
private MemoryStream bufferStream;
private int expectedCommaCount;
private long logicalPosition;

public string HeaderLine { get; private set; }

public CsvTransformStream(StreamReader streamReader)
public CsvTransformStream(StreamReader streamReader, char separator)
{
this.streamReader = streamReader ?? throw new ArgumentNullException(nameof(streamReader));
bufferStream = new MemoryStream();
logicalPosition = 0;
this.separator = separator;
}

private void InitializeHeader()
Expand All @@ -51,9 +53,9 @@ private void InitializeHeader()
}

// Replace all whitespace (including non-breaking) with a single space
HeaderLine = Regex.Replace(rawHeaderLine, @"\s+", " ").Replace("\uFFFD", "");
HeaderLine = Regex.Replace(rawHeaderLine, @"[^\S\t]+", " ").Replace("\uFFFD", "");

expectedCommaCount = HeaderLine.Split(',').Length - 1;
expectedSeparatorCount = HeaderLine.Split(separator).Length - 1;
WriteToBuffer(HeaderLine + "\n");
}

Expand Down Expand Up @@ -86,10 +88,10 @@ public override int Read(byte[] buffer, int offset, int count)
string line;
while ((line = streamReader.ReadLine()) != null)
{
int commaCount = CountCommasOutsideQuotes(line);
if (commaCount < expectedCommaCount)
int separatorCount = CountSeparatorsOutsideQuotes(line);
if (separatorCount < expectedSeparatorCount)
{
line += new string(',', expectedCommaCount - commaCount);
line += new string(separator, expectedSeparatorCount - separatorCount);
}

WriteToBuffer(line + "\n");
Expand Down Expand Up @@ -123,15 +125,15 @@ public override int Read(byte[] buffer, int offset, int count)
return bytesRead;
}

private int CountCommasOutsideQuotes(string line)
private int CountSeparatorsOutsideQuotes(string line)
{
bool inQuotes = false;
int commaCount = 0;

foreach (char c in line)
{
if (c == '"') inQuotes = !inQuotes;
else if (c == ',' && !inQuotes) commaCount++;
else if (c == separator && !inQuotes) commaCount++;
}

return commaCount;
Expand Down
38 changes: 20 additions & 18 deletions src/Data.Csv/Utils/CsvVirtualDataTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Data.Csv.Utils;
public class CsvVirtualDataTable : VirtualDataTable, IDisposable
{
private readonly int _pageSize;
private readonly int _guessRows;
private readonly int _guessTypeRows;
private readonly FloatingPointDataType _preferredFloatingPointDataType;
private readonly Func<IEnumerable<string>, Type> _guessTypeFunction;

Expand All @@ -36,44 +36,46 @@ public class CsvVirtualDataTable : VirtualDataTable, IDisposable
/// <param name="pageSize">
/// The number of rows to load per page.
/// </param>
/// <param name="guessRows">
/// <param name="guessTypeRows">
/// The number of rows to use when guessing data types.
/// </param>
/// <param name="preferredFloatingPointDataType">
/// The preferred floating point data type for numeric columns.
/// </param>
public CsvVirtualDataTable(
StreamReader streamReader,
string tableName,
int pageSize,
int guessRows,
FloatingPointDataType preferredFloatingPointDataType,
Func<IEnumerable<string>, Type> guessTypeFunction)
StreamReader streamReader,
string tableName,
int pageSize,
int guessTypeRows,
FloatingPointDataType preferredFloatingPointDataType,
Func<IEnumerable<string>, Type> guessTypeFunction,
char separator
)
: base(tableName)
{
_pageSize = pageSize;
_guessRows = guessRows > 0 ? guessRows : throw new ArgumentOutOfRangeException(nameof(guessRows), $"Guess row must be greater than 0. GuessRows: {guessRows}");
_guessTypeRows = guessTypeRows > 0 ? guessTypeRows : throw new ArgumentOutOfRangeException(nameof(guessTypeRows), $"Guess type row must be greater than 0. GuessRows: {guessTypeRows}");
_preferredFloatingPointDataType = preferredFloatingPointDataType;
_guessTypeFunction = guessTypeFunction;

// Instead of using "using", store the reader and transform stream for later use.
_baseReader = streamReader ?? throw new ArgumentNullException(nameof(streamReader));
_transformStream = new CsvTransformStream(_baseReader);
_transformStream = new CsvTransformStream(_baseReader, separator);
_transformReader = new StreamReader(_transformStream, Encoding.UTF8, detectEncodingFromByteOrderMarks: false, bufferSize: 4096, leaveOpen: true);

// Determine the schema and column data types using the first page.
DetermineColumns();
DetermineColumns(separator);

// Set the Rows property to a lazy iterator that pages in DataRows on demand.
Rows = GetRowsIterator();
Rows = GetRowsIterator(separator);
}

private void DetermineColumns()
private void DetermineColumns(char separator)
{
// Load the first page of data (which also reads the header). Note: Using the _transformStream on this LoadCsv, because we do want a Seek to
// origin on the stream. Since we're not using the data, limit numberOfRowsToRead to _guessRows.
DataFrame firstPage = DataFrame.LoadCsv(_transformStream, numberOfRowsToRead: _guessRows, guessRows: _guessRows,
guessTypeFunction: _guessTypeFunction);
DataFrame firstPage = DataFrame.LoadCsv(_transformStream, numberOfRowsToRead: _guessTypeRows, guessRows: _guessTypeRows,
guessTypeFunction: _guessTypeFunction, separator: separator);

Columns.Clear();
if (firstPage.Columns.Count > 0)
Expand All @@ -89,7 +91,7 @@ private void DetermineColumns()
else if (!string.IsNullOrEmpty(_transformStream.HeaderLine))
{
// Fallback: if DataFrame didn't yield columns, use the header line with string type.
var columnNamesFromStream = _transformStream.HeaderLine.Split(',');
var columnNamesFromStream = _transformStream.HeaderLine.Split(separator);
foreach (var name in columnNamesFromStream)
{
Columns.Add(new DataColumn(name.Trim(), typeof(string)));
Expand All @@ -101,7 +103,7 @@ private void DetermineColumns()
/// Returns an iterator that lazily loads rows from the CSV file in pages using the predetermined column types.
/// </summary>
/// <returns>An enumerable sequence of <see cref="DataRow"/>.</returns>
private IEnumerable<DataRow> GetRowsIterator()
private IEnumerable<DataRow> GetRowsIterator(char separator)
{
bool firstPage = true;
_transformStream.Seek(0, SeekOrigin.Begin);
Expand All @@ -119,7 +121,7 @@ private IEnumerable<DataRow> GetRowsIterator()
yield break;
}

DataFrame page = DataFrame.LoadCsvFromString(pageData, header: firstPage, columnNames: columnNames, dataTypes: columnTypes);
DataFrame page = DataFrame.LoadCsvFromString(pageData, header: firstPage, columnNames: columnNames, dataTypes: columnTypes, separator: separator);
firstPage = false;
if (page.Rows.Count == 0)
yield break;
Expand Down
80 changes: 80 additions & 0 deletions src/Data.Csv/Utils/TypeGuesser.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
using System.Text.RegularExpressions;

namespace Data.Csv.Utils;

// This class is a copy of DataFrame.DefaultGuessTypeFunction in the DataFrame library (https://learn.microsoft.com/en-us/dotnet/machine-learning/how-to-guides/getting-started-dataframe)
// with the following modifications to fix issues when uploading a CSV:
// 1. Adding a RegEx condition when parsing DateTime to avoid treating non-date strings such as "1 1" as DateTime object
// 2. Currently, if a column has mixture of data types that does not contain a string, it will guess the type depending on the priority given in MaxKind method.
// This can lead to incorrect format exceptions afterward when casting column values to the guessed data type. So, a condition is added in MaxKind method
// to set the guessed type as String if the current suggested and previous data type is not equal.
public static class TypeGuesser
{
public static Type GuessType(IEnumerable<string> columnValues)
{
Type previous = typeof(string);
int num = 0;

foreach (string columnValue in columnValues)
{
if (!string.Equals(columnValue, "null", StringComparison.OrdinalIgnoreCase) &&
!string.IsNullOrEmpty(columnValue))
{
previous = !bool.TryParse(columnValue, out bool _)
? !float.TryParse(columnValue, out float _)
? !DateTimeTryParse(columnValue)
? DetermineType(num == 0, typeof(string), previous)
: DetermineType(num == 0, typeof(DateTime), previous)
: DetermineType(num == 0, typeof(float), previous)
: DetermineType(num == 0, typeof(bool), previous);
++num;
}
}

return previous;
}

private static bool DateTimeTryParse(string columnValue)
{
// Ensures that the string begins with a date format where components are separated by - / or .
const string datePattern = @"^(?:\d{1,4}[-/.]\d{1,2}[-/.]\d{1,4}).*$";

if (DateTime.TryParse(columnValue, out DateTime _) &&
Regex.IsMatch(columnValue, datePattern))
{
return true;
}

return false;
}

private static Type DetermineType(bool first, Type suggested, Type previous)
{
return first ? suggested : MaxKind(suggested, previous);
}

private static Type MaxKind(Type a, Type b)
{
if (a != b)
{
return typeof(string);
}

if (a == typeof(string) || b == typeof(string))
{
return typeof(string);
}

if (a == typeof(float) || b == typeof(float))
{
return typeof(float);
}

if (a == typeof(bool) || b == typeof(bool))
{
return typeof(bool);
}

return a == typeof(DateTime) || b == typeof(DateTime) ? typeof(DateTime) : typeof(string);
}
}
4 changes: 3 additions & 1 deletion src/Data.Xls/XlsIO/Read/XlsReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ private StreamReader Read(string tableName)
private VirtualDataTable PrepareDataTable(StreamReader streamReader, string tableName)
{
XlsConnection xlsConnection = (XlsConnection)fileConnection;
char separator = ','; //Since the XlsSheetStream composed the stream as comma separated, we are ensured that the separator is a comma and don't need to detect it.
CsvVirtualDataTable virtualDataTable = new(streamReader, tableName, pageSize, xlsConnection.GuessTypeRows,
fileConnection.PreferredFloatingPointDataType, xlsConnection.GuessTypeFunction);
fileConnection.PreferredFloatingPointDataType, xlsConnection.GuessTypeFunction,
separator);

return virtualDataTable;
}
Expand Down
Loading