Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cm.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
logger = logging.getLogger('clusterdock.{}'.format(__name__))


class ParcelNotFoundError(Exception):
pass


class ClouderaManagerParcel:
def __init__(self,
cluster,
Expand Down Expand Up @@ -88,6 +92,13 @@ def condition():
for parcel in self.cluster.parcels:
if parcel.product == self.product and parcel.version == self.version:
break
else:
detected_parcels = ', '.join('{}-{}'.format(parcel.product, parcel.version)
for parcel in self.cluster.parcels)
raise ParcelNotFoundError('Could not find parcel (product = {}, version = {}). '
'Detected parcels: {}.'.format(self.product,
self.version,
detected_parcels))
logger.debug('%s parcel is in stage %s ...', self.product, parcel.stage)
return parcel.stage == stage

Expand Down
68 changes: 68 additions & 0 deletions images/build_images
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/usr/bin/env python3
# Copyright 2017 StreamSets Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
import subprocess
from pathlib import Path

logging.basicConfig(level=logging.WARNING)

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

CSD_URL_TEMPLATE = 'http://archives.streamsets.com/datacollector/{0}/csd/STREAMSETS-{0}.jar'
IMAGE_NAME_TEMPLATE = 'streamsets/clusterdock:topology_cdh-streamsets_datacollector-{}'
PARCEL_DIRECTORY_TEMPLATE = '/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR-{}'
PARCEL_URL_TEMPLATE = 'http://archives.streamsets.com/datacollector/{0}/parcel/STREAMSETS_DATACOLLECTOR-{0}-el6.parcel'

def main():
parser = argparse.ArgumentParser(description=('Build Docker images with StreamSets '
'Data Collector parcels for clusterdock'),
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('versions', metavar='ver', nargs='+')
parser.add_argument('-p', '--push', help='Push Docker images after building', action='store_true')
parser.add_argument('--dry-run', help="Don't actually do the `docker build`", action='store_true')
args = parser.parse_args()

image_folder = Path(Path(__file__).parent, 'sdc').resolve()

if args.dry_run:
logger.info('Doing dry-run of tool ...')

for sdc_version in args.versions:
image_name = IMAGE_NAME_TEMPLATE.format(sdc_version)
csd_url = CSD_URL_TEMPLATE.format(sdc_version)
parcel_directory = PARCEL_DIRECTORY_TEMPLATE.format(sdc_version)
parcel_url = PARCEL_URL_TEMPLATE.format(sdc_version)

cmd_elements = ['docker build -t {}'.format(image_name),
'--build-arg CSD_URL={}'.format(csd_url),
'--build-arg PARCEL_DIRECTORY={}'.format(parcel_directory),
'--build-arg PARCEL_URL={}'.format(parcel_url),
str(image_folder)]
cmd = ' '.join(cmd_elements)
logger.debug('Running Docker build command (%s) ...', cmd)
if not args.dry_run:
subprocess.run(cmd, shell=True, check=True)

if args.push:
cmd = 'docker push {}'.format(image_name)
logger.debug('Running Dockerp ush command (%s) ...', cmd)
if not args.dry_run:
subprocess.run(cmd, shell=True, check=True)


if __name__ == '__main__':
main()
38 changes: 38 additions & 0 deletions images/sdc/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM alpine:latest
MAINTAINER Dima Spivak <dima@spivak.ch>

ARG CSD_DIRECTORY=/opt/cloudera/csd
ARG CSD_URL=http://archives.streamsets.com/datacollector/2.7.2.0/csd/STREAMSETS-2.7.2.0.jar

ARG PARCEL_DIRECTORY=/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR-2.7.2.0
ARG PARCEL_URL=http://archives.streamsets.com/datacollector/2.7.2.0/parcel/STREAMSETS_DATACOLLECTOR-2.7.2.0-el6.parcel

RUN apk --no-cache add tar

RUN wget -O /parcel.tgz ${PARCEL_URL} && \
mkdir -p "$(dirname ${PARCEL_DIRECTORY})" && \
tar xf /parcel.tgz -C "$(dirname ${PARCEL_DIRECTORY})" && \
rm /parcel.tgz

# See http://community.cloudera.com/t5/Cloudera-Manager-Installation/Stop-CM-undistributing-my-parcel/m-p/61402.
RUN touch "${PARCEL_DIRECTORY}/.dont_delete"

VOLUME ${PARCEL_DIRECTORY}

RUN mkdir -p "${CSD_DIRECTORY}" && \
wget -O "${CSD_DIRECTORY}/$(basename ${CSD_URL})" "${CSD_URL}"

VOLUME ${CSD_DIRECTORY}

CMD ["/bin/true"]
115 changes: 111 additions & 4 deletions start.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
from clusterdock.models import Cluster, client, Node
from clusterdock.utils import nested_get, wait_for_condition

from .cm import ClouderaManagerDeployment
from topology_cdh import cm

DEFAULT_NAMESPACE = 'cloudera'
DEFAULT_NAMESPACE = 'streamsets'

CM_PORT = 7180
HUE_PORT = 8888
SDC_USER_ID = 20159

CM_AGENT_CONFIG_FILE_PATH = '/etc/cloudera-scm-agent/config.ini'
CM_PRINCIPAL_PASSWORD = 'cldadmin'
Expand All @@ -51,6 +52,9 @@
'1.3.0': '5.11.0',
'1.4.0': '5.12.0'}

SDC_PARCEL_REPO_URL = 'https://archives.streamsets.com/datacollector/{}/parcel/'
SDC_PORT = 18630

logger = logging.getLogger('clusterdock.{}'.format(__name__))


Expand All @@ -75,6 +79,9 @@ def main(args):
ports = [{CM_PORT: CM_PORT} if args.predictable else CM_PORT,
{HUE_PORT: HUE_PORT} if args.predictable else HUE_PORT]

if args.sdc_version:
ports.append({SDC_PORT: SDC_PORT} if args.predictable else SDC_PORT)

primary_node = Node(hostname=args.primary_node[0], group='primary',
image=primary_node_image, ports=ports,
healthcheck=cm_server_healthcheck)
Expand All @@ -84,11 +91,21 @@ def main(args):

if args.java:
java_image = '{}/{}/clusterdock:cdh_{}'.format(args.registry,
args.namespace,
args.namespace or DEFAULT_NAMESPACE,
args.java)
for node in nodes:
node.volumes.append(java_image)

if args.sdc_version:
sdc_parcel_image = ('{}/{}/clusterdock:topology_cdh-'
'streamsets_datacollector-{}').format(args.registry,
args.namespace
or DEFAULT_NAMESPACE,
args.sdc_version)
logger.debug('Adding SDC parcel image %s to CM nodes ...', sdc_parcel_image)
for node in nodes:
node.volumes.append(sdc_parcel_image)

if args.kerberos:
kerberos_config_host_dir = os.path.expanduser(args.kerberos_config_directory)
volumes = [{kerberos_config_host_dir: KERBEROS_CONFIG_CONTAINER_DIR}]
Expand All @@ -105,6 +122,13 @@ def main(args):
# Keep track of whether to suppress DEBUG-level output in commands.
quiet = not args.verbose

# Create 'sdc' user with requested UID
logger.info('Creating sdc user with uid {}'.format(SDC_USER_ID))
for node in nodes:
node.execute('userdel -r sdc', quiet=quiet)
node.execute('groupadd -g {} sdc'.format(SDC_USER_ID), quiet=quiet)
node.execute('adduser -u {} -g sdc sdc'.format(SDC_USER_ID), quiet=quiet)

if args.kerberos:
cluster.kdc_node = kdc_node
_configure_kdc(cluster, args.kerberos_principals, quiet=quiet)
Expand Down Expand Up @@ -158,7 +182,7 @@ def cm_server_not_dead(primary_node):
logger.info('Cloudera Manager server is now reachable at %s', server_url)

# The work we need to do through CM itself begins here...
deployment = ClouderaManagerDeployment(server_url)
deployment = cm.ClouderaManagerDeployment(server_url)
cm_cluster = deployment.cluster(DEFAULT_CLUSTER_NAME)
cdh_parcel = next(parcel for parcel in cm_cluster.parcels
if parcel.product == 'CDH' and parcel.stage in ('ACTIVATING',
Expand Down Expand Up @@ -211,6 +235,33 @@ def cm_server_not_dead(primary_node):
logger.info('Updating CM server configurations ...')
deployment.update_cm_config(configs={'manages_parcels': True})

if args.sdc_version:
product = 'STREAMSETS_DATACOLLECTOR'
sdc_parcel = cm_cluster.parcel(product=product, version=args.sdc_version)

# After we set CM's "Manage Parcels" config to False, the SDC parcel becomes
# undistributed. After this, we may need to add the SDC parcel repo URL in order
# to be able to re-activate it.
try:
sdc_parcel.wait_for_stage('AVAILABLE_REMOTELY')
except cm.ParcelNotFoundError:
for config in deployment.get_cm_config():
if config['name'] == 'REMOTE_PARCEL_REPO_URLS':
break
else:
raise Exception('Failed to find remote parcel repo URLs configuration.')
parcel_repo_urls = config['value']

sdc_parcel_repo_url = SDC_PARCEL_REPO_URL.format(args.sdc_version)
logger.debug('Adding SDC parcel repo URL (%s) ...', sdc_parcel_repo_url)
remote_parcel_repo_urls = '{},{}'.format(parcel_repo_urls, sdc_parcel_repo_url)
deployment.update_cm_config({'REMOTE_PARCEL_REPO_URLS': remote_parcel_repo_urls})

logger.debug('Refreshing parcel repos ...')
deployment.refresh_parcel_repos()

sdc_parcel.download().distribute().activate()

if args.include_services:
if args.exclude_services:
raise ValueError('Cannot pass both --include-services and --exclude-services.')
Expand Down Expand Up @@ -276,6 +327,7 @@ def cm_server_not_dead(primary_node):
logger.info('Configure Cloudera Manager for Kerberos ...')
_configure_cm_for_kerberos(deployment, cluster)

_configure_for_streamsets_before_start(deployment, cluster_name=DEFAULT_CLUSTER_NAME)
logger.info('Deploying client config ...')
cm_cluster.deploy_client_config()

Expand All @@ -288,6 +340,9 @@ def cm_server_not_dead(primary_node):
logger.info('Validating service health ...')
_validate_service_health(deployment=deployment, cluster_name=DEFAULT_CLUSTER_NAME)

_configure_for_streamsets_after_start(deployment=deployment,
cluster_name=DEFAULT_CLUSTER_NAME,
cluster=cluster, quiet=not args.verbose)

def _configure_kdc(cluster, kerberos_principals, quiet):
kdc_node = cluster.kdc_node
Expand Down Expand Up @@ -595,6 +650,17 @@ def _configure_kudu(deployment, cluster, kudu_version):
configs)


def _configure_sdc(deployment, cluster, sdc_version):
logger.info('Adding StreamSets service to cluster (%s) ...', DEFAULT_CLUSTER_NAME)
datacollector_role = {'type': 'DATACOLLECTOR',
'hostRef': {'hostId': cluster.primary_node.host_id}}
deployment.create_cluster_services(cluster_name=DEFAULT_CLUSTER_NAME,
services=[{'name': 'streamsets',
'type': 'STREAMSETS',
'displayName': 'StreamSets',
'roles': [datacollector_role]}])


def _set_cm_server_java_home(node, java_home):
command = 'echo "export JAVA_HOME={}" >> {}'.format(java_home, CM_SERVER_ETC_DEFAULT)
logger.info('Setting JAVA_HOME to %s in %s ...',
Expand Down Expand Up @@ -784,3 +850,44 @@ def failure(timeout):
wait_for_condition(condition=condition, condition_args=[deployment, cluster_name],
time_between_checks=3, timeout=600, time_to_success=30,
success=success, failure=failure)



def _configure_for_streamsets_before_start(deployment, cluster_name):
logger.info('Adding HDFS proxy user ...')
for service in deployment.get_cluster_services(cluster_name=cluster_name):
if service['type'] == 'HDFS':
configs = {'core_site_safety_valve': ('<property>'
'<name>hadoop.proxyuser.sdc.hosts</name>'
'<value>*</value>'
'</property>'
'<property>'
'<name>hadoop.proxyuser.sdc.users</name>'
'<value>*</value>'
'</property>')}
deployment.update_service_config(cluster_name=cluster_name,
service_name=service['name'],
configs=configs)
break


def _configure_for_streamsets_after_start(deployment, cluster_name, cluster, quiet):
cluster_service_types = {service['type']
for service
in deployment.get_cluster_services(cluster_name=DEFAULT_CLUSTER_NAME)}

if 'SOLR' in cluster_service_types:
SOLR_CONFIG_FILE_PATH = '/root/sample_collection_solr_configs/conf/solrconfig.xml'

logger.info('Creating sample schemaless collection for Solr ...')
cluster.primary_node.execute('solrctl instancedir --generate '
'/root/sample_collection_solr_configs -schemaless', quiet=quiet)
solr_config = cluster.primary_node.get_file(SOLR_CONFIG_FILE_PATH)
cluster.primary_node.put_file(SOLR_CONFIG_FILE_PATH,
re.sub(r'<!--<(str name="df")>text<(/str)>-->',
r'<\1>id<\2>',
solr_config))
cluster.primary_node.execute('solrctl instancedir --create sample_collection '
'/root/sample_collection_solr_configs', quiet=quiet)
cluster.primary_node.execute('solrctl collection --create sample_collection '
'-s 1 -c sample_collection', quiet=quiet)
4 changes: 4 additions & 0 deletions topology.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ start args:
help: If specified, a comma-separated list of service types to include in the CDH cluster
metavar: svc1,svc2,...
--java:
default: jdk1.8.0_131
help: If specified, a Java version to use in place of the default value present on the cluster
metavar: ver
--kafka-version:
Expand All @@ -70,3 +71,6 @@ start args:
--predictable:
action: store_true
help: If specified, attempt to expose container ports to the same port number on the host
--sdc-version:
help: If specified, the version of StreamSets Data Collector to install
metavar: ver