forked from mpquant/MyTT
-
Notifications
You must be signed in to change notification settings - Fork 2
/
MyTT_python2.py
217 lines (158 loc) · 9.63 KB
/
MyTT_python2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# MyTT 麦语言-通达信-同花顺指标实现(python2版本) https://github.com/mpquant/MyTT
# 此版本针对python2的老版本pandas使用 2021-10-22日更新
import numpy as np; import pandas as pd
#------------------ 0级:核心工具函数 python2 --------------------------------------------
def RD(N,D=3): return np.round(N,D) #四舍五入取3位小数
def RET(S,N=1): return np.array(S)[-N] #返回序列倒数第N个值,默认返回最后一个
def ABS(S): return np.abs(S) #返回N的绝对值
def MAX(S1,S2): return np.maximum(S1,S2) #序列max
def MIN(S1,S2): return np.minimum(S1,S2) #序列min
def MA(S,N): #求序列的N日平均值,返回序列
return pd.rolling_mean(S,N)
def REF(S, N=1): #对序列整体下移动N,返回序列(shift后会产生NAN)
return pd.Series(S).shift(N).values
def DIFF(S, N=1): #前一个值减后一个值,前面会产生nan
return pd.Series(S).diff(N).values #np.diff(S)直接删除nan,会少一行
def STD(S,N): #求序列的N日标准差,返回序列
return pd.rolling_std(S,N,ddof=0)
def IF(S_BOOL,S_TRUE,S_FALSE): #序列布尔判断 res=S_TRUE if S_BOOL==True else S_FALSE
return np.where(S_BOOL, S_TRUE, S_FALSE)
def SUM(S, N): #对序列求N天累计和,返回序列 N=0对序列所有依次求和
return pd.rolling_sum(S,N) if N>0 else pd.Series(S).cumsum()
def HHV(S,N): # HHV(C, 5) # 最近5天收盘最高价
return pd.rolling_max(S,N)
def LLV(S,N): # LLV(C, 5) # 最近5天收盘最低价
return pd.rolling_min(S,N)
def EMA(S,N): #指数移动平均,为了精度 S>4*N EMA至少需要120周期 alpha=2/(span+1)
return pd.ewma(S,span=N,adjust=False)
def SMA(S, N, M=1): #中国式的SMA,至少需要120周期才精确 (雪球180周期) alpha=1/(1+com)
return pd.ewma(S,com=((N-M)*1.0)/M,adjust=True)
def DMA(S, A): #求S的动态移动平均,A作平滑因子 (此为核心函数,非指标)
return pd.ewma(S,com=1.0/A-1,adjust=False)
def AVEDEV(S,N): #平均绝对偏差 (序列与其平均值的绝对差的平均值)
return pd.rolling_apply(S,N,lambda x: (np.abs(x - x.mean())).mean())
def SLOPE(S,N,RS=False): #返S序列N周期回线性回归斜率 (默认只返回斜率,不返回整个直线序列)
M=pd.Series(S[-N:]); poly = np.polyfit(M.index, M.values,deg=1); Y=np.polyval(poly, M.index);
if RS: return Y[1]-Y[0],Y
return Y[1]-Y[0]
#------------------ 1级:应用层函数(通过0级核心函数实现) ----------------------------------
def COUNT(S_BOOL, N): # COUNT(CLOSE>O, N): 最近N天满足S_BOO的天数 True的天数
return SUM(S_BOOL,N)
def EVERY(S_BOOL, N): # EVERY(CLOSE>O, 5) 最近N天是否都是True
R=SUM(S_BOOL, N)
return IF(R==N, True, False)
def LAST(S_BOOL, A, B): #从前A日到前B日一直满足S_BOOL条件
if A<B: A=B #要求A>B 例:LAST(CLOSE>OPEN,5,3) 5天前到3天前是否都收阳线
return S_BOOL[-A:-B].sum()==(A-B) #返回单个布尔值
def EXIST(S_BOOL, N=5): # EXIST(CLOSE>3010, N=5) n日内是否存在一天大于3000点
R=SUM(S_BOOL,N)
return IF(R>0, True ,False)
def FORCAST(S,N): #返S序列N周期回线性回归后的预测值
K,Y=SLOPE(S,N,RS=True)
return Y[-1]+K
def CROSS(S1,S2): #判断向上金叉穿越 CROSS(MA(C,5),MA(C,10)) 判断向下死叉穿越 CROSS(MA(C,10),MA(C,5))
CROSS_BOOL=IF(S1>S2, True ,False)
return (COUNT(CROSS_BOOL>0,2)==1)*CROSS_BOOL #上穿:昨天0 今天1 下穿:昨天1 今天0
#------------------ 2级:技术指标函数(全部通过0级,1级函数实现) ------------------------------
def MACD(CLOSE,SHORT=12,LONG=26,M=9): # EMA的关系,S取120日,和雪球小数点2位相同
DIF = EMA(CLOSE,SHORT)-EMA(CLOSE,LONG);
DEA = EMA(DIF,M); MACD=(DIF-DEA)*2
return RD(DIF),RD(DEA),RD(MACD)
def KDJ(CLOSE,HIGH,LOW, N=9,M1=3,M2=3): # KDJ指标
RSV = (CLOSE - LLV(LOW, N)) / (HHV(HIGH, N) - LLV(LOW, N)) * 100
K = EMA(RSV, (M1*2-1)); D = EMA(K,(M2*2-1)); J=K*3-D*2
return K, D, J
def RSI(CLOSE, N=24):
DIF = CLOSE-REF(CLOSE,1)
return RD(SMA(MAX(DIF,0), N) / SMA(ABS(DIF), N) * 100)
def WR(CLOSE, HIGH, LOW, N=10, N1=6): #W&R 威廉指标
WR = (HHV(HIGH, N) - CLOSE) / (HHV(HIGH, N) - LLV(LOW, N)) * 100
WR1 = (HHV(HIGH, N1) - CLOSE) / (HHV(HIGH, N1) - LLV(LOW, N1)) * 100
return RD(WR), RD(WR1)
def BIAS(CLOSE,L1=6, L2=12, L3=24): # BIAS乖离率
BIAS1 = (CLOSE - MA(CLOSE, L1)) / MA(CLOSE, L1) * 100
BIAS2 = (CLOSE - MA(CLOSE, L2)) / MA(CLOSE, L2) * 100
BIAS3 = (CLOSE - MA(CLOSE, L3)) / MA(CLOSE, L3) * 100
return RD(BIAS1), RD(BIAS2), RD(BIAS3)
def BOLL(CLOSE,N=20, P=2): #BOLL指标,布林带
MID = MA(CLOSE, N);
UPPER = MID + STD(CLOSE, N) * P
LOWER = MID - STD(CLOSE, N) * P
return RD(UPPER), RD(MID), RD(LOWER)
def PSY(CLOSE,N=12, M=6):
PSY=COUNT(CLOSE>REF(CLOSE,1),N)/N*100
PSYMA=MA(PSY,M)
return RD(PSY),RD(PSYMA)
def CCI(CLOSE,HIGH,LOW,N=14):
TP=(HIGH+LOW+CLOSE)/3
return (TP-MA(TP,N))/(0.015*AVEDEV(TP,N))
def ATR(CLOSE,HIGH,LOW, N=20): #真实波动N日平均值
TR = MAX(MAX((HIGH - LOW), ABS(REF(CLOSE, 1) - HIGH)), ABS(REF(CLOSE, 1) - LOW))
return MA(TR, N)
def BBI(CLOSE,M1=3,M2=6,M3=12,M4=20): #BBI多空指标
return (MA(CLOSE,M1)+MA(CLOSE,M2)+MA(CLOSE,M3)+MA(CLOSE,M4))/4
def DMI(CLOSE,HIGH,LOW,M1=14,M2=6): #动向指标:结果和同花顺,通达信完全一致
TR = SUM(MAX(MAX(HIGH - LOW, ABS(HIGH - REF(CLOSE, 1))), ABS(LOW - REF(CLOSE, 1))), M1)
HD = HIGH - REF(HIGH, 1); LD = REF(LOW, 1) - LOW
DMP = SUM(IF((HD > 0) & (HD > LD), HD, 0), M1)
DMM = SUM(IF((LD > 0) & (LD > HD), LD, 0), M1)
PDI = DMP * 100 / TR; MDI = DMM * 100 / TR
ADX = MA(ABS(MDI - PDI) / (PDI + MDI) * 100, M2)
ADXR = (ADX + REF(ADX, M2)) / 2
return PDI, MDI, ADX, ADXR
def TAQ(HIGH,LOW,N): #唐安奇通道(海龟)交易指标,大道至简,能穿越牛熊
UP=HHV(HIGH,N); DOWN=LLV(LOW,N); MID=(UP+DOWN)/2
return UP,MID,DOWN
def KTN(CLOSE,HIGH,LOW,N=20,M=10): #肯特纳交易通道, N选20日,ATR选10日
MID=EMA((HIGH+LOW+CLOSE)/3,N)
ATRN=ATR(CLOSE,HIGH,LOW,M)
UPPER=MID+2*ATRN; LOWER=MID-2*ATRN
return UPPER,MID,LOWER
def TRIX(CLOSE,M1=12, M2=20): #三重指数平滑平均线
TR = EMA(EMA(EMA(CLOSE, M1), M1), M1)
TRIX = (TR - REF(TR, 1)) / REF(TR, 1) * 100
TRMA = MA(TRIX, M2)
return TRIX, TRMA
def VR(CLOSE,VOL,M1=26): #VR容量比率
LC = REF(CLOSE, 1)
return SUM(IF(CLOSE > LC, VOL, 0), M1) / SUM(IF(CLOSE <= LC, VOL, 0), M1) * 100
def EMV(HIGH,LOW,VOL,N=14,M=9): #简易波动指标
VOLUME=MA(VOL,N)/VOL; MID=100*(HIGH+LOW-REF(HIGH+LOW,1))/(HIGH+LOW)
EMV=MA(MID*VOLUME*(HIGH-LOW)/MA(HIGH-LOW,N),N); MAEMV=MA(EMV,M)
return EMV,MAEMV
def DPO(CLOSE,M1=20, M2=10, M3=6): #区间震荡线
DPO = CLOSE - REF(MA(CLOSE, M1), M2); MADPO = MA(DPO, M3)
return DPO, MADPO
def BRAR(OPEN,CLOSE,HIGH,LOW,M1=26): #BRAR-ARBR 情绪指标
AR = SUM(HIGH - OPEN, M1) / SUM(OPEN - LOW, M1) * 100
BR = SUM(MAX(0, HIGH - REF(CLOSE, 1)), M1) / SUM(MAX(0, REF(CLOSE, 1) - LOW), M1) * 100
return AR, BR
def DFMA(CLOSE,N1=10,N2=50,M=10): #平行线差指标
DIF=MA(CLOSE,N1)-MA(CLOSE,N2); DIFMA=MA(DIF,M) #通达信指标叫DMA 同花顺叫新DMA
return DIF,DIFMA
def MTM(CLOSE,N=12,M=6): #动量指标
MTM=CLOSE-REF(CLOSE,N); MTMMA=MA(MTM,M)
return MTM,MTMMA
def MASS(HIGH,LOW,N1=9,N2=25,M=6): # 梅斯线
MASS=SUM(MA(HIGH-LOW,N1)/MA(MA(HIGH-LOW,N1),N1),N2)
MA_MASS=MA(MASS,M)
return MASS,MA_MASS
def ROC(CLOSE,N=12,M=6): #变动率指标
ROC=100*(CLOSE-REF(CLOSE,N))/REF(CLOSE,N); MAROC=MA(ROC,M)
return ROC,MAROC
def EXPMA(CLOSE,N1=12,N2=50): #EMA指数平均数指标
return EMA(CLOSE,N1),EMA(CLOSE,N2);
def OBV(CLOSE,VOL): #能量潮指标
return SUM(IF(CLOSE>REF(CLOSE,1),VOL,IF(CLOSE<REF(CLOSE,1),-VOL,0)),0)/10000
def MFI(CLOSE,HIGH,LOW,VOL,N=14): #MFI指标是成交量的RSI指标
TYP = (HIGH + LOW + CLOSE)/3
V1=SUM(IF(TYP>REF(TYP,1),TYP*VOL,0),N)/SUM(IF(TYP<REF(TYP,1),TYP*VOL,0),N)
return 100-(100/(1+V1))
def ASI(OPEN,CLOSE,HIGH,LOW,M1=26,M2=10): #振动升降指标
LC=REF(CLOSE,1); AA=ABS(HIGH-LC); BB=ABS(LOW-LC);
CC=ABS(HIGH-REF(LOW,1)); DD=ABS(LC-REF(OPEN,1));
R=IF( (AA>BB) & (AA>CC),AA+BB/2+DD/4,IF( (BB>CC) & (BB>AA),BB+AA/2+DD/4,CC+DD/4));
X=(CLOSE-LC+(CLOSE-OPEN)/2+LC-REF(OPEN,1));
SI=16*X/R*MAX(AA,BB); ASI=SUM(SI,M1); ASIT=MA(ASI,M2);
return ASI,ASIT
#望大家能提交更多指标和函数 https://github.com/mpquant/MyTT