Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
6f1cbd3
MAPREDUCE-7341. Intermediate Manifest Committer
steveloughran Apr 30, 2021
17a9e8c
MAPREDUCE-7341. remove unused imports in TestCleanupStage
steveloughran Jul 28, 2021
20fc78b
MAPREDUCE-7341. cleanup protocol test a bit
steveloughran Jul 30, 2021
2cf738e
MAPREDUCE-7341. Remove AuditingIntegration; test review.
steveloughran Aug 3, 2021
9521077
MAPREDUCE-7341. use msync() before listings
steveloughran Aug 20, 2021
be9ae4b
MAPREDUCE-7341. improving task manifest loading
steveloughran Sep 8, 2021
f275061
MAPREDUCE-7341. creating output
steveloughran Sep 15, 2021
dcb7937
MAPREDUCE-7341. code refactoring
steveloughran Sep 16, 2021
d36c691
MAPREDUCE-7341. Directory Creation
steveloughran Sep 30, 2021
1c74983
MAPREDUCE-7341. Directory Creation
steveloughran Oct 4, 2021
7c75b1d
MAPREDUCE-7341. Rate Limiting and logging
steveloughran Oct 5, 2021
55a81f2
MAPREDUCE-7341. improving logging
steveloughran Oct 7, 2021
d9ccac4
MAPREDUCE-7341. Design of etag extraction extension point
steveloughran Oct 21, 2021
4bde5a1
MAPREDUCE-7341. Etags through ABFS
steveloughran Oct 21, 2021
dc5d1d1
MAPREDUCE-7341. Manifest committer
steveloughran Oct 22, 2021
146469c
MAPREDUCE-7341. Manifest committer
steveloughran Oct 22, 2021
1b38a96
MAPREDUCE-7341. Use Etags for rename resilience
steveloughran Oct 26, 2021
1ab671a
MAPREDUCE-7341. ResilientCommitByRename
steveloughran Oct 27, 2021
70f92c3
MAPREDUCE-7341. ResilientCommitByRename
steveloughran Oct 28, 2021
c144bd9
MAPREDUCE-7341. manifest committer
steveloughran Jan 7, 2022
002c2cf
MAPREDUCE-7341. IOStats aggregation
steveloughran Jan 21, 2022
d36bf26
MAPREDUCE-7341. revisit
steveloughran Feb 1, 2022
4ec5c85
MAPREDUCE-7341. directory preparation
steveloughran Feb 2, 2022
94637e2
MAPREDUCE-7341. directory preparation
steveloughran Feb 3, 2022
8a742e2
MAPREDUCE-7341. treating missing task attempt path as ok.
steveloughran Feb 8, 2022
5488213
MAPREDUCE-7341. testing and tuning
steveloughran Feb 9, 2022
1c42a4f
MAPREDUCE-7341. Specifying commit protocol rigorously
steveloughran Feb 11, 2022
c1528b2
MAPREDUCE-7341. feedback from sseth
steveloughran Feb 25, 2022
d3ab02a
MAPREDUCE-7341. track size of created files
steveloughran Mar 1, 2022
6660db1
MAPREDUCE-7341. improve dir creation performance.
steveloughran Mar 3, 2022
2bac8fd
Merge branch 'trunk' into mr/MAPREDUCE-7341-manifest-committer
steveloughran Mar 3, 2022
8193ff2
MAPREDUCE-7341. changes from sseth's review
steveloughran Mar 4, 2022
252c89e
MAPREDUCE-7341. fix test failure in abfs itests
steveloughran Mar 7, 2022
024ff5a
MAPREDUCE-7341. clean up the abfs changes
steveloughran Mar 8, 2022
d4223ad
MAPREDUCE-7341. move directory probes to task commit
steveloughran Mar 8, 2022
1f4227d
MAPREDUCE-7341. More directory creation performance.
steveloughran Mar 8, 2022
e568b5c
MAPREDUCE-7341. Optimise task commit directory probes
steveloughran Mar 9, 2022
c706621
MAPREDUCE-7341. docs, options and setting as default
steveloughran Mar 9, 2022
0dafe3d
MAPREDUCE-7341. mapreduce.manifest.committer.delete.target.files
steveloughran Mar 10, 2022
d599707
MAPREDUCE-7341 doc changes.
steveloughran Mar 10, 2022
aac5ffc
MAPREDUCE-7341 doc changes.
steveloughran Mar 10, 2022
a550b42
MAPREDUCE-7341 doc changes.
steveloughran Mar 10, 2022
af50fef
MAPREDUCE-7341 doc changes.
steveloughran Mar 10, 2022
453fbae
MAPREDUCE-7341 doc changes.
steveloughran Mar 10, 2022
8691286
MAPREDUCE-7341. some of the review actions
steveloughran Mar 12, 2022
5a6c0c1
MAPREDUCE-7341. Remove the option of moving the temporary directory t…
steveloughran Mar 12, 2022
2684397
MAPREDUCE-7341. tests on manifest file list
steveloughran Mar 14, 2022
dccf376
MAPREDUCE-7341. PathOutputCommitter logs at info
steveloughran Mar 14, 2022
5019013
MAPREDUCE-7341. yetus -unused imports and spaces at EOL in markdown
steveloughran Mar 15, 2022
2dd859a
MAPREDUCE-7341. mukund's comments.
steveloughran Mar 15, 2022
73c7a39
MAPREDUCE-7341. checkstyles
steveloughran Mar 15, 2022
74177c4
MAPREDUCE-7341. feedback before merge
steveloughran Mar 16, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ public final class StoreStatisticNames {
/** {@value}. */
public static final String OP_MODIFY_ACL_ENTRIES = "op_modify_acl_entries";

/** {@value}. */
public static final String OP_MSYNC = "op_msync";

/** {@value}. */
public static final String OP_OPEN = "op_open";

Expand Down Expand Up @@ -172,6 +175,9 @@ public final class StoreStatisticNames {
public static final String STORE_IO_THROTTLED
= "store_io_throttled";

/** Rate limiting was reported {@value}. */
public static final String STORE_IO_RATE_LIMITED = "store_io_rate_limited";

/** Requests made of a store: {@value}. */
public static final String STORE_IO_REQUEST
= "store_io_request";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -450,12 +451,37 @@ public static <B> B trackDuration(
* @param factory factory of duration trackers
* @param statistic statistic key
* @param input input callable.
* @throws IOException IO failure.
*/
public static void trackDurationOfInvocation(
DurationTrackerFactory factory,
String statistic,
InvocationRaisingIOE input) throws IOException {

measureDurationOfInvocation(factory, statistic, input);
}

/**
* Given an IOException raising callable/lambda expression,
* execute it and update the relevant statistic,
* returning the measured duration.
*
* {@link #trackDurationOfInvocation(DurationTrackerFactory, String, InvocationRaisingIOE)}
* with the duration returned for logging etc.; added as a new
* method to avoid linking problems with any code calling the existing
* method.
*
* @param factory factory of duration trackers
* @param statistic statistic key
* @param input input callable.
* @return the duration of the operation, as measured by the duration tracker.
* @throws IOException IO failure.
*/
public static Duration measureDurationOfInvocation(
DurationTrackerFactory factory,
String statistic,
InvocationRaisingIOE input) throws IOException {

// create the tracker outside try-with-resources so
// that failures can be set in the catcher.
DurationTracker tracker = createTracker(factory, statistic);
Expand All @@ -473,6 +499,7 @@ public static void trackDurationOfInvocation(
// set the failed flag.
tracker.close();
}
return tracker.asDuration();
}

/**
Expand Down Expand Up @@ -622,7 +649,7 @@ public static <B> B trackDurationOfSupplier(
* @param statistic statistic to track
* @return a duration tracker.
*/
private static DurationTracker createTracker(
public static DurationTracker createTracker(
@Nullable final DurationTrackerFactory factory,
final String statistic) {
return factory != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,15 @@ default long incrementCounter(String key) {
*/
void addTimedOperation(String prefix, Duration duration);

/**
* Add a statistics sample as a min, max and mean and count.
* @param key key to add.
* @param count count.
*/
default void addSample(String key, long count) {
incrementCounter(key, count);
addMeanStatisticSample(key, count);
addMaximumSample(key, count);
addMinimumSample(key, count);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public void close() {
public Duration asDuration() {
return firstDuration.asDuration();
}

@Override
public String toString() {
return firstDuration.toString();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,11 @@ public void close() {
}
iostats.addTimedOperation(name, asDuration());
}

@Override
public String toString() {
return " Duration of " +
(failed? (key + StoreStatisticNames.SUFFIX_FAILURES) : key)
+ ": " + super.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ public synchronized T fromJsonStream(InputStream stream) throws IOException {
@SuppressWarnings("unchecked")
public synchronized T load(File jsonFile)
throws IOException, JsonParseException, JsonMappingException {
if (!jsonFile.exists()) {
throw new FileNotFoundException("No such file: " + jsonFile);
}
if (!jsonFile.isFile()) {
throw new FileNotFoundException("Not a file: " + jsonFile);
}
Expand All @@ -181,7 +184,7 @@ public synchronized T load(File jsonFile)
try {
return mapper.readValue(jsonFile, classType);
} catch (IOException e) {
LOG.error("Exception while parsing json file {}", jsonFile, e);
LOG.warn("Exception while parsing json file {}", jsonFile, e);
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.util;

import java.time.Duration;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
* Minimal subset of google rate limiter class.
* Can be used to throttle use of object stores where excess load
* will trigger cluster-wide throttling, backoff etc. and so collapse
* performance.
* The time waited is returned as a Duration type.
* The google rate limiter implements this by allowing a caller to ask for
* more capacity than is available. This will be granted
* but the subsequent request will be blocked if the bucket of
* capacity hasn't let refilled to the point where there is
* capacity again.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface RateLimiting {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General usage of this in the patch: Should something like this be used within the FileSystem implementation itself, potentially shared across multiple instances pointing to the same FileSystem, instead of by the individual commit operations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we may want to go that way with the s3a client. for this committer i was trying to move it up and let us manage it here.

interesting point though, as yes, if you do it behind the FS api then the applications don't need to ask for permission themselves...but they do need to consider that some calls may block a while.

putting the feature in here what helps address the key problem we've been seeing of "many renames in job commit overloading abfs"; does nothing for any other operations.

imagine if we added the source of rate limiting to the StoreOperations, where the default limiter would be the no-op one, and abfs would return one which is shared by all users of that FS instance? this would let all spark jobs in the same VM share the same limiter. it would be hidden away in our new private interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to experiment/test more with test before moving for all operations in FS itself.


/**
* Acquire rate limiter capacity.
* If there is not enough space, the permits will be acquired,
* but the subsequent call will block until the capacity has been
* refilled.
* @param requestedCapacity capacity to acquire.
* @return time spent waiting for output.
*/
Duration acquire(int requestedCapacity);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.util;

import java.time.Duration;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter;

/**
* Factory for Rate Limiting.
* This should be only place in the code where the guava RateLimiter is imported.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class RateLimitingFactory {

private static final RateLimiting UNLIMITED = new NoRateLimiting();

/**
* No waiting took place.
*/
private static final Duration INSTANTLY = Duration.ofMillis(0);

private RateLimitingFactory() {
}

/**
* No Rate Limiting.
*/
private static class NoRateLimiting implements RateLimiting {


@Override
public Duration acquire(int requestedCapacity) {
return INSTANTLY;
}
}

/**
* Rate limiting restricted to that of a google rate limiter.
*/
private static final class RestrictedRateLimiting implements RateLimiting {
private final RateLimiter limiter;

/**
* Constructor.
* @param capacityPerSecond capacity in permits/second.
*/
private RestrictedRateLimiting(int capacityPerSecond) {
this.limiter = RateLimiter.create(capacityPerSecond);
}

@Override
public Duration acquire(int requestedCapacity) {
final double delayMillis = limiter.acquire(requestedCapacity);
return delayMillis == 0
? INSTANTLY
: Duration.ofMillis((long) (delayMillis * 1000));
}

}

/**
* Get the unlimited rate.
* @return a rate limiter which always has capacity.
*/
public static RateLimiting unlimitedRate() {
return UNLIMITED;
}

/**
* Create an instance.
* If the rate is 0; return the unlimited rate.
* @param capacity capacity in permits/second.
* @return limiter restricted to the given capacity.
*/
public static RateLimiting create(int capacity) {

return capacity == 0
? unlimitedRate()
: new RestrictedRateLimiting(capacity);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.util.functional;

import java.io.Closeable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

import static java.util.Objects.requireNonNull;

/**
* A task submitter which is closeable, and whose close() call
* shuts down the pool. This can help manage
* thread pool lifecycles.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public final class CloseableTaskPoolSubmitter implements TaskPool.Submitter,
Closeable {

/** Executors. */
private ExecutorService pool;

/**
* Constructor.
* @param pool non-null executor.
*/
public CloseableTaskPoolSubmitter(final ExecutorService pool) {
this.pool = requireNonNull(pool);
}

/**
* Get the pool.
* @return the pool.
*/
public ExecutorService getPool() {
return pool;
}

/**
* Shut down the pool.
*/
@Override
public void close() {
if (pool != null) {
pool.shutdown();
pool = null;
}
}

@Override
public Future<?> submit(final Runnable task) {
return pool.submit(task);
}
}
Loading