Skip to content

Commit

Permalink
fix engine bug
Browse files Browse the repository at this point in the history
  • Loading branch information
walker8088 committed Sep 13, 2024
1 parent 324b06d commit d1a0cdc
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 118 deletions.
5 changes: 3 additions & 2 deletions src/cchess/__main__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import os

import sys
import argparse
import pathlib

from .game import Game

def print_game(game):

print('\n=====================================')
print(game.info)
ame.print_init_board()
game.print_init_board()
print('-------------------------------------')
if game.annote:
print(game.annote)
Expand Down
180 changes: 91 additions & 89 deletions src/cchess/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
import os
import re
import os
import time
import enum
import logging
Expand All @@ -33,39 +32,29 @@
#-----------------------------------------------------#
def parse_engine_info_to_dict(s):
result = {}
current_key = None
for part in s.split():
if current_key is None and part != 'score': #, 'lowerbound', 'higherbound']:
current_key = part

elif current_key is not None and part == 'score':
current_key
continue

elif current_key is not None:
result[current_key] = part
current_key = None

# 注意:这个简单的实现没有处理'cp'后面没有值的情况,或者字符串格式不正确的情况
# 在实际应用中,你可能需要添加更多的错误检查和处理逻辑

# 由于'cp'后面通常跟着值,但在这个例子中我们将其视为值的一部分,
# 所以我们不需要对'cp'进行特殊处理(除了上面的跳过逻辑)
# 但是,如果'cp'后面没有值,或者格式经常变化,你可能需要更复杂的逻辑

# 示例中'cp'已经隐含地作为'score'值的一部分处理了,所以不需要额外操作
# 但如果'cp'应该作为一个独立的键或标记存在,你需要调整逻辑来适应

# 返回结果
current_key = None
info = s.split()
for index, part in enumerate(info):
if part in ['info','cp']: #略过这两个关键字,不影响分析结果
continue
elif part == 'pv': ##遇到pv就是到尾了,剩下的都是招法
result['moves'] = info[index+1:]
break

if current_key is None: #TODO, 'lowerbound', 'higherbound']:
current_key = part
#替换key
if current_key == 'bestmove':
current_key = 'move'
else:
if part == 'mate': # score mate 这样的字符串,后滑一个关键字
current_key = part
continue
else:
result[current_key] = part
current_key = None

return result

# 示例字符串
#s = "depth 1 seldepth 1 multipv 1 score cp -58 nodes 28 nps 14000 hashfull 0 tbhits 0 time 2 pv f5c5"
# 调用函数并打印结果
# 注意:这个实现不会将'cp'作为独立的键,而是将其视为'score'值的一部分
#result = parse_special_string_to_dict(s)
# 由于'cp'被视为'score'值的一部分,这里我们不会直接看到'cp',但'-58'将是'score'的值
#print(result) # 输出可能不包括'cp'作为独立键,但'score'的值将是'-58'

#-----------------------------------------------------#
#Engine status
Expand Down Expand Up @@ -102,6 +91,49 @@ def __init__(self, exec_path=''):
def init_cmd(self):
return ""

def load(self, engine_path):

self.engine_exec_path = engine_path

try:
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
self.process = subprocess.Popen(self.engine_exec_path,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
startupinfo=startupinfo,
cwd=Path(
self.engine_exec_path).parent,
text=True)
except Exception as e:
logger.warning(f"load engine {engine_path} ERROR: {e}")
self.engine_status = EngineStatus.ERROR
return False

time.sleep(0.5)

self.pin = self.process.stdin
self.pout = self.process.stdout
self.perr = self.process.stderr
self.engine_out_queque = Queue()
self.engine_status = EngineStatus.BOOTING

#self._send_cmd('test')

if not self._send_cmd(self.init_cmd()):
self.engine_status = EngineStatus.ERROR
return False

self.start()

#while self.engine_status == EngineStatus.BOOTING:
# self.handle_msg_once()

return True


def run(self):

self.running = True
Expand Down Expand Up @@ -141,7 +173,6 @@ def handle_msg_once(self):
elif resp_id == "option":
self.options.append(output)
if resp_id == self.ok_resp():
#print("GOT", resp_id)
self.engine_status = EngineStatus.READY
move_info["action"] = 'ready'

Expand All @@ -160,72 +191,42 @@ def handle_msg_once(self):
else:
move_info["action"] = 'bestmove'
resp_dict = parse_engine_info_to_dict(output)
move_info['move'] = resp_dict.pop('bestmove')
move_info.update(resp_dict)

move_key = move_info['move']
if move_key in self.score_dict:
for key in ['score', 'mate', 'moves', 'seldepth', 'time']:
if key in self.score_dict[move_key]:
move_info[key] = self.score_dict[move_key][key]

elif resp_id == 'info' and out_list[1] == "depth":
#info depth 6 score 4 pv b0c2 b9c7 c3c4 h9i7 c2d4 h7e7
#info depth 1 seldepth 1 multipv 1 score cp -58 nodes 28 nps 14000 hashfull 0 tbhits 0 time 2 pv f5c5

move_info['action'] = 'info_move'
move_info["move"] = []

pv_index = output.find(' pv ')
if pv_index > 0:
move_info["move"] = output[pv_index+4:].split(' ')

resp_dict = parse_engine_info_to_dict(output[5:pv_index+1])
if 'cp' in resp_dict:
move_info['score'] = resp_dict.pop('cp')
resp_dict = parse_engine_info_to_dict(output)
move_info.update(resp_dict)

if 'moves' in move_info:
move_key = move_info['moves'][0]
self.score_dict[move_key] = move_info

if len(move_info) > 0:
self.move_queue.put(move_info)

return True

def wait_for_ready(self, timeout = 10):

start_time = time.time()

def load(self, engine_path):

self.engine_exec_path = engine_path

try:
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
self.process = subprocess.Popen(self.engine_exec_path,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
startupinfo=startupinfo,
cwd=Path(
self.engine_exec_path).parent,
text=True)
except Exception as e:
logger.warning(f"load engine {engine_path} ERROR: {e}")
self.engine_status = EngineStatus.ERROR
return False

time.sleep(0.5)

self.pin = self.process.stdin
self.pout = self.process.stdout
self.perr = self.process.stderr
self.engine_out_queque = Queue()
self.engine_status = EngineStatus.BOOTING

#self._send_cmd('test')

if not self._send_cmd(self.init_cmd()):
self.engine_status = EngineStatus.ERROR
return False

self.start()

#while self.engine_status == EngineStatus.BOOTING:
# self.handle_msg_once()

return True

while True:
self.handle_msg_once()
if self.engine_status == EngineStatus.READY:
return True

if time.time() - start_time > timeout:
return False
time.sleep(0.2)

def quit(self):
self._send_cmd("quit")
time.sleep(0.2)
Expand All @@ -252,6 +253,7 @@ def go_from(self, fen, params={}):

self.last_fen = fen
self.last_go = go_cmd
self.score_dict = {}

return True

Expand Down
1 change: 1 addition & 0 deletions src/cchess/game.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import datetime as dt

from .common import FULL_INIT_FEN
from .board import ChessBoard

# 比赛结果
UNKNOWN, RED_WIN, BLACK_WIN, PEACE = range(4)
Expand Down
13 changes: 6 additions & 7 deletions src/cchess/read_cbr.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
'''

import sys
import struct

from .exception import CChessException
Expand Down Expand Up @@ -256,13 +255,13 @@ def read_from_cbl(file_name):
book_buffer = contents[index:]
try:
game = read_from_cbr_buffer(book_buffer)
game.info['index'] = game_index
lib_info['games'].append(game)
game_index += 1
#print(game.info)
if game is not None:
game.info['index'] = game_index
lib_info['games'].append(game)
game_index += 1
except Exception as e:
#print(file_name, count, index, len(contents), e)
pass
raise Exception(f'{count}, {index}, {len(contents)}, {e}')

count += 1
index += 4096
#print(count, game.info)
Expand Down
2 changes: 0 additions & 2 deletions src/cchess/read_xqf.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,6 @@ def __read_steps(buff_decoder, version, keys, game, parent_move, board):

#-----------------------------------------------------#
def read_from_xqf(full_file_name, read_annotation=True):
#避免循环导入
from .game import Game

with open(full_file_name, "rb") as f:
contents = f.read()
Expand Down
29 changes: 11 additions & 18 deletions tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,7 @@ def test_ucci(self):
assert self.engine.load("eleeye") is False

assert self.engine.load("..\\Engine\\eleeye\\eleeye.exe") is True

for i in range(30):
self.engine.handle_msg_once()
time.sleep(0.2)
if self.engine.engine_status == EngineStatus.READY:
break

self.engine.wait_for_ready()
assert self.engine.engine_status == EngineStatus.READY

fen, moves, result = load_move_txt(Path("data", "ucci_test1_move.txt"))
Expand All @@ -89,7 +83,7 @@ def test_ucci(self):
#print(output)
action = output['action']
if action == 'bestmove':
#print(output)
print(output)
p_from, p_to = iccs2pos(output["move"])
move_txt = board.move(p_from, p_to).to_text()
print(move_txt)
Expand All @@ -98,7 +92,7 @@ def test_ucci(self):
board.next_turn()
break
elif action == 'dead':
print(output)
#print(output)
if board.move_player == cchess.RED:
assert result == S_BLACK_WIN
else:
Expand All @@ -122,13 +116,7 @@ def setup_method(self):

ret = self.engine.load("..\\Engine\\pikafish_230408\\pikafish.exe")
assert ret is True

for i in range(30):
self.engine.handle_msg_once()
time.sleep(0.2)
if self.engine.engine_status == EngineStatus.READY:
break

self.engine.wait_for_ready()
assert self.engine.engine_status == EngineStatus.READY

def teardown_method(self):
Expand Down Expand Up @@ -161,10 +149,11 @@ def test_uci_endgame(self):
board.next_turn()
break
elif action == 'info_move':
print(output)
#print(output)
pass

elif action == 'dead':
print(output)
#print(output)

if board.move_player == cchess.RED:
assert result == S_BLACK_WIN
Expand Down Expand Up @@ -202,6 +191,10 @@ def test_uci_game(self):
print(move.to_text())
board.next_turn()
break
elif action == 'info_move':
#print(output)
pass

self.engine.stop_thinking()

self.engine.quit()
Expand Down

0 comments on commit d1a0cdc

Please sign in to comment.