Skip to content

YARN-11153. Make proxy server support YARN federation. #4314

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.sun.jersey.spi.container.servlet.ServletContainer;

import org.apache.hadoop.yarn.metrics.GenericEventTypeMetrics;
import org.apache.hadoop.yarn.server.webproxy.DefaultAppReportFetcher;
import org.apache.hadoop.yarn.webapp.WebAppException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -1426,9 +1427,9 @@ protected void startWepApp() {
if(WebAppUtils.getResolvedRMWebAppURLWithoutScheme(conf).
equals(proxyHostAndPort)) {
if (HAUtil.isHAEnabled(conf)) {
fetcher = new AppReportFetcher(conf);
fetcher = new DefaultAppReportFetcher(conf);
} else {
fetcher = new AppReportFetcher(conf, getClientRMService());
fetcher = new DefaultAppReportFetcher(conf, getClientRMService());
}
builder.withServlet(ProxyUriUtils.PROXY_SERVLET_NAME,
ProxyUriUtils.PROXY_PATH_SPEC, WebAppProxyServlet.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.hadoop.yarn.server.webproxy;

import java.io.IOException;
import java.net.InetSocketAddress;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
Expand All @@ -27,81 +30,59 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.AHSProxy;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.StringHelper;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;

/**
* This class abstracts away how ApplicationReports are fetched.
*/
public class AppReportFetcher {
enum AppReportSource { RM, AHS }
public abstract class AppReportFetcher {

protected enum AppReportSource {RM, AHS}

private final Configuration conf;
private final ApplicationClientProtocol applicationsManager;
private final ApplicationHistoryProtocol historyManager;
private ApplicationHistoryProtocol historyManager;
private String ahsAppPageUrlBase;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private boolean isAHSEnabled;

/**
* Create a new Connection to the RM/Application History Server
* to fetch Application reports.
* Create a new Connection to the RM/Application History Server to fetch Application reports.
*
* @param conf the conf to use to know where the RM is.
*/
public AppReportFetcher(Configuration conf) {
this.conf = conf;
if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
isAHSEnabled = true;
this.isAHSEnabled = true;
String scheme = WebAppUtils.getHttpSchemePrefix(conf);
String historyUrl = WebAppUtils.getAHSWebAppURLWithoutScheme(conf);
this.ahsAppPageUrlBase = StringHelper.pjoin(scheme + historyUrl, "applicationhistory", "app");
}
this.conf = conf;
try {
applicationsManager = ClientRMProxy.createRMProxy(conf,
ApplicationClientProtocol.class);
if (isAHSEnabled) {
historyManager = getAHSProxy(conf);
if (this.isAHSEnabled) {
this.historyManager = getAHSProxy(conf);
} else {
this.historyManager = null;
}
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
}

/**
* Create a direct connection to RM instead of a remote connection when
* the proxy is running as part of the RM. Also create a remote connection to
* Application History Server if it is enabled.
* @param conf the configuration to use
* @param applicationsManager what to use to get the RM reports.
*/
public AppReportFetcher(Configuration conf, ApplicationClientProtocol applicationsManager) {
if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
isAHSEnabled = true;
}
this.conf = conf;
this.applicationsManager = applicationsManager;
if (isAHSEnabled) {
try {
historyManager = getAHSProxy(conf);
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
} else {
this.historyManager = null;
}
}

protected ApplicationHistoryProtocol getAHSProxy(Configuration configuration)
throws IOException {
return AHSProxy.createAHSProxy(configuration,
ApplicationHistoryProtocol.class,
configuration.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
InetSocketAddress addr = configuration.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT));
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT);
return AHSProxy.createAHSProxy(configuration, ApplicationHistoryProtocol.class, addr);
}

/**
Expand All @@ -112,51 +93,73 @@ protected ApplicationHistoryProtocol getAHSProxy(Configuration configuration)
* @throws YarnException on any error.
* @throws IOException
*/
public FetchedAppReport getApplicationReport(ApplicationId appId)
throws YarnException, IOException {
GetApplicationReportRequest request = recordFactory
.newRecordInstance(GetApplicationReportRequest.class);
public abstract FetchedAppReport getApplicationReport(ApplicationId appId)
throws YarnException, IOException;

/**
* Get an application report for the specified application id from the RM and
* fall back to the Application History Server if not found in RM.
*
* @param applicationsManager what to use to get the RM reports.
* @param appId id of the application to get.
* @return the ApplicationReport for the appId.
* @throws YarnException on any error.
* @throws IOException connection exception.
*/
protected FetchedAppReport getApplicationReport(ApplicationClientProtocol applicationsManager,
ApplicationId appId) throws YarnException, IOException {
GetApplicationReportRequest request =
this.recordFactory.newRecordInstance(GetApplicationReportRequest.class);
request.setApplicationId(appId);

ApplicationReport appReport;
FetchedAppReport fetchedAppReport;
try {
appReport = applicationsManager.
getApplicationReport(request).getApplicationReport();
appReport = applicationsManager.getApplicationReport(request).getApplicationReport();
fetchedAppReport = new FetchedAppReport(appReport, AppReportSource.RM);
} catch (ApplicationNotFoundException e) {
if (!isAHSEnabled) {
// Just throw it as usual if historyService is not enabled.
throw e;
}
//Fetch the application report from AHS
appReport = historyManager.
getApplicationReport(request).getApplicationReport();
appReport = historyManager.getApplicationReport(request).getApplicationReport();
fetchedAppReport = new FetchedAppReport(appReport, AppReportSource.AHS);
}
return fetchedAppReport;
}

public abstract String getRmAppPageUrlBase(ApplicationId appId) throws IOException, YarnException;

public String getAhsAppPageUrlBase() {
return this.ahsAppPageUrlBase;
}

protected Configuration getConf() {
return this.conf;
}

public void stop() {
if (this.applicationsManager != null) {
RPC.stopProxy(this.applicationsManager);
}
if (this.historyManager != null) {
RPC.stopProxy(this.historyManager);
}
}

@VisibleForTesting
public void setHistoryManager(ApplicationHistoryProtocol historyManager) {
this.historyManager = historyManager;
}

/*
* This class creates a bundle of the application report and the source from
* where the report was fetched. This allows the WebAppProxyServlet
* to make decisions for the application report based on the source.
*/
static class FetchedAppReport {
protected static class FetchedAppReport {
private ApplicationReport appReport;
private AppReportSource appReportSource;

public FetchedAppReport(ApplicationReport appReport,
AppReportSource appReportSource) {
public FetchedAppReport(ApplicationReport appReport, AppReportSource appReportSource) {
this.appReport = appReport;
this.appReportSource = appReportSource;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.hadoop.yarn.server.webproxy;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.StringHelper;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;

public class DefaultAppReportFetcher extends AppReportFetcher {

private final ApplicationClientProtocol applicationsManager;
private String rmAppPageUrlBase;

/**
* Create a new Connection to the RM/Application History Server
* to fetch Application reports.
*
* @param conf the conf to use to know where the RM is.
*/
public DefaultAppReportFetcher(Configuration conf) {
super(conf);
this.rmAppPageUrlBase =
StringHelper.pjoin(WebAppUtils.getResolvedRMWebAppURLWithScheme(conf), "cluster", "app");
try {
this.applicationsManager = ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class);
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
}

/**
* Create a direct connection to RM instead of a remote connection when
* the proxy is running as part of the RM. Also create a remote connection to
* Application History Server if it is enabled.
*
* @param conf the configuration to use
* @param applicationsManager what to use to get the RM reports.
*/
public DefaultAppReportFetcher(Configuration conf,
ApplicationClientProtocol applicationsManager) {
super(conf);
this.rmAppPageUrlBase =
StringHelper.pjoin(WebAppUtils.getResolvedRMWebAppURLWithScheme(conf), "cluster", "app");
this.applicationsManager = applicationsManager;
}

/**
* Get an application report for the specified application id from the RM and
* fall back to the Application History Server if not found in RM.
*
* @param appId id of the application to get.
* @return the ApplicationReport for the appId.
* @throws YarnException on any error.
* @throws IOException connection exception.
*/
@Override
public FetchedAppReport getApplicationReport(ApplicationId appId)
throws YarnException, IOException {
return super.getApplicationReport(applicationsManager, appId);
}

public String getRmAppPageUrlBase(ApplicationId appId) throws YarnException, IOException {
return this.rmAppPageUrlBase;
}

public void stop() {
super.stop();
if (this.applicationsManager != null) {
RPC.stopProxy(this.applicationsManager);
}
}
}
Loading