-
Notifications
You must be signed in to change notification settings - Fork 9.2k
MAPREDUCE-7341. Intermediate Manifest Committer #2971
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6f1cbd3
17a9e8c
20fc78b
2cf738e
9521077
be9ae4b
f275061
dcb7937
d36c691
1c74983
7c75b1d
55a81f2
d9ccac4
4bde5a1
dc5d1d1
146469c
1b38a96
1ab671a
70f92c3
c144bd9
002c2cf
d36bf26
4ec5c85
94637e2
8a742e2
5488213
1c42a4f
c1528b2
d3ab02a
6660db1
2bac8fd
8193ff2
252c89e
024ff5a
d4223ad
1f4227d
e568b5c
c706621
0dafe3d
d599707
aac5ffc
a550b42
af50fef
453fbae
8691286
5a6c0c1
2684397
dccf376
5019013
2dd859a
73c7a39
74177c4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hadoop.util; | ||
|
|
||
| import java.time.Duration; | ||
|
|
||
| import org.apache.hadoop.classification.InterfaceAudience; | ||
| import org.apache.hadoop.classification.InterfaceStability; | ||
|
|
||
| /** | ||
| * Minimal subset of google rate limiter class. | ||
| * Can be used to throttle use of object stores where excess load | ||
| * will trigger cluster-wide throttling, backoff etc. and so collapse | ||
| * performance. | ||
| * The time waited is returned as a Duration type. | ||
| * The google rate limiter implements this by allowing a caller to ask for | ||
| * more capacity than is available. This will be granted | ||
| * but the subsequent request will be blocked if the bucket of | ||
| * capacity hasn't let refilled to the point where there is | ||
| * capacity again. | ||
| */ | ||
| @InterfaceAudience.Private | ||
| @InterfaceStability.Unstable | ||
| public interface RateLimiting { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. General usage of this in the patch: Should something like this be used within the FileSystem implementation itself, potentially shared across multiple instances pointing to the same FileSystem, instead of by the individual commit operations.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think we may want to go that way with the s3a client. for this committer i was trying to move it up and let us manage it here. interesting point though, as yes, if you do it behind the FS api then the applications don't need to ask for permission themselves...but they do need to consider that some calls may block a while. putting the feature in here what helps address the key problem we've been seeing of "many renames in job commit overloading abfs"; does nothing for any other operations. imagine if we added the source of rate limiting to the StoreOperations, where the default limiter would be the no-op one, and abfs would return one which is shared by all users of that FS instance? this would let all spark jobs in the same VM share the same limiter. it would be hidden away in our new private interface.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to experiment/test more with test before moving for all operations in FS itself. |
||
|
|
||
| /** | ||
| * Acquire rate limiter capacity. | ||
| * If there is not enough space, the permits will be acquired, | ||
| * but the subsequent call will block until the capacity has been | ||
| * refilled. | ||
| * @param requestedCapacity capacity to acquire. | ||
| * @return time spent waiting for output. | ||
| */ | ||
| Duration acquire(int requestedCapacity); | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,102 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hadoop.util; | ||
|
|
||
| import java.time.Duration; | ||
|
|
||
| import org.apache.hadoop.classification.InterfaceAudience; | ||
| import org.apache.hadoop.classification.InterfaceStability; | ||
| import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter; | ||
|
|
||
| /** | ||
| * Factory for Rate Limiting. | ||
| * This should be only place in the code where the guava RateLimiter is imported. | ||
| */ | ||
| @InterfaceAudience.Private | ||
| @InterfaceStability.Unstable | ||
| public final class RateLimitingFactory { | ||
|
|
||
| private static final RateLimiting UNLIMITED = new NoRateLimiting(); | ||
|
|
||
| /** | ||
| * No waiting took place. | ||
| */ | ||
| private static final Duration INSTANTLY = Duration.ofMillis(0); | ||
|
|
||
| private RateLimitingFactory() { | ||
| } | ||
|
|
||
| /** | ||
| * No Rate Limiting. | ||
| */ | ||
| private static class NoRateLimiting implements RateLimiting { | ||
|
|
||
|
|
||
| @Override | ||
| public Duration acquire(int requestedCapacity) { | ||
| return INSTANTLY; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Rate limiting restricted to that of a google rate limiter. | ||
| */ | ||
| private static final class RestrictedRateLimiting implements RateLimiting { | ||
| private final RateLimiter limiter; | ||
|
|
||
| /** | ||
| * Constructor. | ||
| * @param capacityPerSecond capacity in permits/second. | ||
| */ | ||
| private RestrictedRateLimiting(int capacityPerSecond) { | ||
| this.limiter = RateLimiter.create(capacityPerSecond); | ||
| } | ||
|
|
||
| @Override | ||
| public Duration acquire(int requestedCapacity) { | ||
| final double delayMillis = limiter.acquire(requestedCapacity); | ||
| return delayMillis == 0 | ||
| ? INSTANTLY | ||
| : Duration.ofMillis((long) (delayMillis * 1000)); | ||
| } | ||
|
|
||
| } | ||
|
|
||
| /** | ||
| * Get the unlimited rate. | ||
| * @return a rate limiter which always has capacity. | ||
| */ | ||
| public static RateLimiting unlimitedRate() { | ||
| return UNLIMITED; | ||
| } | ||
|
|
||
| /** | ||
| * Create an instance. | ||
| * If the rate is 0; return the unlimited rate. | ||
| * @param capacity capacity in permits/second. | ||
| * @return limiter restricted to the given capacity. | ||
| */ | ||
| public static RateLimiting create(int capacity) { | ||
|
|
||
| return capacity == 0 | ||
| ? unlimitedRate() | ||
| : new RestrictedRateLimiting(capacity); | ||
| } | ||
|
|
||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hadoop.util.functional; | ||
|
|
||
| import java.io.Closeable; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Future; | ||
|
|
||
| import org.apache.hadoop.classification.InterfaceAudience; | ||
| import org.apache.hadoop.classification.InterfaceStability; | ||
|
|
||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| /** | ||
| * A task submitter which is closeable, and whose close() call | ||
| * shuts down the pool. This can help manage | ||
| * thread pool lifecycles. | ||
| */ | ||
| @InterfaceAudience.Public | ||
| @InterfaceStability.Unstable | ||
| public final class CloseableTaskPoolSubmitter implements TaskPool.Submitter, | ||
| Closeable { | ||
|
|
||
| /** Executors. */ | ||
| private ExecutorService pool; | ||
|
|
||
| /** | ||
| * Constructor. | ||
| * @param pool non-null executor. | ||
| */ | ||
| public CloseableTaskPoolSubmitter(final ExecutorService pool) { | ||
| this.pool = requireNonNull(pool); | ||
| } | ||
|
|
||
| /** | ||
| * Get the pool. | ||
| * @return the pool. | ||
| */ | ||
| public ExecutorService getPool() { | ||
| return pool; | ||
| } | ||
|
|
||
| /** | ||
| * Shut down the pool. | ||
| */ | ||
| @Override | ||
| public void close() { | ||
| if (pool != null) { | ||
| pool.shutdown(); | ||
| pool = null; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public Future<?> submit(final Runnable task) { | ||
| return pool.submit(task); | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.