LLMs from scratch

image

最近读了这本书,确实非常的不错,将 GPT 拆解了,虽然只是一个比较早期的 GPT-2 但是五脏俱全。

首先从 Overview 的角度对实现 GPT-2 有一个大致的观感。

image

Prepare 前置准备

Tokenizing 分词

Tokenizing(分词)是将输入文本拆分成较小的单位(称为Token)的过程,以便模型能够处理和理解这些文本。分词有很多方式,

  • Word-based(基于词):将文本按单词分割(如空格分隔)。适用于英文,但对中文等无明确分隔的语言效果较差。
  • Character-based(基于字符):将文本按单个字符分割,适合中文,但会导致序列过长。
  • Subword-based(基于子词):
    • 目前最常用的方法,如 Byte-Pair Encoding (BPE)、WordPiece 或 SentencePiece。
    • 例如,BPE 会将常用词保留为完整 Token(如 “dog”),而将不常见词拆分为子词(如 “unhappiness” 可能分为 “un”, “##happy”, “##ness”)。
    • 优点是兼顾了词汇表大小和语言覆盖率。

因为本书的重点也不是分词器的实现,在前半段完成了一个 Word By Word 分词器的实现。

1
2
3
preprocessed = re.split(r'([,.:;?_!"()\']|--|\\s)', raw_text)
preprocessed = [item.strip() for item in preprocessed if item.strip()]
print(len(preprocessed))

拆分的结果如下

1
2
3
4
['I', 'HAD', 'always', 'thought', 'Jack', 'Gisburn', 'rather', 'a',
'cheap', 'genius', '--', 'though', 'a', 'good', 'fellow', 'enough',
'--', 'so', 'it', 'was', 'no', 'great', 'surprise', 'to', 'me', 'to',
'hear', 'that', ',', 'in']

拆分之后的单次,给与一个单次一个 ID

1
2
3
4
5
vocab = {token:integer for integer,token in enumerate(all_words)}
for i, item in enumerate(vocab.items()):
print(item)
if i >= 50:
break

就得到了一个单次的 ID 清单

1
2
3
4
5
6
('!', 0)
('"', 1)
("'", 2)
...
('Her', 49)
('Hermia', 50)

不过在本书中使用的 BPE 的分词手段,Byte-Pair Encoding (BPE) 是一种常用的子词分词算法,广泛应用于大型语言模型(如 GPT、BERT 等)中。BPE 通过迭代合并频繁出现的字符对,构建一个词汇表,将文本分割为子词级别的 Token。 具体的原理可以参考 LLM Course

1
2
3
4
5
6
7
8
9
10
from importlib.metadata import version
import tiktoken

tokenizer = tiktoken.get_encoding("gpt2")
text = (
"Hello, do you like tea? <|endoftext|> In the sunlit terraces"
"of someunknownPlace."
)
integers = tokenizer.encode(text, allowed_special={"<|endoftext|>"})
print(integers)

Data Sampling 数据采样

因为 LLM 本质是需要训练的,因此需要需要来训练,书中采用了滑动窗口的方案。

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

import torch
from torch.utils.data import Dataset, DataLoader
class GPTDatasetV1(Dataset):
def __init__(self, txt, tokenizer, max_length, stride):
self.input_ids = []
self.target_ids = []

token_ids = tokenizer.encode(txt) #1 使用 BPE 进行分词

for i in range(0, len(token_ids) - max_length, stride): #2 使用滑动窗口来进行数据切分
input_chunk = token_ids[i:i + max_length]
target_chunk = token_ids[i + 1: i + max_length + 1]
self.input_ids.append(torch.tensor(input_chunk))
self.target_ids.append(torch.tensor(target_chunk))

def __len__(self): #3 返回长度,Dataset 的抽象类需要实现2个函数,__len__ 和 __getitem__
return len(self.input_ids)

def __getitem__(self, idx): #4 返回输入,和期望的输出
return self.input_ids[idx], self.target_ids[idx]

Token Embeding 词嵌入

将离散的词汇表索引(Token ID)映射为连续的向量表示(嵌入向量)在书中举例,比如我们有一个

1
2
3
4
5
6
7
8
9
10
11
12
13
vocab_size = 6 # 6 个单词的小词汇表
output_dim = 3 # 希望创建大小为 3 的嵌入
torch.manual_seed(123)
embedding_layer = torch.nn.Embedding(vocab_size, output_dim)
print(embedding_layer.weight)

Parameter containing:
tensor([[ 0.3374, -0.1778, -0.1690],
[ 0.9178, 1.5810, 1.3010],
[ 1.2753, -0.2010, -0.1606],
[-0.4015, 0.9666, -1.1481],
[-1.1589, 0.3255, -0.6315],
[-2.8400, -0.7849, -1.4096]], requires_grad=True)

从输出我们就自然明白了,也就是生成了一个数组,包含了 vocab_size 个大小的 output_dim 的 float 值,嵌入层的权重矩阵包含小的随机值。这些值在 LLM 训练期间作为 LLM 优化本身的一部分进行优化。此外,我们可以看到权重矩阵有 6 行 3 列。词汇表中 6 个可能的词元各有一行,3 个嵌入维度各有一列。

这部分内容在 《集体智慧编程中》 也有大量的涉及,确实可以触类旁通。

Attention 注意力机制

Attention Is All You Need

提出了一个全新的架构

image

主要解决在一个长文本中,序列数据丢失信息的问题。

没太看懂数学含义

Attending to different parts of the input with self-attention 分步计算自注意力机制

书中先介绍了一个不需要训练的注意力算法。假设有这么一个句子, 已经转化为词向量了。

1
2
3
4
5
6
7
8
9
import torch
inputs = torch.tensor(
[[0.43, 0.15, 0.89], # Your (x^1)
[0.55, 0.87, 0.66], # journey (x^2)
[0.57, 0.85, 0.64], # starts (x^3)
[0.22, 0.58, 0.33], # with (x^4)
[0.77, 0.25, 0.10], # one (x^5)
[0.05, 0.80, 0.55]] # step (x^6)
)

通过下面的方式,计算 W 权重,

1
2
3
4
5
6
7
8
9
10
11
12
attn_scores = torch.empty(6, 6)
for i, x_i in enumerate(inputs):
for j, x_j in enumerate(inputs):
attn_scores[i, j] = torch.dot(x_i, x_j)
print(attn_scores)

tensor([[0.4421, 0.5931, 0.5790],
[0.4419, 0.6515, 0.5683],
[0.4431, 0.6496, 0.5671],
[0.4304, 0.6298, 0.5510],
[0.4671, 0.5910, 0.5266],
[0.4177, 0.6503, 0.5645]])

这是一种无法训练的注意力,直接使用 Word 和 Word 之间的点积来表达这个权重。而可训练的注意力机制是本章的重点,也叫做 “scaled dot-product attention (缩放点积注意力)”

最显著的区别是引入了权重矩阵,这些矩阵在模型训练过程中会更新。这些可训练的权重矩阵至关重要,这样模型(具体来说,是模型内部的注意力模块)就能学会生成 “好的” 上下文向量。

这里引入了三个可以训练的矩阵 Q K V,这里还用第二个为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
x_2 = inputs[1]     #1 第二个输入
d_in = inputs.shape[1] #2 输入的规模
d_out = 2 #3 输出的规模

# 生成 Q,K,V 三个矩阵
torch.manual_seed(123)
W_query = torch.nn.Parameter(torch.rand(d_in, d_out), requires_grad=False)
W_key = torch.nn.Parameter(torch.rand(d_in, d_out), requires_grad=False)
W_value = torch.nn.Parameter(torch.rand(d_in, d_out), requires_grad=False)

# 计算 Q K V 三个值
query_2 = x_2 @ W_query
key_2 = x_2 @ W_key
value_2 = x_2 @ W_value
print(query_2)

使用如下计算出所有的 K 和 V 值

1
2
3
4
keys = inputs @ W_key 
values = inputs @ W_value
print("keys.shape:", keys.shape)
print("values.shape:", values.shape)

计算注意力是通过下图

image

1
2
3
4
5
6
keys_2 = keys[1]             #1 计算 W_22 的权重
attn_score_22 = query_2.dot(keys_2)
print(attn_score_22)

attn_scores_2 = query_2 @ keys.T #2 计算所有的权重
print(attn_scores_2)

这样就可以计算出 W22W_{22} 的权重。

通过缩放注意力分数并使用 softmax 函数来计算注意力权重。将注意力分数除以键的嵌入维度的平方根来缩放它们

1
2
3
d_k = keys.shape[-1] # 维度
attn_weights_2 = torch.softmax(attn_scores_2 / d_k**0.5, dim=-1)
print(attn_weights_2)

最后一步是计算上下文向量

1
2
context_vec_2 = attn_weights_2 @ values
print(context_vec_2)

Implementing self-attention with trainable weights 实现可训练权重实现自注意力

到这里就是一个单步计算逻辑,后面就是要实现一个全逻辑。
上面的内容组合在一起就是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import torch.nn as nn
class SelfAttention_v2(nn.Module):
def __init__(self, d_in, d_out, qkv_bias=False):
super().__init__()
self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)

def forward(self, x):
keys = self.W_key(x)
queries = self.W_query(x)
values = self.W_value(x)
attn_scores = queries @ keys.T
attn_weights = torch.softmax(
attn_scores / keys.shape[-1]**0.5, dim=-1
)
context_vec = attn_weights @ values
return context_vec

Hiding future words with causal attention 用因果注意力隐藏未来的单词

直到现在是会计算全部的输入序列,但是在预测序列中的下一个令牌时,您希望自注意力机制仅考虑出现在当前位置之前的令牌。因果注意力,也称为掩码注意力,是自注意力的一种专门形式。

image

使用 trill 生成掩码

1
2
3
4
5
6
7
8
context_length = attn_scores.shape[0]
mask_simple = torch.tril(torch.ones(context_length, context_length))

masked_simple = attn_weights*mask_simple # 将全局权重进行掩码计算

row_sums = masked_simple.sum(dim=-1, keepdim=True)
masked_simple_norm = masked_simple / row_sums
print(masked_simple_norm)

书中还提到了一个技巧

1
2
3
4
mask = torch.triu(torch.ones(context_length, context_length), diagonal=1)
masked = attn_scores.masked_fill(mask.bool(), -torch.inf)
attn_weights = torch.softmax(masked / keys.shape[-1]**0.5, dim=1)
print(attn_weights)

在 Transformer 架构中,包括 GPT 等模型,注意力机制中的 dropout 通常在两个特定时间应用:在计算注意力权重之后或在将注意力权重应用于值向量之后。在这里,我们将在计算注意力权重后应用 dropout 掩码,如图 3.22 所示,因为在实践中这是更常见的变体。

image

这样就得到了一个更进一步的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
batch = torch.stack((inputs, inputs), dim=0)
print(batch.shape) #1 输入是这样的,一个批次,有多个输入

class CausalAttention(nn.Module):
def __init__(self, d_in, d_out, context_length,
dropout, qkv_bias=False):
super().__init__()
self.d_out = d_out
self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias) # Q 矩阵
self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias) # K 矩阵
self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias) # V 矩阵
self.dropout = nn.Dropout(dropout) #1 Dropout 丢弃层
self.register_buffer(
'mask',
torch.triu(torch.ones(context_length, context_length),
diagonal=1)
) #2 掩码

def forward(self, x):
b, num_tokens, d_in = x.shape #3 第一个是 batch 的数量,第二个是 token 数量, 输入规模
keys = self.W_key(x)
queries = self.W_query(x)
values = self.W_value(x)

attn_scores = queries @ keys.transpose(1, 2) # 因为计算的时候是有 Batch 的,置换了一下将 batch 放置于第一位置
attn_scores.masked_fill_( #4 掩码
self.mask.bool()[:num_tokens, :num_tokens], -torch.inf)
attn_weights = torch.softmax(
attn_scores / keys.shape[-1]**0.5, dim=-1
)
attn_weights = self.dropout(attn_weights) # 应用丢弃层

context_vec = attn_weights @ values
return context_vec

Extending single-head attention to multi-head attention 将单头注意力扩展到多头注意力

最简单使用一个数组就可以构建一个多头注意力

1
2
3
4
5
6
7
8
9
10
11
12
13
class MultiHeadAttentionWrapper(nn.Module):
def __init__(self, d_in, d_out, context_length,
dropout, num_heads, qkv_bias=False):
super().__init__()
self.heads = nn.ModuleList(
[CausalAttention(
d_in, d_out, context_length, dropout, qkv_bias
)
for _ in range(num_heads)]
)

def forward(self, x):
return torch.cat([head(x) for head in self.heads], dim=-1)

不过书后面提到一个新方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class MultiHeadAttention(nn.Module):
# 几乎都一样
def __init__(self, d_in, d_out,
context_length, dropout, num_heads, qkv_bias=False):
super().__init__()
assert (d_out % num_heads == 0), \
"d_out must be divisible by num_heads"

self.d_out = d_out
self.num_heads = num_heads # 这里多了 头数量
self.head_dim = d_out // num_heads #1 降低投影维度以匹配所需的输出维度
self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
self.out_proj = nn.Linear(d_out, d_out) #2
self.dropout = nn.Dropout(dropout)
self.register_buffer(
"mask",
torch.triu(torch.ones(context_length, context_length),
diagonal=1)
)

def forward(self, x):
b, num_tokens, d_in = x.shape
keys = self.W_key(x) #3 Tensor shape: (b, num_tokens, d_out)
queries = self.W_query(x) #3 Tensor shape: (b, num_tokens, d_out)
values = self.W_value(x) #3 Tensor shape: (b, num_tokens, d_out)

#4 过添加一个 num_heads 维度来隐式地分割矩阵。然后我们展开最后一个维度:(b, num_tokens, d_out) -> (b, num_tokens, num_heads, head_dim)
keys = keys.view(b, num_tokens, self.num_heads, self.head_dim)
values = values.view(b, num_tokens, self.num_heads, self.head_dim)
queries = queries.view(
b, num_tokens, self.num_heads, self.head_dim
)

keys = keys.transpose(1, 2) #5 从形状 (b, num_tokens, num_heads, head_dim) 转置为 (b, num_heads, num_tokens, head_dim)
queries = queries.transpose(1, 2) #5 从形状 (b, num_tokens, num_heads, head_dim) 转置为 (b, num_heads, num_tokens, head_dim)
values = values.transpose(1, 2) #5 从形状 (b, num_tokens, num_heads, head_dim) 转置为 (b, num_heads, num_tokens, head_dim)

attn_scores = queries @ keys.transpose(2, 3) #6 为每个头计算点积
mask_bool = self.mask.bool()[:num_tokens, :num_tokens] #7 掩码截断为令牌数量

attn_scores.masked_fill_(mask_bool, -torch.inf) #8 使用掩码来填充注意力分数

attn_weights = torch.softmax(
attn_scores / keys.shape[-1]**0.5, dim=-1)
attn_weights = self.dropout(attn_weights)

context_vec = (attn_weights @ values).transpose(1, 2)
#10 合并头,其中 self.d_out = self.num_heads * self.head_dim
context_vec = context_vec.contiguous().view(
b, num_tokens, self.d_out
)
context_vec = self.out_proj(context_vec) #11 添加一个可选的线性投影
return context_vec

image 使用

Implementing a GPT model 实现 GPT 模型

我们 理解了注意力机制之后就可以开始编写 LLM 模型了

LLM architecture LLM 架构

image

在一个由一个 2,048 × 2,048 维的权重矩阵(或张量)表示的神经网络层中,该矩阵的每个元素都是一个参数。由于有 2,048 行和 2,048 列,该层的参数总数是 2,048 乘以 2,048,等于 4,194,304 个参数。

准备一个基础数据

1
2
3
4
5
6
7
8
9
GPT_CONFIG_124M = {
"vocab_size": 50257, # 词汇量大小,根据分词之后的 Size 决定
"context_length": 1024, # 上下文大小
"emb_dim": 768, # 嵌入层维度
"n_heads": 12, # 注意力头数量
"n_layers": 12, # 一共多少层
"drop_rate": 0.1, # Drop 比例
"qkv_bias": False # QKV Bias,确定是否在 Linear 层的多头注意力机制中为查询、键和值的计算包含一个偏差向量。,后面解释
}

实现一个大框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class DummyGPTModel(nn.Module):
def __init__(self, cfg):
super().__init__()
self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"]) # token -> embeding 的神经网络
self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"]) # context -> embeding 的神经网络,位置嵌入
self.drop_emb = nn.Dropout(cfg["drop_rate"])
self.trf_blocks = nn.Sequential( # 按顺序执行的 NN
*[DummyTransformerBlock(cfg) # 抽象一个Transformer块
for _ in range(cfg["n_layers"])] # N层
) # 1
self.final_norm = LayerNorm(cfg["emb_dim"]) # 归一化层
self.out_head = nn.Linear(
cfg["emb_dim"], cfg["vocab_size"], bias=False
)

def forward(self, in_idx):
batch_size, seq_len = in_idx.shape
tok_embeds = self.tok_emb(in_idx) # 将输入变成 Token Embeding
pos_embeds = self.pos_emb(
torch.arange(seq_len, device=in_idx.device)
)
x = tok_embeds + pos_embeds # 词嵌入 + 位置嵌入 = 嵌入
x = self.drop_emb(x) # 丢弃层
x = self.trf_blocks(x) # TF 层
x = self.final_norm(x) # 归一化
logits = self.out_head(x) # 输出层
return logits

TransformerBlock Transformer块

使用许多层的深度神经网络有时可能具有挑战性,因为会出现梯度消失或梯度爆炸等问题。这些问题会导致训练动态不稳定,并使网络难以有效地调整其权重,这意味着学习过程难以找到一组参数(权重),以使神经网络最小化损失函数。换句话说,网络难以学习数据中的潜在模式,以至于无法使其做出准确的预测或决策。

1
2
3
4
5
6
7
8
9
10
11
12
class LayerNorm(nn.Module):
def __init__(self, emb_dim):
super().__init__()
self.eps = 1e-5
self.scale = nn.Parameter(torch.ones(emb_dim))
self.shift = nn.Parameter(torch.zeros(emb_dim))

def forward(self, x):
mean = x.mean(dim=-1, keepdim=True)
var = x.var(dim=-1, keepdim=True, unbiased=False)
norm_x = (x - mean) / torch.sqrt(var + self.eps)
return self.scale * norm_x + self.shift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class TransformerBlock(nn.Module):
def __init__(self, cfg):
super().__init__()
# 多重注意力
self.att = MultiHeadAttention(
d_in=cfg["emb_dim"],
d_out=cfg["emb_dim"],
context_length=cfg["context_length"],
num_heads=cfg["n_heads"],
dropout=cfg["drop_rate"],
qkv_bias=cfg["qkv_bias"])
# 前馈网络
self.ff = FeedForward(cfg)
# 归一化层
self.norm1 = LayerNorm(cfg["emb_dim"])
self.norm2 = LayerNorm(cfg["emb_dim"])
# 快捷链接层
self.drop_shortcut = nn.Dropout(cfg["drop_rate"])

def forward(self, x):
shortcut = x
x = self.norm1(x) # 归一化
x = self.att(x) # 多头注意力
x = self.drop_shortcut(x) # 丢弃
x = x + shortcut

shortcut = x
x = self.norm2(x) # 归一化
x = self.ff(x) # 前馈
x = self.drop_shortcut(x) # 丢弃
x = x + shortcut
return x

class FeedForward(nn.Module):
def __init__(self, cfg):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(cfg["emb_dim"], 4 * cfg["emb_dim"]),
GELU(),
nn.Linear(4 * cfg["emb_dim"], cfg["emb_dim"]),
)

def forward(self, x):
return self.layers(x)

Pretraining on unlabeled data 预训练

这部分比较简单了,所有的 NN 都需要一个损失函数进行判断,这里使用了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def calc_loss_loader(data_loader, model, device, num_batches=None):
total_loss = 0.
if len(data_loader) == 0:
return float("nan")
elif num_batches is None:
num_batches = len(data_loader)
else:
num_batches = min(num_batches, len(data_loader))
for i, (input_batch, target_batch) in enumerate(data_loader):
if i < num_batches:
loss = calc_loss_batch(
input_batch, target_batch, model, device
)
total_loss += loss.item()
else:
break
return total_loss / num_batches

附录