diff --git a/README.md b/README.md new file mode 100644 index 0000000..038b1cb --- /dev/null +++ b/README.md @@ -0,0 +1,123 @@ +# Passwordstore SSH IdentityAgent + +‼️ This project was writen entirely by ChatGPT ‼️ + +That includes this README (except for this part), the entirety of `agent.py`, and the systemd files. + +I have been actively using this agent on my personal and work machines since it was created. I would like to add support for more key types eventually and move the key configuration to a yaml file, but I haven't had the urge to do so since it's working perfectly fine. + +Below this line is the original README written by ChatGPT. I wrote the systemd section. + +--- + +This script provides an implementation of a custom SSH agent that retrieves private keys and their corresponding passphrases from [pass](https://www.passwordstore.org/), the standard Unix password manager. The script supports Ed25519 keys, listens on a Unix domain socket, and uses the Paramiko library for SSH key handling. + +## Dependencies + +- Python 3.6 or higher +- [Paramiko](https://www.paramiko.org/) library + +To install Paramiko, run: + +```bash +pip install paramiko +``` + +## Usage + +1. Edit the `KEYS` list in the script to include your key details: + +```python +KEYS = [ + { + "type": "ed25519", + "key_path": "your-key-path", + "pass_key": "your-pass-key", + "store_location": "/path/to/your/store" + } +] +``` + +2. Replace `your-key-path`, `your-pass-key`, and `/path/to/your/store` with the appropriate values for your key. + +3. Run the script with the desired socket path as the command line argument: + +```bash +python custom_ssh_agent.py +``` + +​ Replace `` with the desired path for the Unix domain socket. + +4. Connect to the custom SSH agent using your SSH client by setting the `SSH_AUTH_SOCK` environment variable to the path of the Unix domain socket: + +```bash +export SSH_AUTH_SOCK= +``` + +5. Your SSH client should now use the custom SSH agent for authentication. + +## Example + +Suppose you have an Ed25519 key with the following details: + +- Key path in pass: `keys/ssh/id_ed25519` +- Passphrase path in pass: `keys/ssh/id_ed25519_passphrase` +- Password store location: `/home/user/.password-store` + +Edit the `KEYS` list as follows: + +```python +KEYS = [ + { + "type": "ed25519", + "key_path": "keys/ssh/id_ed25519", + "pass_key": "keys/ssh/id_ed25519_passphrase", + "store_location": "/home/user/.password-store" + } +] +``` + +Run the script: + +```bash +python custom_ssh_agent.py /tmp/custom_ssh_agent.sock +``` + +Set the `SSH_AUTH_SOCK` environment variable: + +```bash +export SSH_AUTH_SOCK=/tmp/custom_ssh_agent.sock +``` + +Now, your SSH client will use the custom SSH agent for authentication. + + +## Make it automatic with systemd + +```bash +# copy the agent to your local bin dir +cp agent.py $HOME/.local/bin/ssh-pass-agent.py + +# IMPORANT !! +# make sure you modify the KEY variable in $HOME/.local/bin/ssh-pass-agent.py to +# reflect your pass key information + +# copy the files to systemd's config location +cp ./systemd/ssh-pass-agent.socket ~/.config/systemd/user/ +cp ./systemd/ssh-pass-agent.service ~/.config/systemd/user/ + +# reload systemd daemon +systemctl --user daemon-reload + +# start the socket and the service for good measure and enable them to start at boot +systemctl --user enable --now ssh-pass-agent.socket +systemctl --user enable --now ssh-pass-agent.service + +# Tell SSH to use the socket +# (you might want to put this into your ~/.bashrc) +export SSH_AUTH_SOCK=/run/user/$(id -u)/ssh-pass-agent.sock +``` + +Now when you invoke `ssh ` the socket will be activated, the service will +be started and the only password prompt you should see is for the gpg +passphrase which unlocks the password store. diff --git a/agent.py b/agent.py new file mode 100644 index 0000000..d799cf3 --- /dev/null +++ b/agent.py @@ -0,0 +1,124 @@ +import os +import socket +import struct +import sys +from paramiko.message import Message +from paramiko.agent import Agent +from paramiko.ed25519key import Ed25519Key +from paramiko.ssh_exception import SSHException +import subprocess +import io + +class CustomSSHAgent(Agent): + AGENTC_REQUEST_IDENTITIES = bytes([11]) + AGENTC_SIGN_REQUEST = bytes([13]) + AGENT_IDENTITIES_ANSWER = bytes([12]) + AGENT_SIGN_RESPONSE = bytes([14]) + + def __init__(self, keys): + self.keys = [] + for key in keys: + try: + if key["type"].lower() == "ed25519": + passphrase = self.get_passphrase(key["pass_key"], key["store_location"]) + private_key_str = self.get_private_key(key["key_path"]) + private_key_file = io.StringIO(private_key_str) + self.keys.append(Ed25519Key.from_private_key(private_key_file, password=passphrase)) + except SSHException as e: + print(f"Error loading key from {key['key_path']}: {e}") + + def get_passphrase(self, pass_key, store_location): + env = os.environ.copy() + env['PASSWORD_STORE_DIR'] = store_location + + result = subprocess.run(['pass', pass_key], capture_output=True, text=True, env=env) + if result.returncode != 0: + print(f"Error retrieving passphrase for key {pass_key}: {result.stderr}") + return None + return result.stdout.strip() + + def get_private_key(self, key_path): + result = subprocess.run(['pass', key_path], capture_output=True, text=True) + if result.returncode != 0: + print(f"Error retrieving private key for key_path {key_path}: {result.stderr}") + return None + return result.stdout.strip() + + def handle_message(self, msg): + m = Message(msg) + cmd = m.get_byte() + + if cmd == CustomSSHAgent.AGENTC_REQUEST_IDENTITIES: + resp = Message() + resp.add_byte(CustomSSHAgent.AGENT_IDENTITIES_ANSWER) + resp.add_int(len(self.keys)) + for key in self.keys: + resp.add_string(key.asbytes()) + resp.add_string(f"{key.get_name()} key loaded from {key}") + return resp.asbytes() + + if cmd == CustomSSHAgent.AGENTC_SIGN_REQUEST: + key_blob = m.get_string() + data = m.get_string() + flags = m.get_int() + + resp = Message() + resp.add_byte(CustomSSHAgent.AGENT_SIGN_RESPONSE) + for key in self.keys: + if key_blob == key.asbytes(): + sig = key.sign_ssh_data(data) + resp.add_string(sig) + return resp.asbytes() + + failure_response = Message() + failure_response.add_byte(bytes([255])) # Custom failure code, converted to bytes + return failure_response.asbytes() + +def handle_client(agent, conn, addr): + try: + while True: + msg = conn.recv(4) + if len(msg) == 0: + break + msg_len = struct.unpack(">I", msg)[0] + msg = conn.recv(msg_len) + response = agent.handle_message(msg) + conn.send(struct.pack(">I", len(response)) + response) + finally: + conn.close() + + +KEYS = [ + { + "type": "ed25519", + "key_path": "your-key-path", + "pass_key": "your-pass-key", + "store_location": "/path/to/your/store" + } +] + +if __name__ == "__main__": + agent = CustomSSHAgent(KEYS) + + if len(sys.argv) != 2: + print(f"Usage: {sys.argv[0]} ") + sys.exit(1) + + socket_path = sys.argv[1] + + if os.path.exists(socket_path): + os.unlink(socket_path) + + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.bind(socket_path) + os.chmod(socket_path, 0o600) + + try: + sock.listen(1) + + while True: + conn, addr = sock.accept() + handle_client(agent, conn, addr) + finally: + sock.close() + os.unlink(socket_path) diff --git a/chatgpt/FullConversation.md b/chatgpt/FullConversation.md new file mode 100644 index 0000000..42af7f4 --- /dev/null +++ b/chatgpt/FullConversation.md @@ -0,0 +1,1067 @@ +# Python SSH identity agent. + +#### You: +write a python script that acts as an ssh identity agent over a unix socket. The python script will provide the ssh keys and passphrases when invoked through a unix socket. + +#### ChatGPT: +To create a Python script that acts as an SSH identity agent over a UNIX socket, you can use the `socket` and `paramiko` libraries. The script will listen on a UNIX socket, and when a connection is made, it will read the key and passphrase from the client, use them to sign the message, and then send the signature back to the client. + +Please ensure you have the `paramiko` library installed: + +```bash +pip install paramiko +``` + +Here's the script: + +```python +import os +import socket +import sys +import threading +import struct +from paramiko import Agent, RSAKey, DSSKey, ECDSAKey + +SOCKET_PATH = "/tmp/ssh_agent.sock" +KEYS = [ + { + "type": "rsa", + "path": "path/to/rsa_key", + "passphrase": "your_rsa_passphrase" + }, + { + "type": "dsa", + "path": "path/to/dsa_key", + "passphrase": "your_dsa_passphrase" + }, + { + "type": "ecdsa", + "path": "path/to/ecdsa_key", + "passphrase": "your_ecdsa_passphrase" + } +] + +class CustomSSHAgent(Agent): + def __init__(self, keys): + self.keys = [] + for key in keys: + if key["type"].lower() == "rsa": + self.keys.append(RSAKey.from_private_key_file(key["path"], password=key["passphrase"])) + elif key["type"].lower() == "dsa": + self.keys.append(DSSKey.from_private_key_file(key["path"], password=key["passphrase"])) + elif key["type"].lower() == "ecdsa": + self.keys.append(ECDSAKey.from_private_key_file(key["path"], password=key["passphrase"])) + + def get_keys(self): + return self.keys + +def handle_client(agent, conn, addr): + try: + while True: + msg = conn.recv(4) + if len(msg) == 0: + break + msg_len = struct.unpack(">I", msg)[0] + msg = conn.recv(msg_len) + response = agent.handle_message(msg) + if response: + conn.send(struct.pack(">I", len(response)) + response) + finally: + conn.close() + +def main(): + if os.path.exists(SOCKET_PATH): + os.remove(SOCKET_PATH) + + agent = CustomSSHAgent(KEYS) + + server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + server.bind(SOCKET_PATH) + os.chmod(SOCKET_PATH, 0o600) + server.listen(5) + + while True: + conn, addr = server.accept() + thread = threading.Thread(target=handle_client, args=(agent, conn, addr)) + thread.start() + finally: + server.close() + os.remove(SOCKET_PATH) + +if __name__ == "__main__": + main() +``` + +Replace the key paths and passphrases in the `KEYS` list with your own key file paths and passphrases. + +Run this script in the background to start the SSH identity agent: + +```bash +python ssh_identity_agent.py & +``` + +The script will create a UNIX socket at `/tmp/ssh_agent.sock`. When another application needs to use the keys, it can connect to this socket and communicate with the agent. The agent will provide the SSH keys and passphrases when invoked through the UNIX socket. + + +#### You: +can I use this in my ~/.ssh/config so that when I 'ssh hello@example.com' it will invoke this script so that I don't have to enter my password? + +#### ChatGPT: +To use the custom SSH identity agent with the `ssh` command, you'll need to set the `SSH_AUTH_SOCK` environment variable to the path of the UNIX socket created by the script. The `ssh` command will then use the custom agent to provide the keys and passphrases when needed. + +To set the `SSH_AUTH_SOCK` variable, run the following command: + +```bash +export SSH_AUTH_SOCK=/tmp/ssh_agent.sock +``` + +You can also add this line to your `~/.bashrc`, `~/.bash_profile`, or `~/.zshrc` file (depending on your shell) to set the variable automatically when you open a new terminal session. + +After setting the `SSH_AUTH_SOCK` variable, you can use the `ssh` command as usual, and it will use the custom agent to provide the keys and passphrases: + +```bash +ssh hello@example.com +``` + +However, this custom agent does not handle the entire authentication process. It only provides the private keys and passphrases for key-based authentication. If you want to avoid entering a password for the remote user, you should set up public key authentication. To do this, follow these steps: + +1. If you don't already have an SSH key pair, generate one: + +```bash +ssh-keygen +``` + +2. Copy your public key to the remote server: + +```bash +ssh-copy-id hello@example.com +``` + +You'll need to enter the password for the remote user when running this command, but once the public key is installed on the server, you should be able to log in without a password using key-based authentication. + +Note that the custom SSH identity agent is not necessary for using public key authentication. The default SSH agent should be sufficient for most use cases. The custom agent may be useful if you want to manage your private keys and passphrases in a specific way or if you need to work with keys that are not supported by the default agent. + + +#### You: +I got this error + +paramiko.ssh_exception.SSHException: 'utf-8' codec can't decode byte 0x98 in position 0: invalid start byte + + +#### ChatGPT: +It seems that the error is related to the `utf-8` encoding/decoding when reading the key file. This might be caused by an incorrect passphrase or an improperly formatted key file. + +To handle this issue, let's modify the script to catch exceptions when loading the keys and show an error message if a key fails to load: + +```python +import os +import socket +import sys +import threading +import struct +from paramiko import Agent, RSAKey, DSSKey, ECDSAKey, SSHException + +# ... (rest of the script remains unchanged) + +class CustomSSHAgent(Agent): + def __init__(self, keys): + self.keys = [] + for key in keys: + try: + if key["type"].lower() == "rsa": + self.keys.append(RSAKey.from_private_key_file(key["path"], password=key["passphrase"])) + elif key["type"].lower() == "dsa": + self.keys.append(DSSKey.from_private_key_file(key["path"], password=key["passphrase"])) + elif key["type"].lower() == "ecdsa": + self.keys.append(ECDSAKey.from_private_key_file(key["path"], password=key["passphrase"])) + except SSHException as e: + print(f"Error loading key from {key['path']}: {e}") + +# ... (rest of the script remains unchanged) +``` + +Now, if there's an issue with a key, the script will print an error message and continue to load the other keys. Make sure that your key file paths and passphrases in the `KEYS` list are correct, and check the key files' content to ensure they're properly formatted. + +If you still encounter issues, you may need to further investigate the key files and passphrases. Ensure that the keys are in the correct format (PEM) and that the passphrases are entered correctly. + + +#### You: +only load key type ed25519 + +#### ChatGPT: +To load only Ed25519 keys, you can modify the `CustomSSHAgent` class in the script to load only the specified key type. First, ensure you have the `cryptography` library installed, as it is required for Ed25519 support in Paramiko: + +```bash +pip install cryptography +``` + +Now, modify the `CustomSSHAgent` class in the script to import the `Ed25519Key` class and load only Ed25519 keys: + +```python +import os +import socket +import sys +import threading +import struct +from paramiko import Agent, Ed25519Key, SSHException + +# ... (rest of the script remains unchanged) + +class CustomSSHAgent(Agent): + def __init__(self, keys): + self.keys = [] + for key in keys: + try: + if key["type"].lower() == "ed25519": + self.keys.append(Ed25519Key.from_private_key_file(key["path"], password=key["passphrase"])) + except SSHException as e: + print(f"Error loading key from {key['path']}: {e}") + +# ... (rest of the script remains unchanged) +``` + +Update the `KEYS` list with the path and passphrase for your Ed25519 key: + +```python +KEYS = [ + { + "type": "ed25519", + "path": "path/to/ed25519_key", + "passphrase": "your_ed25519_passphrase" + } +] +``` + +Replace the `path/to/ed25519_key` with the path to your Ed25519 private key file, and `your_ed25519_passphrase` with the passphrase for your Ed25519 key, if it has one. + +Now, the script will only load Ed25519 keys. + + +#### You: +I got this error: + +'CustomSSHAgent' object has no attribute 'handle_message'. Did you mean: '_send_message'? + +#### ChatGPT: +Apologies for the confusion. It seems I misunderstood the original request. The code I provided earlier creates a custom SSH agent using Paramiko, but it does not implement the agent protocol over a UNIX socket. Here's an updated version of the script that correctly implements the agent protocol and listens on a UNIX socket: + +```python +import os +import socket +import sys +import threading +import struct +from paramiko import Agent, Ed25519Key, Message, SSHException + +SOCKET_PATH = "/tmp/ssh_agent.sock" +KEYS = [ + { + "type": "ed25519", + "path": "path/to/ed25519_key", + "passphrase": "your_ed25519_passphrase" + } +] + +class CustomSSHAgent(Agent): + def __init__(self, keys): + self.keys = [] + for key in keys: + try: + if key["type"].lower() == "ed25519": + self.keys.append(Ed25519Key.from_private_key_file(key["path"], password=key["passphrase"])) + except SSHException as e: + print(f"Error loading key from {key['path']}: {e}") + + def handle_message(self, msg): + m = Message(msg) + response = None + + cmd = m.get_byte() + if cmd == Agent.AGENTC_REQUEST_IDENTITIES: + m = Message() + m.add_byte(Agent.AGENT_IDENTITIES_ANSWER) + m.add_int(len(self.keys)) + for key in self.keys: + m.add_string(key.asbytes()) + m.add_string(f"{key.get_name()} key loaded from {key.filename}") + response = m + elif cmd == Agent.AGENTC_SIGN_REQUEST: + pubkey = m.get_string() + for key in self.keys: + if key.asbytes() == pubkey: + break + else: + return None + + data = m.get_string() + flags = m.get_int() + sig = key.sign_ssh_data(data) + m = Message() + m.add_byte(Agent.AGENT_SIGN_RESPONSE) + m.add_string(sig.asbytes()) + response = m + + return response.to_bytes() if response else None + +def handle_client(agent, conn, addr): + try: + while True: + msg = conn.recv(4) + if len(msg) == 0: + break + msg_len = struct.unpack(">I", msg)[0] + msg = conn.recv(msg_len) + response = agent.handle_message(msg) + if response: + conn.send(struct.pack(">I", len(response)) + response) + finally: + conn.close() + +def main(): + if os.path.exists(SOCKET_PATH): + os.remove(SOCKET_PATH) + + agent = CustomSSHAgent(KEYS) + + server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + server.bind(SOCKET_PATH) + os.chmod(SOCKET_PATH, 0o600) + server.listen(5) + + while True: + conn, addr = server.accept() + thread = threading.Thread(target=handle_client, args=(agent, conn, addr)) + thread.start() + finally: + server.close() + os.remove(SOCKET_PATH) + +if __name__ == "__main__": + main() +``` + +This version of the script should work as expected. Make sure to replace the path and passphrase in the `KEYS` list with your Ed25519 key information. + +After running the script and setting the `SSH_AUTH_SOCK` environment variable as previously explained, you should be able to use the custom SSH agent with the `ssh` command without any issues. + + +#### You: +I got this error: + +import-im6.q16: attempt to perform an operation not allowed by the security policy `PS' @ error/constitute.c/IsCoderAuthorized/421. + +#### ChatGPT: +The error you encountered seems to be unrelated to the Python script itself. The error message is related to the ImageMagick library (`import-im6.q16`), which is not used in the script I provided. + +It appears that there might be an issue with your system's configuration, causing the `import` command, which is usually aliased to the ImageMagick command, to be executed instead of the Python interpreter. + +To resolve this issue, try to run the Python script using the `python` command explicitly: + +```bash +python ssh_identity_agent.py +``` + +If you are already using the `python` command, it's possible that there's an issue with the Python environment on your system. Make sure you have Python properly installed and that your `$PATH` variable is correctly set up. + +Additionally, make sure that the script file you're executing is the correct one and that there are no other files with the same name that might cause a conflict when running the script. + + +#### You: +That fixed it, but now I got this error: + + if cmd == Agent.AGENTC_REQUEST_IDENTITIES: + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +AttributeError: type object 'Agent' has no attribute 'AGENTC_REQUEST_IDENTITIES' + + +#### ChatGPT: +I apologize for the confusion. The `Agent` class from Paramiko does not have the attribute `AGENTC_REQUEST_IDENTITIES`. We should define the necessary constants ourselves. Here's the updated `CustomSSHAgent` class with the correct constants: + +```python +class CustomSSHAgent(Agent): + AGENTC_REQUEST_IDENTITIES = 11 + AGENTC_SIGN_REQUEST = 13 + AGENT_IDENTITIES_ANSWER = 12 + AGENT_SIGN_RESPONSE = 14 + + def __init__(self, keys): + self.keys = [] + for key in keys: + try: + if key["type"].lower() == "ed25519": + self.keys.append(Ed25519Key.from_private_key_file(key["path"], password=key["passphrase"])) + except SSHException as e: + print(f"Error loading key from {key['path']}: {e}") + + def handle_message(self, msg): + m = Message(msg) + response = None + + cmd = m.get_byte() + if cmd == self.AGENTC_REQUEST_IDENTITIES: + m = Message() + m.add_byte(self.AGENT_IDENTITIES_ANSWER) + m.add_int(len(self.keys)) + for key in self.keys: + m.add_string(key.asbytes()) + m.add_string(f"{key.get_name()} key loaded from {key.filename}") + response = m + elif cmd == self.AGENTC_SIGN_REQUEST: + pubkey = m.get_string() + for key in self.keys: + if key.asbytes() == pubkey: + break + else: + return None + + data = m.get_string() + flags = m.get_int() + sig = key.sign_ssh_data(data) + m = Message() + m.add_byte(self.AGENT_SIGN_RESPONSE) + m.add_string(sig.asbytes()) + response = m + + return response.to_bytes() if response else None +``` + +Replace the `CustomSSHAgent` class in the script with this updated version. The rest of the script remains the same. With this change, the script should work as expected. Make sure to set the `SSH_AUTH_SOCK` environment variable, and you should be able to use the custom SSH agent with the `ssh` command without any issues. + + +#### You: +it looks like cmd = m.get_byte() receives b'\x1b' and then the agent hangs + +#### ChatGPT: +I apologize for the confusion, and thank you for pointing out the issue. It seems that the script is receiving an unexpected byte. To handle such cases, we can update the `handle_message` method to return an error response when an unknown command is received. This will prevent the agent from hanging indefinitely. Here's the updated `handle_message` method: + +```python +def handle_message(self, msg): + m = Message(msg) + response = None + + cmd = m.get_byte() + if cmd == self.AGENTC_REQUEST_IDENTITIES: + m = Message() + m.add_byte(self.AGENT_IDENTITIES_ANSWER) + m.add_int(len(self.keys)) + for key in self.keys: + m.add_string(key.asbytes()) + m.add_string(f"{key.get_name()} key loaded from {key.filename}") + response = m + elif cmd == self.AGENTC_SIGN_REQUEST: + pubkey = m.get_string() + for key in self.keys: + if key.asbytes() == pubkey: + break + else: + return None + + data = m.get_string() + flags = m.get_int() + sig = key.sign_ssh_data(data) + m = Message() + m.add_byte(self.AGENT_SIGN_RESPONSE) + m.add_string(sig.asbytes()) + response = m + else: + print(f"Unknown command received: {cmd}") + return None + + return response.to_bytes() if response else None +``` + +Replace the existing `handle_message` method in the `CustomSSHAgent` class with this updated version. With this change, the script should handle unexpected commands more gracefully and log them as "Unknown command received." This should prevent the agent from hanging when an unexpected command is received. + + +#### You: +use short circuit returns instead of setting a single "response" variable and waiting until the end to return + +#### ChatGPT: +Certainly! Here's the updated `handle_message` method that uses short-circuit returns: + +```python +def handle_message(self, msg): + m = Message(msg) + cmd = m.get_byte() + + if cmd == self.AGENTC_REQUEST_IDENTITIES: + m = Message() + m.add_byte(self.AGENT_IDENTITIES_ANSWER) + m.add_int(len(self.keys)) + for key in self.keys: + m.add_string(key.asbytes()) + m.add_string(f"{key.get_name()} key loaded from {key.filename}") + return m.to_bytes() + + elif cmd == self.AGENTC_SIGN_REQUEST: + pubkey = m.get_string() + for key in self.keys: + if key.asbytes() == pubkey: + break + else: + return None + + data = m.get_string() + flags = m.get_int() + sig = key.sign_ssh_data(data) + m = Message() + m.add_byte(self.AGENT_SIGN_RESPONSE) + m.add_string(sig.asbytes()) + return m.to_bytes() + + else: + print(f"Unknown command received: {cmd}") + return None +``` + +This version of the `handle_message` method uses short-circuit returns, eliminating the need for the `response` variable. Replace the existing `handle_message` method in the `CustomSSHAgent` class with this updated version. The rest of the script remains the same. + + +#### You: +what command starts with the byte b'\x1b'? why can't this handle that? + +#### ChatGPT: +The byte `b'\x1b'` represents the ASCII escape character (ESC). It is not a valid SSH agent command according to the SSH agent protocol. The SSH agent protocol expects specific commands, such as request identities or sign request, to communicate with the agent. + +It's likely that the received escape character is not an actual command but an issue with some other program using the agent or the agent socket. It could also be due to the terminal or shell interpreting the character as an escape sequence. The script we provided is designed to handle standard SSH agent protocol commands. + +By updating the `handle_message` method in my previous response to print a message about the "Unknown command received," the script can log when an unexpected command is received, which can help with debugging. However, it's essential to investigate why the escape character is being sent to the agent in the first place. + +To further debug this issue, you might want to examine the other programs that might be using the agent or agent socket, or check your terminal/shell configuration to see if there's any setting that might cause this behavior. + + +#### You: +can handle_client tell the client that the message wasn't processed if the response is None so that the client doesn't hang waiting for a response? + +#### ChatGPT: +Yes, you can modify the `handle_client` function to send a failure response when the response from `handle_message` is `None`. One way to do this is to send a message with a custom failure code. Here's the updated `handle_client` function: + +```python +def handle_client(agent, conn, addr): + try: + while True: + msg = conn.recv(4) + if len(msg) == 0: + break + msg_len = struct.unpack(">I", msg)[0] + msg = conn.recv(msg_len) + response = agent.handle_message(msg) + if response is None: + failure_response = Message() + failure_response.add_byte(255) # Custom failure code + response = failure_response.to_bytes() + + conn.send(struct.pack(">I", len(response)) + response) + finally: + conn.close() +``` + +In this updated version of the `handle_client` function, if `response` is `None`, we create a new `Message` object with a custom failure code (255), convert it to bytes, and send it back to the client. This should inform the client that the message was not processed, and it should not hang waiting for a response. + + +#### You: +'failure_response.add_byte(255)' caused this error: TypeError: a bytes-like object is required, not 'int' + + +#### ChatGPT: +My apologies for the oversight. In Paramiko's `Message` class, the `add_byte` method expects a bytes-like object rather than an int. To resolve this issue, we can simply convert the int to a bytes object using the `bytes()` function. Here's the corrected version of that part: + +```python +if response is None: + failure_response = Message() + failure_response.add_byte(bytes([255])) # Custom failure code, converted to bytes + response = failure_response.to_bytes() +``` + +Replace the relevant part in the `handle_client` function with the corrected version. This should resolve the error, and the script should now be able to send a failure response when necessary. + + +#### You: + response = failure_response.to_bytes() + ^^^^^^^^^^^^^^^^^^^^^^^^^ +AttributeError: 'Message' object has no attribute 'to_bytes'. Did you mean: 'add_bytes'? + + +#### ChatGPT: +Apologies for the mistake. The `Message` object indeed doesn't have a `to_bytes` method. Instead, you can use the `asbytes` method. Here's the corrected part of the `handle_client` function: + +```python +if response is None: + failure_response = Message() + failure_response.add_byte(bytes([255])) # Custom failure code, converted to bytes + response = failure_response.asbytes() +``` + +Replace the relevant part in the `handle_client` function with this corrected version. This should resolve the error, and the script should be able to send a failure response when necessary. + + +#### You: + AGENTC_REQUEST_IDENTITIES = 11 + AGENTC_SIGN_REQUEST = 13 + AGENT_IDENTITIES_ANSWER = 12 + AGENT_SIGN_RESPONSE = 14 + +These are ints and the code wants bytes. + +#### ChatGPT: +You're right. The constants need to be in bytes format to match the expected input for the `Message` object. Here's the updated version of the constants in the `CustomSSHAgent` class: + +```python +class CustomSSHAgent(Agent): + AGENTC_REQUEST_IDENTITIES = bytes([11]) + AGENTC_SIGN_REQUEST = bytes([13]) + AGENT_IDENTITIES_ANSWER = bytes([12]) + AGENT_SIGN_RESPONSE = bytes([14]) + ... +``` + +Replace the existing constants in the `CustomSSHAgent` class with these updated versions. With these changes, the constants will be in the correct format, and the script should work as expected. + + +#### You: +Okay that works now. Now instead of providing the passphrase in the KEYS object directly use a unique key called "pass_key" that is a string. That pass_key should be used to execute a subprocess and get the passphrase. The command should be 'pass '. The passphrase will be printed to stdin. + +#### ChatGPT: +To achieve this, you can modify the `CustomSSHAgent` class to execute the `pass ` command using the `subprocess` module. Here's the updated version of the `CustomSSHAgent` class: + +```python +import subprocess + +class CustomSSHAgent(Agent): + AGENTC_REQUEST_IDENTITIES = bytes([11]) + AGENTC_SIGN_REQUEST = bytes([13]) + AGENT_IDENTITIES_ANSWER = bytes([12]) + AGENT_SIGN_RESPONSE = bytes([14]) + + def __init__(self, keys): + self.keys = [] + for key in keys: + try: + if key["type"].lower() == "ed25519": + passphrase = self.get_passphrase(key["pass_key"]) + self.keys.append(Ed25519Key.from_private_key_file(key["path"], password=passphrase)) + except SSHException as e: + print(f"Error loading key from {key['path']}: {e}") + + def get_passphrase(self, pass_key): + result = subprocess.run(['pass', pass_key], capture_output=True, text=True) + if result.returncode != 0: + print(f"Error retrieving passphrase for key {pass_key}: {result.stderr}") + return None + return result.stdout.strip() + + ... +``` + +In this version, a new method `get_passphrase` is added to the `CustomSSHAgent` class. It takes a `pass_key` as an argument and runs the `pass ` command using the `subprocess.run()` function. It then returns the passphrase by extracting it from the `stdout` of the command. + +Make sure to update your `KEYS` variable to include the `"pass_key"` field instead of the `"passphrase"` field: + +```python +KEYS = [ + { + "type": "ed25519", + "path": "/path/to/your/id_ed25519", + "pass_key": "your-pass-key" + } +] +``` + +Replace the `CustomSSHAgent` class in your script with the updated version, and update your `KEYS` variable accordingly. This should now work as expected, fetching the passphrase using the `pass ` command. + + +#### You: +Let me define a file path with the pass_key called store_location. pass the value as an env var to the subprocess + +#### ChatGPT: +To pass the `store_location` as an environment variable to the subprocess, you can create a copy of the current environment and add the `store_location` variable to it. Here's the updated version of the `get_passphrase` method in the `CustomSSHAgent` class: + +```python +import os + +class CustomSSHAgent(Agent): + ... + + def get_passphrase(self, pass_key, store_location): + env = os.environ.copy() + env['STORE_LOCATION'] = store_location + + result = subprocess.run(['pass', pass_key], capture_output=True, text=True, env=env) + if result.returncode != 0: + print(f"Error retrieving passphrase for key {pass_key}: {result.stderr}") + return None + return result.stdout.strip() +``` + +In this version, the `get_passphrase` method takes an additional `store_location` argument. It creates a copy of the current environment using `os.environ.copy()`, adds the `store_location` variable to it, and passes the updated environment to the `subprocess.run()` function using the `env` parameter. + +Now you'll need to update your `KEYS` variable to include the `"store_location"` field: + +```python +KEYS = [ + { + "type": "ed25519", + "path": "/path/to/your/id_ed25519", + "pass_key": "your-pass-key", + "store_location": "/path/to/your/store" + } +] +``` + +And update the `__init__` method of the `CustomSSHAgent` class to pass the `store_location` to the `get_passphrase` method: + +```python +def __init__(self, keys): + self.keys = [] + for key in keys: + try: + if key["type"].lower() == "ed25519": + passphrase = self.get_passphrase(key["pass_key"], key["store_location"]) + self.keys.append(Ed25519Key.from_private_key_file(key["path"], password=passphrase)) + except SSHException as e: + print(f"Error loading key from {key['path']}: {e}") +``` + +Replace the relevant parts in your script with the updated versions. This should now pass the `store_location` as an environment variable to the subprocess, and your script should work as expected. + + +#### You: +instead of providing an ssh key path in the KEYS object let me provide key_path. Use the value of key_path to call another subprocess to get the raw ssh key value. The subprocess should execute 'pass ' and the private ssh key will be printed to stdout to be loaded + +#### ChatGPT: +To fetch the private SSH key using the `pass ` command, you can add a new method `get_private_key` in the `CustomSSHAgent` class to run the subprocess and return the private key. Then, you can use the `from_private_key` method to load the key from the string. Here's the updated version of the `CustomSSHAgent` class: + +```python +import io + +class CustomSSHAgent(Agent): + ... + + def __init__(self, keys): + self.keys = [] + for key in keys: + try: + if key["type"].lower() == "ed25519": + passphrase = self.get_passphrase(key["pass_key"], key["store_location"]) + private_key_str = self.get_private_key(key["key_path"]) + private_key_file = io.StringIO(private_key_str) + self.keys.append(Ed25519Key.from_private_key(private_key_file, password=passphrase)) + except SSHException as e: + print(f"Error loading key from {key['key_path']}: {e}") + + ... + + def get_private_key(self, key_path): + result = subprocess.run(['pass', key_path], capture_output=True, text=True) + if result.returncode != 0: + print(f"Error retrieving private key for key_path {key_path}: {result.stderr}") + return None + return result.stdout.strip() +``` + +In this version, the `get_private_key` method takes a `key_path` as an argument and runs the `pass ` command using the `subprocess.run()` function. It then returns the private key by extracting it from the `stdout` of the command. + +Now you'll need to update your `KEYS` variable to include the `"key_path"` field instead of the `"path"` field: + +```python +KEYS = [ + { + "type": "ed25519", + "key_path": "your-key-path", + "pass_key": "your-pass-key", + "store_location": "/path/to/your/store" + } +] +``` + +Replace the relevant parts in your script with the updated versions. This should now fetch the private SSH key using the `pass ` command, and your script should work as expected. + + +#### You: +Chante the STORE_LOCATION env var to PASSWORD_STORE_DIR and print the full python file now. + +#### ChatGPT: +Here's the full Python script with the changes to use the `PASSWORD_STORE_DIR` environment variable: + +```python +import os +import socket +import struct +import sys +from paramiko.message import Message +from paramiko.agent import Agent +from paramiko.ed25519key import Ed25519Key +from paramiko.ssh_exception import SSHException +import subprocess +import io + +class CustomSSHAgent(Agent): + AGENTC_REQUEST_IDENTITIES = bytes([11]) + AGENTC_SIGN_REQUEST = bytes([13]) + AGENT_IDENTITIES_ANSWER = bytes([12]) + AGENT_SIGN_RESPONSE = bytes([14]) + + def __init__(self, keys): + self.keys = [] + for key in keys: + try: + if key["type"].lower() == "ed25519": + passphrase = self.get_passphrase(key["pass_key"], key["store_location"]) + private_key_str = self.get_private_key(key["key_path"]) + private_key_file = io.StringIO(private_key_str) + self.keys.append(Ed25519Key.from_private_key(private_key_file, password=passphrase)) + except SSHException as e: + print(f"Error loading key from {key['key_path']}: {e}") + + def get_passphrase(self, pass_key, store_location): + env = os.environ.copy() + env['PASSWORD_STORE_DIR'] = store_location + + result = subprocess.run(['pass', pass_key], capture_output=True, text=True, env=env) + if result.returncode != 0: + print(f"Error retrieving passphrase for key {pass_key}: {result.stderr}") + return None + return result.stdout.strip() + + def get_private_key(self, key_path): + result = subprocess.run(['pass', key_path], capture_output=True, text=True) + if result.returncode != 0: + print(f"Error retrieving private key for key_path {key_path}: {result.stderr}") + return None + return result.stdout.strip() + + def handle_message(self, msg): + m = Message(msg) + cmd = m.get_byte() + + if cmd == CustomSSHAgent.AGENTC_REQUEST_IDENTITIES: + resp = Message() + resp.add_byte(CustomSSHAgent.AGENT_IDENTITIES_ANSWER) + resp.add_int(len(self.keys)) + for key in self.keys: + resp.add_string(key.asbytes()) + resp.add_string(f"{key.get_name()} key loaded from {key}") + return resp + + if cmd == CustomSSHAgent.AGENTC_SIGN_REQUEST: + key_blob = m.get_string() + data = m.get_string() + flags = m.get_int() + + resp = Message() + resp.add_byte(CustomSSHAgent.AGENT_SIGN_RESPONSE) + for key in self.keys: + if key_blob == key.asbytes(): + sig = key.sign_ssh_data(data) + resp.add_string(sig) + return resp + return None + + print("Unknown command received:", cmd) + return None + +def handle_client(agent, conn, addr): + try: + while True: + msg = conn.recv(4) + if len(msg) == 0: + break + msg_len = struct.unpack(">I", msg)[0] + msg = conn.recv(msg_len) + response = agent.handle_message(msg) + if response is None: + failure_response = Message() + failure_response.add_byte(bytes([255])) # Custom failure code, converted to bytes + response = failure_response.asbytes() + + conn.send(struct.pack(">I", len(response)) + response) + finally: + conn.close() + +KEYS = [ + { + "type": "ed25519", + "key_path": "your-key-path +``` + + +#### You: +The last part of the message after 'def handle_client' was cut short. What's the rest of it? + +#### ChatGPT: +Apologies for the truncation. Here's the full script starting from the `handle_client` function: + +```python +def handle_client(agent, conn, addr): + try: + while True: + msg = conn.recv(4) + if len(msg) == 0: + break + msg_len = struct.unpack(">I", msg)[0] + msg = conn.recv(msg_len) + response = agent.handle_message(msg) + if response is None: + failure_response = Message() + failure_response.add_byte(bytes([255])) # Custom failure code, converted to bytes + response = failure_response.asbytes() + + conn.send(struct.pack(">I", len(response)) + response) + finally: + conn.close() + +KEYS = [ + { + "type": "ed25519", + "key_path": "your-key-path", + "pass_key": "your-pass-key", + "store_location": "/path/to/your/store" + } +] + +if __name__ == "__main__": + agent = CustomSSHAgent(KEYS) + + if len(sys.argv) != 2: + print(f"Usage: {sys.argv[0]} ") + sys.exit(1) + + socket_path = sys.argv[1] + + if os.path.exists(socket_path): + os.unlink(socket_path) + + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.bind(socket_path) + os.chmod(socket_path, 0o600) + + try: + sock.listen(1) + + while True: + conn, addr = sock.accept() + handle_client(agent, conn, addr) + finally: + sock.close() + os.unlink(socket_path) +``` + +This should complete the Python script, and you can use it as an SSH identity agent over a Unix socket. + + +#### You: + conn.send(struct.pack(">I", len(response)) + response) +produces this error: + +TypeError: object of type 'Message' has no len() + + +#### ChatGPT: +Apologies for the oversight. You need to convert the `response` message to bytes before sending it. To do this, simply call the `asbytes()` method of the `response` object. Here's the corrected line in the `handle_client` function: + +```python +conn.send(struct.pack(">I", len(response.asbytes())) + response.asbytes()) +``` + +Replace the existing line in the `handle_client` function with the corrected one, and the script should work as expected without any errors. + + +#### You: +move the failure_response out of handle_client and into agent.handle_message so that agent.handle_message is responsible even for failure cases + +#### ChatGPT: +You can modify the `handle_message` method in the `CustomSSHAgent` class to return the failure response directly. Here's the updated version of the `handle_message` method: + +```python +def handle_message(self, msg): + m = Message(msg) + cmd = m.get_byte() + + if cmd == CustomSSHAgent.AGENTC_REQUEST_IDENTITIES: + resp = Message() + resp.add_byte(CustomSSHAgent.AGENT_IDENTITIES_ANSWER) + resp.add_int(len(self.keys)) + for key in self.keys: + resp.add_string(key.asbytes()) + resp.add_string(f"{key.get_name()} key loaded from {key}") + return resp + + if cmd == CustomSSHAgent.AGENTC_SIGN_REQUEST: + key_blob = m.get_string() + data = m.get_string() + flags = m.get_int() + + resp = Message() + resp.add_byte(CustomSSHAgent.AGENT_SIGN_RESPONSE) + for key in self.keys: + if key_blob == key.asbytes(): + sig = key.sign_ssh_data(data) + resp.add_string(sig) + return resp + + failure_response = Message() + failure_response.add_byte(bytes([255])) # Custom failure code, converted to bytes + return failure_response.asbytes() +``` + +And here's the updated `handle_client` function: + +```python +def handle_client(agent, conn, addr): + try: + while True: + msg = conn.recv(4) + if len(msg) == 0: + break + msg_len = struct.unpack(">I", msg)[0] + msg = conn.recv(msg_len) + response = agent.handle_message(msg) + conn.send(struct.pack(">I", len(response)) + response) + finally: + conn.close() +``` + +With these changes, the `handle_message` method in the `CustomSSHAgent` class is responsible for generating the failure response. Replace the relevant parts in your script with the updated versions, and the script should work as expected. + + +#### You: +handle_message inconsistently either returns bytes or the whole response object + +#### ChatGPT: +You're right. The `handle_message` method should consistently return the response object as bytes. Let's modify the method to return the `resp` object as bytes when the message is handled. Here's the updated version of the `handle_message` method: + +```python +def handle_message(self, msg): + m = Message(msg) + cmd = m.get_byte() + + if cmd == CustomSSHAgent.AGENTC_REQUEST_IDENTITIES: + resp = Message() + resp.add_byte(CustomSSHAgent.AGENT_IDENTITIES_ANSWER) + resp.add_int(len(self.keys)) + for key in self.keys: + resp.add_string(key.asbytes()) + resp.add_string(f"{key.get_name()} key loaded from {key}") + return resp.asbytes() + + if cmd == CustomSSHAgent.AGENTC_SIGN_REQUEST: + key_blob = m.get_string() + data = m.get_string() + flags = m.get_int() + + resp = Message() + resp.add_byte(CustomSSHAgent.AGENT_SIGN_RESPONSE) + for key in self.keys: + if key_blob == key.asbytes(): + sig = key.sign_ssh_data(data) + resp.add_string(sig) + return resp.asbytes() + + failure_response = Message() + failure_response.add_byte(bytes([255])) # Custom failure code, converted to bytes + return failure_response.asbytes() +``` + +Now, the `handle_message` method consistently returns bytes. Replace the relevant parts in your script with the updated version, and the script should work as expected without any inconsistencies. diff --git a/chatgpt/FullConversation.pdf b/chatgpt/FullConversation.pdf new file mode 100644 index 0000000..55df99f Binary files /dev/null and b/chatgpt/FullConversation.pdf differ diff --git a/systemd/ssh-pass-agent.service b/systemd/ssh-pass-agent.service new file mode 100644 index 0000000..dd3effa --- /dev/null +++ b/systemd/ssh-pass-agent.service @@ -0,0 +1,11 @@ +[Unit] +Description=SSH Agent that uses pass as a backing store +Requires=ssh-pass-agent.socket + +[Service] +Type=simple +ExecStart=/usr/bin/python3 %h/.config/ssh-pass-agent.py %t/ssh-pass-agent.sock +Restart=on-failure + +[Install] +Also=ssh-pass-agent.socket diff --git a/systemd/ssh-pass-agent.socket b/systemd/ssh-pass-agent.socket new file mode 100644 index 0000000..e3e7555 --- /dev/null +++ b/systemd/ssh-pass-agent.socket @@ -0,0 +1,9 @@ +[Unit] +Description=SSH Agent Socket for backing pass store + +[Socket] +ListenStream=%t/ssh-pass-agent.sock +RemoveOnStop=yes + +[Install] +WantedBy=sockets.target