Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,7 @@
import java.util.List;

import com.google.common.annotations.VisibleForTesting;
import com.microsoft.azure.datalake.store.ADLStoreClient;
import com.microsoft.azure.datalake.store.ADLStoreOptions;
import com.microsoft.azure.datalake.store.DirectoryEntry;
import com.microsoft.azure.datalake.store.IfExists;
import com.microsoft.azure.datalake.store.LatencyTracker;
import com.microsoft.azure.datalake.store.UserGroupRepresentation;
import com.microsoft.azure.datalake.store.*;
import com.microsoft.azure.datalake.store.oauth2.AccessTokenProvider;
import com.microsoft.azure.datalake.store.oauth2.ClientCredsTokenProvider;
import com.microsoft.azure.datalake.store.oauth2.DeviceCodeTokenProvider;
Expand All @@ -54,6 +49,8 @@
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.adl.metrics.AdlFileSystemInstrumentation;
import org.apache.hadoop.fs.adl.metrics.Statistic;
import org.apache.hadoop.fs.adl.oauth2.AzureADTokenProvider;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
Expand All @@ -67,6 +64,7 @@
import org.apache.hadoop.util.VersionInfo;

import static org.apache.hadoop.fs.adl.AdlConfKeys.*;
import static org.apache.hadoop.fs.adl.metrics.Statistic.*;

/**
* A FileSystem to access Azure Data Lake Store.
Expand All @@ -83,7 +81,7 @@ public class AdlFileSystem extends FileSystem {
private Path workingDirectory;
private boolean aclBitStatus;
private UserGroupRepresentation oidOrUpn;

private AdlFileSystemInstrumentation instrumentation;

// retained for tests
private AccessTokenProvider tokenProvider;
Expand Down Expand Up @@ -162,6 +160,8 @@ public void initialize(URI storeUri, Configuration conf) throws IOException {
adlClient = ADLStoreClient
.createClient(accountFQDN, getAccessTokenProvider(conf));

instrumentation = new AdlFileSystemInstrumentation(storeUri, adlClient);

ADLStoreOptions options = new ADLStoreOptions();
options.enableThrowingRemoteExceptions();

Expand Down Expand Up @@ -357,12 +357,16 @@ public Path getHomeDirectory() {
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
Long start = System.currentTimeMillis();
entryPoint(CREATE, 1);
statistics.incrementWriteOps(1);
IfExists overwriteRule = overwrite ? IfExists.OVERWRITE : IfExists.FAIL;
return new FSDataOutputStream(new AdlFsOutputStream(adlClient
FSDataOutputStream fsDataOutputStream = new FSDataOutputStream(new AdlFsOutputStream(adlClient
.createFile(toRelativeFilePath(f), overwriteRule,
Integer.toOctalString(applyUMask(permission).toShort()), true),
getConf()), this.statistics);
entryPoint(CREATE_LATENCY, System.currentTimeMillis() - start);
return fsDataOutputStream;
}

/**
Expand All @@ -387,6 +391,8 @@ public FSDataOutputStream create(Path f, FsPermission permission,
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
EnumSet<CreateFlag> flags, int bufferSize, short replication,
long blockSize, Progressable progress) throws IOException {
Long start = System.currentTimeMillis();
entryPoint(CREATENONRECURSIVE, 1);
statistics.incrementWriteOps(1);
IfExists overwriteRule = IfExists.FAIL;
for (CreateFlag flag : flags) {
Expand All @@ -396,10 +402,12 @@ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
}
}

return new FSDataOutputStream(new AdlFsOutputStream(adlClient
FSDataOutputStream fsDataOutputStream = new FSDataOutputStream(new AdlFsOutputStream(adlClient
.createFile(toRelativeFilePath(f), overwriteRule,
Integer.toOctalString(applyUMask(permission).toShort()), false),
getConf()), this.statistics);
entryPoint(CREATENONRECURSIVE_LATENCY, System.currentTimeMillis() - start);
return fsDataOutputStream;
}

/**
Expand All @@ -414,10 +422,14 @@ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
Long start = System.currentTimeMillis();
entryPoint(APPEND, 1);
statistics.incrementWriteOps(1);
return new FSDataOutputStream(
FSDataOutputStream fsDataOutputStream = new FSDataOutputStream(
new AdlFsOutputStream(adlClient.getAppendStream(toRelativeFilePath(f)),
getConf()), this.statistics);
entryPoint(APPEND_LATENCY, System.currentTimeMillis() - start);
return fsDataOutputStream;
}

/**
Expand Down Expand Up @@ -455,10 +467,14 @@ public boolean setReplication(final Path p, final short replication)
@Override
public FSDataInputStream open(final Path f, final int buffersize)
throws IOException {
Long start = System.currentTimeMillis();
entryPoint(OPEN, 1);
statistics.incrementReadOps(1);
return new FSDataInputStream(
FSDataInputStream fsDataInputStream = new FSDataInputStream(
new AdlFsInputStream(adlClient.getReadStream(toRelativeFilePath(f)),
statistics, getConf()));
entryPoint(OPEN_LATENCY, System.currentTimeMillis() - start);
return fsDataInputStream;
}

/**
Expand All @@ -471,10 +487,14 @@ public FSDataInputStream open(final Path f, final int buffersize)
*/
@Override
public FileStatus getFileStatus(final Path f) throws IOException {
Long start = System.currentTimeMillis();
entryPoint(GETFILESTATUS, 1);
statistics.incrementReadOps(1);
DirectoryEntry entry =
adlClient.getDirectoryEntry(toRelativeFilePath(f), oidOrUpn);
return toFileStatus(entry, f);
FileStatus fileStatus = toFileStatus(entry, f);
entryPoint(GETFILESTATUS_LATENCY, System.currentTimeMillis() - start);
return fileStatus;
}

/**
Expand All @@ -488,10 +508,19 @@ public FileStatus getFileStatus(final Path f) throws IOException {
*/
@Override
public FileStatus[] listStatus(final Path f) throws IOException {
Long start = System.currentTimeMillis();
entryPoint(LISTSTATUS, 1);
statistics.incrementReadOps(1);
List<DirectoryEntry> entries =
adlClient.enumerateDirectory(toRelativeFilePath(f), oidOrUpn);
return toFileStatuses(entries, f);
FileStatus[] fileStatuses = toFileStatuses(entries, f);
entryPoint(LISTSTATUS_LATENCY, System.currentTimeMillis() - start);

List<Metric> metricList = adlClient.getMetricList();
for (Metric metric : metricList) {
LOG.debug("listStatus - metricName: " + metric.getName() + " with metricTotal of: " + metric.getTotal());
}
return fileStatuses;
}

/**
Expand Down Expand Up @@ -985,4 +1014,14 @@ public void setUserGroupRepresentationAsUPN(boolean enableUPN) {
oidOrUpn = enableUPN ? UserGroupRepresentation.UPN :
UserGroupRepresentation.OID;
}

/**
* Entry point to an operation.
* Increments the statistic.
* @param statistic The operation to increment
* @param value The value to increment
*/
protected void entryPoint(Statistic statistic, long value) {
instrumentation.incrementCounter(statistic, value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package org.apache.hadoop.fs.adl.metrics;

import com.google.common.annotations.VisibleForTesting;
import com.microsoft.azure.datalake.store.ADLStoreClient;
import com.microsoft.azure.datalake.store.Metric;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.net.URI;
import java.util.List;
import java.util.UUID;

/**
* A metrics source for the ADL file system to track all the metrics we care
* about for getting a clear picture of the performance/reliability/interaction
* of the Hadoop cluster with Azure Data Lake Store.
* Derived from S3AMetrics
*/
@Metrics(about="Metrics for ADLS", context="ADLSFileSystem")
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class AdlFileSystemInstrumentation implements Closeable, MetricsSource {
static final Logger LOG = LoggerFactory.getLogger(AdlFileSystemInstrumentation.class);
private static final String METRICS_SOURCE_BASENAME = "AdlMetrics";

/**
* {@value #METRICS_SYSTEM_NAME} The name of the adl-specific metrics
* system instance used for adl metrics.
*/
public static final String METRICS_SYSTEM_NAME = "adl-file-system";

/**
* {@value #METRIC_TAG_BUCKET} The name of a field added to metrics records
* that indicates the hostname portion of the FS URL.
*/
public static final String METRIC_TAG_BUCKET = "bucket";

private String metricsSourceName;

private ADLStoreClient adlStoreClient;

// metricsSystemLock must be used to synchronize modifications to
// metricsSystem and the following counters.
private static Object metricsSystemLock = new Object();
private static MetricsSystem metricsSystem = null;
private static int metricsSourceNameCounter = 0;
private static int metricsSourceActiveCounter = 0;

private final MetricsRegistry registry = new MetricsRegistry("adlFileSystem").setContext("adlFileSystem");

public AdlFileSystemInstrumentation(URI uri, ADLStoreClient adlStoreClient) {
UUID fileSystemInstanceId = UUID.randomUUID();
registry.tag("adlFileSystemId",
"A unique identifier for the file ",
fileSystemInstanceId.toString());
registry.tag(METRIC_TAG_BUCKET, "Hostname from the FS URL", uri.getHost());
registerAsMetricsSource(uri);
this.adlStoreClient = adlStoreClient;
}

@VisibleForTesting
public MetricsSystem getMetricsSystem() {
synchronized (metricsSystemLock) {
if (metricsSystem == null) {
metricsSystem = new MetricsSystemImpl();
metricsSystem.init(METRICS_SYSTEM_NAME);
}
}
return metricsSystem;
}

/**
* Get the metrics registry.
* @return the registry
*/
public MetricsRegistry getRegistry() {
return registry;
}

/**
* Register this instance as a metrics source.
* @param uri
*/
private void registerAsMetricsSource(URI uri) {
int number;
synchronized(metricsSystemLock) {
getMetricsSystem();

metricsSourceActiveCounter++;
number = ++metricsSourceNameCounter;
}
String msName = METRICS_SOURCE_BASENAME + number;
if (number > 1) {
msName = msName + number;
}
metricsSourceName = msName + "-" + uri.getHost();
metricsSystem.register(metricsSourceName, "", this);
}

/**
* Increment a specific counter.
* No-op if not defined.
* @param op operation
* @param count increment value
*/
public void incrementCounter(Statistic op, long count) {
MutableCounterLong counter = lookupCounter(op.getSymbol());
if (counter != null) {
counter.incr(count);
}
LOG.info(op.getSymbol() + " metrics counter incremented!");
}

/**
* Lookup a counter by name. Return null if it is not known.
* @param name counter name
* @return the counter
* @throws IllegalStateException if the metric is not a counter
*/
private MutableCounterLong lookupCounter(String name) {
MutableMetric metric = lookupMetric(name);
if (metric == null) {
return null;
}
if (!(metric instanceof MutableCounterLong)) {
throw new IllegalStateException("Metric " + name
+ " is not a MutableCounterLong: " + metric);
}
LOG.info("lookup counter log for " + name);
return (MutableCounterLong) metric;
}

/**
* Look up a metric from both the registered set and the lighter weight
* stream entries.
* @param name metric name
* @return the metric or null
*/
public MutableMetric lookupMetric(String name) {
MutableMetric metric = getRegistry().get(name);
return metric;
}

public void close() {
synchronized (metricsSystemLock) {
metricsSystem.unregisterSource(metricsSourceName);
int activeSources = --metricsSourceActiveCounter;
if (activeSources == 0) {
metricsSystem.publishMetricsNow();
metricsSystem.shutdown();
metricsSystem = null;
}
}
}

@Override
public void getMetrics(MetricsCollector builder, boolean all) {
dumpAllMetrics();
registry.snapshot(builder.addRecord(registry.info().name()), true);
}

public void dumpAllMetrics() {
List<Metric> metricList = adlStoreClient.getMetricList();
for (Metric metric : metricList) {
registry.newCounter(metric.getName(), metric.getDescription(), metric.getTotal());
LOG.info("metricName: " + metric.getName() + " with metricTotal of: " + metric.getTotal());
}
}
}
Loading