Skip to content

Added Block Size Checks for ParquetLoader #120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/Microsoft.ML.Core/Utilities/MathUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -871,5 +871,16 @@ public static double Cos(double a)
var res = Math.Cos(a);
return Math.Abs(res) > 1 ? double.NaN : res;
}

/// <summary>
/// Returns the smallest integral value that is greater than or equal to the result of the division.
/// </summary>
/// <param name="numerator">Number to be divided.</param>
/// <param name="denomenator">Number with which to divide the numerator.</param>
/// <returns></returns>
public static long DivisionCeiling(long numerator, long denomenator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DivisionCeiling [](start = 27, length = 15)

FYI we do this a lot throughout the codebase where we could benefit from this instead -- hmm, but I guess I won't make you fix everywhere we do it. I suppose. ;)

{
return (checked(numerator + denomenator) - 1) / denomenator;
}
}
}
49 changes: 38 additions & 11 deletions src/Microsoft.ML.Parquet/ParquetLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public sealed class Arguments
private readonly int _columnChunkReadSize;
private readonly Column[] _columnsLoaded;
private readonly DataSet _schemaDataSet;
private const int _defaultColumnChunkReadSize = 100; // Should ideally be close to Rowgroup size
private const int _defaultColumnChunkReadSize = 1000000;

private bool _disposed;

Expand Down Expand Up @@ -368,8 +368,8 @@ private sealed class Cursor : RootCursorBase, IRowCursor
private readonly Delegate[] _getters;
private readonly ReaderOptions _readerOptions;
private int _curDataSetRow;
private IEnumerator _dataSetEnumerator;
private IEnumerator _blockEnumerator;
private IEnumerator<int> _dataSetEnumerator;
private IEnumerator<int> _blockEnumerator;
private IList[] _columnValues;
private IRandom _rand;

Expand All @@ -390,11 +390,18 @@ public Cursor(ParquetLoader parent, Func<int, bool> predicate, IRandom rand)
Columns = _loader._columnsLoaded.Select(i => i.Name).ToArray()
};

int numBlocks = (int)Math.Ceiling(((decimal)parent.GetRowCount() / _readerOptions.Count));
int[] blockOrder = _rand == null ? Utils.GetIdentityPermutation(numBlocks) : Utils.GetRandomPermutation(rand, numBlocks);
// The number of blocks is calculated based on the specified rows in a block (defaults to 1M).
// Since we want to shuffle the blocks in addition to shuffling the rows in each block, checks
// are put in place to ensure we can produce a shuffle order for the blocks.
var numBlocks = MathUtils.DivisionCeiling((long)parent.GetRowCount(), _readerOptions.Count);
if (numBlocks > int.MaxValue)
{
throw _loader._host.ExceptParam(nameof(Arguments.ColumnChunkReadSize), "Error due to too many blocks. Try increasing block size.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The base class has opened a channel and made it available via Ch property, should probably use that instead since it's a more specific exception context.

}
var blockOrder = CreateOrderSequence((int)numBlocks);
_blockEnumerator = blockOrder.GetEnumerator();

_dataSetEnumerator = new int[0].GetEnumerator(); // Initialize an empty enumerator to get started
_dataSetEnumerator = Enumerable.Empty<int>().GetEnumerator();
_columnValues = new IList[_actives.Length];
_getters = new Delegate[_actives.Length];
for (int i = 0; i < _actives.Length; ++i)
Expand Down Expand Up @@ -472,12 +479,12 @@ protected override bool MoveNextCore()
{
if (_dataSetEnumerator.MoveNext())
{
_curDataSetRow = (int)_dataSetEnumerator.Current;
_curDataSetRow = _dataSetEnumerator.Current;
return true;
}
else if (_blockEnumerator.MoveNext())
{
_readerOptions.Offset = (int)_blockEnumerator.Current * _readerOptions.Count;
_readerOptions.Offset = (long)_blockEnumerator.Current * _readerOptions.Count;

// When current dataset runs out, read the next portion of the parquet file.
DataSet ds;
Expand All @@ -486,9 +493,9 @@ protected override bool MoveNextCore()
ds = ParquetReader.Read(_loader._parquetStream, _loader._parquetOptions, _readerOptions);
}

int[] dataSetOrder = _rand == null ? Utils.GetIdentityPermutation(ds.RowCount) : Utils.GetRandomPermutation(_rand, ds.RowCount);
var dataSetOrder = CreateOrderSequence(ds.RowCount);
_dataSetEnumerator = dataSetOrder.GetEnumerator();
_curDataSetRow = dataSetOrder[0];
_curDataSetRow = dataSetOrder.ElementAt(0);

// Cache list for each active column
for (int i = 0; i < _actives.Length; i++)
Expand Down Expand Up @@ -533,6 +540,26 @@ public bool IsColumnActive(int col)
Ch.CheckParam(0 <= col && col < _colToActivesIndex.Length, nameof(col));
return _colToActivesIndex[col] >= 0;
}

/// <summary>
/// Creates a in-order or shuffled sequence, based on whether _rand is specified.
/// If unable to create a shuffle sequence, will default to sequential.
/// </summary>
/// <param name="size">Number of elements in the sequence.</param>
/// <returns></returns>
private IEnumerable<int> CreateOrderSequence(int size)
{
IEnumerable<int> order;
try
{
order = _rand == null ? Enumerable.Range(0, size) : Utils.GetRandomPermutation(_rand, size);
}
catch (OutOfMemoryException)
{
order = Enumerable.Range(0, size);
}
return order;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A minor note (minor because I am fairly certain this condition will never occur), it is somewhat of a corruption of the IDataView contract` to have an implementation that delivers different results depending on runtime conditions... but since this will never occur and it's for shuffling case only anyway, it is probably not so bad.


#region Dispose
Expand Down Expand Up @@ -671,4 +698,4 @@ private string ConvertListToString(IList list)
}
}
}
}
}