Skip to content

Commit

Permalink
pam: adding support to manage pam modules; pam_access and pam_faillock
Browse files Browse the repository at this point in the history
  • Loading branch information
Dan Lavu committed Nov 22, 2023
1 parent b7ae078 commit 51dafae
Show file tree
Hide file tree
Showing 6 changed files with 353 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/guides/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ How to guides
check-sssd-functionality
ssh-client
sss_override
pam
testing-authentication
testing-autofs
testing-identity
Expand Down
58 changes: 58 additions & 0 deletions docs/guides/pam.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
Pluggable Authentication Modules / PAM
######################################

Class :class:`sssd_test_framework.utils.pam.PAMUtils` provides
an API to manage PAM module configuration. Currently pam_access and pam_faillock is supported.

pam_access
==========
A module for logdaemon style login access control. This is managed by /etc/security/access.conf.

.. code-block:: python
:caption: Example PAM Access usage
@pytest.mark.topology(KnownTopologyGroup.AnyProvider)
def test_example(client: Client, provider: GenericProvider):
# Add users
provider.user("user-1").add()
provider.user("user-2").add()
# Add access rules
access = client.pam.access()
access.config_set([{"access": "+", "user": "user-1", "origin": "ALL"},
{"access": "-", "user": "user-2", "origin": "ALL"}])
client.sssd.authselect.enable_feature(["with-pamaccess"])
client.sssd.domain["use_fully_qualified_names"] = "False"
client.sssd.start()
assert client.auth.ssh.password("user-1", "Secret123")
assert not client.auth.ssh.password("user-2", "Secret123")
pam_faillock
============
A module that counts authentication failures. This is configured in /etc/security/faillock.conf.

.. code-block:: python
:caption: Example PAM Faillock usage
@pytest.mark.topology(KnownTopologyGroup.AnyProvider)
def test_example(client: Client, provider: GenericProvider):
# Add user
provider.user("user-1").add()
faillock = client.pam.faillock()
faillock.config_set({"deny": "3", "unlock_time": "300"})
client.sssd.common.pam(["with-faillock"])
client.sssd.start()
assert client.auth.ssh.password("user-1", "Secret123")
for i in range(3):
client.auth.ssh.password("user-1", "BadSecret123")
assert not client.auth.ssh.password("user-1", "Secret123")
# Reset user lockout
client.tools.faillock(["--user", "user-1", "--reset"])
assert client.auth.ssh.password("user-1", "Secret123")
6 changes: 6 additions & 0 deletions sssd_test_framework/roles/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from ..utils.authentication import AuthenticationUtils
from ..utils.authselect import AuthselectUtils
from ..utils.ldap import LDAPUtils
from ..utils.pam import PAMUtils
from ..utils.tools import LinuxToolsUtils

HostType = TypeVar("HostType", bound=BaseHost)
Expand Down Expand Up @@ -139,6 +140,11 @@ def __init__(self, *args, **kwargs) -> None:
Authentication helpers.
"""

self.pam: PAMUtils = PAMUtils(self.host, self.fs)
"""
Configuring various PAM modules.
"""


class BaseLinuxLDAPRole(BaseLinuxRole[LDAPHostType]):
"""
Expand Down
259 changes: 259 additions & 0 deletions sssd_test_framework/utils/pam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
""""PAM Tools."""

from __future__ import annotations

import re

from pytest_mh import MultihostHost, MultihostUtility
from pytest_mh.utils.fs import LinuxFileSystem

__all__ = [
"PAMUtils",
"PAMAccess",
"PAMFaillock",
]


class PAMUtils(MultihostUtility[MultihostHost]):
"""
Configuring various PAM modules
"""

def __init__(self, host: MultihostHost, fs: LinuxFileSystem) -> None:
"""
:param host: Remote host instance
:type host: MultihostHost
:param fs: Linux file system instance
:type fs: LinuxFileSystem
"""
super().__init__(host)

self.fs: LinuxFileSystem = fs

def access(self, file: str = "/etc/security/access.conf") -> PAMAccess:
"""
:param file: PAM Access file name.
:type file: str
:return: PAM Access object
:rtype: PAMAccess
"""
return PAMAccess(self, file, self.host, self.fs)

def faillock(self, file: str = "/etc/security/faillock.conf") -> PAMFaillock:
"""
:param file: PAM Faillock file name.
:type file: str
:return: PAM Faillock object
:rtype: PAMFaillock
"""
return PAMFaillock(self, file, self.host, self.fs)


class PAMAccess(PAMUtils):
"""
Management of PAM Access on the client host.
.. code-block:: python
:caption: Example usage
@pytest.mark.topology(KnownTopologyGroup.AnyProvider)
def test_example(client: Client, provider: GenericProvider):
# Add users
provider.user("user-1").add()
provider.user("user-2").add()
# Add rule to permit "user-1" and deny "user-2"
access = client.pam.access()
access.config_set([{"access": "+", "user": "user-1", "origin": "ALL"},
{"access": "-", "user": "user-2", "origin": "ALL"}])
client.sssd.authselect.enable_feature(["with-pamaccess"])
client.sssd.start()
# Check the results
assert client.auth.ssh.password("user-1", "Secret123")
assert not client.auth.ssh.password("user-2", "Secret123")
"""

def __init__(self, util: PAMUtils, file: str, host: MultihostHost, fs: LinuxFileSystem) -> None:
"""
:param util: PAMUtils utility object
:type util: PAMUtils
:param file: Configuration file
:type file: str
:param host: Multihost object
:type host: MultihostHost
:param fs: LinuxFileSystem object
:type fs: LinuxFileSystem
:param file: File name of access file
:type file: str, optional
"""
super().__init__(host, fs)
self._changed: bool = False

self.util: PAMUtils = util
self.file: str = file
self.path: str = "/files" + self.file
self.args: str = f'--noautoload --transform "Access.lns incl {self.file}"'
self.cmd: str = ""

def setup_when_used(self) -> None:
super().setup_when_used()
self.fs.backup(self.file)

def config_read(self) -> str:
"""
Read access file as Augeas tree.
:return: PAM access configuration
:rtype: str
"""
self.util.logger.info(f"Reading {self.file} and parsing as Augeas tree")
result = self.util.host.ssh.run(f"augtool {self.args} print {self.path}")

return result.stdout

def config_delete(self, value: list[dict[str, str]]) -> None:
"""
Delete access configuration.
:param value: Configuration.
:type value: list[dict[str, str]]
:return: None
"""
if value is None:
raise ValueError("No data!")

index = 1
for i in self.util.host.ssh.run(f"augtool {self.args} match {self.path}/*").stdout_lines:
node = re.sub("\\d", str(index), i.split("=")[0].strip())
leaf = self.util.host.ssh.run(f"augtool {self.args} match {node}/*").stdout_lines
access = i.split("=")[1].strip()
user = leaf[0].split("=")[1].strip()
origin = leaf[1].split("=")[1].strip()
match = {"access": access, "user": user, "origin": origin}
for y in value:
if match == y:
self.util.logger.info(f"Deleting node in Augeas tree {self.file}")
self.util.host.ssh.run(f"augtool {self.args} --autosave rm {node}")
else:
index = +index

def config_set(self, value: list[dict[str, str]]) -> None:
"""
Configure access configuration file.
:param value: Access rule
:type value: list[list[str]]
:return: None
"""
if value is None:
raise ValueError("No data!")

count = 1
for i in value:
self.cmd = self.cmd + f"set {self.path}/access[{count}] " + i["access"] + "\n"
self.cmd = self.cmd + f"set {self.path}/access[{count}]/user " + i["user"] + "\n"
self.cmd = self.cmd + f"set {self.path}/access[{count}]/origin " + i["origin"] + "\n"
count = +count

self.util.host.ssh.run(f"augtool --echo {self.args}", input=f"{self.cmd} save\n")


class PAMFaillock(PAMUtils):
"""
Management of PAM Faillock on the client host.
.. code-block:: python
:caption: Example usage
@pytest.mark.topology(KnownTopologyGroup.AnyProvider)
def test_example(client: Client, provider: GenericProvider):
# Add user
provider.user("user-1").add()
# Setup faillock
faillock = client.pam.faillock()
faillock.config_set({"deny": "3", "unlock_time": "300"})
client.sssd.common.pam(["with-faillock"])
# Start SSSD
client.sssd.start()
# Check the results
assert client.auth.ssh.password("user-1", "Secret123")
# Three failed login attempts
for i in range(3):
assert not client.auth.ssh.password("user-1", "bad_password")
assert not client.auth.ssh.password("user-1", "Secret123")
# Reset user lockout
client.pam.faillock("user-1").reset
assert client.auth.ssh.password("user-1", "Secret123")
"""

def __init__(self, util: PAMUtils, file: str, host: MultihostHost, fs: LinuxFileSystem) -> None:
"""
:param util: PAMUtils object
:type util: PAMUtils
:param file: Configuration file
:type file: str
:param host: MultihostHost object
:type host: MultihostHost
:param fs: LinuxFileSystem object
:type fs: LinuxFileSystem
:param file: Faillock configuration file
:type file: str, optional
"""
super().__init__(host, fs)
self._changed: bool = False

self.util: PAMUtils = util
self.file: str = file
self.path: str = "/files" + self.file
self.args: str = f'--noautoload --transform "Simplevars.lns incl {self.file}"'
self.cmd: str = ""

def setup_when_used(self) -> None:
super().setup_when_used()
self.fs.backup(self.file)

def config_read(self) -> str:
"""
Read faillock configuration as augeas tree.
:return: PAM access configuration
:rtype: str
"""
self.util.logger.info(f"Reading {self.file} and parsing as Augeas tree")
result = self.util.host.ssh.run(f"augtool {self.args} print {self.path}").stdout

return result

def config_delete(self, value: dict[str, str]) -> None:
"""
Delete faillock configuration.
:param value: Configuration.
:type value: dict[str, str]
:return: None
"""
if value is None:
raise ValueError("No data!")

self.util.logger.info(f"Deleting node in Augeas tree in {self.file}")
for k, v in value.items():
self.util.host.ssh.run(f"augtool {self.args} --autosave rm {self.path}/{k} {v}")

def config_set(self, value: dict[str, str]) -> None:
"""
Set faillock configuration.
:param value: Configuration parameter(s) and value(s).
:type value: dict[str, str]
:return: None
"""
if value is None:
raise ValueError("No data!")

for k, v in value.items():
self.cmd = self.cmd + f"set {self.path}/{k} {v}\n"

self.util.host.ssh.run(f"augtool --echo {self.args}", input=f"{self.cmd}save\n")
16 changes: 16 additions & 0 deletions sssd_test_framework/utils/sssd.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,3 +864,19 @@ def proxy(

self.sssd.dom(domain).clear()
self.sssd.dom(domain).update(options)

def pam(self, features: list[str] | None = None) -> None:
"""
Configure SSSD with pam.
#. Select authselect sssd profile
#. Enable pam responder in sssd profile
:param features: list of authselect features
:type features: list[str], optional
"""
if features is None:
features = []

self.sssd.authselect.select("sssd", features)
self.sssd.enable_responder("pam")
13 changes: 13 additions & 0 deletions sssd_test_framework/utils/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,19 @@ def dnf(self, args: list[Any] | None = None) -> SSHProcessResult:

return command

def faillock(self, args: list[Any]) -> SSHProcessResult:
"""
Execute faillock command.
:param args: Arguments to ``faillock``
:type args: list[Any]
:return: SSH Process result
:rtype: SSHProcessResult
"""
if args is None:
args = []

return self.host.ssh.exec(["faillock", *args])

def teardown(self):
"""
Revert all changes.
Expand Down

0 comments on commit 51dafae

Please sign in to comment.