Skip to content

Commit

Permalink
AMBARI-21722 - Begin Using Service Versions In Python stack_feature C…
Browse files Browse the repository at this point in the history
…ode (jonathanhurley)
  • Loading branch information
Jonathan Hurley committed Aug 16, 2017
1 parent 12c0588 commit 330a61c
Show file tree
Hide file tree
Showing 73 changed files with 1,578 additions and 1,013 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env python
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from resource_management.libraries.script.script import Script

def get_component_repository_version(service_name, component_name = None):
"""
Gets the version associated with the specified component from the structure in the command.
Every command should contain a mapping of service/component to the desired repository it's set
to.
:service_name: the name of the service
:component_name: the name of the component
"""
versions = _get_component_repositories()
if versions is None:
return None

if service_name not in versions:
return None

component_versions = versions[service_name]
if len(component_versions) == 0:
return None

if component_name is None:
for component in component_versions:
return component_versions[component]

if not component_name in component_versions:
return None

return component_versions[component_name]


def _get_component_repositories():
"""
Gets an initialized dictionary from the value in componentVersionMap. This structure is
sent on every command by Ambari and should contain each service & component's desired repository.
:return:
"""
config = Script.get_config()
if "componentVersionMap" not in config or config["componentVersionMap"] is "":
return None

return config["componentVersionMap"]
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class StackFeature:
SPARK_LIVY2 = "spark_livy2"
STORM_KERBEROS = "storm_kerberos"
STORM_AMS = "storm_ams"
CREATE_KAFKA_BROKER_ID = "create_kafka_broker_id"
KAFKA_LISTENERS = "kafka_listeners"
KAFKA_KERBEROS = "kafka_kerberos"
PIG_ON_TEZ = "pig_on_tez"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
__all__ = ["copy_to_hdfs", "get_sysprep_skip_copy_tarballs_hdfs"]

import os
import uuid
import tempfile
import re

Expand All @@ -30,7 +29,7 @@
from resource_management.libraries.functions.default import default
from resource_management.core import shell
from resource_management.core.logger import Logger
from resource_management.libraries.functions import stack_tools
from resource_management.libraries.functions import stack_tools, stack_features, stack_select

STACK_NAME_PATTERN = "{{ stack_name }}"
STACK_ROOT_PATTERN = "{{ stack_root }}"
Expand Down Expand Up @@ -141,32 +140,23 @@ def get_current_version(use_upgrading_version_during_upgrade=True):
:param use_upgrading_version_during_upgrade: True, except when the RU/EU hasn't started yet.
:return: Version, or False if an error occurred.
"""
upgrade_direction = default("/commandParams/upgrade_direction", None)
is_stack_upgrade = upgrade_direction is not None
current_version = default("/hostLevelParams/current_version", None)
Logger.info("Default version is {0}".format(current_version))
if is_stack_upgrade:
if use_upgrading_version_during_upgrade:
# This is the version going to. In the case of a downgrade, it is the lower version.
current_version = default("/commandParams/version", None)
Logger.info("Because this is a Stack Upgrade, will use version {0}".format(current_version))
else:
Logger.info("This is a Stack Upgrade, but keep the version unchanged.")
else:
if current_version is None:
# During normal operation, the first installation of services won't yet know about the version, so must rely
# on <stack-selector> to get it.
stack_version = _get_single_version_from_stack_select()
if stack_version:
Logger.info("Will use stack version {0}".format(stack_version))
current_version = stack_version
# get the version for this command
version = stack_features.get_stack_feature_version(Script.get_config())

# if there is no upgrade, then use the command's version
if not Script.in_stack_upgrade() or use_upgrading_version_during_upgrade:
Logger.info("Tarball version was calcuated as {0}. Use Command Version: {1}".format(
version, use_upgrading_version_during_upgrade))

return version

# we're in an upgrade and we need to use an older version
current_version = stack_select.get_role_component_current_stack_version()
if current_version is None:
message_suffix = "during stack %s" % str(upgrade_direction) if is_stack_upgrade else ""
Logger.warning("Cannot copy tarball because unable to determine current version {0}.".format(message_suffix))
Logger.warning("Unable to determine the current version of the component for this command; unable to copy the tarball")
return False

return current_version
return current_version;


def _get_single_version_from_stack_select():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,6 @@ def get_stack_feature_version(config):
command_version = default("/commandParams/version", None)
command_stack = default("/commandParams/target_stack", None)

# something like 2.4.0.0-1234
# (or None if this is a cluster install and it hasn't been calculated yet)
current_cluster_version = default("/hostLevelParams/current_version", None)

# UPGRADE or DOWNGRADE (or None)
upgrade_direction = default("/commandParams/upgrade_direction", None)

Expand All @@ -123,8 +119,8 @@ def get_stack_feature_version(config):
# if this is not an upgrade, then we take the simple path
if upgrade_direction is None:
Logger.info(
"Stack Feature Version Info: Cluster Stack={0}, Cluster Current Version={1}, Command Stack={2}, Command Version={3} -> {4}".format(
stack_version, current_cluster_version, command_stack, command_version, version_for_stack_feature_checks))
"Stack Feature Version Info: Cluster Stack={0}, Command Stack={1}, Command Version={2} -> {3}".format(
stack_version, command_stack, command_version, version_for_stack_feature_checks))

return version_for_stack_feature_checks

Expand All @@ -133,33 +129,24 @@ def get_stack_feature_version(config):
is_stop_command = _is_stop_command(config)
if not is_stop_command:
Logger.info(
"Stack Feature Version Info: Cluster Stack={0}, Cluster Current Version={1}, Command Stack={2}, Command Version={3}, Upgrade Direction={4} -> {5}".format(
stack_version, current_cluster_version, command_stack, command_version, upgrade_direction,
"Stack Feature Version Info: Cluster Stack={0}, Command Stack={1}, Command Version={2}, Upgrade Direction={3} -> {4}".format(
stack_version, command_stack, command_version, upgrade_direction,
version_for_stack_feature_checks))

return version_for_stack_feature_checks

# something like 2.5.0.0-5678 (or None)
downgrade_from_version = default("/commandParams/downgrade_from_version", None)

is_downgrade = upgrade_direction.lower() == Direction.DOWNGRADE.lower()
# guaranteed to have a STOP command now during an UPGRADE/DOWNGRADE, check direction
if upgrade_direction.lower() == Direction.DOWNGRADE.lower():
if downgrade_from_version is None:
Logger.warning(
"Unable to determine the version being downgraded when stopping services, using {0}".format(
version_for_stack_feature_checks))
else:
version_for_stack_feature_checks = downgrade_from_version
if is_downgrade:
from resource_management.libraries.functions import upgrade_summary
version_for_stack_feature_checks = upgrade_summary.get_source_version(default_version = version_for_stack_feature_checks)
else:
# UPGRADE
if current_cluster_version is not None:
version_for_stack_feature_checks = current_cluster_version
else:
version_for_stack_feature_checks = command_version if command_version is not None else stack_version

Logger.info(
"Stack Feature Version Info: Cluster Stack={0}, Cluster Current Version={1}, Command Stack={2}, Command Version={3}, Upgrade Direction={4}, stop_command={5} -> {6}".format(
stack_version, current_cluster_version, command_stack, command_version, upgrade_direction,
"Stack Feature Version Info: Cluster Stack={0}, Command Stack={1}, Command Version={2}, Upgrade Direction={3}, stop_command={4} -> {5}".format(
stack_version, command_stack, command_version, upgrade_direction,
is_stop_command, version_for_stack_feature_checks))

return version_for_stack_feature_checks
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#!/usr/bin/env python
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from collections import namedtuple
from resource_management.libraries.script.script import Script
from resource_management.libraries.functions.constants import Direction

UpgradeSummary = namedtuple("UpgradeSummary", "type direction orchestration is_revert services")
UpgradeServiceSummary = namedtuple("UpgradeServiceSummary", "service_name source_stack source_version target_stack target_version")

def get_source_version(service_name = None, default_version=None):
"""
Gets the source (from) version of a service participating in an upgrade. If there is no
upgrade or the specific service is not participating, this will return None.
:param service_name: the service name to check for, or None to extract it from the command
:param default_version: if the version of the service can't be calculated, this optional
default value is returned
:return: the version that the service is upgrading from or None if there is no upgrade or
the service is not included in the upgrade.
"""
service_summary = _get_service_summary(service_name)
if service_summary is None:
return default_version

return service_summary.source_version


def get_target_version(service_name = None, default_version=None):
"""
Gets the target (to) version of a service participating in an upgrade. If there is no
upgrade or the specific service is not participating, this will return None.
:param service_name: the service name to check for, or None to extract it from the command
:param default_version: if the version of the service can't be calculated, this optional
default value is returned
:return: the version that the service is upgrading to or None if there is no upgrade or
the service is not included in the upgrade.
"""
service_summary = _get_service_summary(service_name)
if service_summary is None:
return default_version

return service_summary.target_version



def get_upgrade_summary():
"""
Gets a summary of an upgrade in progress, including type, direction, orchestration and from/to
repository versions.
"""
config = Script.get_config()
if "upgradeSummary" not in config or not config["upgradeSummary"]:
return None

upgrade_summary = config["upgradeSummary"]
service_summary_dict = {}

service_summary = upgrade_summary["services"]
for service_name, service_summary_json in service_summary.iteritems():
service_summary = UpgradeServiceSummary(service_name = service_name,
source_stack = service_summary_json["sourceStackId"],
source_version = service_summary_json["sourceVersion"],
target_stack = service_summary_json["targetStackId"],
target_version = service_summary_json["targetVersion"])

service_summary_dict[service_name] = service_summary

return UpgradeSummary(type=upgrade_summary["type"], direction=upgrade_summary["direction"],
orchestration=upgrade_summary["orchestration"], is_revert = upgrade_summary["isRevert"],
services = service_summary_dict)


def get_downgrade_from_version(service_name = None):
"""
Gets the downgrade-from-version for the specificed service. If there is no downgrade or
the service isn't participating in the downgrade, then this will return None
:param service_name: the service, or optionally onmitted to infer it from the command.
:return: the downgrade-from-version or None
"""
upgrade_summary = get_upgrade_summary()
if upgrade_summary is None:
return None

if Direction.DOWNGRADE.lower() != upgrade_summary.direction.lower():
return None

service_summary = _get_service_summary(service_name)
if service_summary is None:
return None

return service_summary.source_version


def _get_service_summary(service_name):
"""
Gets the service summary for the upgrade/downgrade for the given service, or None if
the service isn't participating.
:param service_name: the service name
:return: the service summary or None
"""
upgrade_summary = get_upgrade_summary()
if upgrade_summary is None:
return None

if service_name is None:
config = Script.get_config()
service_name = config['serviceName']

service_summary = upgrade_summary.services
if service_name not in service_summary:
return None

return service_summary[service_name]
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@

import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.ClusterNotFoundException;
import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.ServiceNotFoundException;
import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
import org.apache.ambari.server.agent.ExecutionCommand;
import org.apache.ambari.server.agent.ExecutionCommand.KeyNames;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
import org.apache.ambari.server.orm.entities.UpgradeEntity;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.ConfigHelper;
Expand All @@ -42,6 +44,9 @@
import org.apache.ambari.server.state.ServiceInfo;
import org.apache.ambari.server.state.StackId;
import org.apache.ambari.server.state.StackInfo;
import org.apache.ambari.server.state.UpgradeContext;
import org.apache.ambari.server.state.UpgradeContext.UpgradeSummary;
import org.apache.ambari.server.state.UpgradeContextFactory;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -70,6 +75,9 @@ public class ExecutionCommandWrapper {
@Inject
private Gson gson;

@Inject
private UpgradeContextFactory upgradeContextFactory;

/**
* Used for injecting hooks and common-services into the command.
*/
Expand Down Expand Up @@ -223,8 +231,12 @@ public ExecutionCommand getExecutionCommand() {
Map<String, String> commandParams = executionCommand.getCommandParams();

if (null != repositoryVersion) {
commandParams.put(KeyNames.VERSION, repositoryVersion.getVersion());
executionCommand.getHostLevelParams().put(KeyNames.CURRENT_VERSION, repositoryVersion.getVersion());
// only set the version if it's not set and this is NOT an install
// command
if (!commandParams.containsKey(KeyNames.VERSION)
&& executionCommand.getRoleCommand() != RoleCommand.INSTALL) {
commandParams.put(KeyNames.VERSION, repositoryVersion.getVersion());
}

StackId stackId = repositoryVersion.getStackId();
StackInfo stackInfo = ambariMetaInfo.getStack(stackId.getStackName(),
Expand Down Expand Up @@ -255,6 +267,15 @@ public ExecutionCommand getExecutionCommand() {
// we are "loading-late": components that have not yet upgraded in an EU will have the correct versions.
executionCommand.setComponentVersions(cluster);

// provide some basic information about a cluster upgrade if there is one
// in progress
UpgradeEntity upgrade = cluster.getUpgradeInProgress();
if (null != upgrade) {
UpgradeContext upgradeContext = upgradeContextFactory.create(cluster, upgrade);
UpgradeSummary upgradeSummary = upgradeContext.getUpgradeSummary();
executionCommand.setUpgradeSummary(upgradeSummary);
}

} catch (ClusterNotFoundException cnfe) {
// it's possible that there are commands without clusters; in such cases,
// just return the de-serialized command and don't try to read configs
Expand Down
Loading

0 comments on commit 330a61c

Please sign in to comment.