RotaryEmbedding
代码语言:javascript
复制# 旋转位置嵌入,应用于每一层 Q 和 K
class RotaryEmbedding(nn.Module):
def __init__(self, dim, rope_ratio=1, original_impl=False, device=None, dtype=None):
super().__init__()
# 除法项定义
inv_freq = 1.0 / (10000 ** (torch.arange(0, dim, 2, device=device).to(dtype=dtype) / dim))
self.register_buffer("inv_freq", inv_freq)
# d,嵌入维度
self.dim = dim
# (未知)
self.original_impl = original_impl
# 旋转比例
self.rope_ratio = rope_ratio
def forward_impl(
self, seq_len: int, n_elem: int, dtype: torch.dtype, device: torch.device, base: int = 10000
):
"""Enhanced Transformer with Rotary Position Embedding.
Derived from: https://github.com/labmlai/annotated_deep_learning_paper_implementations/blob/master/labml_nn/
transformers/rope/__init__.py. MIT License:
https://github.com/labmlai/annotated_deep_learning_paper_implementations/blob/master/license.
"""
# $Theta = {theta_i = 10000^{frac{2(i-1)}{d}}, i in [1, 2, ..., frac{d}{2}]}$
base = base * self.rope_ratio
# 嵌入空间中每个二维子空间的旋转角度
theta = 1.0 / (base ** (torch.arange(0, n_elem, 2, dtype=torch.float, device=device) / n_elem))
# 序列 ID,0 ~ SeqLen - 1 的一维数组
seq_idx = torch.arange(seq_len, dtype=torch.float, device=device)
# 二者的每个元素相乘,得到序列角度
# 尺寸为 [SeqLen, HeadSize // 2]
# idx_theta[i, d] == i * theta[d]
idx_theta = torch.outer(seq_idx, theta).float()
# 计算序列角度的余弦和正弦,并按最后一维堆叠
# 尺寸为 [SeqLen, HeadSize // 2, 2]
# cache[i, d] 是第 i 个向量第 d 个子空间的余弦和正弦值
cache = torch.stack([torch.cos(idx_theta), torch.sin(idx_theta)], dim=-1)
# this is to mimic the behaviour of complex32, else we will get different results
if dtype in (torch.float16, torch.bfloat16, torch.int8):
cache = cache.bfloat16() if dtype == torch.bfloat16 else cache.half()
return cache
def forward(self, max_seq_len, offset=0):
return self.forward_impl(
max_seq_len, self.dim, dtype=self.inv_freq.dtype, device=self.inv_freq.device
)
@torch.jit.script
def apply_rotary_pos_emb(x: torch.Tensor, rope_cache: torch.Tensor) -> torch.Tensor:
# 输入:[SeqLen, BatchSize, NHead, HeadSize]
# rope:[MaxSeqLen, HeadSize // 2, 2]
sq, b, np, hn = x.size(0), x.size(1), x.size(2), x.size(3)
# HeadSize
rot_dim = rope_cache.shape[-2] * 2
# 如果 X 嵌入维度超过了 HeadSize,将其分为两部分,只处理 HeadSize 之内的部分
x, x_pass = x[..., :rot_dim], x[..., rot_dim:]
# rope 截断到 SeqLen 长度
rope_cache = rope_cache[:sq]
# 拆分 X 的最后一维,使元素两个一组,[SeqLen, BatchSize, NHead, HeadSize // 2, 2]
xshaped = x.reshape(sq, -1, np, rot_dim // 2, 2)
# 再 rope 第二维插两个 1,[SeqLen, 1, 1, HeadSize // 2, 2]
rope_cache = rope_cache.view(sq, -1, 1, xshaped.size(3), 2)
# 执行旋转编码
# xshaped[..., 0]:二维子空间 x0
# xshaped[..., 1]:二位子空间 y0
# xshaped[..., 0]:cosθ
# rope_cache[..., 1]:sinθ
# x = cosθ * x0 - sinθ * y0
# y = sinθ * x0 cosθ * y0
x_out2 = torch.stack(
[
xshaped[..., 0] * rope_cache[..., 0] - xshaped[..., 1] * rope_cache[..., 1],
xshaped[..., 1] * rope_cache[..., 0] xshaped[..., 0] * rope_cache[..., 1],
],
-1,
)
# 变形为 [SeqLen, BatchSize, NHead, HeadSize]
x_out2 = x_out2.flatten(3)
# 将 HeadSize 之外的部分合并回来
return torch.cat((x_out2, x_pass), dim=-1)
Embedding
代码语言:javascript
复制class Embedding(torch.nn.Module):
"""Language model embeddings."""
def __init__(self, config: ChatGLMConfig, device=None):
super(Embedding, self).__init__()
# HidSize:隐藏状态每个向量的维度
self.hidden_size = config.hidden_size
# 嵌入层,用于将单词ID转成向量,尺寸 [VocabSize, HidSize]
self.word_embeddings = nn.Embedding(
config.padded_vocab_size,
self.hidden_size,
dtype=config.torch_dtype,
device=device
)
# 控制残差连接是否是 FP32
self.fp32_residual_connection = config.fp32_residual_connection
def forward(self, input_ids):
# 输入是单词 ID,[BatchSize, SeqLen]
# 将单词 ID 传入嵌入层,得到单词向量,作为初始隐藏状态
# [BatchSize, SeqLen, HidSize]
words_embeddings = self.word_embeddings(input_ids)
embeddings = words_embeddings
# 交换初始隐藏状态前两维,[SeqLen, BatchSize, HidSize]
embeddings = embeddings.transpose(0, 1).contiguous()
# 如果设置了 FP32,将其转换为 FP32
if self.fp32_residual_connection:
embeddings = embeddings.float()
return embeddings
ChatGLMForConditionalGeneration
代码语言:javascript
复制class ChatGLMForConditionalGeneration(ChatGLMPreTrainedModel):
def __init__(self, config: ChatGLMConfig, empty_init=True, device=None):
super().__init__(config)
# MaxSeqLen
self.max_sequence_length = config.max_length
# 前面的 TFM
self.transformer = ChatGLMModel(config, empty_init=empty_init, device=device)
self.config = config
self.quantized = False
# 如果指定了量化位数则执行量化
if self.config.quantization_bit:
self.quantize(self.config.quantization_bit, empty_init=True)
def forward(
self,
input_ids: Optional[torch.Tensor] = None,
position_ids: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
past_key_values: Optional[Tuple[torch.FloatTensor]] = None,
inputs_embeds: Optional[torch.Tensor] = None,
labels: Optional[torch.Tensor] = None,
use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
return_last_logit: Optional[bool] = False,
):
# 初始化`use_cache`,指定是否返回 KVCache
use_cache = use_cache if use_cache is not None else self.config.use_cache
# 初始化`return_dict`,指定返回字典还是元组
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# 单词 ID:[BatchSize, SeqLen]
# 将单词 ID 等东西传入 TFM
transformer_outputs = self.transformer(
input_ids=input_ids,
position_ids=position_ids,
attention_mask=attention_mask,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
use_cache=use_cache,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
# 得到最终隐藏状态,[SeqLen, BatchSize, HidSize]
hidden_states = transformer_outputs[0]
# 如果只返回最后一个 logit, 只取隐藏状态的最后一个
if return_last_logit:
hidden_states = hidden_states[-1:]
# 将隐藏状态传入输出层得到 logits,[SeqLen, BatchSize, VocabSize]
lm_logits = self.transformer.output_layer(hidden_states)
# 交换前两维,[BatchSize, SeqLen, HidSize]
lm_logits = lm_logits.transpose(0, 1).contiguous()
loss = None
# 如果指定了标签,计算损失
if labels is not None:
lm_logits = lm_logits.to(torch.float32)
# 截断 logits 的最后一个元素和标签的第一个元素
# 因为需要让单词 #1 的 logits 拟合标签 #2
# logits: A B C D E (F)
# 标签: (A) B C D E F
shift_logits = lm_logits[..., :-1, :].contiguous()
shift_labels = labels[..., 1:].contiguous()
# 计算交叉熵,并忽略标签 -100
loss_fct = CrossEntropyLoss(ignore_index=-100)
# logits 变形为 [BatchSize * (DeqLen - 1), VocabSize]
# 标签变形为 [BatchSize * (DeqLen - 1)]
loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1))
'''
这段逻辑类似于:
mask = shift_labels != -100
shift_labels = shift_labels[mask]
shift_logits = shift_logits[mask]
shift_onehot = torch.nn.functional.one_hot(shift_labels, shift_logits.size(-1))
shift_probs = torch.softmax(shift_logits, -1)
loss = - (shift_onehot * torch.log(shift_probs)).sum(-1).mean()
'''
lm_logits = lm_logits.to(hidden_states.dtype)
loss = loss.to(hidden_states.dtype)
# 如果指定不返回字典,将损失,logits 和其他东西打包成元组返回
if not return_dict:
output = (lm_logits,) transformer_outputs[1:]
return ((loss,) output) if loss is not None else output
# 否则返回字典
return CausalLMOutputWithPast(
loss=loss,
logits=lm_logits,
past_key_values=transformer_outputs.past_key_values,
hidden_states=transformer_outputs.hidden_states,
attentions=transformer_outputs.attentions,
)