-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Added Block Size Checks for ParquetLoader #120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
49bb968
64de2d6
6e4d46c
9967d12
ec1e78a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -94,7 +94,7 @@ public sealed class Arguments | |
private readonly int _columnChunkReadSize; | ||
private readonly Column[] _columnsLoaded; | ||
private readonly DataSet _schemaDataSet; | ||
private const int _defaultColumnChunkReadSize = 100; // Should ideally be close to Rowgroup size | ||
private const int _defaultColumnChunkReadSize = 1000000; | ||
|
||
private bool _disposed; | ||
|
||
|
@@ -368,8 +368,8 @@ private sealed class Cursor : RootCursorBase, IRowCursor | |
private readonly Delegate[] _getters; | ||
private readonly ReaderOptions _readerOptions; | ||
private int _curDataSetRow; | ||
private IEnumerator _dataSetEnumerator; | ||
private IEnumerator _blockEnumerator; | ||
private IEnumerator<int> _dataSetEnumerator; | ||
private IEnumerator<int> _blockEnumerator; | ||
private IList[] _columnValues; | ||
private IRandom _rand; | ||
|
||
|
@@ -390,11 +390,18 @@ public Cursor(ParquetLoader parent, Func<int, bool> predicate, IRandom rand) | |
Columns = _loader._columnsLoaded.Select(i => i.Name).ToArray() | ||
}; | ||
|
||
int numBlocks = (int)Math.Ceiling(((decimal)parent.GetRowCount() / _readerOptions.Count)); | ||
int[] blockOrder = _rand == null ? Utils.GetIdentityPermutation(numBlocks) : Utils.GetRandomPermutation(rand, numBlocks); | ||
// The number of blocks is calculated based on the specified rows in a block (defaults to 1M). | ||
// Since we want to shuffle the blocks in addition to shuffling the rows in each block, checks | ||
// are put in place to ensure we can produce a shuffle order for the blocks. | ||
var numBlocks = MathUtils.DivisionCeiling((long)parent.GetRowCount(), _readerOptions.Count); | ||
if (numBlocks > int.MaxValue) | ||
{ | ||
throw _loader._host.ExceptParam(nameof(Arguments.ColumnChunkReadSize), "Error due to too many blocks. Try increasing block size."); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The base class has opened a channel and made it available via |
||
} | ||
var blockOrder = CreateOrderSequence((int)numBlocks); | ||
_blockEnumerator = blockOrder.GetEnumerator(); | ||
|
||
_dataSetEnumerator = new int[0].GetEnumerator(); // Initialize an empty enumerator to get started | ||
_dataSetEnumerator = Enumerable.Empty<int>().GetEnumerator(); | ||
_columnValues = new IList[_actives.Length]; | ||
_getters = new Delegate[_actives.Length]; | ||
for (int i = 0; i < _actives.Length; ++i) | ||
|
@@ -472,12 +479,12 @@ protected override bool MoveNextCore() | |
{ | ||
if (_dataSetEnumerator.MoveNext()) | ||
{ | ||
_curDataSetRow = (int)_dataSetEnumerator.Current; | ||
_curDataSetRow = _dataSetEnumerator.Current; | ||
return true; | ||
} | ||
else if (_blockEnumerator.MoveNext()) | ||
{ | ||
_readerOptions.Offset = (int)_blockEnumerator.Current * _readerOptions.Count; | ||
_readerOptions.Offset = (long)_blockEnumerator.Current * _readerOptions.Count; | ||
|
||
// When current dataset runs out, read the next portion of the parquet file. | ||
DataSet ds; | ||
|
@@ -486,9 +493,9 @@ protected override bool MoveNextCore() | |
ds = ParquetReader.Read(_loader._parquetStream, _loader._parquetOptions, _readerOptions); | ||
} | ||
|
||
int[] dataSetOrder = _rand == null ? Utils.GetIdentityPermutation(ds.RowCount) : Utils.GetRandomPermutation(_rand, ds.RowCount); | ||
var dataSetOrder = CreateOrderSequence(ds.RowCount); | ||
_dataSetEnumerator = dataSetOrder.GetEnumerator(); | ||
_curDataSetRow = dataSetOrder[0]; | ||
_curDataSetRow = dataSetOrder.ElementAt(0); | ||
|
||
// Cache list for each active column | ||
for (int i = 0; i < _actives.Length; i++) | ||
|
@@ -533,6 +540,26 @@ public bool IsColumnActive(int col) | |
Ch.CheckParam(0 <= col && col < _colToActivesIndex.Length, nameof(col)); | ||
return _colToActivesIndex[col] >= 0; | ||
} | ||
|
||
/// <summary> | ||
/// Creates a in-order or shuffled sequence, based on whether _rand is specified. | ||
/// If unable to create a shuffle sequence, will default to sequential. | ||
/// </summary> | ||
/// <param name="size">Number of elements in the sequence.</param> | ||
/// <returns></returns> | ||
private IEnumerable<int> CreateOrderSequence(int size) | ||
{ | ||
IEnumerable<int> order; | ||
try | ||
{ | ||
order = _rand == null ? Enumerable.Range(0, size) : Utils.GetRandomPermutation(_rand, size); | ||
} | ||
catch (OutOfMemoryException) | ||
{ | ||
order = Enumerable.Range(0, size); | ||
} | ||
return order; | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A minor note (minor because I am fairly certain this condition will never occur), it is somewhat of a corruption of the |
||
|
||
#region Dispose | ||
|
@@ -671,4 +698,4 @@ private string ConvertListToString(IList list) | |
} | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI we do this a lot throughout the codebase where we could benefit from this instead -- hmm, but I guess I won't make you fix everywhere we do it. I suppose. ;)