Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
trajanov committed Feb 2, 2024
1 parent 1d0ec1d commit b14376a
Showing 1 changed file with 89 additions and 106 deletions.
195 changes: 89 additions & 106 deletions Notebooks/Spark-Example-09-Data-Partitioning-TreeAggregate.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,23 @@
"\n",
"### 1. Hash Partitioning\n",
"\n",
"In Hash Partitioning, Spark will create the partitions based on the hash value of the key. For example, if you have 1000 keys and you want to create 10 partitions, then Spark will create 10 partitions based on the hash value of the key. The hash value of the key will be calculated by using the following formula:\n",
"Spark will create the partitions based on the hash value of the key. For example, if you have 1000 keys and you want to create 10 partitions, then Spark will create 10 partitions based on the hash value of the key. The hash value of the key will be calculated by using the following formula:\n",
"\n",
"$$\n",
"\\text{Partition} = \\text{hash}(key) \\mod \\text{numPartitions}\n",
"$$\n",
"\n",
"### 2. Range Partitioning\n",
"\n",
"In Range Partitioning, Spark will create the partitions based on the range of the key. For example, if you have 1000 keys and you want to create 10 partitions, then Spark will create 10 partitions based on the range of the key. The range of the key will be calculated by using the following formula:\n",
"\n",
"\n",
"In range partitioning, data is partitioned based on a range of key values. All records with keys within a specific range fall into the same partition.\n",
"Spark will create the partitions based on the range of the key. For example, if you have 1000 keys and you want to create 10 partitions, then Spark will create 10 partitions based on the range of the key. The range of the key will be calculated by using the following formula:\n",
"\n",
"$$\n",
"\\text{Partition} = (Max Key - Min Key) / numPartitions\n",
"$$\n",
"\n",
"In range partitioning, data is partitioned based on a range of key values. All records with keys within a specific range fall into the same partition.\n",
"\n",
"\n",
"### 3. Custom Partitioning\n",
"\n",
"You can also implement custom partitioning logic based on the specific requirements of your application.\n",
Expand All @@ -56,7 +56,7 @@
},
{
"cell_type": "code",
"execution_count": 1,
"execution_count": 21,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -70,7 +70,7 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": 30,
"metadata": {},
"outputs": [
{
Expand All @@ -79,7 +79,7 @@
"8"
]
},
"execution_count": 2,
"execution_count": 30,
"metadata": {},
"output_type": "execute_result"
}
Expand All @@ -100,29 +100,29 @@
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": 31,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"8"
"4"
]
},
"execution_count": 4,
"execution_count": 31,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Repartition the RDD\n",
"rdd=rdd.repartition(8)\n",
"rdd=rdd.repartition(4)\n",
"rdd.getNumPartitions()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": 32,
"metadata": {},
"outputs": [
{
Expand All @@ -132,7 +132,7 @@
"All data\n",
" [2, 2, 2, 2, 2, 2, 2, 2]\n",
"partitions\n",
" [[2], [], [2], [2], [2], [], [2], [2, 2, 2]]\n"
" [[2, 2, 2], [2, 2, 2], [2], [2]]\n"
]
}
],
Expand All @@ -141,82 +141,6 @@
"print(\"partitions\\n\",rdd.glom().collect())"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"(16, 8)\n"
]
}
],
"source": [
"# Create a threeAggregate that calculate the sum of the elements and counts the elements\n",
"a = rdd.treeAggregate((0,0), lambda x, y: (x[0]+y, x[1]+1) , lambda x, y: (x[0]+y[0],x[1]+y[1]))\n",
"print(a)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"(17, 9)\n"
]
}
],
"source": [
"a = rdd.repartition(1)\\\n",
" .treeAggregate((1, 1), lambda x, y: (x[0]+y, x[1]+1) , lambda x, y: (x[0]+y[0],x[1]+y[1]))\n",
"print(a)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"(20, 12)\n"
]
}
],
"source": [
"a = rdd.repartition(4)\\\n",
" .treeAggregate((1, 1), lambda x, y: (x[0]+y, x[1]+1) , lambda x, y: (x[0]+y[0],x[1]+y[1]))\n",
"print(a)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"(24, 16)\n"
]
}
],
"source": [
"a = rdd.repartition(8)\\\n",
".treeAggregate((1, 1), lambda x, y: (x[0]+y, x[1]+1) , lambda x, y: (x[0]+y[0],x[1]+y[1]))\n",
"print(a)\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
Expand All @@ -228,7 +152,7 @@
},
{
"cell_type": "code",
"execution_count": 9,
"execution_count": 33,
"metadata": {},
"outputs": [
{
Expand Down Expand Up @@ -273,7 +197,7 @@
},
{
"cell_type": "code",
"execution_count": 10,
"execution_count": 34,
"metadata": {},
"outputs": [
{
Expand Down Expand Up @@ -315,7 +239,7 @@
},
{
"cell_type": "code",
"execution_count": 11,
"execution_count": 35,
"metadata": {},
"outputs": [
{
Expand All @@ -340,12 +264,80 @@
"print(\"result\\n\",sum_count.collect())\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Aggregate function and initializations\n",
"The number of partitions in combination with the initial value can have an effect on the result of the aggregate functions. \n",
"\n",
"We need to be careful when using aggregate functions with initial values that are not neutral for the function."
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"(16, 8)\n"
]
}
],
"source": [
"# Create a threeAggregate that calculate the sum of the elements and counts the elements\n",
"rdd = sc.parallelize([2, 2, 2, 2, 2, 2, 2, 2], 8)\n",
"a = rdd.treeAggregate((0,0), lambda x, y: (x[0]+y, x[1]+1) , lambda x, y: (x[0]+y[0],x[1]+y[1]))\n",
"print(a)"
]
},
{
"cell_type": "code",
"execution_count": 37,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1 partition (17, 9)\n",
"4 partitions (20, 12)\n",
"8 partitions (24, 16)\n"
]
}
],
"source": [
"a = rdd.repartition(1)\\\n",
" .treeAggregate((1, 1), lambda x, y: (x[0]+y, x[1]+1) , lambda x, y: (x[0]+y[0],x[1]+y[1]))\n",
"print(\"1 partition\", a)\n",
"\n",
"a = rdd.repartition(4)\\\n",
" .treeAggregate((1, 1), lambda x, y: (x[0]+y, x[1]+1) , lambda x, y: (x[0]+y[0],x[1]+y[1]))\n",
"print(\"4 partitions\", a)\n",
"\n",
"a = rdd.repartition(8)\\\n",
".treeAggregate((1, 1), lambda x, y: (x[0]+y, x[1]+1) , lambda x, y: (x[0]+y[0],x[1]+y[1]))\n",
"print(\"8 partitions\", a)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Spark DataFrame partitioning\n",
"\n",
"Spark DataFrame partitioning is the process of dividing a large DataFrame into smaller DataFrame based on the column value or expression."
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Spark DataFrame repartition()\n",
"## Spark DataFrame repartition()\n",
"Unlike RDD, you can’t specify the partition/parallelism while creating DataFrame. \n",
"\n",
"Spark DataFrame repartition() method is used to increase or decrease the partitions.\n"
Expand Down Expand Up @@ -388,15 +380,6 @@
"print(df2.rdd.glom().collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Spark DataFrame partitioning\n",
"\n",
"Spark DataFrame partitioning is the process of dividing a large DataFrame into smaller DataFrame based on the column value or expression."
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand All @@ -407,7 +390,7 @@
},
{
"cell_type": "code",
"execution_count": 36,
"execution_count": 38,
"metadata": {},
"outputs": [
{
Expand Down Expand Up @@ -450,7 +433,7 @@
},
{
"cell_type": "code",
"execution_count": 23,
"execution_count": 39,
"metadata": {},
"outputs": [
{
Expand Down Expand Up @@ -493,7 +476,7 @@
},
{
"cell_type": "code",
"execution_count": 34,
"execution_count": 40,
"metadata": {},
"outputs": [
{
Expand Down

0 comments on commit b14376a

Please sign in to comment.