来源 | CSDN博客作者 | 蒋含竹责编 | 徐威龙
利用循环神经网络RNN可以做各种连续性数据的预测,其中生成古诗词是一件非常有趣的事,特此分享我的学习经验。
先来几首藏头诗吧 ^_^
代码语言:javascript复制宁静致远
宁随古峰一里乡,静在门林满树通。致有旧人身自住,远花不似水花中。
风起云涌
风山一夕月,起落鸟纷纷。云散生何处,涌深千尺村。
春夏秋冬
春来空树柳微时,夏火遥愁独寂寥。秋上北陵村未苦,冬来寒向入楼僧。
另外,我的实现参考了这篇博客,非常感谢这位博主的无私奉献!
https://blog.csdn.net/aaronjny/article/details/103806954
导包
代码语言:javascript复制import math
import re
import numpy as np
import tensorflow as tf
from collections import Counter
数据预处理
2.1 原始数据
- 原始数据(百度网盘: poetry.txt 提取码: b2pp)
内容示例如下
代码语言:javascript复制过老子庙:仙居怀圣德,灵庙肃神心。草合人踪断,尘浓鸟迹深。流沙丹灶没,关路紫烟沉。独伤千载后,空馀松柏林。
途次陕州:境出三秦外,途分二陕中。山川入虞虢,风俗限西东。树古棠阴在,耕余让畔空。鸣笳从此去,行见洛阳宫。
野次喜雪:拂曙辟行宫,寒皋野望通。每云低远岫,飞雪舞长空。赋象恒依物,萦回屡逐风。为知勤恤意,先此示年丰。
送贺知章归四明:遗荣期入道,辞老竟抽簪。岂不惜贤达,其如高尚心。寰中得秘要,方外散幽襟。独有青门饯,群僚怅别深。
轩游宫十五夜:行迈离秦国,巡方赴洛师。路逢三五夜,春色暗中期。关外长河转,宫中淑气迟。歌钟对明月,不减旧游时。
我们的原始数据poetry.txt中,每一行是一首诗,按":"符号分隔为诗的标题、内容,其中还有逗号、句号。
2.2 数据预处理
首先,因为我们想训练的是写诗的内容,因此等下训练的时候只需要诗的内容即可。
另外,我们的数据中可能存在部分符号的问题,例如中英文符号混用、每行存在多个冒号、数据中存在其他符号等问题,因此我们需要对数据进行清洗。
代码语言:javascript复制# 数据路径
DATA_PATH = './datasets/poetry.txt'
# 单行诗最大长度
MAX_LEN = 64
# 禁用的字符,拥有以下符号的诗将被忽略
DISALLOWED_WORDS = ['(', ')', '(', ')', '__', '《', '》', '【', '】', '[', ']']
# 一首诗(一行)对应一个列表的元素
poetry = []
# 按行读取数据 poetry.txt
with open(DATA_PATH, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 遍历处理每一条数据
for line in lines:
# 利用正则表达式拆分标题和内容
fields = re.split(r"[::]", line)
# 跳过异常数据
if len(fields) != 2:
continue
# 得到诗词内容(后面不需要标题)
content = fields[1]
# 跳过内容过长的诗词
if len(content) > MAX_LEN - 2:
continue
# 跳过存在禁用符的诗词
if any(word in content for word in DISALLOWED_WORDS):
continue
poetry.append(content.replace('n', '')) # 最后要记得删除换行符
接着,我们来打印几首处理后的诗看看
代码语言:javascript复制for i in range(0, 5):
print(poetry[i])
代码语言:javascript复制系马宫槐老,持杯店菊黄。故交今不见,流恨满川光。
世间何事不潸然,得失人情命不延。适向蔡家厅上饮,回头已见一千年。
只领千馀骑,长驱碛邑间。云州多警急,雪夜度关山。石响铃声远,天寒弓力悭。秦楼休怅望,不日凯歌还。
今日花前饮,甘心醉数杯。但愁花有语,不为老人开。
秋来吟更苦,半咽半随风。禅客心应乱,愁人耳愿聋。雨晴烟树里,日晚古城中。远思应难尽,谁当与我同。
现在,我们需要对诗句进行分词,不过考虑到为了最后生成的诗的长度的整齐性,以及便利性,我们在这里按单个字符进行拆分。(你也可以使用专业的分词工具,例如jieba、hanlp等)
并且,我们还需要统计一下词频,删除掉出现次数较低的词
代码语言:javascript复制# 最小词频
MIN_WORD_FREQUENCY = 8
# 统计词频,利用Counter可以直接按单个字符进行统计词频
counter = Counter()
for line in poetry:
counter.update(line)
# 过滤掉低词频的词
tokens = [token for token, count in counter.items() if count >= MIN_WORD_FREQUENCY]
看看我们的词频统计结果如何
代码语言:javascript复制i = 0
for token, count in counter.items():
if i >= 5:
break;
print(token, "->",count)
i = 1
代码语言:javascript复制寒 -> 2627
随 -> 1039
穷 -> 487
律 -> 119
变 -> 286
除此之外,还有几个点需要我们考虑。
- 需要用2个符号分别表示一首诗的起始点、结束点。这样我们的神经网络才能由训练得知什么时候写完一首诗。
- 需要一个字符来代表所有未知的字符。因为我们的数据去除了低频词,并且我们的文本不可能包含全世界所有的字符,因此需要一个字符来表示未知字符。
- 需要一个字符来填充诗词,以保证诗词的长度统一。因为单个批次内训练的数据特征长度必须一致。
因此,我们需要设置几个特殊字符
代码语言:javascript复制# 补上特殊词标记:填充字符标记、未知词标记、开始标记、结束标记
tokens = ["[PAD]", "[NONE]", "[START]", "[END]"] tokens
最后,我们需要对生成的所有词进行编号,方便后面进行转码
代码语言:javascript复制# 映射: 词 -> 编号
word_idx = {}
# 映射: 编号 -> 词
idx_word = {}
for idx, word in enumerate(tokens):
word_idx[word] = idx
idx_word[idx] = word
注意:因为后面我们要构建一个Tokenizer,在其内部实现该结构,此处的代码可以不用管
2.3 构建Tokenizer
构建一个Tokenizer,用于实现编号与词之间、编号列表与词列表之间的转换
其代码如下
代码语言:javascript复制class Tokenizer:
"""
分词器
"""
def __init__(self, tokens):
# 词汇表大小
self.dict_size = len(tokens)
# 生成映射关系
self.token_id = {} # 映射: 词 -> 编号
self.id_token = {} # 映射: 编号 -> 词
for idx, word in enumerate(tokens):
self.token_id[word] = idx
self.id_token[idx] = word
# 各个特殊标记的编号id,方便其他地方使用
self.start_id = self.token_id["[START]"]
self.end_id = self.token_id["[END]"]
self.none_id = self.token_id["[NONE]"]
self.pad_id = self.token_id["[PAD]"]
def id_to_token(self, token_id):
"""
编号 -> 词
"""
return self.id_token.get(token_id)
def token_to_id(self, token):
"""
词 -> 编号
"""
return self.token_id.get(token, self.none_id)
def encode(self, tokens):
"""
词列表 -> [START]编号 编号列表 [END]编号
"""
token_ids = [self.start_id, ] # 起始标记
# 遍历,词转编号
for token in tokens:
token_ids.append(self.token_to_id(token))
token_ids.append(self.end_id) # 结束标记
return token_ids
def decode(self, token_ids):
"""
编号列表 -> 词列表(去掉起始、结束标记)
"""
# 起始、结束标记
flag_tokens = {"[START]", "[END]"}
tokens = []
for idx in token_ids:
token = self.id_to_token(idx)
# 跳过起始、结束标记
if token not in flag_tokens:
tokens.append(token)
return tokens
初始化 Tokenizer
代码语言:javascript复制tokenizer = Tokenizer(tokens)
2.4 构建PoetryDataSet
放了方便后面按批次抽取数据训练模型,因此我们还需要构建一个数据生成器。这样TensorFlow在训练模型时会之间从该数据生成器抽取数据。
另外,我们抽取的原始数据还需要进行转码,才能喂给模型进行训练,该部分也封装在PoetryDataSet中
其代码如下
代码语言:javascript复制class PoetryDataSet:
"""
古诗数据集生成器
"""
def __init__(self, data, tokenizer, batch_size):
# 数据集
self.data = data
self.total_size = len(self.data)
# 分词器,用于词转编号
self.tokenizer = tokenizer
# 每批数据量
self.batch_size = BATCH_SIZE
# 每个epoch迭代的步数
self.steps = int(math.floor(len(self.data) / self.batch_size))
def pad_line(self, line, length, padding=None):
"""
对齐单行数据
"""
if padding is None:
padding = self.tokenizer.pad_id
padding_length = length - len(line)
if padding_length > 0:
return line [padding] * padding_length
else:
return line[:length]
def __len__(self):
return self.steps
def __iter__(self):
# 打乱数据
np.random.shuffle(self.data)
# 迭代一个epoch,每次yield一个batch
for start in range(0, self.total_size, self.batch_size):
end = min(start self.batch_size, self.total_size)
data = self.data[start:end]
max_length = max(map(len, data))
batch_data = []
for str_line in data:
# 对每一行诗词进行编码、并补齐padding
encode_line = self.tokenizer.encode(str_line)
pad_encode_line = self.pad_line(encode_line, max_length 2) # 加2是因为tokenizer.encode会添加START和END
batch_data.append(pad_encode_line)
batch_data = np.array(batch_data)
# yield 特征、标签
yield batch_data[:, :-1], batch_data[:, 1:]
def generator(self):
while True:
yield from self.__iter__()
生成的特征、标签的示例如下(实际是编号,此处做了转换)
代码语言:javascript复制特征:[START]我有辞乡剑,玉锋堪截云。襄阳走马客,意气自生春。朝嫌剑花净,暮嫌剑光冷。能持剑向人,不解持照身。[END][PAD][PAD][PAD]
标签:我有辞乡剑,玉锋堪截云。襄阳走马客,意气自生春。朝嫌剑花净,暮嫌剑光冷。能持剑向人,不解持照身。[END][PAD][PAD][PAD][PAD]
初始化 PoetryDataSet
代码语言:javascript复制dataset = PoetryDataSet(poetry, tokenizer, BATCH_SIZE)
模型的构建与训练
3.1 构建模型
现在我们可以开始构建RNN模型了,因为模型层与层之间是顺序的,因此我们可以采用Sequential快速构建模型。
模型如下
代码语言:javascript复制model = tf.keras.Sequential([
# 词嵌入层
tf.keras.layers.Embedding(input_dim=tokenizer.dict_size, output_dim=150),
# 第一个LSTM层
tf.keras.layers.LSTM(150, dropout=0.5, return_sequences=True),
# 第二个LSTM层
tf.keras.layers.LSTM(150, dropout=0.5, return_sequences=True),
# 利用TimeDistributed对每个时间步的输出都做Dense操作(softmax激活)
tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(tokenizer.dict_size, activation='softmax')),
])
模型总览
代码语言:javascript复制model.summary()
代码语言:javascript复制Model: "sequential_2"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
embedding_2 (Embedding) (None, None, 150) 515100
_________________________________________________________________
lstm_4 (LSTM) (None, None, 150) 180600
_________________________________________________________________
lstm_5 (LSTM) (None, None, 150) 180600
_________________________________________________________________
time_distributed_2 (TimeDist (None, None, 3434) 518534
=================================================================
Total params: 1,394,834
Trainable params: 1,394,834
Non-trainable params: 0
_________________________________________________________________
进行模型编译(选择优化器、损失函数)
代码语言:javascript复制model.compile(
optimizer=tf.keras.optimizers.Adam(),
loss=tf.keras.losses.sparse_categorical_crossentropy
)
注意:因为我们的标签是非one_hot形式的,因此需要选择sparse_categorical_crossentropy 。当然你也可以利用tf.one_hot(标签, size)进行转换,然后使用categorical_crossentropy。
3.2 训练模型
开始训练模型
代码语言:javascript复制model.fit(
dataset.generator(),
steps_per_epoch=dataset.steps,
epochs=10
)
代码语言:javascript复制Train for 767 steps
Epoch 1/10
767/767 [==============================] - 34s 44ms/step - loss: 4.8892
Epoch 2/10
767/767 [==============================] - 31s 41ms/step - loss: 4.2494
Epoch 3/10
767/767 [==============================] - 31s 40ms/step - loss: 4.1113
Epoch 4/10
767/767 [==============================] - 31s 40ms/step - loss: 3.9864
Epoch 5/10
767/767 [==============================] - 31s 40ms/step - loss: 3.8660
Epoch 6/10
767/767 [==============================] - 31s 40ms/step - loss: 3.7879
Epoch 7/10
767/767 [==============================] - 31s 40ms/step - loss: 3.7339
Epoch 8/10
767/767 [==============================] - 31s 40ms/step - loss: 3.6826
Epoch 9/10
767/767 [==============================] - 31s 40ms/step - loss: 3.6275
Epoch 10/10
767/767 [==============================] - 31s 40ms/step - loss: 3.5999
预测
4.1 预测单个词
模型对于数据的预测结果是概率分布
代码语言:javascript复制# 需要先将词转为编号
token_ids = [tokenizer.token_to_id(word) for word in ["月", "光", "静", "谧"]]
# 进行预测
result = model.predict([token_ids ,])
print(result)
代码语言:javascript复制[[[2.0809230e-04 9.3881181e-03 5.5695949e-07 ... 5.6030808e-06
8.5241054e-06 2.0507096e-06]
[7.6916285e-06 6.1246334e-03 1.8850582e-08 ... 4.8418292e-06
2.8483141e-06 5.3288642e-07]
[5.0856406e-06 3.1365673e-03 1.9067786e-08 ... 4.5156207e-06
1.0479171e-05 9.7814757e-07]
[7.1793047e-06 2.2729969e-02 2.0391434e-08 ... 2.0609916e-06
2.2420336e-06 2.1413473e-06]]]
每次预测其实是根据一个序列预测一个新的词,我们需要词的多样化,因此可以按预测结果的概率分布进行抽样。代码如下
代码语言:javascript复制def predict(model, token_ids):
"""
在概率值为前100的词中选取一个词(按概率分布的方式)
:return: 一个词的编号(不包含[PAD][NONE][START])
"""
# 预测各个词的概率分布
# -1 表示只要对最新的词的预测
# 3: 表示不要前面几个标记符
_probas = model.predict([token_ids, ])[0, -1, 3:]
# 按概率降序,取前100
p_args = _probas.argsort()[-100:][::-1] # 此时拿到的是索引
p = _probas[p_args] # 根据索引找到具体的概率值
p = p / sum(p) # 归一
# 按概率抽取一个
target_index = np.random.choice(len(p), p=p)
# 前面预测时删除了前几个标记符,因此编号要补上3位,才是实际在tokenizer词典中的编号
return p_args[target_index] 3
我们随便来对一个序列进行循环预测试试
代码语言:javascript复制token_ids = tokenizer.encode("清风明月")[:-1]
while len(token_ids) < 13:
# 预测词的编号
target = predict(model, token_ids)
# 保存结果
token_ids.append(target)
# 到达END
if target == tokenizer.end_id:
break
print("".join(tokenizer.decode(token_ids)))
代码语言:javascript复制清风明月夜,晚色北堂残。
至此,基本的预测已经完成。后面只需要设置一些规则,就可以实现随机生成一首诗、生成一首藏头诗的功能
4.2 随机生成一首诗、自动续写诗词
代码如下
代码语言:javascript复制def generate_random_poem(tokenizer, model, text=""):
"""
随机生成一首诗
:param tokenizer: 分词器
:param model: 古诗模型
:param text: 古诗的起始字符串,默认为空
:return: 一首古诗的字符串
"""
# 将初始字符串转成token_ids,并去掉结束标记[END]
token_ids = tokenizer.encode(text)[:-1]
while len(token_ids) < MAX_LEN:
# 预测词的编号
target = predict(model, token_ids)
# 保存结果
token_ids.append(target)
# 到达END
if target == tokenizer.end_id:
break
return "".join(tokenizer.decode(token_ids))
随意测试几次
代码语言:javascript复制for i in range(5):
print(generate_random_poem(tokenizer, model))
代码语言:javascript复制江亭路断暮,归去见芳洲。惆怅门中去,心年少地深。夜期深木静,水落夕阳深。秋去人南雨,凄头望海中。
洛陌江阳宫下树,玉门宫夜似东云。今更已长逢醉士,一明先语似相春。
春山风半夜初归,万岁空声去去过。自惜秦生犹送酒,何人无计不安稀。
何处东陵路,无年已复还。晓莺逢半急,潮望月云稀。暗影通三度,烟沙水鸟深。当年相忆望,何处问渔家。
清夜向阳阁,一风看北宫。雨分红蕊草,红杏药茶行。野石翻山远,猿晴不独天。谁知一山下,飞首却悠悠。
给一首诗的开头,让它自己续写
代码语言:javascript复制print(generate_random_poem(tokenizer, model, "春眠不觉晓,"))
print(generate_random_poem(tokenizer, model, "白日依山尽,"))
print(generate_random_poem(tokenizer, model, "秦时明月汉时关,"))
代码语言:javascript复制春眠不觉晓,坐住树深空。风月飘犹晓,春多出水流。
白日依山尽,相逢独水声。唯疑见心意,一老泪鸣归。落晚南游客,吟猿见柳寒。何堪看暮望,还见有军情。
秦时明月汉时关,欲望时恩不道心。莫忆旧乡僧雁在,始堪曾在牡苓流。
4.2 生成一首藏头诗
代码如下
代码语言:javascript复制def generate_acrostic_poem(tokenizer, model, heads):
"""
生成一首藏头诗
:param tokenizer: 分词器
:param model: 古诗模型
:param heads: 藏头诗的头
:return: 一首古诗的字符串
"""
# token_ids,只包含[START]编号
token_ids = [tokenizer.start_id, ]
# 逗号和句号标记编号
punctuation_ids = {tokenizer.token_to_id(","), tokenizer.token_to_id("。")}
content = []
# 为每一个head生成一句诗
for head in heads:
content.append(head)
# head转为编号id,放入列表,用于预测
token_ids.append(tokenizer.token_to_id(head))
# 开始生成一句诗
target = -1;
while target not in punctuation_ids: # 遇到逗号、句号,说明本句结束,开始下一句
# 预测词的编号
target = predict(model, token_ids)
# 因为可能预测到END,所以加个判断
if target > 3:
# 保存结果到token_ids中,下一次预测还要用
token_ids.append(target)
content.append(tokenizer.id_to_token(target))
return "".join(content)
随意测试几次
代码语言:javascript复制print(generate_acrostic_poem(tokenizer, model, heads="上善若水"))
print(generate_acrostic_poem(tokenizer, model, heads="明月清风"))
print(generate_acrostic_poem(tokenizer, model, heads="点个赞吧"))
代码语言:javascript复制上亭清色望,善地半烟霞。若辨从秋日,水花清上清。
明夕远多尽,月生开雨明。清山看楚雪,风色水堂钟。
点阁风空雪,个枝时未开。赞君初合泪,吧石似春风。
4.3 如何生成一首押韵诗?
看了前面生成随机诗、藏头诗的代码,其实你应该知道我们对于生成的诗的每个词是可以控制。
那么我们在选取每句最后一个字时,只需要换一个预测方法即可。
之前我们使用predict是选取概率值为前100的,现在你只需要从预测的概率分布中过滤出与前面句式押韵的词,然后从中随机抽取一个字,即可生成押韵的诗句!^_^
其他
如果你需要在训练时,每个epoch都打印一下训练效果,或者想保存loss最小的模型,你可以在训练时添加Callback,例如:
代码语言:javascript复制class ShowSaveCallback(tf.keras.callbacks.Callback):
def __init__(self):
super().__init__()
# 给一个初始最大值
self.loss = float("inf")
def on_epoch_end(self, epoch, logs=None):
# 保留损失最低的模型
if logs['loss'] <= self.loss:
self.loss = logs['loss']
model.save("./rnn_model.h5")
# 查看一下本次训练的效果
print()
for i in range(5):
print(generate_random_poem(tokenizer, model))
# 开始训练
model.fit(
dataset.generator(),
steps_per_epoch=dataset.steps,
epochs=10,
callbacks=[ShowSaveCallback()]
)
加载训练好的模型
代码语言:javascript复制model = tf.keras.models.load_model("./rnn_model.h5")
# 后面就可以继续进行预测了
代码语言:javascript复制【end】
◆有奖征文◆
推荐阅读
2020年,5种将死的编程语言检测、量化、追踪新冠病毒,基于深度学习的自动CT图像分析有多靠谱?GitHub 接连封杀开源项目惹众怒,CEO 亲自道歉!智能合约编写之 Solidity 的设计模式低学历、文科出身,我如何从月薪不到 3000 逆袭为大厂高薪程序员?从提取层、处理层、基础结构入手,带你了解Spark和Kafka!你点的每个“在看”,我都认真当成了AI