Skip to content

Commit 0759ccb

Browse files
turboFeiGitHub Enterprise
authored andcommitted
[HADP-59723] Add celeborn plugins (apache#848)
1 parent 9daf4e0 commit 0759ccb

File tree

5 files changed

+105
-1
lines changed

5 files changed

+105
-1
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@
313313
<!-- Needed for consistent times -->
314314
<maven.build.timestamp.format>yyyy-MM-dd HH:mm:ss z</maven.build.timestamp.format>
315315

316-
<celeborn.client.version>0.6.0.0.3.2</celeborn.client.version>
316+
<celeborn.client.version>0.6.1.0.0.0</celeborn.client.version>
317317
<iceberg.version>1.6.1.1.9.0</iceberg.version>
318318

319319
<!-- SPARK-36796 for JDK-17 test-->

sql/ebay-etl/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@
7171
<artifactId>spark-hive_${scala.binary.version}</artifactId>
7272
<version>${project.version}</version>
7373
</dependency>
74+
<dependency>
75+
<groupId>org.apache.celeborn</groupId>
76+
<artifactId>celeborn-client-spark-3-shaded_${scala.binary.version}</artifactId>
77+
</dependency>
7478
</dependencies>
7579

7680
<build>
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.celeborn.client.spark.ebay;
19+
20+
import org.apache.spark.ShuffleDependency;
21+
import org.apache.spark.shuffle.celeborn.ShuffleFallbackPolicy;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import org.apache.celeborn.client.LifecycleManager;
26+
import org.apache.celeborn.common.CelebornConf;
27+
28+
public class ShuffleMapPartitionsFallbackPolicy implements ShuffleFallbackPolicy {
29+
public static final Logger LOG =
30+
LoggerFactory.getLogger(ShuffleMapPartitionsFallbackPolicy.class);
31+
public static final String SPARK_SHUFFLE_FALLBACK_MAP_PARTITION_THRESHOLD =
32+
"celeborn.client.spark.shuffle.fallback.numMapPartitionsThreshold";
33+
34+
@Override
35+
public boolean needFallback(
36+
ShuffleDependency<?, ?, ?> shuffleDependency,
37+
CelebornConf celebornConf,
38+
LifecycleManager lifecycleManager) {
39+
Integer shuffleMapPartitions = shuffleDependency.rdd().getNumPartitions();
40+
Integer largeMapPartitionsThreshold =
41+
celebornConf.getInt(SPARK_SHUFFLE_FALLBACK_MAP_PARTITION_THRESHOLD, Integer.MAX_VALUE);
42+
boolean needFallback = shuffleMapPartitions > largeMapPartitionsThreshold;
43+
if (needFallback) {
44+
LOG.warn(
45+
"Shuffle map partition number {} exceeds threshold {}, fallback to spark built-in shuffle implementation.",
46+
shuffleMapPartitions,
47+
largeMapPartitionsThreshold);
48+
}
49+
return needFallback;
50+
}
51+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
org.apache.celeborn.client.spark.ebay.ShuffleMapPartitionsFallbackPolicy
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.ebay.celeborn
19+
20+
import org.apache.celeborn.common.CelebornConf
21+
import org.apache.celeborn.common.client.ApplicationInfoProvider
22+
23+
import org.apache.spark.SparkContext
24+
25+
class ApplicationInfoProviderImpl(conf: CelebornConf) extends ApplicationInfoProvider(conf) {
26+
override def provide(): Map[String, String] = {
27+
SparkContext.getActive.map { sc =>
28+
Map("queue" -> sc.getConf.get("spark.yarn.queue", "default"))
29+
}.getOrElse(Map.empty[String, String])
30+
}
31+
}

0 commit comments

Comments
 (0)