早晨之星(黎明之星)
代码语言:javascript复制def morning_star(df, inplace=False):
if not inplace:
df = df.copy()
# 计算三日移动平均线
df['ma3'] = df['close'].rolling(3).mean()
# 计算昨天的收盘价
df['prev_close'] = df['close'].shift(1)
# 找到所有符合条件的早晨之星形态
df['morning_star'] = (df['close'] < df['open']) & (df['close'] > df['open']*1.005) &
(df['open'] > df['prev_close']) & (df['open'].shift(1) > df['close'].shift(1)) &
(df['prev_close'] < df['ma3']) & (df['open'] > df['ma3']) &
(df['volume'] > df['volume'].rolling(5).mean()*1.5)
return df
黄昏之星(夜星)
代码语言:javascript复制def evening_star(df, inplace=False):
if not inplace:
df = df.copy()
# 计算第一天和第二天的趋势
df['trend_1'] = df['close'].shift(-1) - df['open'].shift(-1)
df['trend_2'] = df['open'] - df['close'].shift(-1)
# 判断符合黄昏之星的条件
df['evening_star'] = ((df['close'].shift(-1) > df['open'].shift(-1)) &
(df['open'] > df['close'].shift(-1)) &
(df['close'] < df['open'] - 0.5 * (df['high'].shift(-1) - df['low'].shift(-1))) &
(df['close'] < df['close'].shift(-1)) &
(df['open'] > df['open'].shift(-1)))
# 在原数据帧上进行更新或者返回新的数据帧
if inplace:
return None
else:
return df
倾盆大雨
以下是实现形态学指标【倾盆大雨】的Python函数代码:
代码语言:javascript复制import pandas as pd
def downpour(df, inplace=False):
if not inplace:
df = df.copy()
df['downpour'] = (df['close'] - df['open']) / df['close'].shift(1)
if not inplace:
return df
# 示例用法
data = {
'open': [10, 11, 12, 13],
'high': [11, 12, 13, 14],
'low': [9, 10, 11, 12],
'close': [11, 12, 13, 14],
'volume': [100, 200, 300, 400]
}
df = pd.DataFrame(data)
df = downpour(df)
print(df)
在这个示例中,downpour
函数计算了倾盆大雨指标,并将结果保存在名为downpour
的新列中。你可以将这个函数和示例数据一起运行来查看计算结果。
旭日东升
代码语言:javascript复制import numpy as np
def xuridongsheng(df, inplace=False):
if not inplace:
df = df.copy()
df['MA5'] = df['close'].rolling(window=5).mean()
df['MA10'] = df['close'].rolling(window=10).mean()
df['XRL'] = np.where(df['MA5'] > df['MA10'], 1, 0)
df['XRH'] = np.where(df['MA5'] < df['MA10'], 1, 0)
df['XURDS'] = df['MA5'] * (1 (df['volume'] / df['volume'].shift(1) - 1) * df['XRL'])
if not inplace:
return df
淡友反攻
代码语言:javascript复制def bullish_engulfing(df, inplace=True):
if not inplace:
df = df.copy()
df['body'] = abs(df['close'] - df['open'])
df['prev_body'] = abs(df['close'].shift(1) - df['open'].shift(1))
df['bullish_engulfing'] = (df['open'] < df['close']) & (df['open'].shift(1) > df['close'].shift(1)) & (df['close'] > df['open'].shift(1)) & (df['open'] < df['close'].shift(1)) & (df['body'] > df['prev_body'])
if not inplace:
return df
好友反攻
代码语言:javascript复制import pandas as pd
def friends_rebound(df, inplace=False):
if not inplace:
df = df.copy()
# 计算最高价的移动平均值
df['high_ma'] = df['high'].rolling(window=5).mean()
# 计算最低价的移动平均值
df['low_ma'] = df['low'].rolling(window=5).mean()
# 判断是否符合好友反攻条件
df['is_friends_rebound'] = (df['close'] > df['open']) & (df['close'] > df['high_ma']) & (df['open'] > df['low']) & (df['volume'] > df['volume'].rolling(window=5).mean())
if not inplace:
return df
射击之星形态
代码语言:javascript复制import pandas as pd
def star_shooting(df, inplace=False):
if not inplace:
df = df.copy()
# 计算射击之星形态指标
df['body'] = abs(df['open'] - df['close'])
df['wick'] = df['high'] - df[['open', 'close']].max(axis=1)
# 标记射击之星形态
df['shooting_star'] = ((df['body'] < df['body'].shift()) &
(df['body'] < df['body'].shift(-1)) &
(df['wick'] > df['wick'].shift()) &
(df['wick'] > df['wick'].shift(-1)))
# 返回结果
return df
# 示例用法
data = {'open': [10, 11, 9, 10, 8],
'high': [12, 13, 10, 11, 9],
'low': [8, 10, 8, 9, 7],
'close': [11, 10, 9, 8, 8],
'volume': [1000, 1500, 800, 1200, 900]}
df = pd.DataFrame(data)
df = star_shooting(df)
print(df)
墓碑十字线(墓碑线)
代码语言:javascript复制def gravestone_doji(df, inplace=False):
if not inplace:
df = df.copy()
df['is_gravestone_doji'] = ((df['open'] == df['high']) &
(df['close'] == df['low']) &
((df['open'] - df['low']) > 2 * (df['high'] - df['close'])))
return df
顶部尽头线(下山虎)
代码语言:javascript复制def down_tiger(df, inplace=False):
if not inplace:
df = df.copy()
df['lowest_low'] = df['low'].rolling(9).min()
df['highest_close'] = df['close'].rolling(26).max()
df['down_tiger'] = df['lowest_low'] - df['highest_close']
return df
底部尽头线(上山虎)
代码语言:javascript复制def bottom_reversal(df, inplace=False):
if not inplace:
df = df.copy()
# 计算底部尽头线指标
df['low_close'] = df['low'] - df['close'].shift(1)
df['close_low'] = df['close'] - df['low']
df['bottom_reversal'] = (df['low_close'] < 0) & (df['close_low'] > 0)
if not inplace:
return df
双针探底
代码语言:javascript复制def double_bottom(df, inplace=False):
if not inplace:
df = df.copy()
# 计算波谷
df['valley'] = (df['low'] < df['low'].shift(-1)) & (df['low'] < df['low'].shift(1))
# 计算双针探底
df['double_bottom'] = df['valley'] & df['valley'].shift(2) & ~df['valley'].shift(1)
if inplace:
return
else:
return df
吊颈线(上吊线)
代码语言:javascript复制def hanging_man(df, inplace=False):
if not inplace:
df = df.copy()
df['body'] = abs(df['open'] - df['close']) # 实体
df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1) # 上影线
df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low'] # 下影线
df['is_hanging_man'] = (df['body'] < df['body'].shift(1)) & (df['lower_shadow'] > 2 * df['body']) & (df['lower_shadow'] >= 2 * df['upper_shadow'])
return df
乌云盖顶
代码语言:javascript复制def dark_cloud_cover(df, inplace=False):
if not inplace:
df = df.copy()
df['previous_close'] = df['close'].shift(1)
df['body'] = df['open'] - df['close']
df['shadow'] = (df['high'] - df['low']) / df['close']
df['is_dark_cloud'] = ((df['close'] > df['open']) &
(df['open'] > df['previous_close']) &
(df['open'] > df['high']) &
((df['open'] df['close']) / 2 < df['high']) &
(df['close'] > df['previous_close']) &
(df['body'] > 0.25 * (df['previous_close'] - df['open'])) &
(df['body'] < 0.75 * (df['previous_close'] - df['open'])) &
(df['shadow'] > 0.5))
if not inplace:
return df
曙光初现形态
代码语言:javascript复制import pandas as pd
import numpy as np
def dawn_appear(df, inplace=False):
if not inplace:
df = df.copy()
df['open_close_range'] = abs(df['open'] - df['close']) / df['open']
df['dawn_appear'] = np.where(
(df['open_close_range'] <= 0.01) &
(df['close'] > df['open']) &
(df['close'] > df['high'].shift(1)) &
(df['close'] > df['open'].shift(1)),
True, False
)
return df
# 示例用法
data = {'open': [10, 9, 11, 12, 10],
'high': [12, 11, 12, 13, 11],
'low': [9, 8, 10, 11, 9],
'close': [11, 10, 11, 12, 11],
'volume': [1000, 1200, 800, 1500, 1000]}
df = pd.DataFrame(data)
df = dawn_appear(df)
print(df)
上涨身怀六甲
代码语言:javascript复制import pandas as pd
def up_trend_six_pregnancies(df, inplace=True):
if not inplace:
df = df.copy()
df['open-close'] = df['open'] - df['close'] # 计算开盘价和收盘价的差
df['high-low'] = df['high'] - df['low'] # 计算最高价和最低价的差
df['up_trend'] = (df['open-close'] > 0) & (df['open-close'].shift(1) > 0) # 判断是否处于上涨趋势
df['pregnancy_1'] = df['high-low'].rolling(3).sum() / 3 # 第一持有期
df['pregnancy_2'] = df['high-low'].rolling(6).sum() / 6 # 第二持有期
df['pregnancy_3'] = df['high-low'].rolling(9).sum() / 9 # 第三持有期
df['pregnancy_4'] = df['high-low'].rolling(12).sum() / 12 # 第四持有期
df['pregnancy_5'] = df['high-low'].rolling(18).sum() / 18 # 第五持有期
df['pregnancy_6'] = df['high-low'].rolling(24).sum() / 24 # 第六持有期
if not inplace:
return df
下跌身怀六甲
代码语言:javascript复制def downtrend_six_pregnancies(df, inplace=False):
if not inplace:
df = df.copy()
df['body'] = abs(df['close'] - df['open']) # 实体大小
df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1) # 上影线
df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low'] # 下影线
df['pregnancies'] = ((df['close'] > df['open']) & (df['body'] > df['body'].shift(1)) & (df['body'] > df['body'].shift(2)) &
(df['upper_shadow'] < df['upper_shadow'].shift(1)) & (df['upper_shadow'] < df['upper_shadow'].shift(2)) &
(df['lower_shadow'] < df['lower_shadow'].shift(1)) & (df['lower_shadow'] < df['lower_shadow'].shift(2))).astype(int)
return df
上涨孕十字星
下面是实现形态学指标【上涨孕十字星】的Python函数代码:
代码语言:javascript复制import pandas as pd
def bullish_engulfing(df, inplace=False):
if not inplace:
df = df.copy()
df['body'] = abs(df['close'] - df['open'])
df['is_bullish_engulfing'] = False
for i in range(1, len(df)):
if df['close'][i] > df['open'][i] and df['open'][i-1] > df['close'][i-1] and df['close'][i-1] > df['open'][i] and df['open'][i] < df['close'][i-1] and df['body'][i] > df['body'][i-1]:
df.at[i, 'is_bullish_engulfing'] = True
return df
# 测试
data = {'open': [10, 8, 9, 7, 12],
'high': [12, 9, 10, 8, 13],
'low': [8, 7, 8, 6, 10],
'close': [12, 9, 10, 8, 12],
'volume': [1000, 2000, 1500, 3000, 2500]}
df = pd.DataFrame(data)
df = bullish_engulfing(df)
print(df)
该函数会在数据帧中添加两列:body
表示K线实体长度,is_bullish_engulfing
表示是否符合上涨孕十字星形态。在测试中,我们创建了一个示例数据帧并调用了bullish_engulfing
函数,最终输出数据帧包含了新增的两列。
下跌孕十字星
代码语言:javascript复制import pandas as pd
def down_abandoned_baby(df, inplace=False):
if not inplace:
df = df.copy()
df['body'] = abs(df['close'] - df['open'])
df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']
df['is_down_abandoned_baby'] = (df['body'] < df['body'].shift(1)) & (df['body'].shift(1) > df['body'].shift(2)) & (df['close'].shift(1) > df['open'].shift(1)) & (df['open'] < df['close']) & (df['close'] < df['open'].shift(1)) & (df['low'] > df['high'].shift(1)) & (df['high'] < df['low'].shift(1))
if inplace:
return
else:
return df
上涨孤独十字星
代码语言:javascript复制def upward_lone_cross_star(df, inplace=False):
if not inplace:
df = df.copy()
df['body'] = abs(df['close'] - df['open'])
df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']
df['is_upward_lone_cross_star'] = ((df['body'] < 0.1 * df['open']) &
(df['upper_shadow'] < 0.3 * df['open']) &
(df['lower_shadow'] > 2 * df['body']) &
(df['open'] < df['close']))
if not inplace:
return df
使用示例:
代码语言:javascript复制import pandas as pd
# 创建包含必要列的示例数据
data = {'open': [10, 8, 12, 8],
'high': [11, 9, 13, 9],
'low': [9, 7, 11, 7],
'close': [10, 8, 11, 8],
'volume': [1000, 2000, 1500, 1800]}
df = pd.DataFrame(data)
# 应用函数
upward_lone_cross_star(df, inplace=True)
# 查看结果
print(df)
下跌孤独十字星
代码语言:javascript复制def falling_lone_cross_star(df, inplace=False):
if not inplace:
df = df.copy()
df['is_falling_lone_cross_star'] = False
for i in range(2, len(df)):
if df['close'][i] < df['open'][i] and df['close'][i-1] < df['open'][i-1] and df['close'][i-2] < df['open'][i-2]
and df['close'][i-1] < df['close'][i-2] and df['close'][i-1] < df['open'][i-2]:
df.at[i, 'is_falling_lone_cross_star'] = True
if not inplace:
return df
您可以将这段代码添加到您的量化交易策略中,用于识别下跌孤独十字星形态,从而辅助决策。
上涨Pinbar组合
代码语言:javascript复制def bullish_pinbar(df, inplace=False):
if not inplace:
df = df.copy()
df['body_size'] = abs(df['close'] - df['open'])
df['upper_shadow_size'] = df['high'] - df[['open', 'close']].max(axis=1)
df['lower_shadow_size'] = df[['open', 'close']].min(axis=1) - df['low']
df['bullish_pinbar'] = (
(df['body_size'] < df['upper_shadow_size']) &
(df['body_size'] < df['lower_shadow_size']) &
(df['close'] > df['open'])
)
return df
下跌Pinbar组合
代码语言:javascript复制def down_pinbar(df, inplace=False):
if not inplace:
df = df.copy()
df['body'] = abs(df['close'] - df['open']) # 计算蜡烛实体
df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1) # 计算上影线
df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low'] # 计算下影线
# 判断下跌Pinbar
df['down_pinbar'] = ((df['close'] < df['open']) & # 实体为下跌
(df['body'] < 0.5 * df['low'] * 0.01) & # 实体小于0.5%的最低价
(df['upper_shadow'] < 0.3 * df['low'] * 0.01) & # 上影线小于0.3%的最低价
(df['lower_shadow'] < 0.3 * df['low'] * 0.01)) # 下影线小于0.3%的最低价
return df
下跌螺旋桨
代码语言:javascript复制import numpy as np
def falling_screwing_propeller(df, inplace=False):
if not inplace:
df = df.copy()
df['Close-Open'] = df['close'] - df['open']
df['High-Low'] = df['high'] - df['low']
df['Body'] = np.abs(df['Close-Open'])
df['Upper Shadow'] = np.where(df['Close-Open'] > 0, df['high'] - df['close'], df['high'] - df['open'])
df['Lower Shadow'] = np.where(df['Close-Open'] > 0, df['open'] - df['low'], df['close'] - df['low'])
if not inplace:
return df
这段代码实现了计算下跌螺旋桨指标的函数,包括对数据帧中的收盘价、开盘价、最高价、最低价等列进行处理,计算出指标所需的各个数据并保存到数据帧中。最后返回更新后的数据帧。
上涨螺旋桨
下面是一个实现上涨螺旋桨指标的Python函数:
代码语言:javascript复制import numpy as np
def rising_screw_propeller(df, inplace=False):
if not inplace:
df = df.copy()
df['HL'] = df['high'] - df['low']
df['HC'] = np.abs(df['high'] - df['close'].shift())
df['LC'] = np.abs(df['low'] - df['close'].shift())
df['TR'] = df[['HC', 'HL', 'LC']].max(axis=1)
df['ATR'] = df['TR'].rolling(window=14).mean()
df['RSPL'] = 100 * (df['close'] - df['close'].shift(1)) / df['ATR']
if not inplace:
return df
# 调用函数
df = rising_screw_propeller(df)
这个函数首先计算了当日的最高价和最低价的差值HL
,以及当日最高价和昨日收盘价的差值HC
、当日最低价和昨日收盘价的差值LC
。然后通过这三个差值计算真实范围TR
,并通过TR的移动平均计算平均真实范围ATR
。最后根据公式计算上涨螺旋桨指标RSPL
。如果inplace
为False,则返回更新后的数据帧df
,否则直接在原数据帧上进行更新。
阴包阳形态
代码语言:javascript复制def candlestick_pattern(df, inplace=False):
if not inplace:
df = df.copy()
df['bearish_engulfing'] = ((df['close'] < df['open']) &
(df['close'].shift(1) > df['open'].shift(1)) &
(df['open'] > df['close'].shift(1)) &
(df['close'] > df['open'].shift(1)))
df['bullish_engulfing'] = ((df['close'] > df['open']) &
(df['close'].shift(1) < df['open'].shift(1)) &
(df['open'] < df['close'].shift(1)) &
(df['close'] < df['open'].shift(1)))
if not inplace:
return df
# 测试
import pandas as pd
data = {'open': [10, 11, 20, 15, 30],
'high': [15, 15, 25, 25, 35],
'low': [5, 10, 18, 12, 25],
'close': [8, 12, 22, 18, 28],
'volume': [100000, 120000, 150000, 110000, 200000]}
df = pd.DataFrame(data)
df = candlestick_pattern(df)
print(df)
阳包阴形态
代码语言:javascript复制def yang_bao_yin(df, inplace=True):
if not inplace:
df = df.copy()
df['yang_bao_yin'] = (df['close'] > df['open']) & (df['close'] < df['open'] * 1.01) & (df['low'] < df['open']) & (df['high'] > df['close'])
if not inplace:
return df
仙人指路
代码语言:javascript复制import pandas as pd
import numpy as np
def williams_ad(df, inplace=False):
if not inplace:
df = df.copy()
high = df['high']
low = df['low']
close = df['close']
volume = df['volume']
ad = np.zeros(len(df))
for i in range(1, len(df)):
ad[i] = ad[i-1] (close[i] - low[i]) - (high[i] - close[i])
df['williams_ad'] = ad
return df
中流砥柱形态
代码语言:javascript复制def morphology_indicator(df, inplace=False):
if not inplace:
df = df.copy()
# 计算中流砥柱形态指标
df['body_height'] = abs(df['close'] - df['open']) # 蜡烛实体高度
df['upper_shadow_height'] = df['high'] - df[['open', 'close']].max(axis=1) # 上影线高度
df['lower_shadow_height'] = df[['open', 'close']].min(axis=1) - df['low'] # 下影线高度
df['morphology'] = (df['upper_shadow_height'] <= 2 * df['body_height']) & (df['lower_shadow_height'] <= 2 * df['body_height'])
if not inplace:
return df
使用示例:
代码语言:javascript复制import pandas as pd
# 创建示例数据
data = {
'open': [10, 15, 12, 20],
'high': [12, 18, 17, 22],
'low': [9, 14, 11, 18],
'close': [11, 16, 13, 21],
'volume': [1000, 1500, 1200, 2000]
}
df = pd.DataFrame(data)
# 计算形态学指标
morphology_indicator(df, inplace=True)
print(df)
单针探底(定海神针)
代码语言:javascript复制import numpy as np
def single_needle_bottom(df, inplace=False):
if not inplace:
df = df.copy()
def is_single_needle_bottom(row):
if row['close'] < row['open'] and row['close'] < row['low'] 0.5 * (row['high'] - row['low']):
return True
else:
return False
df['single_needle_bottom'] = df.apply(is_single_needle_bottom, axis=1)
return df
锤头线(锤子线)
代码语言:javascript复制def hammer_candlestick(df, inplace=False):
if not inplace:
df = df.copy()
df['body'] = abs(df['close'] - df['open'])
df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']
df['is_hammer'] = ((df['body'] / df['body'].rolling(5).mean() < 0.02) &
(df['upper_shadow'] / df['body'] < 0.15) &
(df['lower_shadow'] / df['body'] < 0.15))
return df
看跌(高位倒锤头)流星线
以下是一个可以实现形态学指标【看跌(高位倒锤头)流星线】的Python函数:
代码语言:javascript复制def bearish_inverted_hammer(df, inplace=False):
if not inplace:
df = df.copy()
df['body'] = abs(df['close'] - df['open'])
df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']
df['is_bearish_inverted_hammer'] = False
for i in range(len(df)):
if (df.loc[i]['body'] < 0.1 * df.loc[i]['high'] and
df.loc[i]['upper_shadow'] > 2.5 * df.loc[i]['body'] and
df.loc[i]['lower_shadow'] < 0.1 * df.loc[i]['high']):
df.at[i, 'is_bearish_inverted_hammer'] = True
return df
这个函数会计算每日的实体(body)、上影线(upper_shadow)和下影线(lower_shadow),然后检查是否符合看跌(高位倒锤头)流星线的形态。如果符合条件,则将is_bearish_inverted_hammer
列设置为True,表示这一天出现了高位倒锤头形态。最后返回更新后的数据帧df
。
看涨(低位)倒锤头线
代码语言:javascript复制import pandas as pd
def bullish_inverted_hammer(df, inplace=False):
if not inplace:
df = df.copy()
df['body'] = abs(df['open'] - df['close'])
df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']
df['is_bullish_inverted_hammer'] = ((df['close'] > df['open']) &
(df['open'] > df['low']) &
(df['close'] > ((df['high'] df['low']) / 2)) &
(df['body'] < 0.2 * df['low']) &
(df['upper_shadow'] < 2 * df['body']) &
(df['lower_shadow'] > 2 * df['body']))
if not inplace:
return df
上涨镊子线(U形磁铁)
代码语言:javascript复制def u_shaped_magnet(df, inplace=False):
if not inplace:
df = df.copy()
df['u_shaped_magnet'] = 0
for i in range(2, len(df)):
if df['close'][i-2] > df['open'][i-2] and df['close'][i-1] < df['close'][i-2] and df['close'][i] > df['open'][i]:
df.loc[i, 'u_shaped_magnet'] = 1
return df
下跌镊子线(n形磁铁)
代码语言:javascript复制def falling_engulfing(df, inplace=False):
if not inplace:
df = df.copy()
df['body_size'] = abs(df['close'] - df['open'])
df['body_top'] = df[['open', 'close']].max(axis=1)
df['body_bottom'] = df[['open', 'close']].min(axis=1)
df['body_range'] = df['body_top'] - df['body_bottom']
df['prev_body_size'] = df['body_size'].shift(1)
df['prev_body_top'] = df['body_top'].shift(1)
df['prev_body_bottom'] = df['body_bottom'].shift(1)
df['falling_engulfing'] = (df['close'] < df['open']) & (df['open'] > df['prev_body_top']) & (df['close'] < df['prev_body_bottom'])
if not inplace:
return df
三空阴线
代码语言:javascript复制def morphology_indicator(df, inplace=False):
# 计算三空阴线指标
df['close_open'] = df['close'] - df['open']
df['close_low'] = df['close'] - df['low']
df['open_close'] = df['open'] - df['close']
df['three_black_crows'] = (df['close_open'] < 0) & (df['close_low'] < 0) & (df['open_close'] < 0)
if inplace:
return df
else:
return df.copy()
三空阳线
代码语言:javascript复制def three_white_soldiers(df, inplace=False):
if not inplace:
df = df.copy()
df['body_size'] = df['close'] - df['open']
df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']
df['is_up_day'] = df['close'] > df['open']
for i in range(2, len(df)):
if df.loc[i-2, 'is_up_day'] and df.loc[i-1, 'is_up_day'] and df.loc[i, 'is_up_day']:
if df.loc[i-2, 'body_size'] <= 0 or df.loc[i-1, 'body_size'] <= 0 or df.loc[i, 'body_size'] <= 0:
continue
if df.loc[i-2, 'lower_shadow'] / df.loc[i-2, 'body_size'] >= 0.5 or df.loc[i-1, 'lower_shadow'] / df.loc[i-1, 'body_size'] >= 0.5 or df.loc[i, 'lower_shadow'] / df.loc[i, 'body_size'] >= 0.5:
continue
if df.loc[i-2, 'close'] <= df.loc[i-1, 'close'] and df.loc[i-1, 'close'] <= df.loc[i, 'close']:
df.loc[i, 'three_white_soldiers'] = 1
df.drop(['body_size', 'upper_shadow', 'lower_shadow', 'is_up_day'], axis=1, inplace=True)
return df
# 调用示例
import pandas as pd
data = {
'open': [10, 11, 12, 13, 14],
'high': [12, 13, 14, 15, 16],
'low': [9, 10, 11, 12, 13],
'close': [11, 12, 13, 14, 15],
'volume': [1000, 1200, 1300, 1100, 1000]
}
df = pd.DataFrame(data)
df = three_white_soldiers(df, inplace=False)
print(df)
红三兵
代码语言:javascript复制def red_three_soldiers(df, inplace=False):
if not inplace:
df = df.copy()
df['body'] = df['close'] - df['open']
df['up_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
df['down_shadow'] = df[['open', 'close']].min(axis=1) - df['low']
df['up_body'] = df['body'].apply(lambda x: x if x > 0 else 0)
df['down_body'] = df['body'].apply(lambda x: -x if x < 0 else 0)
df['red_soldier'] = False
for i in range(2, len(df)):
if df.loc[i-2, 'up_body'] > 0 and df.loc[i-1, 'up_body'] > 0 and df.loc[i, 'up_body'] > 0:
df.loc[i, 'red_soldier'] = True
return df
# 示例用法
import pandas as pd
data = {
'open': [10, 11, 12, 13, 14],
'high': [15, 16, 17, 18, 19],
'low': [9, 10, 11, 12, 13],
'close': [14, 15, 16, 17, 18],
'volume': [1000, 2000, 1500, 1800, 1200]
}
df = pd.DataFrame(data)
df = red_three_soldiers(df)
print(df)
三只乌鸦
代码语言:javascript复制import pandas as pd
def three_black_crows(df, inplace=False):
if not inplace:
df = df.copy()
df['close_shifted'] = df['close'].shift(1)
def is_three_black_crows(row):
if row['close'] < row['open'] and row['close'] < row['close_shifted']:
return True
else:
return False
df['is_three_black_crows'] = df.apply(is_three_black_crows, axis=1)
return df
# 使用示例
data = {'open': [10, 9, 8, 7],
'high': [11, 10, 9, 8],
'low': [9, 8, 7, 6],
'close': [8, 7, 6, 5],
'volume': [100, 200, 150, 180]}
df = pd.DataFrame(data)
df = three_black_crows(df)
print(df)
低位并排阳线
代码语言:javascript复制def low_parallel_up(df, inplace=False):
if not inplace:
df = df.copy()
df['low_parallel_up'] = (df['open'] < df['close']) & (df['low'] < df['open']) & (df['low'] < df['close'])
return df
高位并排阳线
代码语言:javascript复制def high_concurrent_positive(df, inplace=False):
if not inplace:
df = df.copy()
df['high_close'] = df['high'] == df['close']
df['prev_close'] = df['close'].shift(1)
df['prev_high'] = df['high'].shift(1)
df['prev_close_high'] = df['prev_close'] == df['prev_high']
df['high_concurrent_positive'] = df['high_close'] & df['prev_close_high']
if not inplace:
return df
# 示例用法
import pandas as pd
data = {
'open': [10, 15, 20, 18, 25],
'high': [12, 18, 22, 20, 27],
'low': [8, 14, 18, 16, 22],
'close': [11, 17, 21, 19, 26],
'volume': [1000, 2000, 1500, 1800, 1200]
}
df = pd.DataFrame(data)
df = high_concurrent_positive(df)
print(df)
上升三部曲(升势三鸦)
代码语言:javascript复制def upward_three_crows(df, inplace=False):
if not inplace:
df = df.copy()
df['body'] = df['open'] - df['close']
df['body_rel'] = df['body'] / (df['high'] - df['low'])
for i in range(2, len(df)):
if df['body'][i] < 0 and df['body'][i-1] < 0 and df['body'][i-2] < 0
and df['body_rel'][i] >= 0.5 and df['body_rel'][i-1] >= 0.5 and df['body_rel'][i-2] >= 0.5:
df.loc[i, 'upward_three_crows'] = 1
else:
df.loc[i, 'upward_three_crows'] = 0
return df
# 示例用法
import pandas as pd
data = {
'open': [10, 9, 8, 7, 8, 7],
'high': [12, 11, 10, 9, 9, 8],
'low': [9, 8, 7, 6, 7, 6],
'close': [9, 8, 7, 6, 7, 6],
'volume': [100, 200, 150, 120, 300, 250]
}
df = pd.DataFrame(data)
df = upward_three_crows(df)
print(df)
下降三部曲(降势三鹤)
代码语言:javascript复制import pandas as pd
def descending_three_methods(df, inplace=False):
if not inplace:
df = df.copy()
df['short_ma'] = df['close'].rolling(window=5).mean() # 计算5日均价
df['long_ma'] = df['close'].rolling(window=20).mean() # 计算20日均价
df['volume_ma'] = df['volume'].rolling(window=5).mean() # 计算5日成交量均值
# 判断下降三部曲条件
df['descending_three_methods'] = (df['short_ma'] < df['long_ma']) & (df['short_ma'].shift(1) > df['long_ma'].shift(1)) & (df['close'] < df['short_ma']) & (df['close'] < df['close'].shift(1)) & (df['volume'] < df['volume_ma'])
if not inplace:
return df
高档五阴线
代码语言:javascript复制import pandas as pd
def high_grade_five_black_crows(df, inplace=False):
if not inplace:
df = df.copy()
# 计算每根K线的实体大小
df['body'] = abs(df['open'] - df['close'])
# 计算每根K线的上影线和下影线
df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']
# 判断是否为高档五阴线
conditions = (df['body'] / df['open'] < 0.01) & (df['lower_shadow'] / df['open'] < 0.005) &
(df['close'] < df['open']) & (df['close'].shift(1) < df['close'].shift(2)) &
(df['close'].shift(1) < df['close'].shift(3)) & (df['close'].shift(1) < df['close'].shift(4))
df['high_grade_five_black_crows'] = conditions.astype(int)
return df
低档五阳线
代码语言:javascript复制def low_gang_wuyang(df, inplace=False):
if not inplace:
df = df.copy()
df['low_gang_wuyang'] = 0
for i in range(4, len(df)):
if df['close'][i] > df['close'][i-1] and df['close'][i-1] > df['close'][i-2]
and df['close'][i-2] > df['close'][i-3] and df['close'][i-3] > df['close'][i-4]:
df.at[i, 'low_gang_wuyang'] = 1
return df
两黑夹一红
代码语言:javascript复制def two_black_one_red(df, inplace=False):
if not inplace:
df = df.copy()
df['body'] = abs(df['close'] - df['open']) # 计算实体大小
df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1) # 计算上影线
df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low'] # 计算下影线
df['is_black'] = (df['close'] < df['open']).astype(int) # 判断是否为黑蜡烛
df['is_red'] = (df['close'] > df['open']).astype(int) # 判断是否为红蜡烛
df['pattern'] = (df['is_black'] df['is_black'].shift(1) df['is_red']) == 2 # 判断是否为两黑夹一红形态
if not inplace:
return df
这段代码定义了一个名为two_black_one_red
的函数,该函数接受一个数据帧df
,并包含一个inplace
参数用于指示是否原地更新df
。函数计算并添加了实体大小、上影线、下影线、蜡烛颜色、以及两黑夹一红形态的指标到df
中。如果inplace
为False
,则函数会返回更新后的df
;否则将直接在原地更新df
。
两红夹一黑
代码语言:javascript复制def two_red_one_black(df, inplace=False):
if not inplace:
df = df.copy()
df['close_shift1'] = df['close'].shift(1)
df['close_shift2'] = df['close'].shift(2)
df['open_shift1'] = df['open'].shift(1)
df['open_shift2'] = df['open'].shift(2)
df['is_red1'] = df['close'] > df['open']
df['is_red2'] = df['close_shift1'] > df['open_shift1']
df['is_black'] = df['close_shift2'] < df['open_shift2']
df['two_red_one_black'] = ((df['is_red1'] & df['is_red2'] & df['is_black']).shift(-2)).astype(int)
if not inplace:
return df
多方炮
代码语言:javascript复制import pandas as pd
def bull_power(df, inplace=False):
if not inplace:
df = df.copy()
df['bull_power'] = df['high'] - (df['open'] df['close']) / 2
return df
空方炮
代码语言:javascript复制def short_selling_cannon(df, inplace=False):
if not inplace:
df = df.copy()
df['body'] = abs(df['close'] - df['open']) # 实体的大小
df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1) # 上影线
df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low'] # 下影线
df['short_selling_cannon'] = (df['upper_shadow'] > 2 * df['body']) & (df['lower_shadow'] < 0.5 * df['body'])
return df
下降插入线(坠落线)
你可以使用以下代码来实现形态学指标【下降插入线(坠落线)】:
代码语言:javascript复制def descent_insert_line(df, inplace=False):
if not inplace:
df = df.copy()
df['insert_line'] = (df['open'] df['close']) / 2
df['descent_insert_line'] = (df['high'] - df['insert_line']).rolling(window=3).max()
if not inplace:
return df
使用这个函数,你可以对数据帧df
应用形态学指标【下降插入线(坠落线)】,并选择是否原地更新df
。
高开跳空缺口
代码语言:javascript复制def high_open_jump_gap(df, inplace=False):
if not inplace:
df = df.copy()
df['high_open_gap'] = df['open'] - df['close'].shift(1)
return df
低开跳空缺口
代码语言:javascript复制def low_open_gap(df, inplace=False):
if not inplace:
df = df.copy()
# 计算当日开盘价与前一日收盘价的差值
df['open_close_diff'] = df['open'] - df['close'].shift(1)
# 判断是否出现低开跳空缺口
df['low_open_gap'] = df['open_close_diff'] > 0
if not inplace:
return df