Skip to content

Commit

Permalink
Add kill_duplicates function to remove all duplicates from a table
Browse files Browse the repository at this point in the history
  • Loading branch information
MrPowers committed Dec 10, 2022
1 parent 3a0fbb5 commit 5c2e899
Show file tree
Hide file tree
Showing 5 changed files with 513 additions and 370 deletions.
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,40 @@ You can leverage the upsert code if your SCD table meets these requirements:
* Contains a unique primary key column
* Any change in an attribute column triggers an upsert
* SCD logic is exposed via `effective_time`, `end_time` and `is_current` column (you can also use date or version columns for SCD upserts)

## Kill duplicates

The `kill_duplicate` function completely removes all duplicate rows from a Delta table.

Suppose you have the following table:

```
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| A| A| # duplicate
| 2| A| B|
| 3| A| A| # duplicate
| 4| A| A| # duplicate
| 5| B| B| # duplicate
| 6| D| D|
| 9| B| B| # duplicate
+----+----+----+
```

Run the `kill_duplicates` function:

```python
mack.kill_duplicates(deltaTable, "col1", ["col2", "col3"])
```

Here's the ending state of the table:

```
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 2| A| B|
| 6| D| D|
+----+----+----+
```
19 changes: 19 additions & 0 deletions mack/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from delta import *
import pyspark
import pyspark.sql.functions as F


class MackValidationError(ValueError):
Expand Down Expand Up @@ -84,3 +85,21 @@ def type_2_scd_generic_upsert(
.execute()
)
return res


def kill_duplicates(deltaTable, pkey, cols):
spark = pyspark.sql.SparkSession.getActiveSession()
colsA = ", ".join(cols)
deltaTable.toDF().createOrReplaceTempView("temp")
dfTemp = (
spark.sql(f"SELECT *, ROW_NUMBER() OVER (PARTITION BY {colsA} ORDER BY {pkey} DESC) rn FROM temp")
).filter(F.col('rn') > 1).drop('rn').distinct()

q = []
for col in cols:
q.append(f"main.{col} = nodups.{col}")
q = " AND ".join(q)

deltaTable.alias("main").merge(
dfTemp.alias("nodups"), q
).whenMatchedDelete().execute()
Loading

0 comments on commit 5c2e899

Please sign in to comment.