Skip to content

Commit

Permalink
Add RBAC tests for cert rotation
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronyeeski authored and sangeethah committed Feb 6, 2020
1 parent fd731a2 commit be03a4a
Showing 1 changed file with 58 additions and 5 deletions.
63 changes: 58 additions & 5 deletions tests/validation/tests/v3_api/test_cert_rotation.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,70 @@
import pytest
import datetime
import time
import os
import ast
from .common import rbac_get_user_token_by_role
from .common import get_client_for_token
from .common import get_user_client_and_cluster
from .common import validate_cluster_state
from .common import get_etcd_nodes
from .common import wait_for_nodes_to_become_active
from .common import wait_for_node_status
from rancher import ApiError


# Globals
# Master list of all certs
ALL_CERTS = ["kube-apiserver", "kube-controller-manager",
"kube-node", "kube-proxy", "kube-scheduler",
"kube-etcd", "kube-ca"]
TEST_RBAC = ast.literal_eval(os.environ.get('RANCHER_TEST_RBAC', "False"))
if_test_rbac = pytest.mark.skipif(TEST_RBAC is False,
reason='rbac tests are skipped')
CLUSTER_NAME = os.environ.get("RANCHER_CLUSTER_NAME", "")
# here are all supported roles for RBAC testing
CLUSTER_MEMBER = "cluster-member"
CLUSTER_OWNER = "cluster-owner"
PROJECT_MEMBER = "project-member"
PROJECT_OWNER = "project-owner"
PROJECT_READ_ONLY = "read-only"
rbac_data = {
"project": None,
"namespace": None,
"workload": None,
"p_unshared": None,
"ns_unshared": None,
"wl_unshared": None,
"users": {
CLUSTER_OWNER: {},
CLUSTER_MEMBER: {},
PROJECT_OWNER: {},
PROJECT_MEMBER: {},
PROJECT_READ_ONLY: {},
}
}
# --------------------- rbac test -----------------------
@if_test_rbac
@pytest.mark.parametrize("role", [CLUSTER_MEMBER,
PROJECT_MEMBER, PROJECT_OWNER,
PROJECT_READ_ONLY, CLUSTER_OWNER])
def test_rbac_cert_rotation(role):
user_token = rbac_get_user_token_by_role(role)
user_client = get_client_for_token(user_token)
user_cluster = user_client.list_cluster(name=CLUSTER_NAME).data[0]
if role == CLUSTER_OWNER:
now = datetime.datetime.now()
user_cluster.rotateCertificates()
changed = ALL_CERTS.copy()
changed.remove("kube-ca")
client, cluster = get_user_client_and_cluster()
validate_cluster_state(client, cluster,
intermediate_state="updating")
certs2 = get_certs()
compare_changed(certs2, now, changed)
return None
with pytest.raises(ApiError) as e:
user_cluster.rotateCertificates()
assert e.value.error.status == 403
assert e.value.error.code == 'PermissionDenied'


def test_rotate_all_certs():
Expand Down Expand Up @@ -117,11 +170,11 @@ def rotate_certs(service=""):
client, cluster = get_user_client_and_cluster()
if service:
if service == "kube-ca":
rotate = cluster.rotateCertificates(caCertificates=True)
cluster.rotateCertificates(caCertificates=True)
else:
rotate = cluster.rotateCertificates(services=service)
cluster.rotateCertificates(services=service)
else:
rotate = cluster.rotateCertificates()
cluster.rotateCertificates()


def rotate_and_compare(unchanged, changed, service=""):
Expand Down

0 comments on commit be03a4a

Please sign in to comment.