diff --git a/chess/engine.py b/chess/engine.py index 01ee08b3e..9a7a0aa8a 100644 --- a/chess/engine.py +++ b/chess/engine.py @@ -10,14 +10,12 @@ import enum import logging import math -import warnings import shlex import subprocess import sys import threading import time import typing -import os import re import chess @@ -1305,66 +1303,47 @@ def start(self, engine: UciProtocol) -> None: engine.send_line("uci") def line_received(self, engine: UciProtocol, line: str) -> None: - if line == "uciok" and not self.result.done(): + token, remaining = _next_token(line) + if line.strip() == "uciok" and not self.result.done(): engine.initialized = True self.result.set_result(None) self.set_finished() - elif line.startswith("option "): - self._option(engine, line.split(" ", 1)[1]) - elif line.startswith("id "): - self._id(engine, line.split(" ", 1)[1]) + elif token == "option": + self._option(engine, remaining) + elif token == "id": + self._id(engine, remaining) def _option(self, engine: UciProtocol, arg: str) -> None: current_parameter = None - - name: List[str] = [] - type: List[str] = [] - default: List[str] = [] - min = None - max = None - current_var = None + option_parts: dict[str, str] = {k: "" for k in ["name", "type", "default", "min", "max"]} var = [] - for token in arg.strip().split(" "): - if token == "name" and not name: - current_parameter = "name" - elif token == "type" and not type: - current_parameter = "type" - elif token == "default" and not default: - current_parameter = "default" - elif token == "min" and min is None: - current_parameter = "min" - elif token == "max" and max is None: - current_parameter = "max" - elif token == "var": - current_parameter = "var" - if current_var is not None: - var.append(" ".join(current_var)) - current_var = [] - elif current_parameter == "name": - name.append(token) - elif current_parameter == "type": - type.append(token) - elif current_parameter == "default": - default.append(token) + parameters = list(option_parts.keys()) + ['var'] + option_regex = fr"\s*({'|'.join(parameters)})\s*" + for token in re.split(option_regex, arg.strip()): + if token == "var" or (token in option_parts and not option_parts[token]): + current_parameter = token elif current_parameter == "var": - current_var.append(token) - elif current_parameter == "min": - try: - min = int(token) - except ValueError: - LOGGER.exception("Exception parsing option min") - elif current_parameter == "max": - try: - max = int(token) - except ValueError: - LOGGER.exception("Exception parsing option max") + var.append(token) + elif current_parameter: + option_parts[current_parameter] = token - if current_var is not None: - var.append(" ".join(current_var)) + def parse_min_max_value(option_parts: dict[str, str], which: Literal["min", "max"]) -> Optional[int]: + try: + number = option_parts[which] + return int(number) if number else None + except ValueError: + LOGGER.exception(f"Exception parsing option {which}") + return None - without_default = Option(" ".join(name), " ".join(type), None, min, max, var) - option = Option(without_default.name, without_default.type, without_default.parse(" ".join(default)), min, max, var) + name = option_parts["name"] + type = option_parts["type"] + default = option_parts["default"] + min = parse_min_max_value(option_parts, "min") + max = parse_min_max_value(option_parts, "max") + + without_default = Option(name, type, None, min, max, var) + option = Option(without_default.name, without_default.type, without_default.parse(default), min, max, var) engine.options[option.name] = option if option.default is not None: @@ -1373,8 +1352,8 @@ def _option(self, engine: UciProtocol, arg: str) -> None: engine.target_config[option.name] = option.default def _id(self, engine: UciProtocol, arg: str) -> None: - key, value = arg.split(" ", 1) - engine.id[key] = value + key, value = _next_token(arg) + engine.id[key] = value.strip() return await self.communicate(UciInitializeCommand) @@ -1408,7 +1387,7 @@ def start(self, engine: UciProtocol) -> None: engine._isready() def line_received(self, engine: UciProtocol, line: str) -> None: - if line == "readyok": + if line.strip() == "readyok": self.result.set_result(None) self.set_finished() else: @@ -1589,11 +1568,12 @@ def start(self, engine: UciProtocol) -> None: self._readyok(engine) def line_received(self, engine: UciProtocol, line: str) -> None: - if line.startswith("info "): - self._info(engine, line.split(" ", 1)[1]) - elif line.startswith("bestmove "): - self._bestmove(engine, line.split(" ", 1)[1]) - elif line == "readyok" and self.sent_isready: + token, remaining = _next_token(line) + if token == "info": + self._info(engine, remaining) + elif token == "bestmove": + self._bestmove(engine, remaining) + elif line.strip() == "readyok" and self.sent_isready: self._readyok(engine) else: LOGGER.warning("%s: Unexpected engine output: %r", engine, line) @@ -1681,11 +1661,12 @@ def start(self, engine: UciProtocol) -> None: self._readyok(engine) def line_received(self, engine: UciProtocol, line: str) -> None: - if line.startswith("info "): - self._info(engine, line.split(" ", 1)[1]) - elif line.startswith("bestmove "): - self._bestmove(engine, line.split(" ", 1)[1]) - elif line == "readyok" and self.sent_isready: + token, remaining = _next_token(line) + if token == "info": + self._info(engine, remaining) + elif token == "bestmove": + self._bestmove(engine, remaining) + elif line.strip() == "readyok" and self.sent_isready: self._readyok(engine) else: LOGGER.warning("%s: Unexpected engine output: %r", engine, line) @@ -1730,39 +1711,56 @@ async def quit(self) -> None: UCI_REGEX = re.compile(r"^[a-h][1-8][a-h][1-8][pnbrqk]?|[PNBRQK]@[a-h][1-8]|0000\Z") +def _create_variation_line(root_board: chess.Board, line: str) -> tuple[list[chess.Move], str]: + board = root_board.copy(stack=False) + currline: list[chess.Move] = [] + while True: + next_move, remaining_line_after_move = _next_token(line) + if UCI_REGEX.match(next_move): + currline.append(board.push_uci(next_move)) + line = remaining_line_after_move + else: + return currline, line + + def _parse_uci_info(arg: str, root_board: chess.Board, selector: Info = INFO_ALL) -> InfoDict: info: InfoDict = {} if not selector: return info - tokens = arg.split(" ") - while tokens: - parameter = tokens.pop(0) + remaining_line = arg + while remaining_line: + parameter, remaining_line = _next_token(remaining_line) if parameter == "string": - info["string"] = " ".join(tokens) + info["string"] = remaining_line break elif parameter in ["depth", "seldepth", "nodes", "multipv", "currmovenumber", "hashfull", "nps", "tbhits", "cpuload"]: try: - info[parameter] = int(tokens.pop(0)) # type: ignore + number, remaining_line = _next_token(remaining_line) + info[parameter] = int(number) # type: ignore except (ValueError, IndexError): LOGGER.error("Exception parsing %s from info: %r", parameter, arg) elif parameter == "time": try: - info["time"] = int(tokens.pop(0)) / 1000.0 + time_ms, remaining_line = _next_token(remaining_line) + info["time"] = int(time_ms) / 1000.0 except (ValueError, IndexError): LOGGER.error("Exception parsing %s from info: %r", parameter, arg) elif parameter == "ebf": try: - info["ebf"] = float(tokens.pop(0)) + number, remaining_line = _next_token(remaining_line) + info["ebf"] = float(number) except (ValueError, IndexError): LOGGER.error("Exception parsing %s from info: %r", parameter, arg) elif parameter == "score" and selector & INFO_SCORE: try: - kind = tokens.pop(0) - value = tokens.pop(0) - if tokens and tokens[0] in ["lowerbound", "upperbound"]: - info[tokens.pop(0)] = True # type: ignore + kind, remaining_line = _next_token(remaining_line) + value, remaining_line = _next_token(remaining_line) + token, remaining_after_token = _next_token(remaining_line) + if token in ["lowerbound", "upperbound"]: + info[token] = True # type: ignore + remaining_line = remaining_after_token if kind == "cp": info["score"] = PovScore(Cp(int(value)), root_board.turn) elif kind == "mate": @@ -1773,7 +1771,8 @@ def _parse_uci_info(arg: str, root_board: chess.Board, selector: Info = INFO_ALL LOGGER.error("Exception parsing score from info: %r", arg) elif parameter == "currmove": try: - info["currmove"] = chess.Move.from_uci(tokens.pop(0)) + current_move, remaining_line = _next_token(remaining_line) + info["currmove"] = chess.Move.from_uci(current_move) except (ValueError, IndexError): LOGGER.error("Exception parsing currmove from info: %r", arg) elif parameter == "currline" and selector & INFO_CURRLINE: @@ -1781,13 +1780,10 @@ def _parse_uci_info(arg: str, root_board: chess.Board, selector: Info = INFO_ALL if "currline" not in info: info["currline"] = {} - cpunr = int(tokens.pop(0)) - currline: List[chess.Move] = [] + cpunr_text, remaining_line = _next_token(remaining_line) + cpunr = int(cpunr_text) + currline, remaining_line = _create_variation_line(root_board, remaining_line) info["currline"][cpunr] = currline - - board = root_board.copy(stack=False) - while tokens and UCI_REGEX.match(tokens[0]): - currline.append(board.push_uci(tokens.pop(0))) except (ValueError, IndexError): LOGGER.error("Exception parsing currline from info: %r, position at root: %s", arg, root_board.fen()) elif parameter == "refutation" and selector & INFO_REFUTATION: @@ -1796,27 +1792,25 @@ def _parse_uci_info(arg: str, root_board: chess.Board, selector: Info = INFO_ALL info["refutation"] = {} board = root_board.copy(stack=False) - refuted = board.push_uci(tokens.pop(0)) + refuted_text, remaining_line = _next_token(remaining_line) + refuted = board.push_uci(refuted_text) - refuted_by: List[chess.Move] = [] + refuted_by, remaining_line = _create_variation_line(board, remaining_line) info["refutation"][refuted] = refuted_by - - while tokens and UCI_REGEX.match(tokens[0]): - refuted_by.append(board.push_uci(tokens.pop(0))) except (ValueError, IndexError): LOGGER.error("Exception parsing refutation from info: %r, position at root: %s", arg, root_board.fen()) elif parameter == "pv" and selector & INFO_PV: try: - pv: List[chess.Move] = [] + pv, remaining_line = _create_variation_line(root_board, remaining_line) info["pv"] = pv - board = root_board.copy(stack=False) - while tokens and UCI_REGEX.match(tokens[0]): - pv.append(board.push_uci(tokens.pop(0))) except (ValueError, IndexError): LOGGER.error("Exception parsing pv from info: %r, position at root: %s", arg, root_board.fen()) elif parameter == "wdl": try: - info["wdl"] = PovWdl(Wdl(int(tokens.pop(0)), int(tokens.pop(0)), int(tokens.pop(0))), root_board.turn) + wins, remaining_line = _next_token(remaining_line) + draws, remaining_line = _next_token(remaining_line) + losses, remaining_line = _next_token(remaining_line) + info["wdl"] = PovWdl(Wdl(int(wins), int(draws), int(losses)), root_board.turn) except (ValueError, IndexError): LOGGER.error("Exception parsing wdl from info: %r", arg) @@ -1946,10 +1940,11 @@ def timeout(self, engine: XBoardProtocol) -> None: self.end(engine) def line_received(self, engine: XBoardProtocol, line: str) -> None: - if line.startswith("#"): + token, remaining = _next_token(line) + if token.startswith("#"): pass - elif line.startswith("feature "): - self._feature(engine, line.split(" ", 1)[1]) + elif token == "feature": + self._feature(engine, remaining) elif XBOARD_ERROR_REGEX.match(line): raise EngineError(line) @@ -2161,32 +2156,35 @@ def start(self, engine: XBoardProtocol) -> None: engine.send_line("go") def line_received(self, engine: XBoardProtocol, line: str) -> None: - if line.startswith("move "): - self._move(engine, line.split(" ", 1)[1]) - elif line.startswith("Hint: "): - self._hint(engine, line.split(" ", 1)[1]) - elif line == self.pong_after_move: - if not self.result.done(): - self.result.set_result(self.play_result) - if not ponder: + token, remaining = _next_token(line) + if token == "move": + self._move(engine, remaining.strip()) + elif token == "Hint:": + self._hint(engine, remaining.strip()) + elif token == "pong": + pong_line = f"{token} {remaining.strip()}" + if pong_line == self.pong_after_move: + if not self.result.done(): + self.result.set_result(self.play_result) + if not ponder: + self.set_finished() + elif pong_line == self.pong_after_ponder: + if not self.result.done(): + self.result.set_result(self.play_result) self.set_finished() - elif line == self.pong_after_ponder: - if not self.result.done(): - self.result.set_result(self.play_result) - self.set_finished() - elif line == "offer draw": + elif f"{token} {remaining.strip()}" == "offer draw": if not self.result.done(): self.play_result.draw_offered = True self._ping_after_move(engine) - elif line == "resign": + elif line.strip() == "resign": if not self.result.done(): self.play_result.resigned = True self._ping_after_move(engine) - elif line.startswith("1-0") or line.startswith("0-1") or line.startswith("1/2-1/2"): + elif token in ["1-0", "0-1", "1/2-1/2"]: if "resign" in line and not self.result.done(): self.play_result.resigned = True self._ping_after_move(engine) - elif line.startswith("#"): + elif token.startswith("#"): pass elif XBOARD_ERROR_REGEX.match(line): engine.first_game = True # Board state might no longer be in sync @@ -2293,11 +2291,12 @@ def start(self, engine: XBoardProtocol) -> None: self.time_limit_handle = None def line_received(self, engine: XBoardProtocol, line: str) -> None: - if line.startswith("#"): + token, remaining = _next_token(line) + if token.startswith("#"): pass elif len(line.split()) >= 4 and line.lstrip()[0].isdigit(): self._post(engine, line) - elif line == self.final_pong: + elif f"{token} {remaining.strip()}" == self.final_pong: self.end(engine) elif XBOARD_ERROR_REGEX.match(line): engine.first_game = True # Board state might no longer be in sync @@ -2553,6 +2552,23 @@ def _parse_xboard_post(line: str, root_board: chess.Board, selector: Info = INFO return info +def _next_token(line: str) -> tuple[str, str]: + """Get the next token in a whitespace-delimited line of text. + + The result is returned as a 2-part tuple of strings. + + If the input line is empty or all whitespace, then the result is two + empty strings. + + If the input line is not empty and not completely whitespace, then + the first element of the returned tuple is a single word with + leading and trailing whitespace removed. The second element is the + unchanged rest of the line.""" + + parts = line.split(maxsplit=1) + return (parts[0] if parts else "", parts[1] if len(parts) == 2 else "") + + class BestMove: """Returned by :func:`chess.engine.AnalysisResult.wait()`."""