Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ tests/__pycache__
tmp
todo
.DS_Store
test.py
92 changes: 68 additions & 24 deletions airos/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def __init__(
self.username = username
self.password = password

self.api_version: int = 8

parsed_host = urlparse(host)
scheme = (
parsed_host.scheme
Expand All @@ -74,11 +76,13 @@ def __init__(
self.current_csrf_token: str | None = None

# Mostly 8.x API endpoints, login/status are the same in 6.x
self._login_urls = {
"default": f"{self.base_url}/api/auth",
"v6_alternative": f"{self.base_url}/login.cgi",
}
self._login_url = f"{self.base_url}/api/auth"
self._status_cgi_url = f"{self.base_url}/status.cgi"

# Presumed 6.x XM only endpoint
self._v6_xm_login_url = f"{self.base_url}/login.cgi"
self._v6_form_url = "/index.cgi"

# Presumed 8.x only endpoints
self._stakick_cgi_url = f"{self.base_url}/stakick.cgi"
self._provmode_url = f"{self.base_url}/api/provmode"
Expand All @@ -88,6 +92,8 @@ def __init__(
self._download_progress_url = f"{self.base_url}/api/fw/download-progress"
self._install_url = f"{self.base_url}/fwflash.cgi"

self._login_urls = [self._login_url, self._v6_xm_login_url]

@staticmethod
def derived_wireless_data(
derived: dict[str, Any], response: dict[str, Any]
Expand Down Expand Up @@ -204,7 +210,8 @@ def _get_authenticated_headers(
headers["X-CSRF-ID"] = self._csrf_id

if self._auth_cookie: # pragma: no cover
headers["Cookie"] = f"AIROS_{self._auth_cookie}"
# headers["Cookie"] = f"AIROS_{self._auth_cookie}"
headers["Cookie"] = self._auth_cookie

return headers

Expand All @@ -218,7 +225,8 @@ def _store_auth_data(self, response: aiohttp.ClientResponse) -> None:
cookie.load(set_cookie)
for key, morsel in cookie.items():
if key.startswith("AIROS_"):
self._auth_cookie = morsel.key[6:] + "=" + morsel.value
# self._auth_cookie = morsel.key[6:] + "=" + morsel.value
self._auth_cookie = f"{morsel.key}={morsel.value}"
break

async def _request_json(
Expand All @@ -243,7 +251,7 @@ async def _request_json(
request_headers.update(headers)

try:
if url not in self._login_urls.values() and not self.connected:
if url not in self._login_urls and not self.connected:
_LOGGER.error("Not connected, login first")
raise AirOSDeviceConnectionError from None

Expand All @@ -259,7 +267,7 @@ async def _request_json(
_LOGGER.debug("Successfully fetched JSON from %s", url)

# If this is the login request, we need to store the new auth data
if url in self._login_urls.values():
if url in self._login_urls:
self._store_auth_data(response)
self.connected = True

Expand All @@ -283,32 +291,68 @@ async def _request_json(
_LOGGER.warning("Request to %s was cancelled", url)
raise

async def _login_v6(self) -> None:
"""Login to airOS v6 (XM) devices."""
# Handle session cookie from login url
async with self.session.request(
"GET",
self._v6_xm_login_url,
allow_redirects=False,
) as response:
session_cookie = next(
(c for n, c in response.cookies.items() if n.startswith("AIROS")), None
)
if not session_cookie:
raise AirOSDeviceConnectionError("No session cookie received.")
self._auth_cookie = f"{session_cookie.key}={session_cookie.value}"

# Handle login expecting 302 redirect
payload = {
"username": self.username,
"password": self.password,
"uri": self._v6_form_url,
}
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Origin": self.base_url,
"Referer": self._v6_xm_login_url,
"Cookie": self._auth_cookie,
}
async with self.session.request(
"POST",
self._v6_xm_login_url,
data=payload,
headers=headers,
allow_redirects=False,
) as response:
if response.status != 302:
raise AirOSConnectionAuthenticationError("Login failed.")

# Activate session by accessing the form URL
headers = {"Referer": self._v6_xm_login_url, "Cookie": self._auth_cookie}
async with self.session.request(
"GET",
f"{self.base_url}{self._v6_form_url}",
headers=headers,
allow_redirects=True,
) as response:
if "login.cgi" in str(response.url):
raise AirOSConnectionAuthenticationError("Session activation failed.")
self.connected = True
self.api_version = 6

async def login(self) -> None:
"""Login to AirOS device."""
payload = {"username": self.username, "password": self.password}
try:
await self._request_json(
"POST", self._login_urls["default"], json_data=payload
)
await self._request_json("POST", self._login_url, json_data=payload)
except AirOSUrlNotFoundError:
pass # Try next URL
await self._login_v6()
except AirOSConnectionSetupError as err:
raise AirOSConnectionSetupError("Failed to login to AirOS device") from err
else:
return

try: # Alternative URL
await self._request_json(
"POST",
self._login_urls["v6_alternative"],
form_data=payload,
ct_form=True,
)
except AirOSConnectionSetupError as err:
raise AirOSConnectionSetupError(
"Failed to login to default and alternate AirOS device urls"
) from err

async def status(self) -> AirOSDataModel:
"""Retrieve status from the device."""
response = await self._request_json(
Expand Down
52 changes: 51 additions & 1 deletion tests/test_stations6.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import json
import os
from typing import Any
from unittest.mock import AsyncMock, patch
from unittest.mock import AsyncMock, MagicMock, patch

import aiofiles
import pytest
Expand Down Expand Up @@ -65,3 +65,53 @@ async def test_ap_object(
cookie = SimpleCookie()
cookie["session_id"] = "test-cookie"
cookie["AIROS_TOKEN"] = "abc123"


@pytest.mark.asyncio
async def test_login_v6_flow() -> None:
"""Test AirOS v6 XM login flow with manual cookie handling."""

# Create a mock session
session = MagicMock()

# Mock response for GET /login.cgi
get_login_response = MagicMock()
get_login_response.__aenter__.return_value = get_login_response
get_login_response.status = 200
get_login_response.cookies = {
"AIROS_ABC123": MagicMock(key="AIROS_ABC123", value="xyz789")
}

# Mock response for POST /login.cgi
post_login_response = MagicMock()
post_login_response.__aenter__.return_value = post_login_response
post_login_response.status = 302

# Mock response for GET /index.cgi
get_index_response = MagicMock()
get_index_response.__aenter__.return_value = get_index_response
get_index_response.status = 200
get_index_response.url = "http://192.168.1.3/index.cgi"

# Set side effects for session.request
session.request.side_effect = [
get_login_response,
post_login_response,
get_index_response,
]

# Create device
airos6_device = AirOS6(
host="http://192.168.1.3",
username="ubnt",
password="ubnt",
session=session,
)

await airos6_device._login_v6() # noqa: SLF001

# Assertions
assert airos6_device.connected is True
assert airos6_device.api_version == 6
assert airos6_device._auth_cookie == "AIROS_ABC123=xyz789" # noqa: SLF001
assert session.request.call_count == 3