【Quant102】50 个形态学指标的 Pandas 代码

2024-05-24 17:49:46 浏览数 (3)

早晨之星(黎明之星)

代码语言:javascript复制
def morning_star(df, inplace=False):
    if not inplace:
        df = df.copy()
        
    # 计算三日移动平均线
    df['ma3'] = df['close'].rolling(3).mean()
    
    # 计算昨天的收盘价
    df['prev_close'] = df['close'].shift(1)

    # 找到所有符合条件的早晨之星形态
    df['morning_star'] = (df['close'] < df['open']) & (df['close'] > df['open']*1.005) & 
                        (df['open'] > df['prev_close']) & (df['open'].shift(1) > df['close'].shift(1)) & 
                        (df['prev_close'] < df['ma3']) & (df['open'] > df['ma3']) & 
                        (df['volume'] > df['volume'].rolling(5).mean()*1.5)

    return df

黄昏之星(夜星)

代码语言:javascript复制
def evening_star(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    # 计算第一天和第二天的趋势
    df['trend_1'] = df['close'].shift(-1) - df['open'].shift(-1)
    df['trend_2'] = df['open'] - df['close'].shift(-1)
    
    # 判断符合黄昏之星的条件
    df['evening_star'] = ((df['close'].shift(-1) > df['open'].shift(-1)) & 
                          (df['open'] > df['close'].shift(-1)) & 
                          (df['close'] < df['open'] - 0.5 * (df['high'].shift(-1) - df['low'].shift(-1))) & 
                          (df['close'] < df['close'].shift(-1)) & 
                          (df['open'] > df['open'].shift(-1)))
    
    # 在原数据帧上进行更新或者返回新的数据帧
    if inplace:
        return None
    else:
        return df

倾盆大雨

以下是实现形态学指标【倾盆大雨】的Python函数代码:

代码语言:javascript复制
import pandas as pd

def downpour(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['downpour'] = (df['close'] - df['open']) / df['close'].shift(1)
    
    if not inplace:
        return df

# 示例用法
data = {
    'open': [10, 11, 12, 13],
    'high': [11, 12, 13, 14],
    'low': [9, 10, 11, 12],
    'close': [11, 12, 13, 14],
    'volume': [100, 200, 300, 400]
}
df = pd.DataFrame(data)

df = downpour(df)
print(df)

在这个示例中,downpour函数计算了倾盆大雨指标,并将结果保存在名为downpour的新列中。你可以将这个函数和示例数据一起运行来查看计算结果。

旭日东升

代码语言:javascript复制
import numpy as np

def xuridongsheng(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['MA5'] = df['close'].rolling(window=5).mean()
    df['MA10'] = df['close'].rolling(window=10).mean()
    
    df['XRL'] = np.where(df['MA5'] > df['MA10'], 1, 0)
    df['XRH'] = np.where(df['MA5'] < df['MA10'], 1, 0)
    
    df['XURDS'] = df['MA5'] * (1   (df['volume'] / df['volume'].shift(1) - 1) * df['XRL'])
    
    if not inplace:
        return df

淡友反攻

代码语言:javascript复制
def bullish_engulfing(df, inplace=True):
    if not inplace:
        df = df.copy()
    
    df['body'] = abs(df['close'] - df['open']) 
    df['prev_body'] = abs(df['close'].shift(1) - df['open'].shift(1))

    df['bullish_engulfing'] = (df['open'] < df['close']) & (df['open'].shift(1) > df['close'].shift(1)) & (df['close'] > df['open'].shift(1)) & (df['open'] < df['close'].shift(1)) & (df['body'] > df['prev_body'])
    
    if not inplace:
        return df

好友反攻

代码语言:javascript复制
import pandas as pd

def friends_rebound(df, inplace=False):
    if not inplace:
        df = df.copy()

    # 计算最高价的移动平均值
    df['high_ma'] = df['high'].rolling(window=5).mean()
    # 计算最低价的移动平均值
    df['low_ma'] = df['low'].rolling(window=5).mean()

    # 判断是否符合好友反攻条件
    df['is_friends_rebound'] = (df['close'] > df['open']) & (df['close'] > df['high_ma']) & (df['open'] > df['low']) & (df['volume'] > df['volume'].rolling(window=5).mean())

    if not inplace:
        return df

射击之星形态

代码语言:javascript复制
import pandas as pd

def star_shooting(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    # 计算射击之星形态指标
    df['body'] = abs(df['open'] - df['close'])
    df['wick'] = df['high'] - df[['open', 'close']].max(axis=1)
    
    # 标记射击之星形态
    df['shooting_star'] = ((df['body'] < df['body'].shift()) &
                           (df['body'] < df['body'].shift(-1)) &
                           (df['wick'] > df['wick'].shift()) &
                           (df['wick'] > df['wick'].shift(-1)))
    
    # 返回结果
    return df

# 示例用法
data = {'open': [10, 11, 9, 10, 8],
        'high': [12, 13, 10, 11, 9],
        'low': [8, 10, 8, 9, 7],
        'close': [11, 10, 9, 8, 8],
        'volume': [1000, 1500, 800, 1200, 900]}
df = pd.DataFrame(data)
df = star_shooting(df)
print(df)

墓碑十字线(墓碑线)

代码语言:javascript复制
def gravestone_doji(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['is_gravestone_doji'] = ((df['open'] == df['high']) & 
                                (df['close'] == df['low']) &
                                ((df['open'] - df['low']) > 2 * (df['high'] - df['close'])))
    
    return df

顶部尽头线(下山虎)

代码语言:javascript复制
def down_tiger(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['lowest_low'] = df['low'].rolling(9).min()
    df['highest_close'] = df['close'].rolling(26).max()
    
    df['down_tiger'] = df['lowest_low'] - df['highest_close']
    
    return df

底部尽头线(上山虎)

代码语言:javascript复制
def bottom_reversal(df, inplace=False):
    if not inplace:
        df = df.copy()

    # 计算底部尽头线指标
    df['low_close'] = df['low'] - df['close'].shift(1)
    df['close_low'] = df['close'] - df['low']
    df['bottom_reversal'] = (df['low_close'] < 0) & (df['close_low'] > 0)

    if not inplace:
        return df

双针探底

代码语言:javascript复制
def double_bottom(df, inplace=False):
    if not inplace:
        df = df.copy()

    # 计算波谷
    df['valley'] = (df['low'] < df['low'].shift(-1)) & (df['low'] < df['low'].shift(1))

    # 计算双针探底
    df['double_bottom'] = df['valley'] & df['valley'].shift(2) & ~df['valley'].shift(1)

    if inplace:
        return
    else:
        return df

吊颈线(上吊线)

代码语言:javascript复制
def hanging_man(df, inplace=False):
    if not inplace:
        df = df.copy()
    df['body'] = abs(df['open'] - df['close'])  # 实体
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)  # 上影线
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']  # 下影线
    df['is_hanging_man'] = (df['body'] < df['body'].shift(1)) & (df['lower_shadow'] > 2 * df['body']) & (df['lower_shadow'] >= 2 * df['upper_shadow'])

    return df

乌云盖顶

代码语言:javascript复制
def dark_cloud_cover(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['previous_close'] = df['close'].shift(1)
    df['body'] = df['open'] - df['close']
    df['shadow'] = (df['high'] - df['low']) / df['close']
    
    df['is_dark_cloud'] = ((df['close'] > df['open']) & 
                           (df['open'] > df['previous_close']) & 
                           (df['open'] > df['high']) &
                           ((df['open']   df['close']) / 2 < df['high']) & 
                           (df['close'] > df['previous_close']) &
                           (df['body'] > 0.25 * (df['previous_close'] - df['open'])) &
                           (df['body'] < 0.75 * (df['previous_close'] - df['open'])) &
                           (df['shadow'] > 0.5))
    
    if not inplace:
        return df

曙光初现形态

代码语言:javascript复制
import pandas as pd
import numpy as np

def dawn_appear(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['open_close_range'] = abs(df['open'] - df['close']) / df['open']
    
    df['dawn_appear'] = np.where(
        (df['open_close_range'] <= 0.01) & 
        (df['close'] > df['open']) & 
        (df['close'] > df['high'].shift(1)) & 
        (df['close'] > df['open'].shift(1)), 
        True, False
    )
    
    return df

# 示例用法
data = {'open': [10, 9, 11, 12, 10],
        'high': [12, 11, 12, 13, 11],
        'low': [9, 8, 10, 11, 9],
        'close': [11, 10, 11, 12, 11],
        'volume': [1000, 1200, 800, 1500, 1000]}

df = pd.DataFrame(data)
df = dawn_appear(df)
print(df)

上涨身怀六甲

代码语言:javascript复制
import pandas as pd

def up_trend_six_pregnancies(df, inplace=True):
    if not inplace:
        df = df.copy()

    df['open-close'] = df['open'] - df['close']  # 计算开盘价和收盘价的差
    df['high-low'] = df['high'] - df['low']  # 计算最高价和最低价的差
    df['up_trend'] = (df['open-close'] > 0) & (df['open-close'].shift(1) > 0)  # 判断是否处于上涨趋势
    df['pregnancy_1'] = df['high-low'].rolling(3).sum() / 3  # 第一持有期
    df['pregnancy_2'] = df['high-low'].rolling(6).sum() / 6  # 第二持有期
    df['pregnancy_3'] = df['high-low'].rolling(9).sum() / 9  # 第三持有期
    df['pregnancy_4'] = df['high-low'].rolling(12).sum() / 12  # 第四持有期
    df['pregnancy_5'] = df['high-low'].rolling(18).sum() / 18  # 第五持有期
    df['pregnancy_6'] = df['high-low'].rolling(24).sum() / 24  # 第六持有期

    if not inplace:
        return df

下跌身怀六甲

代码语言:javascript复制
def downtrend_six_pregnancies(df, inplace=False):
    if not inplace:
        df = df.copy()
        
    df['body'] = abs(df['close'] - df['open'])  # 实体大小
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)  # 上影线
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']  # 下影线
    df['pregnancies'] = ((df['close'] > df['open']) & (df['body'] > df['body'].shift(1)) & (df['body'] > df['body'].shift(2)) &
                         (df['upper_shadow'] < df['upper_shadow'].shift(1)) & (df['upper_shadow'] < df['upper_shadow'].shift(2)) &
                         (df['lower_shadow'] < df['lower_shadow'].shift(1)) & (df['lower_shadow'] < df['lower_shadow'].shift(2))).astype(int)
    
    return df

上涨孕十字星

下面是实现形态学指标【上涨孕十字星】的Python函数代码:

代码语言:javascript复制
import pandas as pd

def bullish_engulfing(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['body'] = abs(df['close'] - df['open'])
    df['is_bullish_engulfing'] = False
    
    for i in range(1, len(df)):
        if df['close'][i] > df['open'][i] and df['open'][i-1] > df['close'][i-1] and df['close'][i-1] > df['open'][i] and df['open'][i] < df['close'][i-1] and df['body'][i] > df['body'][i-1]:
            df.at[i, 'is_bullish_engulfing'] = True
    
    return df

# 测试
data = {'open': [10, 8, 9, 7, 12],
        'high': [12, 9, 10, 8, 13],
        'low': [8, 7, 8, 6, 10],
        'close': [12, 9, 10, 8, 12],
        'volume': [1000, 2000, 1500, 3000, 2500]}
df = pd.DataFrame(data)

df = bullish_engulfing(df)
print(df)

该函数会在数据帧中添加两列:body表示K线实体长度,is_bullish_engulfing表示是否符合上涨孕十字星形态。在测试中,我们创建了一个示例数据帧并调用了bullish_engulfing函数,最终输出数据帧包含了新增的两列。

下跌孕十字星

代码语言:javascript复制
import pandas as pd

def down_abandoned_baby(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['body'] = abs(df['close'] - df['open'])
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']

    df['is_down_abandoned_baby'] = (df['body'] < df['body'].shift(1)) & (df['body'].shift(1) > df['body'].shift(2)) & (df['close'].shift(1) > df['open'].shift(1)) & (df['open'] < df['close']) & (df['close'] < df['open'].shift(1)) & (df['low'] > df['high'].shift(1)) & (df['high'] < df['low'].shift(1))

    if inplace:
        return
    else:
        return df

上涨孤独十字星

代码语言:javascript复制
def upward_lone_cross_star(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['body'] = abs(df['close'] - df['open'])
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']

    df['is_upward_lone_cross_star'] = ((df['body'] < 0.1 * df['open']) & 
                                       (df['upper_shadow'] < 0.3 * df['open']) &
                                       (df['lower_shadow'] > 2 * df['body']) & 
                                       (df['open'] < df['close']))

    if not inplace:
        return df

使用示例:

代码语言:javascript复制
import pandas as pd
# 创建包含必要列的示例数据
data = {'open': [10, 8, 12, 8],
        'high': [11, 9, 13, 9],
        'low': [9, 7, 11, 7],
        'close': [10, 8, 11, 8],
        'volume': [1000, 2000, 1500, 1800]}
df = pd.DataFrame(data)

# 应用函数
upward_lone_cross_star(df, inplace=True)

# 查看结果
print(df)

下跌孤独十字星

代码语言:javascript复制
def falling_lone_cross_star(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['is_falling_lone_cross_star'] = False
    
    for i in range(2, len(df)):
        if df['close'][i] < df['open'][i] and df['close'][i-1] < df['open'][i-1] and df['close'][i-2] < df['open'][i-2] 
            and df['close'][i-1] < df['close'][i-2] and df['close'][i-1] < df['open'][i-2]:
            
            df.at[i, 'is_falling_lone_cross_star'] = True
    
    if not inplace:
        return df

您可以将这段代码添加到您的量化交易策略中,用于识别下跌孤独十字星形态,从而辅助决策。

上涨Pinbar组合

代码语言:javascript复制
def bullish_pinbar(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['body_size'] = abs(df['close'] - df['open'])
    df['upper_shadow_size'] = df['high'] - df[['open', 'close']].max(axis=1)
    df['lower_shadow_size'] = df[['open', 'close']].min(axis=1) - df['low']
    
    df['bullish_pinbar'] = (
        (df['body_size'] < df['upper_shadow_size']) & 
        (df['body_size'] < df['lower_shadow_size']) & 
        (df['close'] > df['open'])
    )
    
    return df

下跌Pinbar组合

代码语言:javascript复制
def down_pinbar(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['body'] = abs(df['close'] - df['open'])  # 计算蜡烛实体
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)  # 计算上影线
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']  # 计算下影线
    
    # 判断下跌Pinbar
    df['down_pinbar'] = ((df['close'] < df['open']) &  # 实体为下跌
                         (df['body'] < 0.5 * df['low'] * 0.01) &  # 实体小于0.5%的最低价
                         (df['upper_shadow'] < 0.3 * df['low'] * 0.01) &  # 上影线小于0.3%的最低价
                         (df['lower_shadow'] < 0.3 * df['low'] * 0.01))  # 下影线小于0.3%的最低价
    
    return df

下跌螺旋桨

代码语言:javascript复制
import numpy as np

def falling_screwing_propeller(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['Close-Open'] = df['close'] - df['open']
    df['High-Low'] = df['high'] - df['low']

    df['Body'] = np.abs(df['Close-Open'])
    df['Upper Shadow'] = np.where(df['Close-Open'] > 0, df['high'] - df['close'], df['high'] - df['open'])
    df['Lower Shadow'] = np.where(df['Close-Open'] > 0, df['open'] - df['low'], df['close'] - df['low'])
    
    if not inplace:
        return df

这段代码实现了计算下跌螺旋桨指标的函数,包括对数据帧中的收盘价、开盘价、最高价、最低价等列进行处理,计算出指标所需的各个数据并保存到数据帧中。最后返回更新后的数据帧。

上涨螺旋桨

下面是一个实现上涨螺旋桨指标的Python函数:

代码语言:javascript复制
import numpy as np

def rising_screw_propeller(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['HL'] = df['high'] - df['low']
    df['HC'] = np.abs(df['high'] - df['close'].shift())
    df['LC'] = np.abs(df['low'] - df['close'].shift())

    df['TR'] = df[['HC', 'HL', 'LC']].max(axis=1)
    df['ATR'] = df['TR'].rolling(window=14).mean()

    df['RSPL'] = 100 * (df['close'] - df['close'].shift(1)) / df['ATR']

    if not inplace:
        return df

# 调用函数
df = rising_screw_propeller(df)

这个函数首先计算了当日的最高价和最低价的差值HL,以及当日最高价和昨日收盘价的差值HC、当日最低价和昨日收盘价的差值LC。然后通过这三个差值计算真实范围TR,并通过TR的移动平均计算平均真实范围ATR。最后根据公式计算上涨螺旋桨指标RSPL。如果inplace为False,则返回更新后的数据帧df,否则直接在原数据帧上进行更新。

阴包阳形态

代码语言:javascript复制
def candlestick_pattern(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['bearish_engulfing'] = ((df['close'] < df['open']) & 
                                (df['close'].shift(1) > df['open'].shift(1)) & 
                                (df['open'] > df['close'].shift(1)) & 
                                (df['close'] > df['open'].shift(1)))
    
    df['bullish_engulfing'] = ((df['close'] > df['open']) & 
                                (df['close'].shift(1) < df['open'].shift(1)) & 
                                (df['open'] < df['close'].shift(1)) & 
                                (df['close'] < df['open'].shift(1)))
    
    if not inplace:
        return df

# 测试
import pandas as pd

data = {'open': [10, 11, 20, 15, 30],
        'high': [15, 15, 25, 25, 35],
        'low': [5, 10, 18, 12, 25],
        'close': [8, 12, 22, 18, 28],
        'volume': [100000, 120000, 150000, 110000, 200000]}

df = pd.DataFrame(data)

df = candlestick_pattern(df)

print(df)

阳包阴形态

代码语言:javascript复制
def yang_bao_yin(df, inplace=True):
    if not inplace:
        df = df.copy()

    df['yang_bao_yin'] = (df['close'] > df['open']) & (df['close'] < df['open'] * 1.01) & (df['low'] < df['open']) & (df['high'] > df['close'])

    if not inplace:
        return df

仙人指路

代码语言:javascript复制
import pandas as pd
import numpy as np

def williams_ad(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    high = df['high']
    low = df['low']
    close = df['close']
    volume = df['volume']

    ad = np.zeros(len(df))
    for i in range(1, len(df)):
        ad[i] = ad[i-1]   (close[i] - low[i]) - (high[i] - close[i])

    df['williams_ad'] = ad
    return df

中流砥柱形态

代码语言:javascript复制
def morphology_indicator(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    # 计算中流砥柱形态指标
    df['body_height'] = abs(df['close'] - df['open'])  # 蜡烛实体高度
    df['upper_shadow_height'] = df['high'] - df[['open', 'close']].max(axis=1)  # 上影线高度
    df['lower_shadow_height'] = df[['open', 'close']].min(axis=1) - df['low']  # 下影线高度
    df['morphology'] = (df['upper_shadow_height'] <= 2 * df['body_height']) & (df['lower_shadow_height'] <= 2 * df['body_height'])
    
    if not inplace:
        return df

使用示例:

代码语言:javascript复制
import pandas as pd

# 创建示例数据
data = {
    'open': [10, 15, 12, 20],
    'high': [12, 18, 17, 22],
    'low': [9, 14, 11, 18],
    'close': [11, 16, 13, 21],
    'volume': [1000, 1500, 1200, 2000]
}
df = pd.DataFrame(data)

# 计算形态学指标
morphology_indicator(df, inplace=True)

print(df)

单针探底(定海神针)

代码语言:javascript复制
import numpy as np

def single_needle_bottom(df, inplace=False):
    if not inplace:
        df = df.copy()

    def is_single_needle_bottom(row):
        if row['close'] < row['open'] and row['close'] < row['low']   0.5 * (row['high'] - row['low']):
            return True
        else:
            return False

    df['single_needle_bottom'] = df.apply(is_single_needle_bottom, axis=1)

    return df

锤头线(锤子线)

代码语言:javascript复制
def hammer_candlestick(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['body'] = abs(df['close'] - df['open'])
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']
  
    df['is_hammer'] = ((df['body'] / df['body'].rolling(5).mean() < 0.02) & 
                      (df['upper_shadow'] / df['body'] < 0.15) &
                      (df['lower_shadow'] / df['body'] < 0.15))
  
    return df

看跌(高位倒锤头)流星线

以下是一个可以实现形态学指标【看跌(高位倒锤头)流星线】的Python函数:

代码语言:javascript复制
def bearish_inverted_hammer(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['body'] = abs(df['close'] - df['open'])
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']

    df['is_bearish_inverted_hammer'] = False
    for i in range(len(df)):
        if (df.loc[i]['body'] < 0.1 * df.loc[i]['high'] and
            df.loc[i]['upper_shadow'] > 2.5 * df.loc[i]['body'] and
            df.loc[i]['lower_shadow'] < 0.1 * df.loc[i]['high']):
            df.at[i, 'is_bearish_inverted_hammer'] = True

    return df

这个函数会计算每日的实体(body)、上影线(upper_shadow)和下影线(lower_shadow),然后检查是否符合看跌(高位倒锤头)流星线的形态。如果符合条件,则将is_bearish_inverted_hammer列设置为True,表示这一天出现了高位倒锤头形态。最后返回更新后的数据帧df

看涨(低位)倒锤头线

代码语言:javascript复制
import pandas as pd

def bullish_inverted_hammer(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['body'] = abs(df['open'] - df['close'])
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']

    df['is_bullish_inverted_hammer'] = ((df['close'] > df['open']) & 
                                         (df['open'] > df['low']) &
                                         (df['close'] > ((df['high']   df['low']) / 2)) &
                                         (df['body'] < 0.2 * df['low']) &
                                         (df['upper_shadow'] < 2 * df['body']) &
                                         (df['lower_shadow'] > 2 * df['body']))

    if not inplace:
        return df

上涨镊子线(U形磁铁)

代码语言:javascript复制
def u_shaped_magnet(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['u_shaped_magnet'] = 0
    
    for i in range(2, len(df)):
        if df['close'][i-2] > df['open'][i-2] and df['close'][i-1] < df['close'][i-2] and df['close'][i] > df['open'][i]:
            df.loc[i, 'u_shaped_magnet'] = 1
            
    return df

下跌镊子线(n形磁铁)

代码语言:javascript复制
def falling_engulfing(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['body_size'] = abs(df['close'] - df['open'])
    df['body_top'] = df[['open', 'close']].max(axis=1)
    df['body_bottom'] = df[['open', 'close']].min(axis=1)
    df['body_range'] = df['body_top'] - df['body_bottom']
    
    df['prev_body_size'] = df['body_size'].shift(1)
    df['prev_body_top'] = df['body_top'].shift(1)
    df['prev_body_bottom'] = df['body_bottom'].shift(1)
    
    df['falling_engulfing'] = (df['close'] < df['open']) & (df['open'] > df['prev_body_top']) & (df['close'] < df['prev_body_bottom'])
    
    if not inplace:
        return df

三空阴线

代码语言:javascript复制
def morphology_indicator(df, inplace=False):
    # 计算三空阴线指标
    df['close_open'] = df['close'] - df['open']
    df['close_low'] = df['close'] - df['low']
    df['open_close'] = df['open'] - df['close']
    
    df['three_black_crows'] = (df['close_open'] < 0) & (df['close_low'] < 0) & (df['open_close'] < 0)
    
    if inplace:
        return df
    else:
        return df.copy()

三空阳线

代码语言:javascript复制
def three_white_soldiers(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['body_size'] = df['close'] - df['open']
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']
    
    df['is_up_day'] = df['close'] > df['open']
    
    for i in range(2, len(df)):
        if df.loc[i-2, 'is_up_day'] and df.loc[i-1, 'is_up_day'] and df.loc[i, 'is_up_day']:
            if df.loc[i-2, 'body_size'] <= 0 or df.loc[i-1, 'body_size'] <= 0 or df.loc[i, 'body_size'] <= 0:
                continue
            if df.loc[i-2, 'lower_shadow'] / df.loc[i-2, 'body_size'] >= 0.5 or df.loc[i-1, 'lower_shadow'] / df.loc[i-1, 'body_size'] >= 0.5 or df.loc[i, 'lower_shadow'] / df.loc[i, 'body_size'] >= 0.5:
                continue
            if df.loc[i-2, 'close'] <= df.loc[i-1, 'close'] and df.loc[i-1, 'close'] <= df.loc[i, 'close']:
                df.loc[i, 'three_white_soldiers'] = 1
    
    df.drop(['body_size', 'upper_shadow', 'lower_shadow', 'is_up_day'], axis=1, inplace=True)
    
    return df

# 调用示例
import pandas as pd

data = {
    'open': [10, 11, 12, 13, 14],
    'high': [12, 13, 14, 15, 16],
    'low': [9, 10, 11, 12, 13],
    'close': [11, 12, 13, 14, 15],
    'volume': [1000, 1200, 1300, 1100, 1000]
}
df = pd.DataFrame(data)
df = three_white_soldiers(df, inplace=False)
print(df)

红三兵

代码语言:javascript复制
def red_three_soldiers(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['body'] = df['close'] - df['open']
    df['up_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
    df['down_shadow'] = df[['open', 'close']].min(axis=1) - df['low']

    df['up_body'] = df['body'].apply(lambda x: x if x > 0 else 0)
    df['down_body'] = df['body'].apply(lambda x: -x if x < 0 else 0)

    df['red_soldier'] = False

    for i in range(2, len(df)):
        if df.loc[i-2, 'up_body'] > 0 and df.loc[i-1, 'up_body'] > 0 and df.loc[i, 'up_body'] > 0:
            df.loc[i, 'red_soldier'] = True

    return df

# 示例用法
import pandas as pd

data = {
    'open': [10, 11, 12, 13, 14],
    'high': [15, 16, 17, 18, 19],
    'low': [9, 10, 11, 12, 13],
    'close': [14, 15, 16, 17, 18],
    'volume': [1000, 2000, 1500, 1800, 1200]
}

df = pd.DataFrame(data)

df = red_three_soldiers(df)
print(df)

三只乌鸦

代码语言:javascript复制
import pandas as pd

def three_black_crows(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['close_shifted'] = df['close'].shift(1)
    
    def is_three_black_crows(row):
        if row['close'] < row['open'] and row['close'] < row['close_shifted']:
            return True
        else:
            return False
    
    df['is_three_black_crows'] = df.apply(is_three_black_crows, axis=1)
    
    return df

# 使用示例
data = {'open': [10, 9, 8, 7],
        'high': [11, 10, 9, 8],
        'low': [9, 8, 7, 6],
        'close': [8, 7, 6, 5],
        'volume': [100, 200, 150, 180]}
df = pd.DataFrame(data)

df = three_black_crows(df)
print(df)

低位并排阳线

代码语言:javascript复制
def low_parallel_up(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['low_parallel_up'] = (df['open'] < df['close']) & (df['low'] < df['open']) & (df['low'] < df['close'])

    return df

高位并排阳线

代码语言:javascript复制
def high_concurrent_positive(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['high_close'] = df['high'] == df['close']
    df['prev_close'] = df['close'].shift(1)
    df['prev_high'] = df['high'].shift(1)
    df['prev_close_high'] = df['prev_close'] == df['prev_high']
    df['high_concurrent_positive'] = df['high_close'] & df['prev_close_high']

    if not inplace:
        return df

# 示例用法
import pandas as pd

data = {
    'open': [10, 15, 20, 18, 25],
    'high': [12, 18, 22, 20, 27],
    'low': [8, 14, 18, 16, 22],
    'close': [11, 17, 21, 19, 26],
    'volume': [1000, 2000, 1500, 1800, 1200]
}

df = pd.DataFrame(data)

df = high_concurrent_positive(df)
print(df)

上升三部曲(升势三鸦)

代码语言:javascript复制
def upward_three_crows(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['body'] = df['open'] - df['close']
    df['body_rel'] = df['body'] / (df['high'] - df['low'])
    
    for i in range(2, len(df)):
        if df['body'][i] < 0 and df['body'][i-1] < 0 and df['body'][i-2] < 0 
        and df['body_rel'][i] >= 0.5 and df['body_rel'][i-1] >= 0.5 and df['body_rel'][i-2] >= 0.5:
            df.loc[i, 'upward_three_crows'] = 1
        else:
            df.loc[i, 'upward_three_crows'] = 0
    
    return df

# 示例用法
import pandas as pd

data = {
    'open': [10, 9, 8, 7, 8, 7],
    'high': [12, 11, 10, 9, 9, 8],
    'low': [9, 8, 7, 6, 7, 6],
    'close': [9, 8, 7, 6, 7, 6],
    'volume': [100, 200, 150, 120, 300, 250]
}

df = pd.DataFrame(data)
df = upward_three_crows(df)
print(df)

下降三部曲(降势三鹤)

代码语言:javascript复制
import pandas as pd

def descending_three_methods(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['short_ma'] = df['close'].rolling(window=5).mean()  # 计算5日均价
    df['long_ma'] = df['close'].rolling(window=20).mean()  # 计算20日均价
    df['volume_ma'] = df['volume'].rolling(window=5).mean()  # 计算5日成交量均值

    # 判断下降三部曲条件
    df['descending_three_methods'] = (df['short_ma'] < df['long_ma']) & (df['short_ma'].shift(1) > df['long_ma'].shift(1)) & (df['close'] < df['short_ma']) & (df['close'] < df['close'].shift(1)) & (df['volume'] < df['volume_ma'])

    if not inplace:
        return df

高档五阴线

代码语言:javascript复制
import pandas as pd

def high_grade_five_black_crows(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    # 计算每根K线的实体大小
    df['body'] = abs(df['open'] - df['close'])
    
    # 计算每根K线的上影线和下影线
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']
    
    # 判断是否为高档五阴线
    conditions = (df['body'] / df['open'] < 0.01) & (df['lower_shadow'] / df['open'] < 0.005) & 
                 (df['close'] < df['open']) & (df['close'].shift(1) < df['close'].shift(2)) & 
                 (df['close'].shift(1) < df['close'].shift(3)) & (df['close'].shift(1) < df['close'].shift(4))

    df['high_grade_five_black_crows'] = conditions.astype(int)
    
    return df

低档五阳线

代码语言:javascript复制
def low_gang_wuyang(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['low_gang_wuyang'] = 0
    for i in range(4, len(df)):
        if df['close'][i] > df['close'][i-1] and df['close'][i-1] > df['close'][i-2] 
        and df['close'][i-2] > df['close'][i-3] and df['close'][i-3] > df['close'][i-4]:
            df.at[i, 'low_gang_wuyang'] = 1
    
    return df

两黑夹一红

代码语言:javascript复制
def two_black_one_red(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['body'] = abs(df['close'] - df['open'])  # 计算实体大小
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)  # 计算上影线
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']  # 计算下影线

    df['is_black'] = (df['close'] < df['open']).astype(int)  # 判断是否为黑蜡烛
    df['is_red'] = (df['close'] > df['open']).astype(int)  # 判断是否为红蜡烛

    df['pattern'] = (df['is_black']   df['is_black'].shift(1)   df['is_red']) == 2  # 判断是否为两黑夹一红形态

    if not inplace:
        return df

这段代码定义了一个名为two_black_one_red的函数,该函数接受一个数据帧df,并包含一个inplace参数用于指示是否原地更新df。函数计算并添加了实体大小、上影线、下影线、蜡烛颜色、以及两黑夹一红形态的指标到df中。如果inplaceFalse,则函数会返回更新后的df;否则将直接在原地更新df

两红夹一黑

代码语言:javascript复制
def two_red_one_black(df, inplace=False):
    if not inplace:
        df = df.copy()

    df['close_shift1'] = df['close'].shift(1)
    df['close_shift2'] = df['close'].shift(2)
    df['open_shift1'] = df['open'].shift(1)
    df['open_shift2'] = df['open'].shift(2)
    
    df['is_red1'] = df['close'] > df['open']
    df['is_red2'] = df['close_shift1'] > df['open_shift1']
    df['is_black'] = df['close_shift2'] < df['open_shift2']

    df['two_red_one_black'] = ((df['is_red1'] & df['is_red2'] & df['is_black']).shift(-2)).astype(int)

    if not inplace:
        return df

多方炮

代码语言:javascript复制
import pandas as pd

def bull_power(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['bull_power'] = df['high'] - (df['open']   df['close']) / 2
    
    return df

空方炮

代码语言:javascript复制
def short_selling_cannon(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    df['body'] = abs(df['close'] - df['open'])  # 实体的大小
    df['upper_shadow'] = df['high'] - df[['open', 'close']].max(axis=1)  # 上影线
    df['lower_shadow'] = df[['open', 'close']].min(axis=1) - df['low']  # 下影线
    
    df['short_selling_cannon'] = (df['upper_shadow'] > 2 * df['body']) & (df['lower_shadow'] < 0.5 * df['body'])

    return df

下降插入线(坠落线)

你可以使用以下代码来实现形态学指标【下降插入线(坠落线)】:

代码语言:javascript复制
def descent_insert_line(df, inplace=False):
    if not inplace:
        df = df.copy()
    df['insert_line'] = (df['open']   df['close']) / 2
    df['descent_insert_line'] = (df['high'] - df['insert_line']).rolling(window=3).max()
    if not inplace:
        return df

使用这个函数,你可以对数据帧df应用形态学指标【下降插入线(坠落线)】,并选择是否原地更新df

高开跳空缺口

代码语言:javascript复制
def high_open_jump_gap(df, inplace=False):
    if not inplace:
        df = df.copy()
        
    df['high_open_gap'] = df['open'] - df['close'].shift(1)
    
    return df

低开跳空缺口

代码语言:javascript复制
def low_open_gap(df, inplace=False):
    if not inplace:
        df = df.copy()
    
    # 计算当日开盘价与前一日收盘价的差值
    df['open_close_diff'] = df['open'] - df['close'].shift(1)
    
    # 判断是否出现低开跳空缺口
    df['low_open_gap'] = df['open_close_diff'] > 0
    
    if not inplace:
        return df

1 人点赞