Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 130 additions & 98 deletions appium/webdriver/appium_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import subprocess as sp
import sys
import time
from typing import Any, List, Optional, Union
from typing import Any, List, Optional, Set

import urllib3
from selenium.webdriver.remote.remote_connection import urllib3

DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 4723
Expand All @@ -29,6 +29,14 @@
DEFAULT_BASE_PATH = '/'


class AppiumServiceError(RuntimeError):
pass


class AppiumStartupError(RuntimeError):
pass


def find_executable(executable: str) -> Optional[str]:
path = os.environ['PATH']
paths = path.split(os.pathsep)
Expand All @@ -47,97 +55,110 @@ def find_executable(executable: str) -> Optional[str]:
return None


def poll_url(host: str, port: int, path: str, timeout_ms: int) -> bool:
time_started_sec = time.time()
conn = urllib3.PoolManager(timeout=1.0)
while time.time() < time_started_sec + timeout_ms / 1000.0:
def get_node() -> str:
result = find_executable('node')
if result is None:
raise AppiumServiceError(
'NodeJS main executable cannot be found. Make sure it is installed and present in PATH'
)
return result


def get_npm() -> str:
result = find_executable('npm.cmd' if sys.platform == 'win32' else 'npm')
if result is None:
raise AppiumServiceError(
'Node Package Manager executable cannot be found. Make sure it is installed and present in PATH'
)
return result


def get_main_script(node: Optional[str], npm: Optional[str]) -> str:
result: Optional[str] = None
npm_path = npm or get_npm()
for args in [['root', '-g'], ['root']]:
try:
modules_root = sp.check_output([npm_path] + args).strip().decode('utf-8')
if os.path.exists(os.path.join(modules_root, MAIN_SCRIPT_PATH)):
result = os.path.join(modules_root, MAIN_SCRIPT_PATH)
break
except sp.CalledProcessError:
continue
if result is None:
node_path = node or get_node()
try:
resp = conn.request('HEAD', f'http://{host}:{port}{path}')
if resp.status < 400:
return True
except urllib3.exceptions.HTTPError:
pass
time.sleep(1.0)
return False
result = (
sp.check_output([node_path, '-e', f'console.log(require.resolve("{MAIN_SCRIPT_PATH}"))'])
.decode('utf-8')
.strip()
)
except sp.CalledProcessError as e:
raise AppiumServiceError(e.output) from e
return result


class AppiumServiceError(RuntimeError):
pass
def parse_arg_value(args: List[str], arg_names: Set[str], default: str) -> str:
for idx, arg in enumerate(args):
if arg in arg_names and idx < len(args) - 1:
return args[idx + 1]
return default


def parse_port(args: List[str]) -> int:
return int(parse_arg_value(args, {'--port', '-p'}, str(DEFAULT_PORT)))


def parse_base_path(args: List[str]) -> str:
return parse_arg_value(args, {'--base-path', '-pa'}, DEFAULT_BASE_PATH)


def parse_host(args: List[str]) -> str:
return parse_arg_value(args, {'--address', '-a'}, DEFAULT_HOST)


def make_status_url(args: List[str]) -> str:
base_path = parse_base_path(args)
return STATUS_URL if base_path == DEFAULT_BASE_PATH else f'{re.sub(r"/+$", "", base_path)}{STATUS_URL}'


class AppiumService:
def __init__(self) -> None:
self._process: Optional[sp.Popen] = None
self._cmd: Optional[List] = None

def _get_node(self) -> str:
if not hasattr(self, '_node_executable'):
self._node_executable = find_executable('node')
if self._node_executable is None:
raise AppiumServiceError(
'NodeJS main executable cannot be found. ' + 'Make sure it is installed and present in PATH'
)
return self._node_executable

def _get_npm(self) -> str:
if not hasattr(self, '_npm_executable'):
self._npm_executable = find_executable('npm.cmd' if sys.platform == 'win32' else 'npm')
if self._npm_executable is None:
raise AppiumServiceError(
'Node Package Manager executable cannot be found. ' + 'Make sure it is installed and present in PATH'
)
return self._npm_executable

def _get_main_script(self) -> Union[str, bytes]:
if not hasattr(self, '_main_script'):
for args in [['root', '-g'], ['root']]:
try:
modules_root = sp.check_output([self._get_npm()] + args).strip().decode('utf-8')
if os.path.exists(os.path.join(modules_root, MAIN_SCRIPT_PATH)):
self._main_script: Union[str, bytes] = os.path.join(modules_root, MAIN_SCRIPT_PATH)
break
except sp.CalledProcessError:
continue
if not hasattr(self, '_main_script'):
try:
self._main_script = sp.check_output(
[self._get_node(), '-e', f'console.log(require.resolve("{MAIN_SCRIPT_PATH}"))']
).strip()
except sp.CalledProcessError as e:
raise AppiumServiceError(e.output) from e
return self._main_script

@staticmethod
def _parse_port(args: List[str]) -> int:
for idx, arg in enumerate(args or []):
if arg in ('--port', '-p') and idx < len(args) - 1:
return int(args[idx + 1])
return DEFAULT_PORT

@staticmethod
def _parse_base_path(args: List[str]) -> str:
for idx, arg in enumerate(args or []):
if arg in ('--base-path', '-pa') and idx < len(args) - 1:
return args[idx + 1]
return DEFAULT_BASE_PATH

@staticmethod
def _parse_host(args: List[str]) -> str:
for idx, arg in enumerate(args or []):
if arg in ('--address', '-a') and idx < len(args) - 1:
return args[idx + 1]
return DEFAULT_HOST
self._cmd: Optional[List[str]] = None

def _poll_status(self, host: str, port: int, path: str, timeout_ms: int) -> bool:
time_started_sec = time.time()
conn = urllib3.PoolManager(timeout=1.0)
while time.time() < time_started_sec + timeout_ms / 1000.0:
if not self.is_running:
raise AppiumStartupError()
# noinspection PyUnresolvedReferences
try:
resp = conn.request('HEAD', f'http://{host}:{port}{path}')
if resp.status < 400:
return True
except urllib3.exceptions.HTTPError:
pass
time.sleep(1.0)
return False

def start(self, **kwargs: Any) -> sp.Popen:
"""Starts Appium service with given arguments.

If you use the service to start Appium 1.x
then consider providing ['--base-path', '/wd/hub'] arguments. By default,
the service assumes Appium server listens on '/' path, which is the default path
for Appium 2.

The service will be forcefully restarted if it is already running.

Keyword Args:
env (dict): Environment variables mapping. The default system environment,
which is inherited from the parent process, is assigned by default.
node (str): The full path to the main NodeJS executable. The service will try
to retrieve it automatically by default.
to retrieve it automatically if not provided.
npm (str): The full path to the Node Package Manager (npm) script. The service will try
to retrieve it automatically if not provided.
stdout (int): Check the documentation for subprocess.Popen for more details.
The default value is subprocess.DEVNULL on Windows and subprocess.PIPE on other platforms.
stderr (int): Check the documentation for subprocess.Popen for more details.
Expand All @@ -146,7 +167,7 @@ def start(self, **kwargs: Any) -> sp.Popen:
for HTTP connections. If set to zero or a negative number then no wait will be applied.
60000 ms by default.
main_script (str): The full path to the main Appium executable
(usually located at build/lib/main.js). If this is not set
(usually located at build/lib/main.js). If not set
then the service tries to detect the path automatically.
args (str): List of Appium arguments (all must be strings). Check
https://appium.io/docs/en/writing-running-appium/server-args/ for more details
Expand All @@ -160,27 +181,38 @@ def start(self, **kwargs: Any) -> sp.Popen:
self.stop()

env = kwargs['env'] if 'env' in kwargs else None
node = kwargs['node'] if 'node' in kwargs else self._get_node()
node: str = kwargs.get('node') or get_node()
npm: str = kwargs.get('npm') or get_npm()
main_script: str = kwargs.get('main_script') or get_main_script(node, npm)
# A workaround for https://github.com/appium/python-client/issues/534
default_std = sp.DEVNULL if sys.platform == 'win32' else sp.PIPE
stdout = kwargs['stdout'] if 'stdout' in kwargs else default_std
stderr = kwargs['stderr'] if 'stderr' in kwargs else default_std
timeout_ms = int(kwargs['timeout_ms']) if 'timeout_ms' in kwargs else STARTUP_TIMEOUT_MS
main_script = kwargs['main_script'] if 'main_script' in kwargs else self._get_main_script()
args = [node, main_script]
args: List[str] = [node, main_script]
if 'args' in kwargs:
args.extend(kwargs['args'])
self._cmd = args
self._process = sp.Popen(args=args, stdout=stdout, stderr=stderr, env=env)
host = self._parse_host(args)
port = self._parse_port(args)
error_msg: Optional[str] = None
base_path = self._parse_base_path(args)
status_url_path = (
STATUS_URL if base_path == DEFAULT_BASE_PATH else f'{re.sub(r"[/]+$", "", base_path)}{STATUS_URL}'
startup_failure_msg = (
'Appium server process is unable to start. Make sure proper values have been '
f'provided to \'node\' ({node}), \'npm\' ({npm}) and \'main_script\' ({main_script}) '
f'method arguments.'
)
if not self.is_running or (timeout_ms > 0 and not poll_url(host, port, status_url_path, timeout_ms)):
error_msg = f'Appium has failed to start on {host}:{port} within {timeout_ms}ms timeout'
if timeout_ms > 0:
status_url_path = make_status_url(args)
try:
if not self._poll_status(parse_host(args), parse_port(args), status_url_path, timeout_ms):
error_msg = (
f'Appium server has started but is not listening on {status_url_path} '
f'within {timeout_ms}ms timeout. Make sure proper values have been provided '
f'to --base-path, --address and --port process arguments.'
)
except AppiumStartupError:
error_msg = startup_failure_msg
elif not self.is_running:
error_msg = startup_failure_msg
if error_msg is not None:
if stderr == sp.PIPE and self._process.stderr is not None:
err_output = self._process.stderr.read()
Expand All @@ -201,7 +233,8 @@ def stop(self) -> bool:
"""
is_terminated = False
if self.is_running:
self._process.terminate() # type: ignore
assert self._process
self._process.terminate()
is_terminated = True
self._process = None
self._cmd = None
Expand All @@ -214,28 +247,27 @@ def is_running(self) -> bool:
Returns:
bool: `True` if the service is running
"""
return self._process is not None and self._process.poll() is None
return self._process is not None and self._cmd is not None and self._process.poll() is None

@property
def is_listening(self) -> bool:
"""Check if the service is listening on the given/default host/port.

The fact, that the service is running, does not always mean it is listening.
the default host/port values can be customized by providing --address/--port
command line arguments while starting the service.
The default host/port/base path values can be customized by providing
--address/--port/--base-path command line arguments while starting the service.

Returns:
bool: `True` if the service is running and listening on the given/default host/port
"""
if not self.is_running or self._cmd is None:
if not self.is_running:
return False

assert self._cmd
try:
return self._poll_status(parse_host(self._cmd), parse_port(self._cmd), make_status_url(self._cmd), 1000)
except AppiumStartupError:
return False
host = self._parse_host(self._cmd)
port = self._parse_port(self._cmd)
base_path = self._parse_base_path(self._cmd)
status_url_path = (
STATUS_URL if base_path == DEFAULT_BASE_PATH else f'{re.sub(r"[/]+$", "", base_path)}{STATUS_URL}'
)
return self.is_running and poll_url(host, port, status_url_path, 1000)


if __name__ == '__main__':
Expand Down