最近读了这本书,确实非常的不错,将 GPT 拆解了,虽然只是一个比较早期的 GPT-2
但是五脏俱全。
首先从 Overview
的角度对实现 GPT-2
有一个大致的观感。
Prepare 前置准备
Tokenizing 分词
Tokenizing(分词)是将输入文本拆分成较小的单位(称为Token)的过程,以便模型能够处理和理解这些文本。分词有很多方式,
Word-based(基于词):将文本按单词分割(如空格分隔)。适用于英文,但对中文等无明确分隔的语言效果较差。
Character-based(基于字符):将文本按单个字符分割,适合中文,但会导致序列过长。
Subword-based(基于子词):
目前最常用的方法,如 Byte-Pair Encoding (BPE)、WordPiece 或 SentencePiece。
例如,BPE 会将常用词保留为完整 Token(如 “dog”),而将不常见词拆分为子词(如 “unhappiness” 可能分为 “un”, “##happy”, “##ness”)。
优点是兼顾了词汇表大小和语言覆盖率。
因为本书的重点也不是分词器的实现,在前半段完成了一个 Word By Word 分词器的实现。
1 2 3 preprocessed = re.split(r'([,.:;?_!"()\']|--|\\s)' , raw_text) preprocessed = [item.strip() for item in preprocessed if item.strip()] print (len (preprocessed))
拆分的结果如下
1 2 3 4 ['I' , 'HAD' , 'always' , 'thought' , 'Jack' , 'Gisburn' , 'rather' , 'a' , 'cheap' , 'genius' , '--' , 'though' , 'a' , 'good' , 'fellow' , 'enough' ,'--' , 'so' , 'it' , 'was' , 'no' , 'great' , 'surprise' , 'to' , 'me' , 'to' ,'hear' , 'that' , ',' , 'in' ]
拆分之后的单次,给与一个单次一个 ID
1 2 3 4 5 vocab = {token:integer for integer,token in enumerate (all_words)} for i, item in enumerate (vocab.items()): print (item) if i >= 50 : break
就得到了一个单次的 ID 清单
1 2 3 4 5 6 ('!' , 0) ('"' , 1) ("'" , 2) ... ('Her' , 49) ('Hermia' , 50)
不过在本书中使用的 BPE
的分词手段,Byte-Pair Encoding (BPE) 是一种常用的子词分词算法,广泛应用于大型语言模型(如 GPT、BERT 等)中。BPE 通过迭代合并频繁出现的字符对,构建一个词汇表,将文本分割为子词级别的 Token。 具体的原理可以参考 LLM Course
1 2 3 4 5 6 7 8 9 10 from importlib.metadata import versionimport tiktokentokenizer = tiktoken.get_encoding("gpt2" ) text = ( "Hello, do you like tea? <|endoftext|> In the sunlit terraces" "of someunknownPlace." ) integers = tokenizer.encode(text, allowed_special={"<|endoftext|>" }) print (integers)
Data Sampling 数据采样
因为 LLM 本质是需要训练的,因此需要需要来训练,书中采用了滑动窗口的方案。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import torchfrom torch.utils.data import Dataset, DataLoaderclass GPTDatasetV1 (Dataset ): def __init__ (self, txt, tokenizer, max_length, stride ): self .input_ids = [] self .target_ids = [] token_ids = tokenizer.encode(txt) for i in range (0 , len (token_ids) - max_length, stride): input_chunk = token_ids[i:i + max_length] target_chunk = token_ids[i + 1 : i + max_length + 1 ] self .input_ids.append(torch.tensor(input_chunk)) self .target_ids.append(torch.tensor(target_chunk)) def __len__ (self ): return len (self .input_ids) def __getitem__ (self, idx ): return self .input_ids[idx], self .target_ids[idx]
Token Embeding 词嵌入
将离散的词汇表索引(Token ID)映射为连续的向量表示(嵌入向量)在书中举例,比如我们有一个
1 2 3 4 5 6 7 8 9 10 11 12 13 vocab_size = 6 output_dim = 3 torch.manual_seed(123 ) embedding_layer = torch.nn.Embedding(vocab_size, output_dim) print (embedding_layer.weight)Parameter containing: tensor([[ 0.3374 , -0.1778 , -0.1690 ], [ 0.9178 , 1.5810 , 1.3010 ], [ 1.2753 , -0.2010 , -0.1606 ], [-0.4015 , 0.9666 , -1.1481 ], [-1.1589 , 0.3255 , -0.6315 ], [-2.8400 , -0.7849 , -1.4096 ]], requires_grad=True )
从输出我们就自然明白了,也就是生成了一个数组,包含了 vocab_size 个大小的 output_dim 的 float 值,嵌入层的权重矩阵包含小的随机值。这些值在 LLM 训练期间作为 LLM 优化本身的一部分进行优化。此外,我们可以看到权重矩阵有 6 行 3 列。词汇表中 6 个可能的词元各有一行,3 个嵌入维度各有一列。
作者注
这部分内容在 《集体智慧编程中》 也有大量的涉及,确实可以触类旁通。
Attention 注意力机制
Attention Is All You Need
提出了一个全新的架构
主要解决在一个长文本中,序列数据丢失信息的问题。
书中先介绍了一个不需要训练的注意力算法。假设有这么一个句子, 已经转化为词向量了。
1 2 3 4 5 6 7 8 9 import torchinputs = torch.tensor( [[0.43 , 0.15 , 0.89 ], [0.55 , 0.87 , 0.66 ], [0.57 , 0.85 , 0.64 ], [0.22 , 0.58 , 0.33 ], [0.77 , 0.25 , 0.10 ], [0.05 , 0.80 , 0.55 ]] )
通过下面的方式,计算 W 权重,
1 2 3 4 5 6 7 8 9 10 11 12 attn_scores = torch.empty(6 , 6 ) for i, x_i in enumerate (inputs): for j, x_j in enumerate (inputs): attn_scores[i, j] = torch.dot(x_i, x_j) print (attn_scores)tensor([[0.4421 , 0.5931 , 0.5790 ], [0.4419 , 0.6515 , 0.5683 ], [0.4431 , 0.6496 , 0.5671 ], [0.4304 , 0.6298 , 0.5510 ], [0.4671 , 0.5910 , 0.5266 ], [0.4177 , 0.6503 , 0.5645 ]])
这是一种无法训练的注意力,直接使用 Word 和 Word 之间的点积来表达这个权重。而可训练的注意力机制是本章的重点,也叫做 “scaled dot-product attention (缩放点积注意力)”
最显著的区别是引入了权重矩阵,这些矩阵在模型训练过程中会更新。这些可训练的权重矩阵至关重要,这样模型(具体来说,是模型内部的注意力模块)就能学会生成 “好的” 上下文向量。
这里引入了三个可以训练的矩阵 Q
K
V
,这里还用第二个为例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 x_2 = inputs[1 ] d_in = inputs.shape[1 ] d_out = 2 torch.manual_seed(123 ) W_query = torch.nn.Parameter(torch.rand(d_in, d_out), requires_grad=False ) W_key = torch.nn.Parameter(torch.rand(d_in, d_out), requires_grad=False ) W_value = torch.nn.Parameter(torch.rand(d_in, d_out), requires_grad=False ) query_2 = x_2 @ W_query key_2 = x_2 @ W_key value_2 = x_2 @ W_value print (query_2)
使用如下计算出所有的 K 和 V 值
1 2 3 4 keys = inputs @ W_key values = inputs @ W_value print ("keys.shape:" , keys.shape)print ("values.shape:" , values.shape)
计算注意力是通过下图
1 2 3 4 5 6 keys_2 = keys[1 ] attn_score_22 = query_2.dot(keys_2) print (attn_score_22)attn_scores_2 = query_2 @ keys.T print (attn_scores_2)
这样就可以计算出 W 2 2 W_{22} W 2 2 的权重。
通过缩放注意力分数并使用 softmax 函数来计算注意力权重。将注意力分数除以键的嵌入维度的平方根来缩放它们
1 2 3 d_k = keys.shape[-1 ] attn_weights_2 = torch.softmax(attn_scores_2 / d_k**0.5 , dim=-1 ) print (attn_weights_2)
最后一步是计算上下文向量
1 2 context_vec_2 = attn_weights_2 @ values print (context_vec_2)
Implementing self-attention with trainable weights 实现可训练权重实现自注意力
到这里就是一个单步计算逻辑,后面就是要实现一个全逻辑。
上面的内容组合在一起就是
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import torch.nn as nnclass SelfAttention_v2 (nn.Module): def __init__ (self, d_in, d_out, qkv_bias=False ): super ().__init__() self .W_query = nn.Linear(d_in, d_out, bias=qkv_bias) self .W_key = nn.Linear(d_in, d_out, bias=qkv_bias) self .W_value = nn.Linear(d_in, d_out, bias=qkv_bias) def forward (self, x ): keys = self .W_key(x) queries = self .W_query(x) values = self .W_value(x) attn_scores = queries @ keys.T attn_weights = torch.softmax( attn_scores / keys.shape[-1 ]**0.5 , dim=-1 ) context_vec = attn_weights @ values return context_vec
Hiding future words with causal attention 用因果注意力隐藏未来的单词
直到现在是会计算全部的输入序列,但是在预测序列中的下一个令牌时,您希望自注意力机制仅考虑出现在当前位置之前的令牌。因果注意力,也称为掩码注意力,是自注意力的一种专门形式。
使用 trill
生成掩码
1 2 3 4 5 6 7 8 context_length = attn_scores.shape[0 ] mask_simple = torch.tril(torch.ones(context_length, context_length)) masked_simple = attn_weights*mask_simple row_sums = masked_simple.sum (dim=-1 , keepdim=True ) masked_simple_norm = masked_simple / row_sums print (masked_simple_norm)
书中还提到了一个技巧
1 2 3 4 mask = torch.triu(torch.ones(context_length, context_length), diagonal=1 ) masked = attn_scores.masked_fill(mask.bool (), -torch.inf) attn_weights = torch.softmax(masked / keys.shape[-1 ]**0.5 , dim=1 ) print (attn_weights)
在 Transformer 架构中,包括 GPT 等模型,注意力机制中的 dropout 通常在两个特定时间应用:在计算注意力权重之后或在将注意力权重应用于值向量之后。在这里,我们将在计算注意力权重后应用 dropout 掩码,如图 3.22 所示,因为在实践中这是更常见的变体。
这样就得到了一个更进一步的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 batch = torch.stack((inputs, inputs), dim=0 ) print (batch.shape) class CausalAttention (nn.Module): def __init__ (self, d_in, d_out, context_length, dropout, qkv_bias=False ): super ().__init__() self .d_out = d_out self .W_query = nn.Linear(d_in, d_out, bias=qkv_bias) self .W_key = nn.Linear(d_in, d_out, bias=qkv_bias) self .W_value = nn.Linear(d_in, d_out, bias=qkv_bias) self .dropout = nn.Dropout(dropout) self .register_buffer( 'mask' , torch.triu(torch.ones(context_length, context_length), diagonal=1 ) ) def forward (self, x ): b, num_tokens, d_in = x.shape keys = self .W_key(x) queries = self .W_query(x) values = self .W_value(x) attn_scores = queries @ keys.transpose(1 , 2 ) attn_scores.masked_fill_( self .mask.bool ()[:num_tokens, :num_tokens], -torch.inf) attn_weights = torch.softmax( attn_scores / keys.shape[-1 ]**0.5 , dim=-1 ) attn_weights = self .dropout(attn_weights) context_vec = attn_weights @ values return context_vec
Extending single-head attention to multi-head attention 将单头注意力扩展到多头注意力
最简单使用一个数组就可以构建一个多头注意力
1 2 3 4 5 6 7 8 9 10 11 12 13 class MultiHeadAttentionWrapper (nn.Module): def __init__ (self, d_in, d_out, context_length, dropout, num_heads, qkv_bias=False ): super ().__init__() self .heads = nn.ModuleList( [CausalAttention( d_in, d_out, context_length, dropout, qkv_bias ) for _ in range (num_heads)] ) def forward (self, x ): return torch.cat([head(x) for head in self .heads], dim=-1 )
不过书后面提到一个新方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 class MultiHeadAttention (nn.Module): def __init__ (self, d_in, d_out, context_length, dropout, num_heads, qkv_bias=False ): super ().__init__() assert (d_out % num_heads == 0 ), \ "d_out must be divisible by num_heads" self .d_out = d_out self .num_heads = num_heads self .head_dim = d_out // num_heads self .W_query = nn.Linear(d_in, d_out, bias=qkv_bias) self .W_key = nn.Linear(d_in, d_out, bias=qkv_bias) self .W_value = nn.Linear(d_in, d_out, bias=qkv_bias) self .out_proj = nn.Linear(d_out, d_out) self .dropout = nn.Dropout(dropout) self .register_buffer( "mask" , torch.triu(torch.ones(context_length, context_length), diagonal=1 ) ) def forward (self, x ): b, num_tokens, d_in = x.shape keys = self .W_key(x) queries = self .W_query(x) values = self .W_value(x) keys = keys.view(b, num_tokens, self .num_heads, self .head_dim) values = values.view(b, num_tokens, self .num_heads, self .head_dim) queries = queries.view( b, num_tokens, self .num_heads, self .head_dim ) keys = keys.transpose(1 , 2 ) queries = queries.transpose(1 , 2 ) values = values.transpose(1 , 2 ) attn_scores = queries @ keys.transpose(2 , 3 ) mask_bool = self .mask.bool ()[:num_tokens, :num_tokens] attn_scores.masked_fill_(mask_bool, -torch.inf) attn_weights = torch.softmax( attn_scores / keys.shape[-1 ]**0.5 , dim=-1 ) attn_weights = self .dropout(attn_weights) context_vec = (attn_weights @ values).transpose(1 , 2 ) context_vec = context_vec.contiguous().view( b, num_tokens, self .d_out ) context_vec = self .out_proj(context_vec) return context_vec
使用
Implementing a GPT model 实现 GPT 模型
我们 理解了注意力机制之后就可以开始编写 LLM 模型了
LLM architecture LLM 架构
在一个由一个 2,048 × 2,048 维的权重矩阵(或张量)表示的神经网络层中,该矩阵的每个元素都是一个参数。由于有 2,048 行和 2,048 列,该层的参数总数是 2,048 乘以 2,048,等于 4,194,304 个参数。
准备一个基础数据
1 2 3 4 5 6 7 8 9 GPT_CONFIG_124M = { "vocab_size" : 50257 , "context_length" : 1024 , "emb_dim" : 768 , "n_heads" : 12 , "n_layers" : 12 , "drop_rate" : 0.1 , "qkv_bias" : False }
实现一个大框架
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class DummyGPTModel (nn.Module): def __init__ (self, cfg ): super ().__init__() self .tok_emb = nn.Embedding(cfg["vocab_size" ], cfg["emb_dim" ]) self .pos_emb = nn.Embedding(cfg["context_length" ], cfg["emb_dim" ]) self .drop_emb = nn.Dropout(cfg["drop_rate" ]) self .trf_blocks = nn.Sequential( *[DummyTransformerBlock(cfg) for _ in range (cfg["n_layers" ])] ) self .final_norm = LayerNorm(cfg["emb_dim" ]) self .out_head = nn.Linear( cfg["emb_dim" ], cfg["vocab_size" ], bias=False ) def forward (self, in_idx ): batch_size, seq_len = in_idx.shape tok_embeds = self .tok_emb(in_idx) pos_embeds = self .pos_emb( torch.arange(seq_len, device=in_idx.device) ) x = tok_embeds + pos_embeds x = self .drop_emb(x) x = self .trf_blocks(x) x = self .final_norm(x) logits = self .out_head(x) return logits
使用许多层的深度神经网络有时可能具有挑战性,因为会出现梯度消失或梯度爆炸等问题。这些问题会导致训练动态不稳定,并使网络难以有效地调整其权重,这意味着学习过程难以找到一组参数(权重),以使神经网络最小化损失函数。换句话说,网络难以学习数据中的潜在模式,以至于无法使其做出准确的预测或决策。
1 2 3 4 5 6 7 8 9 10 11 12 class LayerNorm (nn.Module): def __init__ (self, emb_dim ): super ().__init__() self .eps = 1e-5 self .scale = nn.Parameter(torch.ones(emb_dim)) self .shift = nn.Parameter(torch.zeros(emb_dim)) def forward (self, x ): mean = x.mean(dim=-1 , keepdim=True ) var = x.var(dim=-1 , keepdim=True , unbiased=False ) norm_x = (x - mean) / torch.sqrt(var + self .eps) return self .scale * norm_x + self .shift
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 class TransformerBlock (nn.Module): def __init__ (self, cfg ): super ().__init__() self .att = MultiHeadAttention( d_in=cfg["emb_dim" ], d_out=cfg["emb_dim" ], context_length=cfg["context_length" ], num_heads=cfg["n_heads" ], dropout=cfg["drop_rate" ], qkv_bias=cfg["qkv_bias" ]) self .ff = FeedForward(cfg) self .norm1 = LayerNorm(cfg["emb_dim" ]) self .norm2 = LayerNorm(cfg["emb_dim" ]) self .drop_shortcut = nn.Dropout(cfg["drop_rate" ]) def forward (self, x ): shortcut = x x = self .norm1(x) x = self .att(x) x = self .drop_shortcut(x) x = x + shortcut shortcut = x x = self .norm2(x) x = self .ff(x) x = self .drop_shortcut(x) x = x + shortcut return x class FeedForward (nn.Module): def __init__ (self, cfg ): super ().__init__() self .layers = nn.Sequential( nn.Linear(cfg["emb_dim" ], 4 * cfg["emb_dim" ]), GELU(), nn.Linear(4 * cfg["emb_dim" ], cfg["emb_dim" ]), ) def forward (self, x ): return self .layers(x)
Pretraining on unlabeled data 预训练
这部分比较简单了,所有的 NN 都需要一个损失函数进行判断,这里使用了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def calc_loss_loader (data_loader, model, device, num_batches=None ): total_loss = 0. if len (data_loader) == 0 : return float ("nan" ) elif num_batches is None : num_batches = len (data_loader) else : num_batches = min (num_batches, len (data_loader)) for i, (input_batch, target_batch) in enumerate (data_loader): if i < num_batches: loss = calc_loss_batch( input_batch, target_batch, model, device ) total_loss += loss.item() else : break return total_loss / num_batches
附录