Skip to content

HADOOP-17511. Add audit/telemetry logging to S3A connector #2675

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.util.functional.RemoteIterators.cleanupRemoteIterator;

/**
* An abstract class for the execution of a file system command
*/
Expand Down Expand Up @@ -361,6 +363,7 @@ protected void processPaths(PathData parent,
}
}
}
cleanupRemoteIterator(itemsIterator);
}

private void processPathInternal(PathData item) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.fs.RemoteIterator;

import static org.apache.hadoop.util.functional.RemoteIterators.mappingRemoteIterator;

/**
* Encapsulates a Path (path), its FileStatus (stat), and its FileSystem (fs).
* PathData ensures that the returned path string will be the same as the
Expand Down Expand Up @@ -287,20 +289,8 @@ public RemoteIterator<PathData> getDirectoryContentsIterator()
throws IOException {
checkIfExists(FileTypeRequirement.SHOULD_BE_DIRECTORY);
final RemoteIterator<FileStatus> stats = this.fs.listStatusIterator(path);
return new RemoteIterator<PathData>() {

@Override
public boolean hasNext() throws IOException {
return stats.hasNext();
}

@Override
public PathData next() throws IOException {
FileStatus file = stats.next();
String child = getStringForChildPath(file.getPath());
return new PathData(fs, child, file);
}
};
return mappingRemoteIterator(stats,
file -> new PathData(fs, getStringForChildPath(file.getPath()), file));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ public final class StoreStatisticNames {
public static final String DELEGATION_TOKENS_ISSUED
= "delegation_tokens_issued";

/** Probe for store existing: {@value}. */
public static final String STORE_EXISTS_PROBE
= "store_exists_probe";

/** Requests throttled and retried: {@value}. */
public static final String STORE_IO_THROTTLED
= "store_io_throttled";
Expand Down Expand Up @@ -349,6 +353,9 @@ public final class StoreStatisticNames {
public static final String MULTIPART_UPLOAD_STARTED
= "multipart_upload_started";

public static final String MULTIPART_UPLOAD_LIST
= "multipart_upload_list";

private StoreStatisticNames() {
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.statistics.impl;

import javax.annotation.Nullable;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.MeanStatistic;

import static java.util.Collections.emptyMap;

/**
* An Empty IOStatisticsStore implementation.
*/
final class EmptyIOStatisticsStore implements IOStatisticsStore {

/**
* The sole instance of this class.
*/
private static final EmptyIOStatisticsStore INSTANCE =
new EmptyIOStatisticsStore();
/**
* Get the single instance of this class.
* @return a shared, empty instance.
*/
static IOStatisticsStore getInstance() {
return INSTANCE;
}

private EmptyIOStatisticsStore() {
}

@Override
public Map<String, Long> counters() {
return emptyMap();
}

@Override
public Map<String, Long> gauges() {
return emptyMap();
}

@Override
public Map<String, Long> minimums() {
return emptyMap();
}

@Override
public Map<String, Long> maximums() {
return emptyMap();
}

@Override
public Map<String, MeanStatistic> meanStatistics() {
return emptyMap();
}

@Override
public boolean aggregate(@Nullable final IOStatistics statistics) {
return false;
}

@Override
public long incrementCounter(final String key, final long value) {
return 0;
}

@Override
public void setCounter(final String key, final long value) {

}

@Override
public void setGauge(final String key, final long value) {

}

@Override
public long incrementGauge(final String key, final long value) {
return 0;
}

@Override
public void setMaximum(final String key, final long value) {

}

@Override
public long incrementMaximum(final String key, final long value) {
return 0;
}

@Override
public void setMinimum(final String key, final long value) {

}

@Override
public long incrementMinimum(final String key, final long value) {
return 0;
}

@Override
public void addMinimumSample(final String key, final long value) {

}

@Override
public void addMaximumSample(final String key, final long value) {

}

@Override
public void setMeanStatistic(final String key, final MeanStatistic value) {

}

@Override
public void addMeanStatisticSample(final String key, final long value) {

}

@Override
public void reset() {

}

@Override
public AtomicLong getCounterReference(final String key) {
return null;
}

@Override
public AtomicLong getMaximumReference(final String key) {
return null;
}

@Override
public AtomicLong getMinimumReference(final String key) {
return null;
}

@Override
public AtomicLong getGaugeReference(final String key) {
return null;
}

@Override
public MeanStatistic getMeanStatistic(final String key) {
return null;
}

@Override
public void addTimedOperation(final String prefix,
final long durationMillis) {

}

@Override
public void addTimedOperation(final String prefix, final Duration duration) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;

Expand Down Expand Up @@ -97,6 +98,15 @@ public static IOStatistics emptyStatistics() {
return EmptyIOStatistics.getInstance();
}

/**
* Get the shared instance of the immutable empty statistics
* store.
* @return an empty statistics object.
*/
public static IOStatisticsStore emptyStatisticsStore() {
return EmptyIOStatisticsStore.getInstance();
}

/**
* Take an IOStatistics instance and wrap it in a source.
* @param statistics statistics.
Expand Down Expand Up @@ -573,6 +583,38 @@ public static <B> Callable<B> trackDurationOfCallable(
};
}

/**
* Given a Java supplier, evaluate it while
* tracking the duration of the operation and success/failure.
* @param factory factory of duration trackers
* @param statistic statistic key
* @param input input callable.
* @param <B> return type.
* @return the output of the supplier.
*/
public static <B> B trackDurationOfSupplier(
@Nullable DurationTrackerFactory factory,
String statistic,
Supplier<B> input) {
// create the tracker outside try-with-resources so
// that failures can be set in the catcher.
DurationTracker tracker = createTracker(factory, statistic);
try {
// exec the input function and return its value
return input.get();
} catch (RuntimeException e) {
// input function failed: note it
tracker.failed();
// and rethrow
throw e;
} finally {
// update the tracker.
// this is called after any catch() call will have
// set the failed flag.
tracker.close();
}
}

/**
* Create the tracker. If the factory is null, a stub
* tracker is returned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ public static <S> RemoteIterator<S> closingRemoteIterator(

/**
* Build a list from a RemoteIterator.
* @param source source iterator
* @param <T> type
* @return a list of the values.
* @throws IOException if the source RemoteIterator raises it.
Expand All @@ -202,12 +203,17 @@ public static <T> List<T> toList(RemoteIterator<T> source)

/**
* Build an array from a RemoteIterator.
* @param source source iterator
* @param a destination array; if too small a new array
* of the same type is created
* @param <T> type
* @return an array of the values.
* @throws IOException if the source RemoteIterator raises it.
*/
public static <T> T[] toArray(RemoteIterator<T> source) throws IOException {
return (T[]) toList(source).toArray();
public static <T> T[] toArray(RemoteIterator<T> source,
T[] a) throws IOException {
List<T> list = toList(source);
return list.toArray(a);
}

/**
Expand Down Expand Up @@ -240,18 +246,28 @@ public static <T> long foreach(
consumer.accept(source.next());
}

// maybe log the results
logIOStatisticsAtDebug(LOG, "RemoteIterator Statistics: {}", source);
} finally {
if (source instanceof Closeable) {
// source is closeable, so close.
IOUtils.cleanupWithLogger(LOG, (Closeable) source);
}
cleanupRemoteIterator(source);
}

return count;
}

/**
* Clean up after an iteration.
* If the log is at debug, calculate and log the IOStatistics.
* If the iterator is closeable, cast and then cleanup the iterator
* @param source iterator source
* @param <T> type of source
*/
public static <T> void cleanupRemoteIterator(RemoteIterator<T> source) {
// maybe log the results
logIOStatisticsAtDebug(LOG, "RemoteIterator Statistics: {}", source);
if (source instanceof Closeable) {
/* source is closeable, so close.*/
IOUtils.cleanupWithLogger(LOG, (Closeable) source);
}
}

/**
* A remote iterator from a singleton. It has a single next()
* value, after which hasNext() returns false and next() fails.
Expand Down
Loading