环境与准备

  • 使用conda下载好的flash-atten,cuda和python版本要匹配,安装transformers和datasets库
  • 下载Qwen2.5-Math-1.5B作为基座模型
  • 数据集直接使用原项目中的/data目录下的几个数据集

sft

核心函数

tokenize_prompt_and_output

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def tokenize_prompt_and_output(
prompt_strs: list[str],
output_strs: list[str],
tokenizer: PreTrainedTokenizerBase,
) -> dict[str, Tensor]:
# 该函数将提示和输出字符串列表转换为模型输入张量,包括 input_ids、labels 和 response_mask。
prompt_ids = []
output_ids = []
masks = []
for prompt, output in zip(prompt_strs, output_strs):
prompt_id = torch.tensor(tokenizer.encode(prompt, add_special_tokens=False))
output_id = torch.tensor(tokenizer.encode(output, add_special_tokens=False))
mask = torch.cat((torch.zeros_like(prompt_id), torch.ones_like(output_id)), dim=0)
masks.append(mask)
prompt_ids.append(prompt_id)
output_ids.append(output_id)
# 将 prompt_ids 和 output_ids 拼接成 input_ids
full_prompt_output_ids = []
full_len = []
for p_id, o_id in zip(prompt_ids, output_ids):
full_prompt_output_id = torch.cat((p_id, o_id), dim=0)
full_prompt_output_ids.append(full_prompt_output_id)
full_len.append(len(full_prompt_output_id))
max_len = max(full_len)

input_ids = []
labels = []
response_mask = []
for full_prompt_output_id,mask in zip(full_prompt_output_ids, masks):
pad_len = max_len - full_prompt_output_id.size(0)
full_prompt_output_id = torch.nn.functional.pad(full_prompt_output_id,(0,pad_len),value=tokenizer.pad_token_id)
mask = torch.nn.functional.pad(mask,(0,pad_len),value=0)
input_id = full_prompt_output_id[:-1]
label = full_prompt_output_id[1:]
response_m = mask[1:]
input_ids.append(input_id)
labels.append(label)
response_mask.append(response_m)
input_ids = torch.stack(input_ids, dim=0)
labels = torch.stack(labels, dim=0)
response_mask = torch.stack(response_mask, dim=0)

return {
"input_ids" : input_ids,
"labels" : labels,
"response_mask" : response_mask
}

大模型工作原理就是根据输入的prompt来预测下一个token是什么,这个函数的作用就是将输入的prompt和输出字符串列表转换为模型可以处理的张量格式,包括input_ids、labels和response_mask。其中input_ids是拼接后的输入序列,labels是对应的标签序列(即期望输出),response_mask用于标记响应部分的token,以便在训练时只计算响应部分的损失。

举个例子:
假设我们有一个prompt字符串列表和一个输出字符串列表:

1
2
prompt_strs = ["What is the capital of France"]([11, 122, 138, 145, 15, 90])
output_strs = ["The capital of France is Paris"]([122, 138, 145, 15, 90, 17])

那么经过tokenize_prompt_and_output函数处理后,我们希望得到:

1
2
input_strs = ["What is the capital of France The capital of France is"]([11, 122, 138, 145, 15, 90, 122, 138, 145, 15, 90])
labels = ["is the capital of France The capital of France is Paris"]([122, 138, 145, 15, 90, 122, 138, 145, 15, 90, 17])

也就是经过了错位处理,
但是,计算机只能处理数字,所以我们需要将这些字符串转换为对应的token id,这就是tokenizer的作用。每个数字表示一个token在词表中的索引。最终,我们得到的input_idslabels都是数字序列,可以直接输入到模型中进行训练。
response_mask则是一个二值序列,用于标记哪些token是响应部分,以便在计算损失时只关注这些部分。
在上面例子中,“The capital of France is Paris”才是模型真正应该学习的内容,所以response_mask会标记这些token的位置。“What is the capital of France”部分则不参与损失计算。故而response_mask是**[0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1]**,表示前5个token是prompt部分,后6个token是响应部分。

不难发现,input_idslabelsresponse_mask的形状应该是一致的。

序列位置 (Index) 0 1 2 3 4 5 6 7 8 9 10
input_ids(输入给模型) 11(What) 122(is) 138(the) 145(capital) 15(of) 90(France) 122(The) 138(capital) 145(of) 15(France) 90(is)
labels(期望的输出) 122(is) 138(the) 145(capital) 15(of) 90(France) 122(The) 138(capital) 145(of) 15(France) 90(is) 17(Paris)
response_mask(是否算 Loss) 0 0 0 0 0 1 1 1 1 1 1
模型在干嘛? 预测 is 预测 the 预测 capital 预测 of 预测 France 预测 The 预测 capital 预测 of 预测 France 预测 is 预测 Paris

现在我们搞清楚了函数的作用,以及输入和输出到底是什么,接下来需要考虑另外一个问题:padding
在将数据喂给大模型时,我们是同时处理多个样本的,这些样本的长度可能不一样。为了让它们能够在同一批次中进行处理,我们需要对它们进行padding,使得它们的长度一致。
所以,我们将prompt和output拼接成prompt_output_full好之后,计算出最长的序列长度,然后对所有的序列进行padding,使得它们的长度都等于这个最大长度。
padding的值通常是tokenizer的pad_token_id,mask的padding补0。这样模型在处理这些padding部分时就不会产生实际的影响。
我们取prompt_output_full的[:-1]作为input_ids,取[1:]作为labels,这样就实现了错位处理,使得模型在预测下一个token时能够正确地学习到输出序列的模式。同时,response_mask也相应地进行错位处理,以确保在计算损失时只关注响应部分的token。
最后,再把这些处理好的input_ids、labels和response_mask堆叠成批次的张量,返回给训练函数使用。

pytorch知识点:

  • torch.stack与torch.cat的区别:
    • torch.stack:在指定维度上将输入张量列表堆叠成一个新的张量。输入张量必须具有相同的形状。不改变原来括号的层数
    • torch.cat:在指定维度上连接输入张量列表。输入张量在连接维度上必须具有相同的形状,但其他维度可以不同。括号层数加一
    • dim=0表示最外层的括号
  • torch.nn.functional.pad:用于对张量进行填充操作,可以指定填充的值和填充的维度。
    • 语法: torch.nn.functional.pad(input, pad, mode='constant', value=0)
      其中 pad 是一个元组:(左边补几个, 右边补几个, 上面补几个, 下面补几个, …)。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      matrix = torch.ones(2, 2)  # 一个 2x2 的全 1 矩阵
      # tensor([[1., 1.],
      # [1., 1.]])

      # pad = (1, 1, 0, 2) 的解析顺序:
      # 1. 看最后一条边(宽度/列):左边补 1 个,右边补 1 个
      # 2. 看倒数第二条边(高度/行):上面补 0 个,下面补 2 个
      padded_matrix = F.pad(matrix, (1, 1, 0, 2), value=0)

      print(padded_matrix)
      # 输出:
      # tensor([[0., 1., 1., 0.],
      # [0., 1., 1., 0.],
      # [0., 0., 0., 0.],
      # [0., 0., 0., 0.]])

compute_entropy逐词熵

1
2
3
4
5
6
7
8
9
10
11
12
def compute_entropy(logits: torch.Tensor) -> torch.Tensor:
"""获取 Logits 的熵(即最后一个维度的熵)。"""
'''
logits: shape (batch_size, sequence_length, vocab_size)
Q:
为什么计算熵的时候不直接使用softmax?
A:
若概率很低,则 log(prob) 会是一个很大的负数,导致溢出。而log_softmax内部有数值稳定的处理。
'''
log_probs = torch.log_softmax(logits, dim=-1)
entropy = -torch.sum(log_probs * torch.exp(log_probs), dim=-1)
return entropy

大模型最后输出的是一个logits张量,表示每个位置上每个token的预测分布。计算熵的目的是为了衡量模型在每个位置上的不确定性。熵越高,表示模型对下一个token的预测越不确定;熵越低,表示模型对下一个token的预测越确定。

先将logits分布转化为概率:

log_probs = torch.log_softmax(logits, dim=-1)

$$
\log p(x_i) = \log\left(\frac{\exp(z_i)}{\sum_{j=1}^{V} \exp(z_j)}\right)
$$

然后再计算真实的熵:
香农熵的标准定义是所有可能事件概率 $p_i$ 与其信息量 $\log p_i$ 乘积的负总和。对应到大模型的词表(Vocabulary)维度,公式为:

$$
H(X) = - \sum_{i=1}^{V} p(x_i) \log p(x_i)
$$

$V$:词表大小(vocab_size),即所有可能预测的 Token 数量。

$p(x_i)$:模型预测下一个 Token 是 $x_i$ 的概率。

$\log p(x_i)$:该 Token 对应的对数概率。

get_response_log_probs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def get_response_log_probs(
model: torch.nn.Module,
input_ids: torch.Tensor,
labels: torch.Tensor,
return_token_entropy: bool,
) -> dict[str, torch.Tensor | None]:
"""
获取在给定提示下,回答(response)部分的条件对数概率(log-probs),
并可选择性地获取下一个 token 预测的熵。

参数 (Args):
model: PreTrainedModel, 用于打分的模型。
input_ids: torch.Tensor, 形状为 (batch_size, sequence_length):
分词后的提示和输出。
labels: torch.Tensor, 形状为 (batch_size, sequence_length):
移位后的 input_ids。
return_token_entropy: bool, 是否返回下一个 token 预测的熵。

返回 (Returns):
dict[str, torch.Tensor | None]:
"log_probs": torch.Tensor, 形状为 (batch_size, sequence_length):
给定提示下回答的条件对数概率。
注意:我们尚未在此处掩盖(mask)掉对应于提示或填充的 token 索引;
这将在训练循环中完成。
"token_entropy": Optional[torch.Tensor], 形状为 (batch_size, sequence_length):
下一个 token 预测的熵。与 log-probs 一样,我们尚未在此处掩盖掉
对应于提示或填充的 token 索引;这将在训练循环中完成。
"""
# 前向传播得到logits
outputs = model(input_ids=input_ids)
logits = outputs.logits # shape: (batch_size, sequence_length, vocab_size)

# 优化显存: 使用 cross_entropy 直接计算 log_probs,避免创建 (B, L, V) 的 all_log_probs
# log_prob(token) = -cross_entropy(logits, token)
logits_flat = logits.view(-1, logits.size(-1))
labels_flat = labels.view(-1)
token_log_probs_flat = -torch.nn.functional.cross_entropy(logits_flat, labels_flat, reduction='none')# reduction='none'表示不对结果进行任何聚合,直接返回每个元素的损失值,这样我们就得到了每个token的log-probs,而不是一个平均值。
log_probs = token_log_probs_flat.view(logits.size(0), logits.size(1))
# 计算熵(如果需要)
token_entropy = None
if(return_token_entropy):
token_entropy = compute_entropy(logits) # shape: (batch_size, sequence_length)

return{
"log_probs": log_probs,
"token_entropy": token_entropy
}

.view函数作用是改变张量形状,不改变其中数据。

  1. 基础玩法:明确指定每一个维度

view 最基本的用法,就是你明确告诉 PyTorch,你想把张量捏成几行几列。

核心法则(黄金定律):变换前后的“元素总数”必须绝对相等!

假设我们有一个包含 6 个元素的一维张量(一条长线):

Python

1
2
3
4
import torch

# 形状: [6] (总共 6 个元素)
a = torch.tensor([1, 2, 3, 4, 5, 6])

我们可以用 view 把这 6 个元素重新排列组合:

组合 A:变成 2 行 3 列

Python

1
2
3
4
5
6
b = a.view(2, 3)
print(b)
# 输出:
# tensor([[1, 2, 3],
# [4, 5, 6]])
# 检查元素总数:2 * 3 = 6,合法!

组合 B:变成 3 行 2 列

Python

1
2
3
4
5
6
7
c = a.view(3, 2)
print(c)
# 输出:
# tensor([[1, 2],
# [3, 4],
# [5, 6]])
# 检查元素总数:3 * 2 = 6,合法!

注意: 数据在变形时,永远是 “从左到右,从上到下” 按顺序填入新形状的,它不会打乱数字原本的先后顺序。

2. 高阶魔法:-1 (偷懒占位符)

这就是你在大模型代码里最常见到的写法。

很多时候,张量非常大(比如 Batch Size 会随着用户输入动态变化),我们人在写代码时,懒得去或者无法提前算出某一个维度到底该是多少。这时候就可以派 -1 出场了。

-1 的意思是:“PyTorch,前面/后面的维度我已经定好了,剩下的那个维度具体是多少,你帮我用除法算一下!”

还是用刚才那 6 个元素的张量 a 举例:

场景 A:我只想要 2 列,行数你帮我算

Python

1
2
3
4
5
# 告诉 PyTorch:我要变成二维矩阵,第 1 维(列数)是 2,第 0 维(行数)你看着办。
d = a.view(-1, 2)

# PyTorch 内部计算:总数 6 / 列数 2 = 行数 3。所以 -1 自动变成了 3。
print(d.shape) # 输出: torch.Size([3, 2])

场景 B:不管原来是几维,全部给我压扁成一维(最常用的展平操作)

Python

1
2
3
4
5
6
7
# 假设我们拿到了刚才的 3x2 矩阵 c
# 告诉 PyTorch:把所有元素揉成一条线!
e = c.view(-1)

# PyTorch 内部计算:总数 6,所以变成长度为 6 的一维张量。
print(e.shape) # 输出: torch.Size([6])
print(e) # 输出: tensor([1, 2, 3, 4, 5, 6])

  1. 实战解析:回到你的大模型代码

Python

1
logits_flat = logits.view(-1, logits.size(-1))

假设 logits 的原始形状是 [Batch=2, Seq=3, Vocab=32000](总元素数是 $2 \times 3 \times 32000 = 192000$)。

  • logits.size(-1) 拿到的是最后一个维度的大小,也就是 32000
  • 所以这行代码等价于:logits.view(-1, 32000)
  • 此时 PyTorch 看到 -1,就会默默帮你计算:$192000 \div 32000 = 6$
  • 最终,3D 的张量就被完美展平为了 2D 的 [6, 32000]

torch.nn.functional.cross_entropy内部在做什么?

  1. 原本的逻辑: Logits $\rightarrow$ 算 Softmax 变概率 $\rightarrow$ 挑出正确答案的概率 $\rightarrow$ 取 Log 算误差。
  2. cross_entropy 内部的逻辑:(算交叉熵损失的)
    • 跳过真实概率: 利用对数公式化简和 LogSumExp 技巧,直接算出所有词的“对数概率”(Log-Prob)。
    • 查字典提取: 看着你的 label(比如正确答案在第 0 位),直接把第 0 位的那个对数概率“拔”出来。
    • 加负号: 在拔出来的数字前面加个负号,变成正数的 Loss 返回给你。
  3. 但是我们需要的是每个 token 的 log-probs,所以再取一个负号就得到了我们想要的结果。

masked_normalize 带掩码归一化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def masked_normalize(
tensor: torch.Tensor,
mask: torch.Tensor,
dim: int | None = None,
normalize_constant: float = 1.0,
) -> torch.Tensor:
"""
沿维度求和并按常数归一化,仅考虑掩码值为 1 的元素。

参数 (Args):
tensor: torch.Tensor, 要求和并归一化的张量。
mask: torch.Tensor, 掩码。我们只考虑掩码值为 1 的元素。
dim: int | None, 归一化之前进行求和的维度。
如果为 None,则对所有维度求和。
normalize_constant: float, 用于归一化的除数常数。

返回 (Returns):
torch.Tensor, 归一化后的和,其中被掩盖的元素(mask=0)不参与求和。
"""
# 矩阵乘法是multi或者@,*是逐元素相乘
masked_tensor = tensor * mask.to(tensor.dtype) #掩码为0则不计算
if dim is None:
summed = masked_tensor.sum()
else:
summed = masked_tensor.sum(dim=dim)
normalized = summed / normalize_constant
return normalized

sft_microbatch_train_step

此函数作用就是进行一次微批次的训练步骤,计算损失并进行反向传播。它会调用前面定义的函数来获取log_probs和token_entropy,然后根据response_mask来计算最终的损失值,并执行反向传播更新模型参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def sft_microbatch_train_step(
policy_log_probs: torch.Tensor,
response_mask: torch.Tensor,
gradient_accumulation_steps: int,
normalize_constant: float | None = 1.0,
) -> tuple[torch.Tensor, dict[str, torch.Tensor]]:

# 1. 取负对数似然
sft_loss = -policy_log_probs

# 2. 计算当前 Batch 中实际有多少个有效的 Output Token
valid_token_count = response_mask.sum().clamp(min=1e-5)#clamp的作用是将输入张量中的元素限制在一个指定的范围内,这里我们设置最小值为1e-5,防止除以0的情况发生。

# 3. 优雅地调用 masked_normalize 完成掩码、求和与求平均
mean_token_loss = masked_normalize(
tensor=sft_loss,
mask=response_mask,
dim=None, # None 表示对整个 Batch 所有元素求和
normalize_constant=valid_token_count # 传入有效 Token 数量作为分母
)

# 4. 除以梯度累积步数,准备反向传播
loss_to_backprop = mean_token_loss / gradient_accumulation_steps

if loss_to_backprop.requires_grad:
loss_to_backprop.backward()

# 5. 还原出未经梯度累积缩放的真实平均 Loss,detach()的作用是将loss_to_backprop从计算图中分离出来,使其不再参与梯度计算,这样我们就可以安全地进行数值操作而不会影响反向传播的过程。
real_loss_value = loss_to_backprop.detach() * gradient_accumulation_steps

return loss_to_backprop, {"loss": real_loss_value}

sft的loss计算核心就是取负对数似然(negative log-likelihood),之前get_response_log_probs得到了每个token的log-probs,
probs就是模型预测下一个token的概率,log-probs就是对数概率。概率在0-1之间,log-probs则是负数,越接近0表示概率越高,越接近负无穷表示概率越低。取负号是因为我们希望最大化概率(最小化负对数概率),所以sft_loss就是-log_probs。

接下来,我们需要计算当前Batch中实际有多少个有效的Output Token,这就是response_mask.sum()的作用。response_mask是一个二值张量,标记了哪些位置是响应部分的token,sum()函数会计算出这些位置的总数。为了防止除以0的情况,我们使用clamp(min=1e-5)来确保valid_token_count至少为1e-5。

然后,我们调用masked_normalize函数来计算平均损失。这个函数会根据response_mask来掩盖掉不参与计算的token,只对有效的token进行求和,并除以有效token的数量来得到每个token的平均损失。

关于梯度累积:
一般来说,批次内的样本数量越多,模型进行梯度更新时的效果越好,但是受限于显存,我们可能无法一次性处理太大的批次。梯度累积的思想就是将一个大批次分成多个小批次(micro-batches),每处理完一个小批次就计算损失并进行反向传播,但不立即更新模型参数,而是等到处理完所有的小批次后再进行一次参数更新。这样就相当于在内存允许的范围内模拟了一个更大的批次。
举个例子:
假设我们想要达到一个批次大小为32的效果,在显存足够的情况下,
我们得到这32个样本梯度应该为

$$
\text{grad} = \frac{1}{32} \sum_{i=1}^{32} \nabla Loss_i
$$

由于显存不足,我们只能每次取4条数据,这样为了达到32的效果,我们要分8次算,
每一个微批次的梯度应该为

$$
\text{grad}j = \frac{1}{4} \sum^{4} \nabla Loss_i
$$

当我们处理完8个微批次后,我们就得到了8个$\text{grad}_j$,我们需要将它们平均一下,才能得到最终的梯度:

$$
\text{grad} = \frac{1}{8} \sum_{j=1}^{8} \text{grad}_j
$$

两式结合起来,我们可以看到,最终的梯度是所有32个样本的平均梯度:

$$
\text{grad} = \frac{1}{8} \sum_{j=1}^{8} \left( \frac{1}{4} \sum_{i=1}^{4} \nabla Loss_i \right) = \frac{1}{32} \sum_{i=1}^{32} \nabla Loss_i
$$

与将32个样本一次性处理得到的梯度是一样的效果。通过梯度累积,我们可以在显存有限的情况下,模拟出更大的批次效果,从而提升模型的训练效果。

在代码实现中,由于.backward()只管累加,不能缩放,根据微积分的性质,常数乘积的导数,等于导数的常数倍,所以我们将除以8这个操作放在了每个微批次的损失计算中,即 loss_to_backprop = mean_token_loss / gradient_accumulation_steps,这样每个微批次的损失都会被缩放,最终在反向传播时就相当于我们处理了一个更大的批次。

但是实际打印loss时,不要忘了乘以8还原回真实的平均损失值,这样我们才能看到正确的训练损失。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
graph TD
%% 定义全局样式和各类型框的样式
classDef default fill:#f9f9f9,stroke:#666,stroke-width:1px,rx:5px,ry:5px;
classDef varBox fill:#fff3e0,stroke:#ffb74d,stroke-width:1px,color:#000;
classDef textNode fill:none,stroke:none,color:#000,font-weight:bold;
classDef plainText fill:none,stroke:none,color:#000;

%% 顶部数据输入
Data["文本数据<br>Prompt + Output"]

%% 第一阶段:蓝框
subgraph Stage1 [" "]
Title1["1. tokenize_prompt_and_output (数据预处理)"]:::textNode
I["input_ids (喂给模型)"]:::varBox
L["labels (算交叉熵用)"]:::varBox
R["response_mask (只对回答算分)"]:::varBox

%% 蓝框内部指向
Title1 --> I
Title1 --> L
Title1 --> R
end
%% 蓝框背景样式
style Stage1 fill:#e3f2fd,stroke:#90caf9,stroke-width:2px,rx:10px,ry:10px

%% 第二阶段:绿框
subgraph Stage2 [" "]
Title2["2. get_response_log_probs (前向传播与打分)"]:::textNode
E["compute_entropy (辅助函数)"]:::varBox

%% 绿框内部双向虚线
Title2 -.-> E
E -.-> Title2
end
%% 绿框背景样式
style Stage2 fill:#e8f5e9,stroke:#a5d6a7,stroke-width:2px,rx:10px,ry:10px

%% 中间过渡文字
LP["policy_log_probs (每个词的自信度)"]:::plainText

%% 第三阶段:紫框
subgraph Stage3 [" "]
Title3["3. sft_microbatch_train_step (计算最终Loss与反向传播)"]:::textNode
M["masked_normalize (辅助函数)"]:::varBox

%% 紫框内部单向虚线及文字
Title3 -.->|"(传入 sft_loss 和 response_mask)"| M
end
%% 紫框背景样式
style Stage3 fill:#f3e5f5,stroke:#ce93d8,stroke-width:2px,rx:10px,ry:10px

%% 底部反向传播
BW["Loss.backward() 算梯度"]:::plainText

%% 跨层级连线逻辑(组装流水线)
Data --> Title1
I --> Title2
L --> Title2
Title2 --> LP
LP --> Title3
R --> Title3
Title3 --> BW

sft数据准备

使用gsm8k数据集,r1_zero.prompt。

1
2
3
A conversation between User and Assistant. The User asks a question, and the Assistant solves it. The Assistant first thinks about the reasoning process in the mind and then provides the User with the answer. The reasoning process is enclosed within <think> </think> and answer is enclosed within <answer> </answer> tags, respectively, i.e., <think> reasoning process here </think> <answer> answer here </answer>.
User: {question}
Assistant: <think>

注意:提示词要和数据集格式吻合,原始数据集使用#### 分隔问题和答案,我们在提示词中使用了 <think><answer>标签来区分思考过程和最终答案,所以我们需要将原始数据集中的####替换为 </think><answer>,并在答案最后添加 </answer>标签。
原始数据集格式:

1
{"question": "Weng earns $12 an hour for babysitting. Yesterday, she just did 50 minutes of babysitting. How much did she earn?", "answer": "Weng earns 12/60 = $<<12/60=0.2>>0.2 per minute.\nWorking 50 minutes, she earned 0.2 x 50 = $<<0.2*50=10>>10.\n#### 10"}

处理后的数据集格式:

1
{"prompt": "A conversation between User and Assistant. The User asks a question, and the Assistant solves it. The Assistant first thinks about the reasoning process in the mind and then provides the User with the answer. The reasoning process is enclosed within <think> </think> and answer is enclosed within <answer> </answer> tags, respectively, i.e., <think> reasoning process here </think> <answer> answer here </answer>.\n\nUser: Weng earns $12 an hour for babysitting. Yesterday, she just did 50 minutes of babysitting. How much did she earn?\n\nAssistant: <think>", "output": "Weng earns 12/60 = $<<12/60=0.2>>0.2 per minute.\nWorking 50 minutes, she earned 0.2 x 50 = $<<0.2*50=10>>10. </think> <answer> 10 </answer>"}

关于数据集的处理,此处不再赘述,主要是使用python的字符串替换功能来完成的。

将数据集格式处理好之后,我们就可以使用pytorch的Dataset和DataLoader来加载数据了。在dataloader中,我们会使用之前定义的tokenize_prompt_and_output函数来将文本数据转换为模型可以处理的张量格式。这样我们就完成了sft数据的准备工作。

有关dataset和dataloader的使用,可以参考其他相关教程,此处不再展开讲解。

sft训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def train():
os.makedirs(OUTPUT_DIR, exist_ok=True)# exist_ok=True 的作用是:如果目标文件夹已经存在,不会抛出异常。如果为False(默认),当文件夹已存在时会抛出FileExistsError。
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%m/%d/%Y %H:%M:%S",
level=logging.INFO,
handlers=[
logging.FileHandler(os.path.join(OUTPUT_DIR, "training.log")),
logging.StreamHandler(sys.stdout)# 实时打印到控制台
]
)
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
# 使用 bfloat16 以获得更好的数值稳定性,防止 loss 出现 NaN
model = AutoModelForCausalLM.from_pretrained(MODEL_PATH, torch_dtype=torch.bfloat16).to(device)
model.gradient_checkpointing_enable()
train_dataset = SFTDataset(DATA_TRAIN_PATH)
data_collator = DataCollator(tokenizer)
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=data_collator)
optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
model.train()
optimizer.zero_grad()

global_step = 0
# 计算总的更新步数(考虑梯度累积)即梯度更新的实际步数,而不是迭代器的总步数
total_update_steps = max(1, math.ceil(len(train_dataloader) * EPOCHS / GRAD_ACCUM_STEPS))

warmup_steps = int(0.1 * total_update_steps) # 比如前 10% 步数预热

scheduler = get_cosine_schedule_with_warmup(
optimizer=optimizer,
num_warmup_steps=warmup_steps,
num_training_steps=total_update_steps
)

for epoch in range(EPOCHS):
for step, batch in enumerate(tqdm(train_dataloader)):
# 搬运数据
input_ids = batch['input_ids'].to(device)
labels = batch['labels'].to(device)
mask = batch['response_mask'].to(device)

log_probs = get_response_log_probs(model, input_ids, labels, return_token_entropy=False)["log_probs"]

loss,metrics = sft_microbatch_train_step(
policy_log_probs=log_probs,
response_mask=mask,
gradient_accumulation_steps=GRAD_ACCUM_STEPS,
)

should_step = ((step + 1) % GRAD_ACCUM_STEPS == 0) or ((step + 1) == len(train_dataloader))
if should_step:
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
scheduler.step()
optimizer.zero_grad()
global_step += 1



if global_step > 0 and global_step % 100 == 0:
logger.info(f"Loss: {metrics['loss']:.4f}")
save_path = os.path.join(OUTPUT_DIR, f"checkpoint-epoch-{epoch + 1}")
model.save_pretrained(save_path)
tokenizer.save_pretrained(save_path)
logger.info(f"Model checkpoint saved after epoch {epoch + 1} to {save_path}")

final_save_path = os.path.join(OUTPUT_DIR, "final")
model.save_pretrained(final_save_path)
tokenizer.save_pretrained(final_save_path)
logger.info(f"Final model saved to {final_save_path}")
return final_save_path

准备好以下超参数,路径和部件:

  • MODEL_PATH:预训练模型的路径
  • DATA_TRAIN_PATH:训练数据集的路径
  • BATCH_SIZE:每个批次的样本数量
  • EPOCHS:训练的轮数
  • GRAD_ACCUM_STEPS:梯度累积的步数
  • lr:学习率
  • OUTPUT_DIR:输出目录,用于保存日志和模型检查点
  • logger:日志记录器,用于记录训练过程中的信息
  1. 初始化好模型、分词器、数据集和优化器。
  2. 计算总的更新步数,并设置学习率调度器(scheduler),这里使用了余弦退火调度器,并且设置了预热步数(warmup_steps)。
  3. 进入训练循环,遍历每个epoch和每个批次:
    • 将输入数据搬运到设备上(GPU或CPU)。
    • 调用get_response_log_probs函数获取每个token的log-probs。
    • 调用sft_microbatch_train_step函数计算损失并进行反向传播。
    • 根据梯度累积的设置,决定何时进行优化器更新和学习率调度器更新。
    • 每100步记录一次当前的损失值。
    • 每个epoch结束后保存一次模型检查点。
  4. 训练完成后,保存最终的模型。

一些小细节:

  • 使用torch.nn.utils.clip_grad_norm_来进行梯度裁剪,防止梯度爆炸。
  • 使用gradient_checkpointing_enable()来启用梯度检查点,以节省显存。
    • 优点:可以节省大约 50% 到 70% 的激活值显存占用。这意味着你可以使用更大的 Batch Size,或者训练更长的序列(Sequence Length),甚至在单卡上微调原本放不下的模型。
    • 缺点:因为在反向传播时需要重新计算一部分前向传播,所以训练速度会变慢大约 20% 到 30%。
  • 模型和数据需要放在同一个设备上(通常是GPU),所以我们使用.to(device)来搬运数据。
  • 学习率调度器的设置:我们计算了总的更新步数,并设置了预热步数为总步数的10%。这样在训练的前10%步数内,学习率会逐渐从0增加到设定的学习率,然后在剩余的90%步数内,学习率会按照余弦退火的方式逐渐降低。
  • 在每个 Python 文件的开头,写上一句 logger = logging.getLogger(__name__)(用当前文件名作为 logger 的名字),然后在代码里统一使用 logger.info()。这样你的日志系统会非常清晰、可控。

sft评估

golden答案提取

评估使用gsm8k数据集的test部分,r1_zero.prompt作为提示词,由于测试集部分我们并没有处理成 <think><answer>的格式,所以我们需要在评估代码中进行一些适配,确保正确提取出“#### ”后面的答案部分进行评估。

1
2
3
4
5
6
ANS_RE = re.compile(r"####\s*([\-0-9\.,]+)")
def extract_reference_answer(answer: str) -> str:
match = ANS_RE.search(answer)
if match:
return match.group(1).strip().replace(",", "")
return "[invalid]"

group() 的用法
match.group(0)(或 match.group()):返回整个正则表达式匹配到的字符串。
例如,如果文本是 “The answer is #### 1,234.56” ,group(0) 会返回 “#### 1,234.56”。
match.group(1):返回第一个括号(即第一个捕获组)匹配到的字符串。
在上面的例子中,group(1) 会返回 “1,234.56”。

vllm推理

在进行推理之前,还是要把prompt和原测试集的question合并成完整的输入文本,然后使用vllm进行推理:

1
2
3
4
5
6
7
8
9
llm = LLM(model=model_path)
sampling_params = SamplingParams(
temperature=0.0,
max_tokens=1024,
stop=["</answer>"],
include_stop_str_in_output=True,
)
outputs = llm.generate(prompts, sampling_params)
responses = [o.outputs[0].text.strip() for o in outputs] # [0]指取出第一个回答,默认只生成一个回答

评估指标

将vllm推理后得到的答案与golden答案进行对比,计算准确率(accuracy),和格式正确率(format accuracy).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
for response, question, raw_answer in zip(responses, questions, answers):
gold = extract_reference_answer(raw_answer)
reward_dict = r1_zero_reward_fn(response, gold)

correct += int(reward_dict.get("answer_reward", 0.0) > 0)
format_ok += int(reward_dict.get("format_reward", 0.0) > 0)

eval_details.append({
"question": question,
"response": response,
"gold_answer": raw_answer,
"extracted_gold": gold,
**reward_dict,
})

总结

根据文档要求,我们分别取了128,256,512,1024以及全部训练数据进行训练,并在测试集上进行了评估。显然数据集约大,模型的表现越好,准确率和格式正确率也越高。
此外,作业还要求选取数据集里面答案和格式正确的过滤数据集进行对比,但是gms8k数据集的test部分本身就非常干净了,几乎没有格式错误或者答案错误的情况,所以我们就没有进行额外的过滤。不过结论是,如果数据集本身质量不高,进行过滤确实可以提升模型的表现,因为模型不会被错误的样本误导。

GRPO

核心函数

compute_group_normalized_rewards

组归一化奖励函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def compute_group_normalized_rewards(
reward_fn: Callable,
rollout_responses: list[str],
repeated_ground_truths: list[str],
group_size: int,
advantage_eps: float,
normalize_by_std: bool,
) -> tuple[torch.Tensor,torch.Tensor, dict[str, float]]:
"""
计算每组采样回答(rollout responses)的奖励,并根据组大小进行归一化。

关于 GRPO 的更多信息,请参阅:
DeepSeekMath: https://arxiv.org/abs/2402.03300
DeepSeek-R1: https://arxiv.org/abs/2501.12948

参数 (Args):
reward_fn: Callable[[str, str], dict[str, float]],
将采样回答与标准答案(ground truths)进行比对评分,
生成一个包含 "reward"(总奖励)、"format_reward"(格式奖励)
和 "answer_reward"(答案准确奖励)键的字典。
rollout_responses: list[str], 策略模型生成的采样回答列表。
该列表的长度为 `rollout_batch_size = n_prompts_per_rollout_batch * group_size`。
repeated_ground_truths: list[str], 样本的标准答案列表。
该列表的长度为 `rollout_batch_size`,因为每个样本的标准答案
都被重复了 `group_size` 次。
group_size: int, 每组的采样数量。
advantage_eps: float, 用于组归一化时防止除以零的 epsilon 值。
normalize_by_std: bool, 是否使用奖励的标准差(std)进行归一化。

返回 (Returns):
tuple[torch.Tensor, torch.Tensor, dict[str, float]]:
torch.Tensor, 形状为 (rollout_batch_size,):
每个采样回答的组归一化奖励(优势 Advantage)。
torch.Tensor, 形状为 (rollout_batch_size,):
每个采样回答的原始奖励。
dict[str, float]: 采样批次的奖励元数据。
你可以选择在此记录你希望记录的内容(例如奖励的统计数据等)。
"""
raw_rewards = []
for response, gt in zip(rollout_responses, repeated_ground_truths):
reward_info = reward_fn(response, gt)
raw_rewards.append(reward_info["reward"])
raw_rewards = torch.tensor(raw_rewards, dtype=torch.float32)
group_rewards = raw_rewards.view(-1, group_size)
# 求均值
group_means = group_rewards.mean(dim = -1,keepdim = True)
# 求标准差
group_std = group_rewards.std(dim = -1, keepdim = True)
# 计算优势
advantages = group_rewards - group_means
if normalize_by_std:
advantages = advantages / (group_std + advantage_eps)
advantages = advantages.view(-1)

meta_data = {
"mean_reward": raw_rewards.mean().item(),
"std_reward": raw_rewards.std().item(),
}

return advantages, raw_rewards, meta_data

单个样本优势公式如下:

$$
\hat{A}_i = \frac{r_i - \mu}{\sigma + \epsilon}
$$

normalize_by_std用来控制是否使用标准差进行归一化,如果设置为False,那么优势的计算公式就变成了:

$$
\hat{A}_i = r_i - \mu
$$

这样可以防止样本之间太相似导致标准差过小,从而使得优势值过大或者过小,影响训练的稳定性。

compute_naive_policy_gradient_loss

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def compute_naive_policy_gradient_loss(
raw_rewards_or_advantages: torch.Tensor,
policy_log_probs: torch.Tensor,
) -> torch.Tensor:
"""
使用原始奖励或优势(advantages)计算策略梯度损失。

参数 (Args):
raw_rewards_or_advantages: torch.Tensor, 形状为 (batch_size, 1):
每个采样回答的原始奖励或优势。
policy_log_probs: torch.Tensor, 形状为 (batch_size, sequence_length):
策略模型的对数概率。

返回 (Returns):
torch.Tensor, 形状为 (batch_size, sequence_length):
每个 token 的策略梯度损失。
"""
per_token_loss = -policy_log_probs * raw_rewards_or_advantages
return per_token_loss

获得token级别的loss。对于每个token,我们都有一个对应的log-probability(模型对这个token的自信度),以及一个对应的reward或者advantage(这个token的奖励)。根据策略梯度的原理,我们希望最大化奖励,所以损失函数应该是奖励的负数乘以log-probability。也就是说,如果某个token的奖励很高,并且模型对它非常自信(log-probability接近0),那么这个token的损失就会很小,反之,如果奖励很低或者模型不自信,那么这个token的损失就会很大。
$-A_t \cdot \log p_\theta(o_t | q, o_{<t})$

compute_grpo_clip_loss

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def compute_grpo_clip_loss(
advantages: torch.Tensor,
policy_log_probs: torch.Tensor,
old_log_probs: torch.Tensor,
cliprange: float,
) -> tuple[torch.Tensor, dict[str, torch.Tensor]]:
"""
计算 GRPO-Clip 损失。

参数 (Args):
advantages: torch.Tensor, 形状为 (batch_size, 1):
每个采样回答的优势。
policy_log_probs: torch.Tensor, 形状为 (batch_size, sequence_length):
当前策略模型的对数概率。
old_log_probs: torch.Tensor, 形状为 (batch_size, sequence_length):
旧策略模型的对数概率。
cliprange: float, 用于限制比率(ratio)的裁剪范围。

返回 (Returns):
tuple[torch.Tensor, dict[str, torch.Tensor]]:
torch.Tensor, 形状为 (batch_size, sequence_length):
每个 token 的 GRPO-Clip 损失。
dict[str, torch.Tensor]: GRPO-Clip 损失的元数据
(通常用于计算裁剪比例 clip fraction)。
"""
prob_ratio = torch.exp(policy_log_probs - old_log_probs)
uncilp = prob_ratio * advantages
clipped_ratio = torch.clamp(prob_ratio, 1.0 - cliprange, 1.0 + cliprange)
clipped = clipped_ratio * advantages
per_token_loss = -torch.min(uncilp, clipped)
metadata = {
"uncilp_mean": uncilp.mean(),
"clipped_mean": clipped.mean(),
}
return per_token_loss, metadata

ratio的计算公式如下:

$$
r_t(\theta) = \frac{\pi_\theta(o_t|q, o_{<t})}{\pi_{\theta_{old}}(o_t|q, o_{<t})}
$$

概率比率 (Probability Ratio):

  • $r_t(\theta)$$q$ 是你的 Prompt(比如输入的 Text)。
  • $o_t$ 是当前生成的第 $t$ 个 Token(比如 SQL 中的 SELECT)。
  • $\pi_\theta$ 是当前正在训练的模型生成这个 Token 的概率。
  • $\pi_{\theta_{old}}$ 是更新前(或采样时)的旧模型生成这个 Token 的概率。
  • 含义: 如果 $r_t(\theta) > 1$,说明新模型比旧模型更倾向于生成这个 Token;如果 $< 1$,说明新模型不那么倾向于生成它了。这在强化学习里叫重要性采样(Importance Sampling)。

$$
-\min \left( \frac{\pi_\theta(o_t|q, o_{<t})}{\pi_{\theta_{old}}(o_t|q, o_{<t})} A_t, \text{clip} \left( \frac{\pi_\theta(o_t|q, o_{<t})}{\pi_{\theta_{old}}(o_t|q, o_{<t})}, 1-\epsilon, 1+\epsilon \right) A_t \right)
$$

公式里使用了 $\min(未裁剪值, 裁剪值)$,这是一种 悲观约束(Pessimistic Bound) 。我们分两种情况来看它到底有多精妙:

场景 A:当 $A_t > 0$(这是一个好回答,模型表现不错)

既然是好回答,我们当然希望增加生成它的概率,即让 $r_t(\theta)$ 变大。

  • 如果 $r_t(\theta)$$1.0$ 涨到了 $1.15$:一切正常,没有触发裁剪,$1.15 \times A_t$
  • 如果模型过于兴奋,把概率比 $r_t(\theta)$ 暴涨到了 $1.5$:此时 Clip 机制介入,把它强行按回到 $1+\epsilon$(比如 $1.2$)。由于 $\min(1.5 \times A_t, 1.2 \times A_t) = 1.2 \times A_t$,最终计算 Loss 时 只奖励到 1.2 倍为止
  • 大白话: 既然你做对了,我给你糖吃。但为了防止你骄傲自满偏离轨道(Reward Hacking),即使你表现再好,我一次最多也就只奖励你 20% 的甜头。

场景 B:当 $A_t < 0$(这是一个烂回答,拖了全组后腿)

既然是烂回答,我们希望降低生成它的概率,即让 $r_t(\theta)$ 变小(趋近于 0)。

  • 如果 $r_t(\theta)$ 降低到了 $0.9$:正常惩罚。
  • 如果模型为了逃避惩罚,极度恐慌地把 $r_t(\theta)$ 降到了 $0.5$:Clip 机制介入,变为 $1-\epsilon$(比如 $0.8$)。由于 $A_t$ 是负数,$\min(0.5 \times A_t, 0.8 \times A_t)$ 的结果会选择那个 更小的值(也就是负得更多的值,即 $0.5 \times A_t$
  • 等等,这里好像有个陷阱? 没错!在标准的 PPO 中,$\min$ 函数确保了:如果你想过度偏离好的方向,我不给你额外的收益(用 Clip 截断);但如果你在烂的方向上走得太远,我会保留那个未裁剪的、更严厉的惩罚值(通过梯度让你赶紧滚回来)。

compute_policy_gradient_loss

这只是一个简单的包装函数,根据是否使用clip来选择计算哪种损失。是为了实验比较使用的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def compute_policy_gradient_loss(
policy_log_probs: torch.Tensor,
loss_type: str,
raw_rewards: torch.Tensor,
advantages: torch.Tensor,
old_log_probs: torch.Tensor,
cliprange: float,
) -> tuple[torch.Tensor, dict[str, torch.Tensor]]:
"""
一个包装器(wrapper),用于分发调用上述合适的策略梯度损失函数。
"""
# 原始奖励作为优势
if loss_type == "no_baseline":
return compute_naive_policy_gradient_loss(raw_rewards, policy_log_probs), {}
elif loss_type == "reinforce_with_baseline": # 组归一化后的优势
return compute_naive_policy_gradient_loss(advantages, policy_log_probs), {}
elif loss_type == "grpo_clip":# cilp后的优势
return compute_grpo_clip_loss(advantages, policy_log_probs, old_log_probs, cliprange)
else:
raise ValueError(f"Unsupported loss type: {loss_type}")

masked_mean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def masked_mean(tensor: torch.Tensor, mask: torch.Tensor, dim: int | None = None) -> torch.Tensor:
"""
计算张量沿指定维度的均值,仅考虑掩码值为 1 的元素。

参数 (Args):
tensor: torch.Tensor, 要计算均值的张量。
mask: torch.Tensor, 掩码。我们只计算掩码值为 1 的元素的均值。
dim: int | None, 要计算均值的维度。
如果为 None,则对所有非掩码元素求和并除以它们的总数。

返回 (Returns):
torch.Tensor, 张量沿指定维度的均值(仅考虑掩码值为 1 的元素)。
"""
masked_tensor= tensor*mask
if dim is None:
sum_masked = masked_tensor.sum()
count_masked = mask.sum()
else:
sum_masked = masked_tensor.sum(dim=dim)
count_masked = mask.sum(dim=dim)
mean_masked = sum_masked / count_masked
return mean_masked

grpo_microbatch_train_step

微批次训练步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def grpo_microbatch_train_step(
policy_log_probs: torch.Tensor,
response_mask: torch.Tensor,
gradient_accumulation_steps: int,
loss_type: Literal["no_baseline", "reinforce_with_baseline", "grpo_clip"],
raw_rewards: torch.Tensor | None = None,
advantages: torch.Tensor | None = None,
old_log_probs: torch.Tensor | None = None,
cliprange: float | None = None,
) -> tuple[torch.Tensor, dict[str, torch.Tensor]]:
"""
计算微批次(microbatch)的策略梯度损失并反向传播其梯度。

参数 (Args):
policy_log_probs: torch.Tensor, 形状为 (batch_size, sequence_length):
策略模型的对数概率。
response_mask: torch.Tensor, 形状为 (batch_size, sequence_length):
回答部分的掩码。
gradient_accumulation_steps: int, 梯度累积步数。
loss_type: Literal["no_baseline", "reinforce_with_baseline", "grpo_clip"],
要使用的损失函数类型。
raw_rewards: torch.Tensor | None, 每个采样回答的原始奖励。
当 loss_type="no_baseline" 时需要。
advantages: torch.Tensor | None, 每个采样回答的优势。
当 loss_type 为 "reinforce_with_baseline" 或 "grpo_clip" 时需要。
old_log_probs: torch.Tensor | None, 旧策略模型的对数概率。
当 loss_type="grpo_clip" 时需要。
cliprange: float | None, 比率的裁剪范围。
当 loss_type="grpo_clip" 时需要。
# 注意:此处源码参数列表中未显式定义 constant_normalize_factor,但在文档说明中有提及
# constant_normalize_factor: int | None, 如果我们想对序列维度求和
# 并按此常数因子归一化(如在 Dr. GRPO 中那样),则提供此参数。

返回 (Returns):
tuple[torch.Tensor, dict[str, torch.Tensor]]:
策略梯度损失及其元数据。
"""
per_token_loss, metadata = compute_policy_gradient_loss(
policy_log_probs=policy_log_probs,
loss_type=loss_type,
raw_rewards=raw_rewards,
advantages=advantages,
old_log_probs=old_log_probs,
cliprange=cliprange,
)
'''
Q:为什么我们先计算每个样本的平均损失(沿序列维度),然后再对批次求平均,而不是直接对所有 token 的损失求平均呢?
A:如果我们直接对所有 token 的损失求平均,那么每个 token 对最终贡献度是一样的,
这可能会导致训练不稳定,尤其是当序列长度变化较大时。通过先计算每个样本的平均损失,
我们确保每个样本对最终损失的贡献是均等的,无论它们的序列长度如何。这种方法可以提高训练的稳定性和效率。
'''
batch_loss = masked_mean(per_token_loss, response_mask, dim=1)
loss = batch_loss.mean()
backprop_loss = loss / gradient_accumulation_steps
if backprop_loss.requires_grad:
backprop_loss.backward()
return loss, metadata

GRPO训练

整个训练流程大概分为以下几步:

  1. vllm推理:使用vllm对训练数据进行推理,得到模型的回答。
  2. 奖励计算:使用奖励函数或者奖励模型对模型的回答进行打分,得到answer_reward和format_reward。
  3. 旧概率计算:得到旧模型在训练数据上的log-probs,作为后续计算概率比值的基础。
  4. GRPO微批次训练步骤:使用之前计算的奖励和旧概率,按照GRPO算法进行微批次的训练步骤,计算损失并进行反向传播。
  5. vllm权重更新

    作业中说明了不使用KL散度作为约束项,所以实际上实现的是一种特殊的GRPO

超参数详解:

  • rollout_batch_size:每次进行vllm推理的样本数量
  • group_size:每个问题生成几个回答,rollout_batch_size//group_size就是每次推理的实际问题数量
  • train_batch_size:控制一次GRPO更新的有效 batch
  • gradient_accumulation_steps:梯度累积的大小。微批次训练的步数micro_train_batch_size = train_batch_size // gradient_accumulation_steps
  • epoch_per_rollout_batch:每次vllm推理后,进行多少轮GRPO训练,=1时是on-policy,>1时是off-policy
  • cliprange:GRPO算法中的clip范围,控制概率比值的范围,防止过大或者过小导致训练不稳定