forked from wps1112/pytdx_backup
-
Notifications
You must be signed in to change notification settings - Fork 0
/
helper.py
132 lines (104 loc) · 2.92 KB
/
helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# coding=utf-8
import struct
import six
#### XXX: 分析了一下,貌似是类似utf-8的编码方式保存有符号数字
def get_price(data, pos):
pos_byte = 6
bdata = indexbytes(data, pos)
intdata = bdata & 0x3f
if bdata & 0x40:
sign = True
else:
sign = False
if bdata & 0x80:
while True:
pos += 1
bdata = indexbytes(data, pos)
intdata += (bdata & 0x7f) << pos_byte
pos_byte += 7
if bdata & 0x80:
pass
else:
break
pos += 1
if sign:
intdata = -intdata
return intdata, pos
def get_volume(ivol):
logpoint = ivol >> (8 * 3)
hheax = ivol >> (8 * 3); # [3]
hleax = (ivol >> (8 * 2)) & 0xff; # [2]
lheax = (ivol >> 8) & 0xff; # [1]
lleax = ivol & 0xff; # [0]
dbl_1 = 1.0
dbl_2 = 2.0
dbl_128 = 128.0
dwEcx = logpoint * 2 - 0x7f;
dwEdx = logpoint * 2 - 0x86;
dwEsi = logpoint * 2 - 0x8e;
dwEax = logpoint * 2 - 0x96;
if dwEcx < 0:
tmpEax = - dwEcx
else:
tmpEax = dwEcx
dbl_xmm6 = 0.0
dbl_xmm6 = pow(2.0, tmpEax)
if dwEcx < 0:
dbl_xmm6 = 1.0 / dbl_xmm6
dbl_xmm4 = 0
if hleax > 0x80:
tmpdbl_xmm3 = 0.0
tmpdbl_xmm1 = 0.0
dwtmpeax = dwEdx + 1
tmpdbl_xmm3 = pow(2.0, dwtmpeax)
dbl_xmm0 = pow(2.0, dwEdx) * 128.0
dbl_xmm0 += (hleax & 0x7f) * tmpdbl_xmm3
dbl_xmm4 = dbl_xmm0
else:
dbl_xmm0 = 0.0
if dwEdx >= 0:
dbl_xmm0 = pow(2.0, dwEdx) * hleax
else:
dbl_xmm0 = (1 / pow(2.0, dwEdx)) * hleax
dbl_xmm4 = dbl_xmm0
dbl_xmm3 = pow(2.0, dwEsi) * lheax
dbl_xmm1 = pow(2.0, dwEax) * lleax
if hleax & 0x80:
dbl_xmm3 *= 2.0
dbl_xmm1 *= 2.0
dbl_ret = dbl_xmm6 + dbl_xmm4 + dbl_xmm3 + dbl_xmm1
return dbl_ret
def get_datetime(category, buffer, pos):
year = 0
month = 0
day = 0
hour = 15
minute = 0
if category < 4 or category == 7 or category == 8:
(zipday, tminutes) = struct.unpack("<HH", buffer[pos: pos + 4])
year = (zipday >> 11) + 2004
month = int((zipday % 2048) / 100)
day = (zipday % 2048) % 100
hour = int(tminutes / 60)
minute = tminutes % 60
else:
(zipday,) = struct.unpack("<I", buffer[pos: pos + 4])
year = int(zipday / 10000);
month = int((zipday % 10000) / 100)
day = zipday % 100
pos += 4
return year, month, day, hour, minute, pos
def get_time(buffer, pos):
(tminutes, ) = struct.unpack("<H", buffer[pos: pos + 2])
hour = int(tminutes / 60)
minute = tminutes % 60
pos += 2
return hour, minute, pos
def indexbytes(data, pos):
if six.PY2:
if type(data) is bytearray:
return data[pos]
else:
return six.indexbytes(data, pos)
else:
return data[pos]