环境与准备
使用conda下载好的flash-atten,cuda和python版本要匹配,安装transformers和datasets库
下载Qwen2.5-Math-1.5B作为基座模型
数据集直接使用原项目中的/data目录下的几个数据集
sft
核心函数
tokenize_prompt_and_output
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 def tokenize_prompt_and_output ( prompt_strs: list [str ], output_strs: list [str ], tokenizer: PreTrainedTokenizerBase, ) -> dict [str , Tensor]: prompt_ids = [] output_ids = [] masks = [] for prompt, output in zip (prompt_strs, output_strs): prompt_id = torch.tensor(tokenizer.encode(prompt, add_special_tokens=False )) output_id = torch.tensor(tokenizer.encode(output, add_special_tokens=False )) mask = torch.cat((torch.zeros_like(prompt_id), torch.ones_like(output_id)), dim=0 ) masks.append(mask) prompt_ids.append(prompt_id) output_ids.append(output_id) full_prompt_output_ids = [] full_len = [] for p_id, o_id in zip (prompt_ids, output_ids): full_prompt_output_id = torch.cat((p_id, o_id), dim=0 ) full_prompt_output_ids.append(full_prompt_output_id) full_len.append(len (full_prompt_output_id)) max_len = max (full_len) input_ids = [] labels = [] response_mask = [] for full_prompt_output_id,mask in zip (full_prompt_output_ids, masks): pad_len = max_len - full_prompt_output_id.size(0 ) full_prompt_output_id = torch.nn.functional.pad(full_prompt_output_id,(0 ,pad_len),value=tokenizer.pad_token_id) mask = torch.nn.functional.pad(mask,(0 ,pad_len),value=0 ) input_id = full_prompt_output_id[:-1 ] label = full_prompt_output_id[1 :] response_m = mask[1 :] input_ids.append(input_id) labels.append(label) response_mask.append(response_m) input_ids = torch.stack(input_ids, dim=0 ) labels = torch.stack(labels, dim=0 ) response_mask = torch.stack(response_mask, dim=0 ) return { "input_ids" : input_ids, "labels" : labels, "response_mask" : response_mask }
大模型工作原理就是根据输入的prompt来预测下一个token是什么,这个函数的作用就是将输入的prompt和输出字符串列表转换为模型可以处理的张量格式,包括input_ids、labels和response_mask。其中input_ids是拼接后的输入序列,labels是对应的标签序列(即期望输出),response_mask用于标记响应部分的token,以便在训练时只计算响应部分的损失。
举个例子:
假设我们有一个prompt字符串列表和一个输出字符串列表:
1 2 prompt_strs = ["What is the capital of France"]([11, 122, 138, 145, 15, 90]) output_strs = ["The capital of France is Paris"]([122, 138, 145, 15, 90, 17])
那么经过tokenize_prompt_and_output 函数处理后,我们希望得到:
1 2 input_strs = ["What is the capital of France The capital of France is"]([11, 122, 138, 145, 15, 90, 122, 138, 145, 15, 90]) labels = ["is the capital of France The capital of France is Paris"]([122, 138, 145, 15, 90, 122, 138, 145, 15, 90, 17])
也就是经过了错位处理,
但是,计算机只能处理数字,所以我们需要将这些字符串转换为对应的token id,这就是tokenizer的作用。每个数字表示一个token在词表中的索引。最终,我们得到的input_ids 和labels 都是数字序列,可以直接输入到模型中进行训练。
response_mask 则是一个二值序列,用于标记哪些token是响应部分,以便在计算损失时只关注这些部分。
在上面例子中,“The capital of France is Paris”才是模型真正应该学习的内容,所以response_mask会标记这些token的位置。“What is the capital of France”部分则不参与损失计算。故而response_mask是**[0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1]**,表示前5个token是prompt部分,后6个token是响应部分。
不难发现,input_ids,labels和 response_mask的形状应该是一致的。
序列位置 (Index)
0
1
2
3
4
5
6
7
8
9
10
input_ids (输入给模型)
11(What)
122(is)
138(the)
145(capital)
15(of)
90(France)
122(The)
138(capital)
145(of)
15(France)
90(is)
labels (期望的输出)
122(is)
138(the)
145(capital)
15(of)
90(France)
122(The)
138(capital)
145(of)
15(France)
90(is)
17(Paris)
response_mask (是否算 Loss)
0
0
0
0
0
1
1
1
1
1
1
模型在干嘛?
预测 is
预测 the
预测 capital
预测 of
预测 France
预测 The
预测 capital
预测 of
预测 France
预测 is
预测 Paris
现在我们搞清楚了函数的作用,以及输入和输出到底是什么,接下来需要考虑另外一个问题:padding
在将数据喂给大模型时,我们是同时处理多个样本的,这些样本的长度可能不一样。为了让它们能够在同一批次中进行处理,我们需要对它们进行padding,使得它们的长度一致。
所以,我们将prompt和output拼接成prompt_output_full好之后,计算出最长的序列长度,然后对所有的序列进行padding,使得它们的长度都等于这个最大长度。
padding的值通常是tokenizer的pad_token_id,mask的padding补0。这样模型在处理这些padding部分时就不会产生实际的影响。
我们取prompt_output_full的[:-1]作为input_ids,取[1:]作为labels,这样就实现了错位处理,使得模型在预测下一个token时能够正确地学习到输出序列的模式。同时,response_mask也相应地进行错位处理,以确保在计算损失时只关注响应部分的token。
最后,再把这些处理好的input_ids、labels和response_mask堆叠成批次的张量,返回给训练函数使用。
pytorch知识点:
torch.stack与torch.cat的区别:
torch.stack:在指定维度上将输入张量列表堆叠成一个新的张量。输入张量必须具有相同的形状。不改变原来括号的层数
torch.cat:在指定维度上连接输入张量列表。输入张量在连接维度上必须具有相同的形状,但其他维度可以不同。括号层数加一
dim=0表示最外层的括号
torch.nn.functional.pad:用于对张量进行填充操作,可以指定填充的值和填充的维度。
语法: torch.nn.functional.pad(input, pad, mode='constant', value=0)
其中 pad 是一个元组:(左边补几个, 右边补几个, 上面补几个, 下面补几个, …)。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 matrix = torch.ones(2 , 2 ) padded_matrix = F.pad(matrix, (1 , 1 , 0 , 2 ), value=0 ) print (padded_matrix)
compute_entropy逐词熵
1 2 3 4 5 6 7 8 9 10 11 12 def compute_entropy (logits: torch.Tensor ) -> torch.Tensor: """获取 Logits 的熵(即最后一个维度的熵)。""" ''' logits: shape (batch_size, sequence_length, vocab_size) Q: 为什么计算熵的时候不直接使用softmax? A: 若概率很低,则 log(prob) 会是一个很大的负数,导致溢出。而log_softmax内部有数值稳定的处理。 ''' log_probs = torch.log_softmax(logits, dim=-1 ) entropy = -torch.sum (log_probs * torch.exp(log_probs), dim=-1 ) return entropy
大模型最后输出的是一个logits张量,表示每个位置上每个token的预测分布。计算熵的目的是为了衡量模型在每个位置上的不确定性。熵越高,表示模型对下一个token的预测越不确定;熵越低,表示模型对下一个token的预测越确定。
先将logits分布转化为概率:
log_probs = torch.log_softmax(logits, dim=-1)
$$
\log p(x_i) = \log\left(\frac{\exp(z_i)}{\sum_{j=1}^{V} \exp(z_j)}\right)
$$
然后再计算真实的熵:
香农熵的标准定义是所有可能事件概率 $p_i$ 与其信息量 $\log p_i$ 乘积的负总和。对应到大模型的词表(Vocabulary)维度,公式为:
$$
H(X) = - \sum_{i=1}^{V} p(x_i) \log p(x_i)
$$
$V$:词表大小(vocab_size),即所有可能预测的 Token 数量。
$p(x_i)$:模型预测下一个 Token 是 $x_i$ 的概率。
$\log p(x_i)$:该 Token 对应的对数概率。
get_response_log_probs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 def get_response_log_probs ( model: torch.nn.Module, input_ids: torch.Tensor, labels: torch.Tensor, return_token_entropy: bool , ) -> dict [str , torch.Tensor | None ]: """ 获取在给定提示下,回答(response)部分的条件对数概率(log-probs), 并可选择性地获取下一个 token 预测的熵。 参数 (Args): model: PreTrainedModel, 用于打分的模型。 input_ids: torch.Tensor, 形状为 (batch_size, sequence_length): 分词后的提示和输出。 labels: torch.Tensor, 形状为 (batch_size, sequence_length): 移位后的 input_ids。 return_token_entropy: bool, 是否返回下一个 token 预测的熵。 返回 (Returns): dict[str, torch.Tensor | None]: "log_probs": torch.Tensor, 形状为 (batch_size, sequence_length): 给定提示下回答的条件对数概率。 注意:我们尚未在此处掩盖(mask)掉对应于提示或填充的 token 索引; 这将在训练循环中完成。 "token_entropy": Optional[torch.Tensor], 形状为 (batch_size, sequence_length): 下一个 token 预测的熵。与 log-probs 一样,我们尚未在此处掩盖掉 对应于提示或填充的 token 索引;这将在训练循环中完成。 """ outputs = model(input_ids=input_ids) logits = outputs.logits logits_flat = logits.view(-1 , logits.size(-1 )) labels_flat = labels.view(-1 ) token_log_probs_flat = -torch.nn.functional.cross_entropy(logits_flat, labels_flat, reduction='none' ) log_probs = token_log_probs_flat.view(logits.size(0 ), logits.size(1 )) token_entropy = None if (return_token_entropy): token_entropy = compute_entropy(logits) return { "log_probs" : log_probs, "token_entropy" : token_entropy }
.view函数作用是改变张量形状,不改变其中数据。
基础玩法:明确指定每一个维度
view 最基本的用法,就是你明确告诉 PyTorch,你想把张量捏成几行几列。
核心法则(黄金定律):变换前后的“元素总数”必须绝对相等!
假设我们有一个包含 6 个元素的一维张量(一条长线):
Python
1 2 3 4 import torch # 形状: [6] (总共 6 个元素) a = torch.tensor([1, 2, 3, 4, 5, 6])
我们可以用 view 把这 6 个元素重新排列组合:
组合 A:变成 2 行 3 列
Python
1 2 3 4 5 6 b = a.view(2, 3) print(b) # 输出: # tensor([[1, 2, 3], # [4, 5, 6]]) # 检查元素总数:2 * 3 = 6,合法!
组合 B:变成 3 行 2 列
Python
1 2 3 4 5 6 7 c = a.view(3, 2) print(c) # 输出: # tensor([[1, 2], # [3, 4], # [5, 6]]) # 检查元素总数:3 * 2 = 6,合法!
注意: 数据在变形时,永远是 “从左到右,从上到下” 按顺序填入新形状的,它不会打乱数字原本的先后顺序。
2. 高阶魔法:-1 (偷懒占位符)
这就是你在大模型代码里最常见到的写法。
很多时候,张量非常大(比如 Batch Size 会随着用户输入动态变化),我们人在写代码时,懒得去或者无法提前算出某一个维度到底该是多少。这时候就可以派 -1 出场了。
-1 的意思是:“PyTorch,前面/后面的维度我已经定好了,剩下的那个维度具体是多少,你帮我用除法算一下!”
还是用刚才那 6 个元素的张量 a 举例:
场景 A:我只想要 2 列,行数你帮我算
Python
1 2 3 4 5 # 告诉 PyTorch:我要变成二维矩阵,第 1 维(列数)是 2,第 0 维(行数)你看着办。 d = a.view(-1, 2) # PyTorch 内部计算:总数 6 / 列数 2 = 行数 3。所以 -1 自动变成了 3。 print(d.shape) # 输出: torch.Size([3, 2])
场景 B:不管原来是几维,全部给我压扁成一维(最常用的展平操作)
Python
1 2 3 4 5 6 7 # 假设我们拿到了刚才的 3x2 矩阵 c # 告诉 PyTorch:把所有元素揉成一条线! e = c.view(-1) # PyTorch 内部计算:总数 6,所以变成长度为 6 的一维张量。 print(e.shape) # 输出: torch.Size([6]) print(e) # 输出: tensor([1, 2, 3, 4, 5, 6])
实战解析:回到你的大模型代码
Python
1 logits_flat = logits.view(-1, logits.size(-1))
假设 logits 的原始形状是 [Batch=2, Seq=3, Vocab=32000](总元素数是 $2 \times 3 \times 32000 = 192000$ )。
logits.size(-1) 拿到的是最后一个维度的大小,也就是 32000。
所以这行代码等价于:logits.view(-1, 32000)。
此时 PyTorch 看到 -1,就会默默帮你计算:$192000 \div 32000 = 6$ 。
最终,3D 的张量就被完美展平为了 2D 的 [6, 32000]。
torch.nn.functional.cross_entropy内部在做什么?
原本的逻辑: Logits $\rightarrow$ 算 Softmax 变概率 $\rightarrow$ 挑出正确答案的概率 $\rightarrow$ 取 Log 算误差。
cross_entropy 内部的逻辑:(算交叉熵损失的)
跳过真实概率: 利用对数公式化简和 LogSumExp 技巧,直接算出所有词的“对数概率”(Log-Prob)。
查字典提取: 看着你的 label(比如正确答案在第 0 位),直接把第 0 位的那个对数概率“拔”出来。
加负号: 在拔出来的数字前面加个负号,变成正数的 Loss 返回给你。
但是我们需要的是每个 token 的 log-probs ,所以再取一个负号就得到了我们想要的结果。
masked_normalize 带掩码归一化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 def masked_normalize ( tensor: torch.Tensor, mask: torch.Tensor, dim: int | None = None , normalize_constant: float = 1.0 , ) -> torch.Tensor: """ 沿维度求和并按常数归一化,仅考虑掩码值为 1 的元素。 参数 (Args): tensor: torch.Tensor, 要求和并归一化的张量。 mask: torch.Tensor, 掩码。我们只考虑掩码值为 1 的元素。 dim: int | None, 归一化之前进行求和的维度。 如果为 None,则对所有维度求和。 normalize_constant: float, 用于归一化的除数常数。 返回 (Returns): torch.Tensor, 归一化后的和,其中被掩盖的元素(mask=0)不参与求和。 """ masked_tensor = tensor * mask.to(tensor.dtype) if dim is None : summed = masked_tensor.sum () else : summed = masked_tensor.sum (dim=dim) normalized = summed / normalize_constant return normalized
sft_microbatch_train_step
此函数作用就是进行一次微批次的训练步骤,计算损失并进行反向传播。它会调用前面定义的函数来获取log_probs和token_entropy,然后根据response_mask来计算最终的损失值,并执行反向传播更新模型参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 def sft_microbatch_train_step ( policy_log_probs: torch.Tensor, response_mask: torch.Tensor, gradient_accumulation_steps: int , normalize_constant: float | None = 1.0 , ) -> tuple [torch.Tensor, dict [str , torch.Tensor]]: sft_loss = -policy_log_probs valid_token_count = response_mask.sum ().clamp(min =1e-5 ) mean_token_loss = masked_normalize( tensor=sft_loss, mask=response_mask, dim=None , normalize_constant=valid_token_count ) loss_to_backprop = mean_token_loss / gradient_accumulation_steps if loss_to_backprop.requires_grad: loss_to_backprop.backward() real_loss_value = loss_to_backprop.detach() * gradient_accumulation_steps return loss_to_backprop, {"loss" : real_loss_value}
sft的loss计算核心就是取负对数似然(negative log-likelihood),之前get_response_log_probs得到了每个token的log-probs,
probs就是模型预测下一个token的概率,log-probs就是对数概率。概率在0-1之间,log-probs则是负数,越接近0表示概率越高,越接近负无穷表示概率越低。取负号是因为我们希望最大化概率(最小化负对数概率),所以sft_loss就是-log_probs。
接下来,我们需要计算当前Batch中实际有多少个有效的Output Token,这就是response_mask.sum()的作用。response_mask是一个二值张量,标记了哪些位置是响应部分的token,sum()函数会计算出这些位置的总数。为了防止除以0的情况,我们使用clamp(min=1e-5)来确保valid_token_count至少为1e-5。
然后,我们调用masked_normalize函数来计算平均损失。这个函数会根据response_mask来掩盖掉不参与计算的token,只对有效的token进行求和,并除以有效token的数量来得到每个token的平均损失。
关于梯度累积:
一般来说,批次内的样本数量越多,模型进行梯度更新时的效果越好,但是受限于显存,我们可能无法一次性处理太大的批次。梯度累积的思想就是将一个大批次分成多个小批次(micro-batches),每处理完一个小批次就计算损失并进行反向传播,但不立即更新模型参数,而是等到处理完所有的小批次后再进行一次参数更新。这样就相当于在内存允许的范围内模拟了一个更大的批次。
举个例子:
假设我们想要达到一个批次大小为32的效果,在显存足够的情况下,
我们得到这32个样本梯度应该为
$$
\text{grad} = \frac{1}{32} \sum_{i=1}^{32} \nabla Loss_i
$$
由于显存不足,我们只能每次取4条数据,这样为了达到32的效果,我们要分8次算,
每一个微批次的梯度应该为
$$
\text{grad}j = \frac{1}{4} \sum ^{4} \nabla Loss_i
$$
当我们处理完8个微批次后,我们就得到了8个$\text{grad}_j$,我们需要将它们平均一下,才能得到最终的梯度:
$$
\text{grad} = \frac{1}{8} \sum_{j=1}^{8} \text{grad}_j
$$
两式结合起来,我们可以看到,最终的梯度是所有32个样本的平均梯度:
$$
\text{grad} = \frac{1}{8} \sum_{j=1}^{8} \left( \frac{1}{4} \sum_{i=1}^{4} \nabla Loss_i \right) = \frac{1}{32} \sum_{i=1}^{32} \nabla Loss_i
$$
与将32个样本一次性处理得到的梯度是一样的效果。通过梯度累积,我们可以在显存有限的情况下,模拟出更大的批次效果,从而提升模型的训练效果。
在代码实现中,由于.backward()只管累加,不能缩放,根据微积分的性质,常数乘积的导数,等于导数的常数倍,所以我们将除以8这个操作放在了每个微批次的损失计算中,即 loss_to_backprop = mean_token_loss / gradient_accumulation_steps,这样每个微批次的损失都会被缩放,最终在反向传播时就相当于我们处理了一个更大的批次。
但是实际打印loss时,不要忘了乘以8还原回真实的平均损失值,这样我们才能看到正确的训练损失。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 graph TD %% 定义全局样式和各类型框的样式 classDef default fill:#f9f9f9,stroke:#666,stroke-width:1px,rx:5px,ry:5px; classDef varBox fill:#fff3e0,stroke:#ffb74d,stroke-width:1px,color:#000; classDef textNode fill:none,stroke:none,color:#000,font-weight:bold; classDef plainText fill:none,stroke:none,color:#000; %% 顶部数据输入 Data["文本数据<br>Prompt + Output"] %% 第一阶段:蓝框 subgraph Stage1 [" "] Title1["1. tokenize_prompt_and_output (数据预处理)"]:::textNode I["input_ids (喂给模型)"]:::varBox L["labels (算交叉熵用)"]:::varBox R["response_mask (只对回答算分)"]:::varBox %% 蓝框内部指向 Title1 --> I Title1 --> L Title1 --> R end %% 蓝框背景样式 style Stage1 fill:#e3f2fd,stroke:#90caf9,stroke-width:2px,rx:10px,ry:10px %% 第二阶段:绿框 subgraph Stage2 [" "] Title2["2. get_response_log_probs (前向传播与打分)"]:::textNode E["compute_entropy (辅助函数)"]:::varBox %% 绿框内部双向虚线 Title2 -.-> E E -.-> Title2 end %% 绿框背景样式 style Stage2 fill:#e8f5e9,stroke:#a5d6a7,stroke-width:2px,rx:10px,ry:10px %% 中间过渡文字 LP["policy_log_probs (每个词的自信度)"]:::plainText %% 第三阶段:紫框 subgraph Stage3 [" "] Title3["3. sft_microbatch_train_step (计算最终Loss与反向传播)"]:::textNode M["masked_normalize (辅助函数)"]:::varBox %% 紫框内部单向虚线及文字 Title3 -.->|"(传入 sft_loss 和 response_mask)"| M end %% 紫框背景样式 style Stage3 fill:#f3e5f5,stroke:#ce93d8,stroke-width:2px,rx:10px,ry:10px %% 底部反向传播 BW["Loss.backward() 算梯度"]:::plainText %% 跨层级连线逻辑(组装流水线) Data --> Title1 I --> Title2 L --> Title2 Title2 --> LP LP --> Title3 R --> Title3 Title3 --> BW
sft数据准备
使用gsm8k数据集,r1_zero.prompt。
1 2 3 A conversation between User and Assistant. The User asks a question, and the Assistant solves it. The Assistant first thinks about the reasoning process in the mind and then provides the User with the answer. The reasoning process is enclosed within <think> </think> and answer is enclosed within <answer> </answer> tags, respectively, i.e., <think> reasoning process here </think> <answer> answer here </answer>. User: {question} Assistant: <think>
注意:提示词要和数据集格式吻合,原始数据集使用#### 分隔问题和答案,我们在提示词中使用了 <think>和 <answer>标签来区分思考过程和最终答案,所以我们需要将原始数据集中的####替换为 </think><answer>,并在答案最后添加 </answer>标签。
原始数据集格式:
1 {"question": "Weng earns $12 an hour for babysitting. Yesterday, she just did 50 minutes of babysitting. How much did she earn?", "answer": "Weng earns 12/60 = $<<12/60=0.2>>0.2 per minute.\nWorking 50 minutes, she earned 0.2 x 50 = $<<0.2*50=10>>10.\n#### 10"}
处理后的数据集格式:
1 {"prompt": "A conversation between User and Assistant. The User asks a question, and the Assistant solves it. The Assistant first thinks about the reasoning process in the mind and then provides the User with the answer. The reasoning process is enclosed within <think> </think> and answer is enclosed within <answer> </answer> tags, respectively, i.e., <think> reasoning process here </think> <answer> answer here </answer>.\n\nUser: Weng earns $12 an hour for babysitting. Yesterday, she just did 50 minutes of babysitting. How much did she earn?\n\nAssistant: <think>", "output": "Weng earns 12/60 = $<<12/60=0.2>>0.2 per minute.\nWorking 50 minutes, she earned 0.2 x 50 = $<<0.2*50=10>>10. </think> <answer> 10 </answer>"}
关于数据集的处理,此处不再赘述,主要是使用python的字符串替换功能来完成的。
将数据集格式处理好之后,我们就可以使用pytorch的Dataset和DataLoader来加载数据了。在dataloader中,我们会使用之前定义的tokenize_prompt_and_output函数来将文本数据转换为模型可以处理的张量格式。这样我们就完成了sft数据的准备工作。
有关dataset和dataloader的使用,可以参考其他相关教程,此处不再展开讲解。
sft训练
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 def train (): os.makedirs(OUTPUT_DIR, exist_ok=True ) device = torch.device("cuda" if torch.cuda.is_available() else "cpu" ) logging.basicConfig( format ="%(asctime)s - %(levelname)s - %(message)s" , datefmt="%m/%d/%Y %H:%M:%S" , level=logging.INFO, handlers=[ logging.FileHandler(os.path.join(OUTPUT_DIR, "training.log" )), logging.StreamHandler(sys.stdout) ] ) tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH) if tokenizer.pad_token is None : tokenizer.pad_token = tokenizer.eos_token model = AutoModelForCausalLM.from_pretrained(MODEL_PATH, torch_dtype=torch.bfloat16).to(device) model.gradient_checkpointing_enable() train_dataset = SFTDataset(DATA_TRAIN_PATH) data_collator = DataCollator(tokenizer) train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True , collate_fn=data_collator) optimizer = torch.optim.AdamW(model.parameters(), lr=lr) model.train() optimizer.zero_grad() global_step = 0 total_update_steps = max (1 , math.ceil(len (train_dataloader) * EPOCHS / GRAD_ACCUM_STEPS)) warmup_steps = int (0.1 * total_update_steps) scheduler = get_cosine_schedule_with_warmup( optimizer=optimizer, num_warmup_steps=warmup_steps, num_training_steps=total_update_steps ) for epoch in range (EPOCHS): for step, batch in enumerate (tqdm(train_dataloader)): input_ids = batch['input_ids' ].to(device) labels = batch['labels' ].to(device) mask = batch['response_mask' ].to(device) log_probs = get_response_log_probs(model, input_ids, labels, return_token_entropy=False )["log_probs" ] loss,metrics = sft_microbatch_train_step( policy_log_probs=log_probs, response_mask=mask, gradient_accumulation_steps=GRAD_ACCUM_STEPS, ) should_step = ((step + 1 ) % GRAD_ACCUM_STEPS == 0 ) or ((step + 1 ) == len (train_dataloader)) if should_step: torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0 ) optimizer.step() scheduler.step() optimizer.zero_grad() global_step += 1 if global_step > 0 and global_step % 100 == 0 : logger.info(f"Loss: {metrics['loss' ]:.4 f} " ) save_path = os.path.join(OUTPUT_DIR, f"checkpoint-epoch-{epoch + 1 } " ) model.save_pretrained(save_path) tokenizer.save_pretrained(save_path) logger.info(f"Model checkpoint saved after epoch {epoch + 1 } to {save_path} " ) final_save_path = os.path.join(OUTPUT_DIR, "final" ) model.save_pretrained(final_save_path) tokenizer.save_pretrained(final_save_path) logger.info(f"Final model saved to {final_save_path} " ) return final_save_path
准备好以下超参数,路径和部件:
MODEL_PATH:预训练模型的路径
DATA_TRAIN_PATH:训练数据集的路径
BATCH_SIZE:每个批次的样本数量
EPOCHS:训练的轮数
GRAD_ACCUM_STEPS:梯度累积的步数
lr:学习率
OUTPUT_DIR:输出目录,用于保存日志和模型检查点
logger:日志记录器,用于记录训练过程中的信息
初始化好模型、分词器、数据集和优化器。
计算总的更新步数,并设置学习率调度器(scheduler),这里使用了余弦退火调度器,并且设置了预热步数(warmup_steps)。
进入训练循环,遍历每个epoch和每个批次:
将输入数据搬运到设备上(GPU或CPU)。
调用get_response_log_probs函数获取每个token的log-probs。
调用sft_microbatch_train_step函数计算损失并进行反向传播。
根据梯度累积的设置,决定何时进行优化器更新和学习率调度器更新。
每100步记录一次当前的损失值。
每个epoch结束后保存一次模型检查点。
训练完成后,保存最终的模型。
一些小细节:
使用torch.nn.utils.clip_grad_norm_来进行梯度裁剪,防止梯度爆炸。
使用gradient_checkpointing_enable()来启用梯度检查点,以节省显存。
优点:可以节省大约 50% 到 70% 的激活值显存占用。这意味着你可以使用更大的 Batch Size,或者训练更长的序列(Sequence Length),甚至在单卡上微调原本放不下的模型。
缺点:因为在反向传播时需要重新计算一部分前向传播,所以训练速度会变慢大约 20% 到 30%。
模型和数据需要放在同一个设备上(通常是GPU),所以我们使用.to (device)来搬运数据。
学习率调度器的设置:我们计算了总的更新步数,并设置了预热步数为总步数的10%。这样在训练的前10%步数内,学习率会逐渐从0增加到设定的学习率,然后在剩余的90%步数内,学习率会按照余弦退火的方式逐渐降低。
在每个 Python 文件的开头,写上一句 logger = logging.getLogger(__name__)(用当前文件名作为 logger 的名字),然后在代码里统一使用 logger.info ()。这样你的日志系统会非常清晰、可控。
sft评估
golden答案提取
评估使用gsm8k数据集的test部分,r1_zero.prompt作为提示词,由于测试集部分我们并没有处理成 <think>和 <answer>的格式,所以我们需要在评估代码中进行一些适配,确保正确提取出“#### ”后面的答案部分进行评估。
1 2 3 4 5 6 ANS_RE = re.compile (r"####\s*([\-0-9\.,]+)" ) def extract_reference_answer (answer: str ) -> str : match = ANS_RE.search(answer) if match : return match .group(1 ).strip().replace("," , "" ) return "[invalid]"
group() 的用法
match.group(0)(或 match.group()):返回整个正则表达式匹配到的字符串。
例如,如果文本是 “The answer is #### 1,234.56” ,group(0) 会返回 “#### 1,234.56”。
match.group(1):返回第一个括号(即第一个捕获组)匹配到的字符串。
在上面的例子中,group(1) 会返回 “1,234.56”。
vllm推理
在进行推理之前,还是要把prompt和原测试集的question合并成完整的输入文本,然后使用vllm进行推理:
1 2 3 4 5 6 7 8 9 llm = LLM(model=model_path) sampling_params = SamplingParams( temperature=0.0 , max_tokens=1024 , stop=["</answer>" ], include_stop_str_in_output=True , ) outputs = llm.generate(prompts, sampling_params) responses = [o.outputs[0 ].text.strip() for o in outputs]
评估指标
将vllm推理后得到的答案与golden答案进行对比,计算准确率(accuracy),和格式正确率(format accuracy).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 for response, question, raw_answer in zip (responses, questions, answers): gold = extract_reference_answer(raw_answer) reward_dict = r1_zero_reward_fn(response, gold) correct += int (reward_dict.get("answer_reward" , 0.0 ) > 0 ) format_ok += int (reward_dict.get("format_reward" , 0.0 ) > 0 ) eval_details.append({ "question" : question, "response" : response, "gold_answer" : raw_answer, "extracted_gold" : gold, **reward_dict, })
总结
根据文档要求,我们分别取了128,256,512,1024以及全部训练数据进行训练,并在测试集上进行了评估。显然数据集约大,模型的表现越好,准确率和格式正确率也越高。
此外,作业还要求选取数据集里面答案和格式正确的过滤数据集进行对比,但是gms8k数据集的test部分本身就非常干净了,几乎没有格式错误或者答案错误的情况,所以我们就没有进行额外的过滤。不过结论是,如果数据集本身质量不高,进行过滤确实可以提升模型的表现,因为模型不会被错误的样本误导。
GRPO
核心函数
compute_group_normalized_rewards
组归一化奖励函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 def compute_group_normalized_rewards ( reward_fn: Callable , rollout_responses: list [str ], repeated_ground_truths: list [str ], group_size: int , advantage_eps: float , normalize_by_std: bool , ) -> tuple [torch.Tensor,torch.Tensor, dict [str , float ]]: """ 计算每组采样回答(rollout responses)的奖励,并根据组大小进行归一化。 关于 GRPO 的更多信息,请参阅: DeepSeekMath: https://arxiv.org/abs/2402.03300 DeepSeek-R1: https://arxiv.org/abs/2501.12948 参数 (Args): reward_fn: Callable[[str, str], dict[str, float]], 将采样回答与标准答案(ground truths)进行比对评分, 生成一个包含 "reward"(总奖励)、"format_reward"(格式奖励) 和 "answer_reward"(答案准确奖励)键的字典。 rollout_responses: list[str], 策略模型生成的采样回答列表。 该列表的长度为 `rollout_batch_size = n_prompts_per_rollout_batch * group_size`。 repeated_ground_truths: list[str], 样本的标准答案列表。 该列表的长度为 `rollout_batch_size`,因为每个样本的标准答案 都被重复了 `group_size` 次。 group_size: int, 每组的采样数量。 advantage_eps: float, 用于组归一化时防止除以零的 epsilon 值。 normalize_by_std: bool, 是否使用奖励的标准差(std)进行归一化。 返回 (Returns): tuple[torch.Tensor, torch.Tensor, dict[str, float]]: torch.Tensor, 形状为 (rollout_batch_size,): 每个采样回答的组归一化奖励(优势 Advantage)。 torch.Tensor, 形状为 (rollout_batch_size,): 每个采样回答的原始奖励。 dict[str, float]: 采样批次的奖励元数据。 你可以选择在此记录你希望记录的内容(例如奖励的统计数据等)。 """ raw_rewards = [] for response, gt in zip (rollout_responses, repeated_ground_truths): reward_info = reward_fn(response, gt) raw_rewards.append(reward_info["reward" ]) raw_rewards = torch.tensor(raw_rewards, dtype=torch.float32) group_rewards = raw_rewards.view(-1 , group_size) group_means = group_rewards.mean(dim = -1 ,keepdim = True ) group_std = group_rewards.std(dim = -1 , keepdim = True ) advantages = group_rewards - group_means if normalize_by_std: advantages = advantages / (group_std + advantage_eps) advantages = advantages.view(-1 ) meta_data = { "mean_reward" : raw_rewards.mean().item(), "std_reward" : raw_rewards.std().item(), } return advantages, raw_rewards, meta_data
单个样本优势公式如下:
$$
\hat{A}_i = \frac{r_i - \mu}{\sigma + \epsilon}
$$
normalize_by_std用来控制是否使用标准差进行归一化,如果设置为False,那么优势的计算公式就变成了:
$$
\hat{A}_i = r_i - \mu
$$
这样可以防止样本之间太相似导致标准差过小,从而使得优势值过大或者过小,影响训练的稳定性。
compute_naive_policy_gradient_loss
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def compute_naive_policy_gradient_loss ( raw_rewards_or_advantages: torch.Tensor, policy_log_probs: torch.Tensor, ) -> torch.Tensor: """ 使用原始奖励或优势(advantages)计算策略梯度损失。 参数 (Args): raw_rewards_or_advantages: torch.Tensor, 形状为 (batch_size, 1): 每个采样回答的原始奖励或优势。 policy_log_probs: torch.Tensor, 形状为 (batch_size, sequence_length): 策略模型的对数概率。 返回 (Returns): torch.Tensor, 形状为 (batch_size, sequence_length): 每个 token 的策略梯度损失。 """ per_token_loss = -policy_log_probs * raw_rewards_or_advantages return per_token_loss
获得token级别的loss。对于每个token,我们都有一个对应的log-probability(模型对这个token的自信度),以及一个对应的reward或者advantage(这个token的奖励)。根据策略梯度的原理,我们希望最大化奖励,所以损失函数应该是奖励的负数乘以log-probability。也就是说,如果某个token的奖励很高,并且模型对它非常自信(log-probability接近0),那么这个token的损失就会很小,反之,如果奖励很低或者模型不自信,那么这个token的损失就会很大。
$-A_t \cdot \log p_\theta(o_t | q, o_{<t})$ 。
compute_grpo_clip_loss
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 def compute_grpo_clip_loss ( advantages: torch.Tensor, policy_log_probs: torch.Tensor, old_log_probs: torch.Tensor, cliprange: float , ) -> tuple [torch.Tensor, dict [str , torch.Tensor]]: """ 计算 GRPO-Clip 损失。 参数 (Args): advantages: torch.Tensor, 形状为 (batch_size, 1): 每个采样回答的优势。 policy_log_probs: torch.Tensor, 形状为 (batch_size, sequence_length): 当前策略模型的对数概率。 old_log_probs: torch.Tensor, 形状为 (batch_size, sequence_length): 旧策略模型的对数概率。 cliprange: float, 用于限制比率(ratio)的裁剪范围。 返回 (Returns): tuple[torch.Tensor, dict[str, torch.Tensor]]: torch.Tensor, 形状为 (batch_size, sequence_length): 每个 token 的 GRPO-Clip 损失。 dict[str, torch.Tensor]: GRPO-Clip 损失的元数据 (通常用于计算裁剪比例 clip fraction)。 """ prob_ratio = torch.exp(policy_log_probs - old_log_probs) uncilp = prob_ratio * advantages clipped_ratio = torch.clamp(prob_ratio, 1.0 - cliprange, 1.0 + cliprange) clipped = clipped_ratio * advantages per_token_loss = -torch.min (uncilp, clipped) metadata = { "uncilp_mean" : uncilp.mean(), "clipped_mean" : clipped.mean(), } return per_token_loss, metadata
ratio的计算公式如下:
$$
r_t(\theta) = \frac{\pi_\theta(o_t|q, o_{<t})}{\pi_{\theta_{old}}(o_t|q, o_{<t})}
$$
概率比率 (Probability Ratio):
$r_t(\theta)$$q$ 是你的 Prompt(比如输入的 Text)。
$o_t$ 是当前生成的第 $t$ 个 Token(比如 SQL 中的 SELECT)。
$\pi_\theta$ 是当前正在训练的模型生成这个 Token 的概率。
$\pi_{\theta_{old}}$ 是更新前(或采样时)的旧模型生成这个 Token 的概率。
含义: 如果 $r_t(\theta) > 1$,说明新模型比旧模型更倾向于生成这个 Token;如果 $< 1$,说明新模型不那么倾向于生成它了。这在强化学习里叫重要性采样(Importance Sampling)。
$$
-\min \left( \frac{\pi_\theta(o_t|q, o_{<t})}{\pi_{\theta_{old}}(o_t|q, o_{<t})} A_t, \text{clip} \left( \frac{\pi_\theta(o_t|q, o_{<t})}{\pi_{\theta_{old}}(o_t|q, o_{<t})}, 1-\epsilon, 1+\epsilon \right) A_t \right)
$$
公式里使用了 $\min(未裁剪值, 裁剪值)$ ,这是一种 悲观约束(Pessimistic Bound) 。我们分两种情况来看它到底有多精妙:
场景 A:当 $A_t > 0$ (这是一个好回答,模型表现不错)
既然是好回答,我们当然希望增加生成它的概率,即让 $r_t(\theta)$ 变大。
如果 $r_t(\theta)$ 从 $1.0$ 涨到了 $1.15$ :一切正常,没有触发裁剪,$1.15 \times A_t$ 。
如果模型过于兴奋,把概率比 $r_t(\theta)$ 暴涨到了 $1.5$ :此时 Clip 机制介入,把它强行按回到 $1+\epsilon$ (比如 $1.2$ )。由于 $\min(1.5 \times A_t, 1.2 \times A_t) = 1.2 \times A_t$ ,最终计算 Loss 时 只奖励到 1.2 倍为止 。
大白话: 既然你做对了,我给你糖吃。但为了防止你骄傲自满偏离轨道(Reward Hacking),即使你表现再好,我一次最多也就只奖励你 20% 的甜头。
场景 B:当 $A_t < 0$ (这是一个烂回答,拖了全组后腿)
既然是烂回答,我们希望降低生成它的概率,即让 $r_t(\theta)$ 变小(趋近于 0)。
如果 $r_t(\theta)$ 降低到了 $0.9$ :正常惩罚。
如果模型为了逃避惩罚,极度恐慌地把 $r_t(\theta)$ 降到了 $0.5$ :Clip 机制介入,变为 $1-\epsilon$ (比如 $0.8$ )。由于 $A_t$ 是负数,$\min(0.5 \times A_t, 0.8 \times A_t)$ 的结果会选择那个 更小的值(也就是负得更多的值,即 $0.5 \times A_t$ ) !
等等,这里好像有个陷阱? 没错!在标准的 PPO 中,$\min$ 函数确保了:如果你想过度偏离 好的方向,我不给你额外的收益(用 Clip 截断);但如果你在烂的方向上走得太远,我会保留那个未裁剪的、更严厉的惩罚值(通过梯度让你赶紧滚回来)。
compute_policy_gradient_loss
这只是一个简单的包装函数,根据是否使用clip来选择计算哪种损失。是为了实验比较使用的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def compute_policy_gradient_loss ( policy_log_probs: torch.Tensor, loss_type: str , raw_rewards: torch.Tensor, advantages: torch.Tensor, old_log_probs: torch.Tensor, cliprange: float , ) -> tuple [torch.Tensor, dict [str , torch.Tensor]]: """ 一个包装器(wrapper),用于分发调用上述合适的策略梯度损失函数。 """ if loss_type == "no_baseline" : return compute_naive_policy_gradient_loss(raw_rewards, policy_log_probs), {} elif loss_type == "reinforce_with_baseline" : return compute_naive_policy_gradient_loss(advantages, policy_log_probs), {} elif loss_type == "grpo_clip" : return compute_grpo_clip_loss(advantages, policy_log_probs, old_log_probs, cliprange) else : raise ValueError(f"Unsupported loss type: {loss_type} " )
masked_mean
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def masked_mean (tensor: torch.Tensor, mask: torch.Tensor, dim: int | None = None ) -> torch.Tensor: """ 计算张量沿指定维度的均值,仅考虑掩码值为 1 的元素。 参数 (Args): tensor: torch.Tensor, 要计算均值的张量。 mask: torch.Tensor, 掩码。我们只计算掩码值为 1 的元素的均值。 dim: int | None, 要计算均值的维度。 如果为 None,则对所有非掩码元素求和并除以它们的总数。 返回 (Returns): torch.Tensor, 张量沿指定维度的均值(仅考虑掩码值为 1 的元素)。 """ masked_tensor= tensor*mask if dim is None : sum_masked = masked_tensor.sum () count_masked = mask.sum () else : sum_masked = masked_tensor.sum (dim=dim) count_masked = mask.sum (dim=dim) mean_masked = sum_masked / count_masked return mean_masked
grpo_microbatch_train_step
微批次训练步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 def grpo_microbatch_train_step ( policy_log_probs: torch.Tensor, response_mask: torch.Tensor, gradient_accumulation_steps: int , loss_type: Literal ["no_baseline" , "reinforce_with_baseline" , "grpo_clip" ], raw_rewards: torch.Tensor | None = None , advantages: torch.Tensor | None = None , old_log_probs: torch.Tensor | None = None , cliprange: float | None = None , ) -> tuple [torch.Tensor, dict [str , torch.Tensor]]: """ 计算微批次(microbatch)的策略梯度损失并反向传播其梯度。 参数 (Args): policy_log_probs: torch.Tensor, 形状为 (batch_size, sequence_length): 策略模型的对数概率。 response_mask: torch.Tensor, 形状为 (batch_size, sequence_length): 回答部分的掩码。 gradient_accumulation_steps: int, 梯度累积步数。 loss_type: Literal["no_baseline", "reinforce_with_baseline", "grpo_clip"], 要使用的损失函数类型。 raw_rewards: torch.Tensor | None, 每个采样回答的原始奖励。 当 loss_type="no_baseline" 时需要。 advantages: torch.Tensor | None, 每个采样回答的优势。 当 loss_type 为 "reinforce_with_baseline" 或 "grpo_clip" 时需要。 old_log_probs: torch.Tensor | None, 旧策略模型的对数概率。 当 loss_type="grpo_clip" 时需要。 cliprange: float | None, 比率的裁剪范围。 当 loss_type="grpo_clip" 时需要。 # 注意:此处源码参数列表中未显式定义 constant_normalize_factor,但在文档说明中有提及 # constant_normalize_factor: int | None, 如果我们想对序列维度求和 # 并按此常数因子归一化(如在 Dr. GRPO 中那样),则提供此参数。 返回 (Returns): tuple[torch.Tensor, dict[str, torch.Tensor]]: 策略梯度损失及其元数据。 """ per_token_loss, metadata = compute_policy_gradient_loss( policy_log_probs=policy_log_probs, loss_type=loss_type, raw_rewards=raw_rewards, advantages=advantages, old_log_probs=old_log_probs, cliprange=cliprange, ) ''' Q:为什么我们先计算每个样本的平均损失(沿序列维度),然后再对批次求平均,而不是直接对所有 token 的损失求平均呢? A:如果我们直接对所有 token 的损失求平均,那么每个 token 对最终贡献度是一样的, 这可能会导致训练不稳定,尤其是当序列长度变化较大时。通过先计算每个样本的平均损失, 我们确保每个样本对最终损失的贡献是均等的,无论它们的序列长度如何。这种方法可以提高训练的稳定性和效率。 ''' batch_loss = masked_mean(per_token_loss, response_mask, dim=1 ) loss = batch_loss.mean() backprop_loss = loss / gradient_accumulation_steps if backprop_loss.requires_grad: backprop_loss.backward() return loss, metadata
GRPO训练
整个训练流程大概分为以下几步:
vllm推理:使用vllm对训练数据进行推理,得到模型的回答。
奖励计算:使用奖励函数或者奖励模型对模型的回答进行打分,得到answer_reward和format_reward。
旧概率计算:得到旧模型在训练数据上的log-probs,作为后续计算概率比值的基础。
GRPO微批次训练步骤:使用之前计算的奖励和旧概率,按照GRPO算法进行微批次的训练步骤,计算损失并进行反向传播。
vllm权重更新
作业中说明了不使用KL散度作为约束项,所以实际上实现的是一种特殊的GRPO
超参数详解:
rollout_batch_size:每次进行vllm推理的样本数量
group_size:每个问题生成几个回答,rollout_batch_size//group_size就是每次推理的实际问题数量
train_batch_size:控制一次GRPO更新的有效 batch
gradient_accumulation_steps:梯度累积的大小。微批次训练的步数micro_train_batch_size = train_batch_size // gradient_accumulation_steps
epoch_per_rollout_batch:每次vllm推理后,进行多少轮GRPO训练,=1时是on-policy,>1时是off-policy
cliprange:GRPO算法中的clip范围,控制概率比值的范围,防止过大或者过小导致训练不稳定