forked from alibaba/tidevice
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_usbmux.py
133 lines (116 loc) · 4.45 KB
/
_usbmux.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# coding: utf-8
# created: codeskyblue 2020/06
import os
import platform
import plistlib
import pprint
import sys
import typing
import uuid
from typing import Optional, Union
from ._types import DeviceInfo, ConnectionType
from ._proto import PROGRAM_NAME, UsbmuxReplyCode
from ._safe_socket import PlistSocket
from .exceptions import * # pragma warning disables S2208
class Usbmux:
def __init__(self, address: Optional[Union[str, tuple]] = None):
if address is None:
if os.name == "posix": # linux or darwin
address = "/var/run/usbmuxd"
elif os.name == "nt": # windows
address = ('127.0.0.1', 27015)
else:
raise EnvironmentError("Unsupported system:", sys.platform)
self.__address = address
self.__tag = 0
@property
def address(self) -> str:
if isinstance(self.__address, str):
return self.__address
ip, port = self.__address
return f"{ip}:{port}"
def _next_tag(self) -> int:
self.__tag += 1
return self.__tag
def create_connection(self) -> PlistSocket:
return PlistSocket(self.__address, self._next_tag())
def send_recv(self, payload: dict) -> dict:
with self.create_connection() as s:
s.send_packet(payload)
recv_data = s.recv_packet()
self._check(recv_data)
return recv_data
def device_list(self) -> typing.List[DeviceInfo]:
"""
Return DeviceInfo and contains bother USB and NETWORK device
Data processing example:
{'DeviceList': [{'DeviceID': 37,
'MessageType': 'Attached',
'Properties': {'ConnectionSpeed': 480000000,
'ConnectionType': 'USB',
'DeviceID': 37,
'LocationID': 341966848,
'ProductID': 4776,
'SerialNumber': '539c5fffb18f2be0bf7f771d68f7c327fb68d2d9',
'UDID': '539c5fffb18f2be0bf7f771d68f7c327fb68d2d9',
'USBSerialNumber': '539c5fffb18f2be0bf7f771d68f7c327fb68d2d9'}}]}
"""
payload = {
"MessageType": "ListDevices", # 必选
"ClientVersionString": "libusbmuxd 1.1.0",
"ProgName": PROGRAM_NAME,
"kLibUSBMuxVersion": 3,
# "ProcessID": 0, # Xcode send it processID
}
data = self.send_recv(payload)
result = {}
for item in data['DeviceList']:
prop = item['Properties']
info = DeviceInfo()
info.udid = prop['SerialNumber']
info.device_id = prop['DeviceID']
info.conn_type = prop['ConnectionType']
if info.udid in result and info.conn_type == ConnectionType.NETWORK:
continue
result[info.udid] = info
return list(result.values())
def device_udid_list(self) -> list:
return [d.udid for d in self.device_list()]
def _check(self, data: dict):
if 'Number' in data and data['Number'] != 0:
raise MuxReplyError(data['Number'])
def read_system_BUID(self) -> str:
""" BUID is always same """
data = self.send_recv({
'ClientVersionString': 'libusbmuxd 1.1.0',
'MessageType': 'ReadBUID',
'ProgName': PROGRAM_NAME,
'kLibUSBMuxVersion': 3
})
return data['BUID']
def _gen_host_id(self):
hostname = platform.node()
hostid = uuid.uuid3(uuid.NAMESPACE_DNS, hostname)
return str(hostid).upper()
def watch_device(self) -> typing.Iterator[dict]:
"""
Return iterator of data as follows
- {'DeviceID': 59, 'MessageType': 'Detached'}
- {'DeviceID': 59, 'MessageType': 'Attached', 'Properties': {
'ConnectionSpeed': 100,
'ConnectionType': 'USB',
'DeviceID': 59,
'LocationID': 341966848, 'ProductID': 4776,
'SerialNumber': 'xxx.xxx', 'USBSerialNumber': 'xxxx..xxx'}}
"""
with self.create_connection() as s:
s.send_packet({
'ClientVersionString': 'qt4i-usbmuxd',
'MessageType': 'Listen',
'ProgName': 'tcprelay'
})
data = s.recv_packet()
self._check(data)
while True:
data = s.recv_packet(header_size=16)
yield data