Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to the Merge routine #5778

Merged
merged 4 commits into from
May 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions src/Microsoft.Data.Analysis/ArrowStringDataFrameColumn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -460,34 +460,42 @@ private ArrowStringDataFrameColumn Clone(PrimitiveDataFrameColumn<int> mapIndice
/// <inheritdoc/>
public override DataFrame ValueCounts()
{
Dictionary<string, ICollection<long>> groupedValues = GroupColumnValues<string>();
Dictionary<string, ICollection<long>> groupedValues = GroupColumnValues<string>(out HashSet<long> _);
return StringDataFrameColumn.ValueCountsImplementation(groupedValues);
}

/// <inheritdoc/>
public override GroupBy GroupBy(int columnIndex, DataFrame parent)
{
Dictionary<string, ICollection<long>> dictionary = GroupColumnValues<string>();
Dictionary<string, ICollection<long>> dictionary = GroupColumnValues<string>(out HashSet<long> _);
return new GroupBy<string>(parent, columnIndex, dictionary);
}

/// <inheritdoc/>
public override Dictionary<TKey, ICollection<long>> GroupColumnValues<TKey>()
public override Dictionary<TKey, ICollection<long>> GroupColumnValues<TKey>(out HashSet<long> nullIndices)
{
if (typeof(TKey) == typeof(string))
{
nullIndices = new HashSet<long>();
Dictionary<string, ICollection<long>> multimap = new Dictionary<string, ICollection<long>>(EqualityComparer<string>.Default);
for (long i = 0; i < Length; i++)
{
string str = this[i] ?? "__null__";
bool containsKey = multimap.TryGetValue(str, out ICollection<long> values);
if (containsKey)
string str = this[i];
if (str != null)
{
values.Add(i);
bool containsKey = multimap.TryGetValue(str, out ICollection<long> values);
if (containsKey)
{
values.Add(i);
}
else
{
multimap.Add(str, new List<long>() { i });
}
}
else
{
multimap.Add(str, new List<long>() { i });
nullIndices.Add(i);
}
}
return multimap as Dictionary<TKey, ICollection<long>>;
Expand All @@ -499,7 +507,7 @@ public override Dictionary<TKey, ICollection<long>> GroupColumnValues<TKey>()
}

/// <inheritdoc/>
public ArrowStringDataFrameColumn FillNulls(string value, bool inPlace = false)
public ArrowStringDataFrameColumn FillNulls(string value, bool inPlace = false)
{
if (value == null)
{
Expand Down
183 changes: 91 additions & 92 deletions src/Microsoft.Data.Analysis/DataFrame.Join.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,82 +168,72 @@ public DataFrame Merge<TKey>(DataFrame other, string leftJoinColumn, string righ
{
// First hash other dataframe on the rightJoinColumn
DataFrameColumn otherColumn = other.Columns[rightJoinColumn];
Dictionary<TKey, ICollection<long>> multimap = otherColumn.GroupColumnValues<TKey>();
Dictionary<TKey, ICollection<long>> multimap = otherColumn.GroupColumnValues<TKey>(out HashSet<long> otherColumnNullIndices);

// Go over the records in this dataframe and match with the dictionary
DataFrameColumn thisColumn = Columns[leftJoinColumn];

for (long i = 0; i < thisColumn.Length; i++)
{
var thisColumnValue = thisColumn[i];
TKey thisColumnValueOrDefault = (TKey)(thisColumnValue == null ? default(TKey) : thisColumnValue);
if (multimap.TryGetValue(thisColumnValueOrDefault, out ICollection<long> rowNumbers))
if (thisColumnValue != null)
{
foreach (long row in rowNumbers)
if (multimap.TryGetValue((TKey)thisColumnValue, out ICollection<long> rowNumbers))
{
if (thisColumnValue == null)
foreach (long row in rowNumbers)
{
// Match only with nulls in otherColumn
if (otherColumn[row] == null)
{
leftRowIndices.Append(i);
rightRowIndices.Append(row);
}
}
else
{
// Cannot match nulls in otherColumn
if (otherColumn[row] != null)
{
leftRowIndices.Append(i);
rightRowIndices.Append(row);
}
leftRowIndices.Append(i);
rightRowIndices.Append(row);
}
}
else
{
leftRowIndices.Append(i);
rightRowIndices.Append(null);
}
}
else
{
leftRowIndices.Append(i);
rightRowIndices.Append(null);
foreach (long row in otherColumnNullIndices)
{
leftRowIndices.Append(i);
rightRowIndices.Append(row);
}
}
}
}
else if (joinAlgorithm == JoinAlgorithm.Right)
{
DataFrameColumn thisColumn = Columns[leftJoinColumn];
Dictionary<TKey, ICollection<long>> multimap = thisColumn.GroupColumnValues<TKey>();
Dictionary<TKey, ICollection<long>> multimap = thisColumn.GroupColumnValues<TKey>(out HashSet<long> thisColumnNullIndices);

DataFrameColumn otherColumn = other.Columns[rightJoinColumn];
for (long i = 0; i < otherColumn.Length; i++)
{
var otherColumnValue = otherColumn[i];
TKey otherColumnValueOrDefault = (TKey)(otherColumnValue == null ? default(TKey) : otherColumnValue);
if (multimap.TryGetValue(otherColumnValueOrDefault, out ICollection<long> rowNumbers))
if (otherColumnValue != null)
{
foreach (long row in rowNumbers)
if (multimap.TryGetValue((TKey)otherColumnValue, out ICollection<long> rowNumbers))
{
if (otherColumnValue == null)
foreach (long row in rowNumbers)
{
if (thisColumn[row] == null)
{
leftRowIndices.Append(row);
rightRowIndices.Append(i);
}
}
else
{
if (thisColumn[row] != null)
{
leftRowIndices.Append(row);
rightRowIndices.Append(i);
}
leftRowIndices.Append(row);
rightRowIndices.Append(i);
}
}
else
{
leftRowIndices.Append(null);
rightRowIndices.Append(i);
}
}
else
{
leftRowIndices.Append(null);
rightRowIndices.Append(i);
foreach (long thisColumnNullIndex in thisColumnNullIndices)
{
leftRowIndices.Append(thisColumnNullIndex);
rightRowIndices.Append(i);
}
}
}
}
Expand All @@ -253,97 +243,106 @@ public DataFrame Merge<TKey>(DataFrame other, string leftJoinColumn, string righ
long leftRowCount = Rows.Count;
long rightRowCount = other.Rows.Count;

var leftColumnIsSmaller = (leftRowCount <= rightRowCount);
bool leftColumnIsSmaller = leftRowCount <= rightRowCount;
DataFrameColumn hashColumn = leftColumnIsSmaller ? Columns[leftJoinColumn] : other.Columns[rightJoinColumn];
DataFrameColumn otherColumn = ReferenceEquals(hashColumn, Columns[leftJoinColumn]) ? other.Columns[rightJoinColumn] : Columns[leftJoinColumn];
Dictionary<TKey, ICollection<long>> multimap = hashColumn.GroupColumnValues<TKey>();
Dictionary<TKey, ICollection<long>> multimap = hashColumn.GroupColumnValues<TKey>(out HashSet<long> smallerDataFrameColumnNullIndices);

for (long i = 0; i < otherColumn.Length; i++)
{
var otherColumnValue = otherColumn[i];
TKey otherColumnValueOrDefault = (TKey)(otherColumnValue == null ? default(TKey) : otherColumnValue);
if (multimap.TryGetValue(otherColumnValueOrDefault, out ICollection<long> rowNumbers))
if (otherColumnValue != null)
{
foreach (long row in rowNumbers)
if (multimap.TryGetValue((TKey)otherColumnValue, out ICollection<long> rowNumbers))
{
if (otherColumnValue == null)
{
if (hashColumn[row] == null)
{
leftRowIndices.Append(leftColumnIsSmaller ? row : i);
rightRowIndices.Append(leftColumnIsSmaller ? i : row);
}
}
else
foreach (long row in rowNumbers)
{
if (hashColumn[row] != null)
{
leftRowIndices.Append(leftColumnIsSmaller ? row : i);
rightRowIndices.Append(leftColumnIsSmaller ? i : row);
}
leftRowIndices.Append(leftColumnIsSmaller ? row : i);
rightRowIndices.Append(leftColumnIsSmaller ? i : row);
}
}
}
else
{
foreach (long nullIndex in smallerDataFrameColumnNullIndices)
{
leftRowIndices.Append(leftColumnIsSmaller ? nullIndex : i);
rightRowIndices.Append(leftColumnIsSmaller ? i : nullIndex);
}
}
}
}
else if (joinAlgorithm == JoinAlgorithm.FullOuter)
{
DataFrameColumn otherColumn = other.Columns[rightJoinColumn];
Dictionary<TKey, ICollection<long>> multimap = otherColumn.GroupColumnValues<TKey>();
Dictionary<TKey, ICollection<long>> multimap = otherColumn.GroupColumnValues<TKey>(out HashSet<long> otherColumnNullIndices);
Dictionary<TKey, long> intersection = new Dictionary<TKey, long>(EqualityComparer<TKey>.Default);

// Go over the records in this dataframe and match with the dictionary
DataFrameColumn thisColumn = Columns[leftJoinColumn];
Int64DataFrameColumn thisColumnNullIndices = new Int64DataFrameColumn("ThisColumnNullIndices");

for (long i = 0; i < thisColumn.Length; i++)
{
var thisColumnValue = thisColumn[i];
TKey thisColumnValueOrDefault = (TKey)(thisColumnValue == null ? default(TKey) : thisColumnValue);
if (multimap.TryGetValue(thisColumnValueOrDefault, out ICollection<long> rowNumbers))
if (thisColumnValue != null)
{
foreach (long row in rowNumbers)
if (multimap.TryGetValue((TKey)thisColumnValue, out ICollection<long> rowNumbers))
{
if (thisColumnValue == null)
{
// Has to match only with nulls in otherColumn
if (otherColumn[row] == null)
{
leftRowIndices.Append(i);
rightRowIndices.Append(row);
if (!intersection.ContainsKey(thisColumnValueOrDefault))
{
intersection.Add(thisColumnValueOrDefault, rowNumber);
}
}
}
else
foreach (long row in rowNumbers)
{
// Cannot match to nulls in otherColumn
if (otherColumn[row] != null)
leftRowIndices.Append(i);
rightRowIndices.Append(row);
if (!intersection.ContainsKey((TKey)thisColumnValue))
{
leftRowIndices.Append(i);
rightRowIndices.Append(row);
if (!intersection.ContainsKey(thisColumnValueOrDefault))
{
intersection.Add(thisColumnValueOrDefault, rowNumber);
}
intersection.Add((TKey)thisColumnValue, rowNumber);
}
}
}
else
{
leftRowIndices.Append(i);
rightRowIndices.Append(null);
}
}
else
{
leftRowIndices.Append(i);
rightRowIndices.Append(null);
thisColumnNullIndices.Append(i);
}
}
for (long i = 0; i < otherColumn.Length; i++)
{
TKey value = (TKey)(otherColumn[i] ?? default(TKey));
if (!intersection.ContainsKey(value))
var value = otherColumn[i];
if (value != null)
{
if (!intersection.ContainsKey((TKey)value))
{
leftRowIndices.Append(null);
rightRowIndices.Append(i);
}
}
}

// Now handle the null rows
foreach (long? thisColumnNullIndex in thisColumnNullIndices)
{
foreach (long otherColumnNullIndex in otherColumnNullIndices)
{
leftRowIndices.Append(thisColumnNullIndex.Value);
rightRowIndices.Append(otherColumnNullIndex);
}
if (otherColumnNullIndices.Count == 0)
{
leftRowIndices.Append(thisColumnNullIndex.Value);
rightRowIndices.Append(null);
}
}
if (thisColumnNullIndices.Length == 0)
{
foreach (long otherColumnNullIndex in otherColumnNullIndices)
{
leftRowIndices.Append(null);
rightRowIndices.Append(i);
rightRowIndices.Append(otherColumnNullIndex);
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/Microsoft.Data.Analysis/DataFrameColumn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,12 @@ public virtual DataFrameColumn Sort(bool ascending = true)
return Clone(sortIndices, !ascending, NullCount);
}

public virtual Dictionary<TKey, ICollection<long>> GroupColumnValues<TKey>() => throw new NotImplementedException();
/// <summary>
/// Groups the rows of this column by their value.
/// </summary>
/// <typeparam name="TKey">The type of data held by this column</typeparam>
/// <returns>A mapping of value(<typeparamref name="TKey"/>) to the indices containing this value</returns>
public virtual Dictionary<TKey, ICollection<long>> GroupColumnValues<TKey>(out HashSet<long> nullIndices) => throw new NotImplementedException();

/// <summary>
/// Returns a DataFrame containing counts of unique values
Expand Down
Loading