Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions DEVELOPING.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ $ devcontainer exec --container-id renku-data-services_devcontainer-data_service
```
The devcontainer contains Postgres, SpiceDB, the correct Python environment and other useful development tools.

Note that the docker-in-docker devcontainer feature requires `iptables_nat` kernel module
on Linux and some distributions (like Fedora) do not have this module enabled by default.
So if you are getting trouble running docker-in-docker then running `sudo modprobe iptable_nat`
on the host can resolve the problems. If you want to make the module load on every login
then you can do the following `echo "iptable_nat" | sudo tee /etc/modules-load.d/iptables-nat.conf`.

### Developing with nix

When using [nix](https://nixos.org/explore/), a development
Expand Down
24 changes: 16 additions & 8 deletions bases/renku_data_services/data_api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@
)
from renku_data_services.git.gitlab import DummyGitlabAPI, EmptyGitlabAPI, GitlabAPI
from renku_data_services.k8s.clients import (
DummyCoreClient,
DummySchedulingClient,
K8sClusterClientsPool,
K8sCoreClient,
K8sResourceQuotaClient,
K8sSchedulingClient,
)
from renku_data_services.k8s.config import KubeConfigEnv
Expand Down Expand Up @@ -235,13 +233,24 @@ def from_env(cls) -> DependencyManager:
encryption_key=config.secrets.encryption_key,
oauth_client_factory=oauth_http_client_factory,
)
k8s_db_cache = K8sDbCache(config.db.async_session_maker)
default_kubeconfig = KubeConfigEnv()
client = K8sClusterClientsPool(
lambda: get_clusters(
kube_conf_root_dir=config.k8s_config_root,
default_kubeconfig=default_kubeconfig,
cluster_repo=cluster_repo,
cache=k8s_db_cache,
kinds_to_cache=[AMALTHEA_SESSION_GVK, JUPYTER_SESSION_GVK, BUILD_RUN_GVK, TASK_RUN_GVK],
),
)
quota_repo = QuotaRepository(
K8sResourceQuotaClient(client), K8sSchedulingClient(client), namespace=config.k8s_namespace
)

if config.dummy_stores:
authenticator = DummyAuthenticator()
gitlab_authenticator = DummyAuthenticator()
quota_repo = QuotaRepository(
DummyCoreClient({}, {}), DummySchedulingClient({}), namespace=config.k8s_namespace
)
user_always_exists = os.environ.get("DUMMY_USERSTORE_USER_ALWAYS_EXISTS", "true").lower() == "true"
user_store = DummyUserStore(user_always_exists=user_always_exists)
gitlab_client = DummyGitlabAPI()
Expand All @@ -253,7 +262,6 @@ def from_env(cls) -> DependencyManager:
git_provider_helper: GitProviderHelperProto = DummyGitProviderHelper()
else:
git_provider_helper = GitProviderHelper.create(connected_services_repo, config.enable_internal_gitlab)
quota_repo = QuotaRepository(K8sCoreClient(), K8sSchedulingClient(), namespace=config.k8s_namespace)
assert config.keycloak is not None

authenticator = KeycloakAuthenticator.new(config.keycloak)
Expand All @@ -276,7 +284,7 @@ def from_env(cls) -> DependencyManager:
default_kubeconfig = KubeConfigEnv()
shipwright_client = ShipwrightClient(
client=K8sClusterClientsPool(
get_clusters(
lambda: get_clusters(
kube_conf_root_dir=config.k8s_config_root,
default_kubeconfig=default_kubeconfig,
cluster_repo=cluster_repo,
Expand Down
2 changes: 1 addition & 1 deletion bases/renku_data_services/k8s_cache/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def quota_repo(self) -> QuotaRepository:
# the resource class and pool information for metrics. We don't need quota information for metrics at all
# so we use the dummy client for quotas here as we don't actually access k8s, just the db.
self._quota_repo = QuotaRepository(
DummyCoreClient({}, {}), DummySchedulingClient({}), namespace=self.config.k8s.renku_namespace
DummyCoreClient(), DummySchedulingClient(), namespace=self.config.k8s.renku_namespace
)
return self._quota_repo

Expand Down
22 changes: 9 additions & 13 deletions bases/renku_data_services/secrets_storage_api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from renku_data_services.crc.db import ClusterRepository
from renku_data_services.k8s.client_interfaces import SecretClient
from renku_data_services.k8s.clients import (
DummyCoreClient,
K8sClusterClientsPool,
K8sSecretClient,
)
Expand Down Expand Up @@ -45,24 +44,21 @@ def from_env(cls) -> DependencyManager:
secret_client: SecretClient
config = Config.from_env()
cluster_repo = ClusterRepository(session_maker=config.db.async_session_maker)
default_kubeconfig = KubeConfigEnv()
client = K8sClusterClientsPool(
lambda: get_clusters(
kube_conf_root_dir=os.environ.get("K8S_CONFIGS_ROOT", "/secrets/kube_configs"),
default_kubeconfig=default_kubeconfig,
cluster_repo=cluster_repo,
)
)
secret_client = K8sSecretClient(client)

if config.dummy_stores:
authenticator = DummyAuthenticator()
secret_client = DummyCoreClient({}, {})
else:
assert config.keycloak is not None
authenticator = KeycloakAuthenticator.new(config.keycloak)
default_kubeconfig = KubeConfigEnv()

secret_client = K8sSecretClient(
K8sClusterClientsPool(
get_clusters(
kube_conf_root_dir=os.environ.get("K8S_CONFIGS_ROOT", "/secrets/kube_configs"),
default_kubeconfig=default_kubeconfig,
cluster_repo=cluster_repo,
)
)
)

return cls(
config=config,
Expand Down
62 changes: 38 additions & 24 deletions components/renku_data_services/crc/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import renku_data_services.base_models as base_models
from renku_data_services import errors
from renku_data_services.base_models import RESET
from renku_data_services.base_models.core import ResetType
from renku_data_services.crc import models
from renku_data_services.crc import orm as schemas
from renku_data_services.crc.core import validate_resource_class_update, validate_resource_pool_update
from renku_data_services.crc.models import ClusterPatch, ClusterSettings, SavedClusterSettings, SessionProtocol
from renku_data_services.crc.orm import ClusterORM
from renku_data_services.k8s.constants import DEFAULT_K8S_CLUSTER
from renku_data_services.k8s.db import QuotaRepository
from renku_data_services.users.db import UserRepo

Expand Down Expand Up @@ -169,7 +171,7 @@ async def initialize(self, async_connection_url: str, rp: models.UnsavedResource
session.add(orm)

async def get_resource_pools(
self, api_user: base_models.APIUser, id: Optional[int] = None, name: Optional[str] = None
self, api_user: base_models.APIUser, id: int | None = None, name: Optional[str] = None
) -> list[models.ResourcePool]:
"""Get resource pools from database."""
async with self.session_maker() as session:
Expand All @@ -188,7 +190,7 @@ async def get_resource_pools(
orms = res.scalars().all()
output: list[models.ResourcePool] = []
for rp in orms:
quota = self.quotas_repo.get_quota(rp.quota) if rp.quota else None
quota = await self.quotas_repo.get_quota(rp.quota, rp.get_cluster_id()) if rp.quota else None
output.append(rp.dump(quota))
return output

Expand All @@ -211,7 +213,7 @@ async def get_resource_pool_from_class(
raise errors.MissingResourceError(
message=f"Could not find the resource pool where a class with ID {resource_class_id} exists."
)
quota = self.quotas_repo.get_quota(orm.quota) if orm.quota else None
quota = await self.quotas_repo.get_quota(orm.quota, orm.get_cluster_id()) if orm.quota else None
return orm.dump(quota)

async def get_default_resource_pool(self) -> models.ResourcePool:
Expand All @@ -227,7 +229,7 @@ async def get_default_resource_pool(self) -> models.ResourcePool:
raise errors.ProgrammingError(
message="Could not find the default resource pool, but this has to exist."
)
quota = self.quotas_repo.get_quota(res.quota) if res.quota else None
quota = await self.quotas_repo.get_quota(res.quota, res.get_cluster_id()) if res.quota else None
return res.dump(quota)

async def get_default_resource_class(self) -> models.ResourceClass:
Expand Down Expand Up @@ -277,10 +279,11 @@ async def filter_resource_pools(
# NOTE: The line below ensures that the right users can access the right resources, do not remove.
stmt = _resource_pool_access_control(api_user, stmt)
res = await session.execute(stmt)
return [
i.dump(quota=self.quotas_repo.get_quota(i.quota), class_match_criteria=criteria)
for i in res.scalars().all()
]
output: list[models.ResourcePool] = []
for rp in res.scalars().all():
quota = await self.quotas_repo.get_quota(rp.quota, rp.get_cluster_id())
output.append(rp.dump(quota, criteria))
return output

@_only_admins
async def insert_resource_pool(
Expand All @@ -293,8 +296,9 @@ async def insert_resource_pool(
cluster = await self.__cluster_repo.select(cluster_id=new_resource_pool.cluster_id)

quota = None
cluster_id = DEFAULT_K8S_CLUSTER if cluster is None else cluster.id
if new_resource_pool.quota is not None:
quota = self.quotas_repo.create_quota(new_quota=new_resource_pool.quota)
quota = await self.quotas_repo.create_quota(new_quota=new_resource_pool.quota, cluster_id=cluster_id)

async with self.session_maker() as session, session.begin():
resource_pool = schemas.ResourcePoolORM.from_unsaved_model(
Expand Down Expand Up @@ -377,7 +381,7 @@ async def insert_resource_class(
raise errors.ValidationError(
message="There can only be one default resource class per resource pool."
)
quota = self.quotas_repo.get_quota(rp.quota) if rp.quota else None
quota = await self.quotas_repo.get_quota(rp.quota, rp.get_cluster_id()) if rp.quota else None
if quota and not quota.is_resource_class_compatible(new_resource_class):
raise errors.ValidationError(
message="The resource class {resource_class} is not compatible with the quota {quota}."
Expand All @@ -403,7 +407,7 @@ async def update_resource_pool(
rp = res.one_or_none()
if rp is None:
raise errors.MissingResourceError(message=f"Resource pool with id {resource_pool_id} cannot be found")
quota = self.quotas_repo.get_quota(rp.quota) if rp.quota else None
quota = await self.quotas_repo.get_quota(rp.quota, rp.get_cluster_id()) if rp.quota else None

validate_resource_pool_update(existing=rp.dump(quota=quota), update=update)

Expand All @@ -426,15 +430,21 @@ async def update_resource_pool(
if update.platform is not None:
rp.platform = update.platform

if update.cluster_id is RESET:
rp.cluster_id = None
elif update.cluster_id is not None:
cluster = await self.__cluster_repo.select(update.cluster_id)
rp.cluster_id = cluster.id
match (update.cluster_id, rp.cluster_id):
case ResetType.Reset, x if x is not None:
raise errors.ValidationError(
message="Resetting the cluster of an existing resource pool is not supported."
)
case ULID(), ULID():
if update.cluster_id != rp.cluster_id:
raise errors.ValidationError(
message="Changing the cluster of an existing resource pool is not supported."
)

cluster_id = rp.get_cluster_id()
if update.quota is RESET and rp.quota:
# Remove the existing quota
self.quotas_repo.delete_quota(name=rp.quota)
await self.quotas_repo.delete_quota(name=rp.quota, cluster_id=cluster_id)
elif isinstance(update.quota, models.QuotaPatch) and rp.quota is None:
# Create a new quota, the `update.quota` object has already been validated
assert update.quota.cpu is not None
Expand All @@ -445,7 +455,7 @@ async def update_resource_pool(
memory=update.quota.memory,
gpu=update.quota.gpu,
)
quota = self.quotas_repo.create_quota(new_quota=new_quota)
quota = await self.quotas_repo.create_quota(new_quota=new_quota, cluster_id=cluster_id)
rp.quota = quota.id
elif isinstance(update.quota, models.QuotaPatch):
assert rp.quota is not None
Expand All @@ -458,7 +468,7 @@ async def update_resource_pool(
gpu_kind=update.quota.gpu_kind if update.quota.gpu_kind is not None else quota.gpu_kind,
id=quota.id,
)
quota = self.quotas_repo.update_quota(quota=updated_quota)
quota = await self.quotas_repo.update_quota(quota=updated_quota, cluster_id=cluster_id)
rp.quota = quota.id

new_classes_coroutines = []
Expand Down Expand Up @@ -499,7 +509,7 @@ async def delete_resource_pool(self, api_user: base_models.APIUser, id: int) ->
raise errors.ValidationError(message="The default resource pool cannot be deleted.")
await session.delete(rp)
if rp.quota:
self.quotas_repo.delete_quota(rp.quota)
await self.quotas_repo.delete_quota(rp.quota, rp.get_cluster_id())
return None

@_only_admins
Expand Down Expand Up @@ -612,7 +622,11 @@ async def update_resource_class(
await session.refresh(cls)

cls_model = cls.dump()
quota = self.quotas_repo.get_quota(cls_model.quota) if cls_model.quota else None
quota = (
await self.quotas_repo.get_quota(cls_model.quota, cls.resource_pool.get_cluster_id())
if cls_model.quota
else None
)
if quota and not quota.is_resource_class_compatible(cls_model):
raise errors.ValidationError(
message=f"The resource class {cls_model} is not compatible with the quota {quota}"
Expand Down Expand Up @@ -722,7 +736,7 @@ async def update_quota(
message=f"The quota {new_quota} is not compatible with the resource class {rc}."
)

return self.quotas_repo.update_quota(quota=new_quota)
return await self.quotas_repo.update_quota(quota=new_quota, cluster_id=rp.get_cluster_id())


@dataclass
Expand Down Expand Up @@ -824,7 +838,7 @@ async def get_user_resource_pools(
rps: Sequence[schemas.ResourcePoolORM] = res.scalars().all()
output: list[models.ResourcePool] = []
for rp in rps:
quota = self.quotas_repo.get_quota(rp.quota) if rp.quota else None
quota = await self.quotas_repo.get_quota(rp.quota, rp.get_cluster_id()) if rp.quota else None
output.append(rp.dump(quota))
return output

Expand Down Expand Up @@ -878,7 +892,7 @@ async def update_user_resource_pools(
user.resource_pools = list(rps_to_add)
output: list[models.ResourcePool] = []
for rp in rps_to_add:
quota = self.quotas_repo.get_quota(rp.quota) if rp.quota else None
quota = await self.quotas_repo.get_quota(rp.quota, rp.get_cluster_id()) if rp.quota else None
output.append(rp.dump(quota))
return output

Expand Down
8 changes: 7 additions & 1 deletion components/renku_data_services/crc/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from renku_data_services import errors
from renku_data_services.base_models import ResetType
from renku_data_services.k8s.constants import ClusterId
from renku_data_services.k8s.constants import DEFAULT_K8S_CLUSTER, ClusterId
from renku_data_services.notebooks.cr_amalthea_session import TlsSecret


Expand Down Expand Up @@ -336,6 +336,12 @@ def get_default_resource_class(self) -> ResourceClass | None:
return rc
return None

def get_cluster_id(self) -> ClusterId:
"""Get the ID of the cluster the resource pool refers to."""
if self.cluster is None:
return DEFAULT_K8S_CLUSTER
return self.cluster.id


@dataclass(frozen=True, eq=True, kw_only=True)
class ResourcePoolPatch:
Expand Down
8 changes: 7 additions & 1 deletion components/renku_data_services/crc/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from renku_data_services.crc import models
from renku_data_services.crc.models import ClusterSettings, SavedClusterSettings, SessionProtocol
from renku_data_services.errors import errors
from renku_data_services.k8s.constants import ClusterId
from renku_data_services.k8s.constants import DEFAULT_K8S_CLUSTER, ClusterId
from renku_data_services.utils.sqlalchemy import ULIDType

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -332,6 +332,12 @@ def _dump_remote(self) -> models.RemoteConfigurationFirecrest | None:
{**self.remote_json, "provider_id": self.remote_provider_id}
)

def get_cluster_id(self) -> ClusterId:
"""Get the ID of the cluster the resource pool refers to."""
if self.cluster_id is None:
return DEFAULT_K8S_CLUSTER
return ClusterId(self.cluster_id)


class TolerationORM(BaseORM):
"""The key for a K8s toleration used to schedule loads on tainted nodes."""
Expand Down
Loading