滑动均值和标准差
为了更好利用向量化来加速,滑动窗口使用np.lib.stride_tricks.sliding_window_view(x, win)
提取,它会返回所有x[i]
开头并且长度为win
的数组的数组。
def rolling(x, win):
r = np.lib.stride_tricks.sliding_window_view(x, win)
pad = np.zeros([len(x) - len(r), win]) * np.nan
return np.vstack([pad, r])
def rolling_mean(x, win):
return rolling(x, win).mean(-1)
def rolling_std(x, win):
return rolling(x, win).std(-1)
布林带
代码语言:javascript复制def bollinger(close, win=10, nstd=2):
means = rolling_mean(close, win)
stds = rolling_std(close, win)
upper = means nstd * stds
lower = means - nstd * stds
return upper, means, lower
指数滑动均值
这是原始实现:
代码语言:javascript复制# 计算指数平滑
# y[i] = alpha * x[i] (1 - alpha) * y[i - 1]
def exp_smooth_naive(x, alpha):
y = x.copy()
for i in range(1, len(y)):
y[i] = y[i] * alpha y[i - 1] * (1 - alpha)
return y
原始公式是递归的,需要改成通项才能向量化,这是推导过程:
代码语言:javascript复制y[0] = x[0] = init
y[t] = alpha * x[t] (1-alpha) * y[t-1]
= alpha * x[t] (1-alpha) * alpha * x[t-1] (1-alpha) ** 2 * y[t-2]
= alpha * x[t] (1-alpha) * alpha * x[t-1] ... (1-alpha)** t * init
= alpha * x[t] (1-alpha) * alpha * x[t-1] ... alpha * (1-alpha)** t * init (1 - alpha) ** (t 1) * init
= Σ(alpha * (1 - alpha) ** i * x[t-i]; i: 0 -> t) (1 - alpha) ** (t 1) * init
corr[i] = alpha * (1-alpha) ** i
supl[t] = (1 - alpha) ** (t 1) * init
y[t] = Σ(corr[i] * x(t-i); i: 0 -> t) supl[t]
y = conv(corr, x) supl
这就完成了向量化,因为 NumPy 或者 PyTorch 都针对卷积做了特殊优化。
代码语言:javascript复制def exp_smooth_vec(x, alpha):
init, n = x[0], len(x)
corr = alpha * (1 - alpha) ** np.arange(0, n)
supl = (1 - alpha) ** (np.arange(0, n) 1) * init
y = np.convolve(corr, x, 'full')[:n] supl
return y
exp_smooth = exp_smooth_vec
def rolling_ema(x, win):
x = np.asarray(x)
alpha = 2 / (win 1.0)
return exp_smooth(x, alpha)
MACD
代码语言:javascript复制def macd(close, fast_win=12, slow_win=26, sig_win=9):
fast = rolling_ema(close, fast_win)
slow = rolling_ema(close, slow_win)
dif = fast - slow
dea = rolling_ema(dif, sig_win)
macd_ = dif - dea * 2
return macd_, dif, dea
RSI
代码语言:javascript复制def rsi(close, win=3):
change = np.diff(close)
up = np.where(change > 0, change, 0)
down = np.where(change < 0, change, 0)
sum_up = rolling(up, win).sum(-1)
sum_down = rolling(down, win).sum(-1)
eps = 1e-12
rs = sum_up / (sum_down eps)
rsi_ = 100 - 100 / (1 rs)
return np.hstack([[np.nan], rsi_])
KDJ
代码语言:javascript复制def kdj(close, low, high, n=9):
hn = rolling(high, n).max(-1)
ln = rolling(low, n).min(-1)
rsv = (close - ln) / (hn - ln) * 100
rsv = [x for x in rsv if not np.isnan(x)]
rsv = np.hstack([[50], rsv])
k = exp_smooth(rsv, 2/3)
d = exp_smooth(k, 2/3)
j = 3 * k - 2 * d
pad = [np.nan] * (len(close) - len(k))
k = np.hstack([pad, k])
d = np.hstack([pad, d])
j = np.hstack([pad, j])
return k, d, j
OBV
代码语言:javascript复制def obv(close, vol):
change = np.diff(close)
sig = np.hstack([[1], np.sign(change)])
obv_ = np.cumsum(vol * sig)
return obv_