Skip to content

Commit

Permalink
Merge pull request ansible#12744 from ansible/feature-mesh-scaling
Browse files Browse the repository at this point in the history
[feature] Ability to add execution nodes at runtime
  • Loading branch information
TheRealHaoLiu authored Sep 26, 2022
2 parents a66b27e + 2dcb127 commit 9c2185c
Show file tree
Hide file tree
Showing 79 changed files with 3,377 additions and 676 deletions.
2 changes: 2 additions & 0 deletions .yamllint
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ ignore: |
awx/ui/test/e2e/tests/smoke-vars.yml
awx/ui/node_modules
tools/docker-compose/_sources
# django template files
awx/api/templates/instance_install_bundle/**

extends: default

Expand Down
2 changes: 1 addition & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ recursive-include awx *.po
recursive-include awx *.mo
recursive-include awx/static *
recursive-include awx/templates *.html
recursive-include awx/api/templates *.md *.html
recursive-include awx/api/templates *.md *.html *.yml
recursive-include awx/ui/build *.html
recursive-include awx/ui/build *
recursive-include awx/playbooks *.yml
Expand Down
130 changes: 91 additions & 39 deletions awx/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4859,7 +4859,7 @@ def validate(self, attrs):
class InstanceLinkSerializer(BaseSerializer):
class Meta:
model = InstanceLink
fields = ('source', 'target')
fields = ('source', 'target', 'link_state')

source = serializers.SlugRelatedField(slug_field="hostname", read_only=True)
target = serializers.SlugRelatedField(slug_field="hostname", read_only=True)
Expand All @@ -4868,63 +4868,74 @@ class Meta:
class InstanceNodeSerializer(BaseSerializer):
class Meta:
model = Instance
fields = ('id', 'hostname', 'node_type', 'node_state')

node_state = serializers.SerializerMethodField()

def get_node_state(self, obj):
if not obj.enabled:
return "disabled"
return "error" if obj.errors else "healthy"
fields = ('id', 'hostname', 'node_type', 'node_state', 'enabled')


class InstanceSerializer(BaseSerializer):
show_capabilities = ['edit']

consumed_capacity = serializers.SerializerMethodField()
percent_capacity_remaining = serializers.SerializerMethodField()
jobs_running = serializers.IntegerField(help_text=_('Count of jobs in the running or waiting state that ' 'are targeted for this instance'), read_only=True)
jobs_running = serializers.IntegerField(help_text=_('Count of jobs in the running or waiting state that are targeted for this instance'), read_only=True)
jobs_total = serializers.IntegerField(help_text=_('Count of all jobs that target this instance'), read_only=True)

class Meta:
model = Instance
read_only_fields = ('uuid', 'hostname', 'version', 'node_type')
read_only_fields = ('ip_address', 'uuid', 'version')
fields = (
"id",
"type",
"url",
"related",
"uuid",
"hostname",
"created",
"modified",
"last_seen",
"last_health_check",
"errors",
'id',
'hostname',
'type',
'url',
'related',
'summary_fields',
'uuid',
'created',
'modified',
'last_seen',
'last_health_check',
'errors',
'capacity_adjustment',
"version",
"capacity",
"consumed_capacity",
"percent_capacity_remaining",
"jobs_running",
"jobs_total",
"cpu",
"memory",
"cpu_capacity",
"mem_capacity",
"enabled",
"managed_by_policy",
"node_type",
'version',
'capacity',
'consumed_capacity',
'percent_capacity_remaining',
'jobs_running',
'jobs_total',
'cpu',
'memory',
'cpu_capacity',
'mem_capacity',
'enabled',
'managed_by_policy',
'node_type',
'node_state',
'ip_address',
'listener_port',
)
extra_kwargs = {'node_type': {'initial': 'execution'}, 'node_state': {'initial': 'installed'}}

def get_related(self, obj):
res = super(InstanceSerializer, self).get_related(obj)
res['jobs'] = self.reverse('api:instance_unified_jobs_list', kwargs={'pk': obj.pk})
res['instance_groups'] = self.reverse('api:instance_instance_groups_list', kwargs={'pk': obj.pk})
if settings.IS_K8S and obj.node_type in (Instance.Types.EXECUTION,):
res['install_bundle'] = self.reverse('api:instance_install_bundle', kwargs={'pk': obj.pk})
res['peers'] = self.reverse('api:instance_peers_list', kwargs={"pk": obj.pk})
if self.context['request'].user.is_superuser or self.context['request'].user.is_system_auditor:
if obj.node_type != 'hop':
res['health_check'] = self.reverse('api:instance_health_check', kwargs={'pk': obj.pk})
return res

def get_summary_fields(self, obj):
summary = super().get_summary_fields(obj)

# use this handle to distinguish between a listView and a detailView
if self.is_detail_view:
summary['links'] = InstanceLinkSerializer(InstanceLink.objects.select_related('target', 'source').filter(source=obj), many=True).data

return summary

def get_consumed_capacity(self, obj):
return obj.consumed_capacity

Expand All @@ -4934,10 +4945,51 @@ def get_percent_capacity_remaining(self, obj):
else:
return float("{0:.2f}".format(((float(obj.capacity) - float(obj.consumed_capacity)) / (float(obj.capacity))) * 100))

def validate(self, attrs):
if self.instance.node_type == 'hop':
raise serializers.ValidationError(_('Hop node instances may not be changed.'))
return attrs
def validate(self, data):
if self.instance:
if self.instance.node_type == Instance.Types.HOP:
raise serializers.ValidationError("Hop node instances may not be changed.")
else:
if not settings.IS_K8S:
raise serializers.ValidationError("Can only create instances on Kubernetes or OpenShift.")
return data

def validate_node_type(self, value):
if not self.instance:
if value not in (Instance.Types.EXECUTION,):
raise serializers.ValidationError("Can only create execution nodes.")
else:
if self.instance.node_type != value:
raise serializers.ValidationError("Cannot change node type.")

return value

def validate_node_state(self, value):
if self.instance:
if value != self.instance.node_state:
if not settings.IS_K8S:
raise serializers.ValidationError("Can only change the state on Kubernetes or OpenShift.")
if value != Instance.States.DEPROVISIONING:
raise serializers.ValidationError("Can only change instances to the 'deprovisioning' state.")
if self.instance.node_type not in (Instance.Types.EXECUTION,):
raise serializers.ValidationError("Can only deprovision execution nodes.")
else:
if value and value != Instance.States.INSTALLED:
raise serializers.ValidationError("Can only create instances in the 'installed' state.")

return value

def validate_hostname(self, value):
if self.instance and self.instance.hostname != value:
raise serializers.ValidationError("Cannot change hostname.")

return value

def validate_listener_port(self, value):
if self.instance and self.instance.listener_port != value:
raise serializers.ValidationError("Cannot change listener port.")

return value


class InstanceHealthCheckSerializer(BaseSerializer):
Expand Down
18 changes: 18 additions & 0 deletions awx/api/templates/instance_install_bundle/install_receptor.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{% verbatim %}
---
- hosts: all
become: yes
tasks:
- name: Create the receptor user
user:
name: "{{ receptor_user }}"
shell: /bin/bash
- name: Enable Copr repo for Receptor
command: dnf copr enable ansible-awx/receptor -y
- import_role:
name: ansible.receptor.setup
- name: Install ansible-runner
pip:
name: ansible-runner
executable: pip3.9
{% endverbatim %}
28 changes: 28 additions & 0 deletions awx/api/templates/instance_install_bundle/inventory.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
all:
hosts:
remote-execution:
ansible_host: {{ instance.hostname }}
ansible_user: <username> # user provided
ansible_ssh_private_key_file: ~/.ssh/id_rsa
receptor_verify: true
receptor_tls: true
receptor_work_commands:
ansible-runner:
command: ansible-runner
params: worker
allowruntimeparams: true
verifysignature: true
custom_worksign_public_keyfile: receptor/work-public-key.pem
custom_tls_certfile: receptor/tls/receptor.crt
custom_tls_keyfile: receptor/tls/receptor.key
custom_ca_certfile: receptor/tls/ca/receptor-ca.crt
receptor_user: awx
receptor_group: awx
receptor_protocol: 'tcp'
receptor_listener: true
receptor_port: {{ instance.listener_port }}
receptor_dependencies:
- podman
- crun
- python39-pip
6 changes: 6 additions & 0 deletions awx/api/templates/instance_install_bundle/requirements.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
collections:
- name: ansible.receptor
source: https://github.com/ansible/receptor-collection/
type: git
version: 0.1.1
12 changes: 11 additions & 1 deletion awx/api/urls/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@

from django.urls import re_path

from awx.api.views import InstanceList, InstanceDetail, InstanceUnifiedJobsList, InstanceInstanceGroupsList, InstanceHealthCheck
from awx.api.views import (
InstanceList,
InstanceDetail,
InstanceUnifiedJobsList,
InstanceInstanceGroupsList,
InstanceHealthCheck,
InstanceInstallBundle,
InstancePeersList,
)


urls = [
Expand All @@ -12,6 +20,8 @@
re_path(r'^(?P<pk>[0-9]+)/jobs/$', InstanceUnifiedJobsList.as_view(), name='instance_unified_jobs_list'),
re_path(r'^(?P<pk>[0-9]+)/instance_groups/$', InstanceInstanceGroupsList.as_view(), name='instance_instance_groups_list'),
re_path(r'^(?P<pk>[0-9]+)/health_check/$', InstanceHealthCheck.as_view(), name='instance_health_check'),
re_path(r'^(?P<pk>[0-9]+)/peers/$', InstancePeersList.as_view(), name='instance_peers_list'),
re_path(r'^(?P<pk>[0-9]+)/install_bundle/$', InstanceInstallBundle.as_view(), name='instance_install_bundle'),
]

__all__ = ['urls']
77 changes: 32 additions & 45 deletions awx/api/views/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,22 @@
UnifiedJobDeletionMixin,
NoTruncateMixin,
)
from awx.api.views.instance_install_bundle import InstanceInstallBundle # noqa
from awx.api.views.inventory import ( # noqa
InventoryList,
InventoryDetail,
InventoryUpdateEventsList,
InventoryList,
InventoryDetail,
InventoryActivityStreamList,
InventoryInstanceGroupsList,
InventoryAccessList,
InventoryObjectRolesList,
InventoryJobTemplateList,
InventoryLabelList,
InventoryCopy,
)
from awx.api.views.mesh_visualizer import MeshVisualizer # noqa
from awx.api.views.organization import ( # noqa
OrganizationList,
OrganizationDetail,
Expand All @@ -145,21 +161,6 @@
OrganizationAccessList,
OrganizationObjectRolesList,
)
from awx.api.views.inventory import ( # noqa
InventoryList,
InventoryDetail,
InventoryUpdateEventsList,
InventoryList,
InventoryDetail,
InventoryActivityStreamList,
InventoryInstanceGroupsList,
InventoryAccessList,
InventoryObjectRolesList,
InventoryJobTemplateList,
InventoryLabelList,
InventoryCopy,
)
from awx.api.views.mesh_visualizer import MeshVisualizer # noqa
from awx.api.views.root import ( # noqa
ApiRootView,
ApiOAuthAuthorizationRootView,
Expand All @@ -174,7 +175,6 @@
from awx.api.pagination import UnifiedJobEventPagination
from awx.main.utils import set_environ


logger = logging.getLogger('awx.api.views')


Expand Down Expand Up @@ -359,7 +359,7 @@ def get(self, request, format=None):
return Response(dashboard_data)


class InstanceList(ListAPIView):
class InstanceList(ListCreateAPIView):

name = _("Instances")
model = models.Instance
Expand Down Expand Up @@ -398,6 +398,17 @@ def get_queryset(self):
return qs


class InstancePeersList(SubListAPIView):

name = _("Instance Peers")
parent_model = models.Instance
model = models.Instance
serializer_class = serializers.InstanceSerializer
parent_access = 'read'
search_fields = {'hostname'}
relationship = 'peers'


class InstanceInstanceGroupsList(InstanceGroupMembershipMixin, SubListCreateAttachDetachAPIView):

name = _("Instance's Instance Groups")
Expand Down Expand Up @@ -440,40 +451,16 @@ def get(self, request, *args, **kwargs):

def post(self, request, *args, **kwargs):
obj = self.get_object()

# Note: hop nodes are already excluded by the get_queryset method
if obj.node_type == 'execution':
from awx.main.tasks.system import execution_node_health_check

runner_data = execution_node_health_check(obj.hostname)
obj.refresh_from_db()
data = self.get_serializer(data=request.data).to_representation(obj)
# Add in some extra unsaved fields
for extra_field in ('transmit_timing', 'run_timing'):
if extra_field in runner_data:
data[extra_field] = runner_data[extra_field]
execution_node_health_check.apply_async([obj.hostname])
else:
from awx.main.tasks.system import cluster_node_health_check

if settings.CLUSTER_HOST_ID == obj.hostname:
cluster_node_health_check(obj.hostname)
else:
cluster_node_health_check.apply_async([obj.hostname], queue=obj.hostname)
start_time = time.time()
prior_check_time = obj.last_health_check
while time.time() - start_time < 50.0:
obj.refresh_from_db(fields=['last_health_check'])
if obj.last_health_check != prior_check_time:
break
if time.time() - start_time < 1.0:
time.sleep(0.1)
else:
time.sleep(1.0)
else:
obj.mark_offline(errors=_('Health check initiated by user determined this instance to be unresponsive'))
obj.refresh_from_db()
data = self.get_serializer(data=request.data).to_representation(obj)

return Response(data, status=status.HTTP_200_OK)
cluster_node_health_check.apply_async([obj.hostname], queue=obj.hostname)
return Response(dict(msg=f"Health check is running for {obj.hostname}."), status=status.HTTP_200_OK)


class InstanceGroupList(ListCreateAPIView):
Expand Down
Loading

0 comments on commit 9c2185c

Please sign in to comment.