AMiner Research Labs公测,使用Google NotebookLM交互范式,新增「代码」工具,可一键复现算法论文框架及可供测试使用的伪代码
给出包含“核心贡献识别”、“关键公式”、“函数依赖关系”、“代码结构设计”等部分详尽的算法解析报告。比如说《LLaMA-MoE: Building Mixture-of-Experts from LLaMA with Continual Pre-training》这篇讲基于现有密集型大语言模型构建Mixture-of-Experts (MoE)模型的方法。选择这篇论文无需指令直接使用代码工具抽取即可得到该篇论文的算法报告下载源码即可得到可直接运行足以满足测试需求的伪代码——源码支持 Python, MATLAB, C等运行环境算法复现实验源码包含注释的完整代码段Data Loader, Model, Training Loop等还提供 requirements.txt或依赖库说明。供实验复现的伪代码如下 算法实现LLaMA-MoE 构建与推理 论文LLaMA-MoE: Building Mixture-of-Experts from LLaMA 核心贡献提出首个从现有decoder-only密集大语言模型LLaMA-2构建混合专家模型MoE的框架 通过分割FFN参数并持续预训练在保持模型性能的同时大幅降低训练成本。 实现说明 - 单文件实现所有代码在一个 .py 文件中 - 完整实现论文中的核心公式 (1)-(6) - 每个函数都标注对应的论文公式编号 - 包含详细注释说明公式含义和实现细节 - 使用 NumPy 实现张量运算便于理解和验证逻辑 import numpy as np from dataclasses import dataclass from typing import List, Tuple, Optional # 数据结构定义 dataclass class MoEConfig: MoE 模型超参数配置 对应论文中的参数设置 d_model: int 4096 # 模型维度 d d_hidden: int 11008 # FFN隐藏层维度 d_h (LLaMA通常为 8/3 * d_model) num_experts: int 8 # 专家总数 N top_k: int 2 # 激活的专家数 k batch_size: int 2 # 批次大小 seq_len: int 1024 # 序列长度 dataclass class ExpertWeights: 单个专家的权重集合 W_up: np.ndarray # shape(d, m) W_gate: np.ndarray # shape(d, m) W_down: np.ndarray # shape(m, d) # 核心公式函数 def llama_ffn_forward( x: np.ndarray, W_up: np.ndarray, W_gate: np.ndarray, W_down: np.ndarray ) - np.ndarray: 计算原始 LLaMA 的 FFN 输出 对应论文公式 (1): $$y h W_{\mathrm{down}}, \quad h x W_{\mathrm{up}} \odot \mathrm{Swish}(x W_{\mathrm{gate}})$$ 实现说明 - Swish 激活函数定义为 $x \cdot \text{sigmoid}(x)$ - 使用矩阵乘法 () 进行投影 - 使用逐元素乘法 (*) 进行门控融合 Args: x: 输入向量, shape(batch_size, seq_len, d) W_up: 上投影权重矩阵, shape(d, d_h) W_gate: 门控投影权重矩阵, shape(d, d_h) W_down: 下投影权重矩阵, shape(d_h, d) Returns: y: FFN层输出, shape(batch_size, seq_len, d) # 计算门控分支: x W_gate gate_out x W_gate # 计算 Swish 激活: gate_out * sigmoid(gate_out) swish_act gate_out * (1.0 / (1.0 np.exp(-gate_out))) # 计算上投影分支: x W_up up_out x W_up # 逐元素乘法 (Hadamard product): h up_out \odot swish_act h up_out * swish_act # 下投影: y h W_down y h W_down return y def moe_top_k_aggregate( expert_outputs: np.ndarray, gate_weights: np.ndarray, top_k_indices: np.ndarray ) - np.ndarray: MoE 层的 Top-k 聚合输出 对应论文公式 (2): $$y \sum_{i \in \kappa} G(x)_i \cdot E_i(x)$$ 实现说明 - 利用高级索引从所有专家输出中提取 Top-k 专家的输出 - 利用高级索引提取对应的门控权重 - 进行加权求和得到最终输出 Args: expert_outputs: 所有专家的输出, shape(batch_size, seq_len, num_experts, d) gate_weights: 门控网络的权重, shape(batch_size, seq_len, num_experts) top_k_indices: 被选中的专家索引, shape(batch_size, seq_len, top_k) Returns: y: MoE层聚合后的输出, shape(batch_size, seq_len, d) batch_size, seq_len, top_k top_k_indices.shape d_model expert_outputs.shape[-1] # 构建索引网格用于从 expert_outputs 中提取数据 # batch_indices: (batch_size, 1, 1) - 广播到 (batch_size, seq_len, top_k) batch_indices np.arange(batch_size)[:, None, None] # seq_indices: (1, seq_len, 1) - 广播到 (batch_size, seq_len, top_k) seq_indices np.arange(seq_len)[None, :, None] # 1. 选出 Top-k 专家的输出 # expert_outputs[batch_indices, seq_indices, top_k_indices, :] # 结果形状: (batch_size, seq_len, top_k, d) selected_expert_outputs expert_outputs[batch_indices, seq_indices, top_k_indices, :] # 2. 选出 Top-k 专家对应的权重 # gate_weights[batch_indices, seq_indices, top_k_indices] # 结果形状: (batch_size, seq_len, top_k) selected_gate_weights gate_weights[batch_indices, seq_indices, top_k_indices] # 3. 加权求和 # 将权重维度扩展以便广播: (batch_size, seq_len, top_k, 1) weights_expanded selected_gate_weights[..., None] # 加权: (batch_size, seq_len, top_k, d) weighted_outputs selected_expert_outputs * weights_expanded # 在 top_k 维度上求和: (batch_size, seq_len, d) y np.sum(weighted_outputs, axis2) return y def slice_expert_weights( W_up: np.ndarray, W_gate: np.ndarray, W_down: np.ndarray, neuron_indices: List[int] ) - Tuple[np.ndarray, np.ndarray, np.ndarray]: 专家权重的分割 对应论文公式 (3): $$W_{\mathrm{up}}^{(j)} W_{\mathrm{up}[:, S_j]}, \quad W_{\mathrm{gate}}^{(j)} W_{\mathrm{gate}[:, S_j]}, \quad W_{\mathrm{down}}^{(j)} W_{\mathrm{down}[S_j, :]}$$ 实现说明 - W_up 和 W_gate 对列维度 d_h进行切片 - W_down 对行维度 d_h进行切片 - neuron_indices 是第 j 个专家分配的神经元索引集合 $S_j$ Args: W_up: 原始上投影权重, shape(d, d_h) W_gate: 原始门控权重, shape(d, d_h) W_down: 原始下投影权重, shape(d_h, d) neuron_indices: 第j个专家分配的神经元索引集合 Returns: expert_weights: 包含 (W_up_j, W_gate_j, W_down_j) 的元组 # 将列表转换为 numpy 数组以便索引 indices np.array(neuron_indices) # 切片操作 W_up_j W_up[:, indices] W_gate_j W_gate[:, indices] W_down_j W_down[indices, :] return W_up_j, W_gate_j, W_down_j def expert_forward( x: np.ndarray, W_up_j: np.ndarray, W_gate_j: np.ndarray, W_down_j: np.ndarray ) - np.ndarray: 单个专家的输出计算 对应论文公式 (4): $$E_j(x) h_j W_{\mathrm{down}}^{(j)}, \quad h_j x W_{\mathrm{up}}^{(j)} \odot \mathrm{Swish}(x W_{\mathrm{gate}}^{(j)})$$ 实现说明 - 逻辑与公式1完全一致只是使用了分割后的子权重矩阵 - 直接复用 llama_ffn_forward 的逻辑 Args: x: 输入向量, shape(batch_size, seq_len, d) W_up_j: 第j个专家的上投影权重, shape(d, m) W_gate_j: 第j个专家的门控权重, shape(d, m) W_down_j: 第j个专家的下投影权重, shape(m, d) Returns: output: 第j个专家的输出, shape(batch_size, seq_len, d) return llama_ffn_forward(x, W_up_j, W_gate_j, W_down_j) def compute_neuron_importance( hidden_states: np.ndarray, grad_loss: np.ndarray ) - np.ndarray: 神经元重要性计算用于 Neuron-Sharing 方法 对应论文公式 (5): $$v : v \sum_{(x,y) \in D} \left| h \odot \nabla_h L(x,y) \right|$$ 实现说明 - 计算隐藏状态与梯度的逐元素乘积 - 取绝对值 - 在 batch 和 seq_len 维度上进行求和 Args: hidden_states: 中间隐藏状态 h, shape(batch_size, seq_len, d_h) grad_loss: 损失对隐藏状态的梯度, shape(batch_size, seq_len, d_h) Returns: importance_score: 重要性增量, shape(d_h,) # 逐元素乘积 product hidden_states * grad_loss # 绝对值 abs_product np.abs(product) # 在 batch (dim 0) 和 seq_len (dim 1) 上求和 importance_score np.sum(abs_product, axis(0, 1)) return importance_score def calculate_moe_scale_factor( num_experts: int, top_k: int ) - float: 专家输出重缩放因子计算 对应论文公式 (6): $$\text{scale factor} \frac{N}{k}$$ 实现说明 - 简单的除法运算 - 用于平衡专家网络的输出幅度 Args: num_experts: 专家总数 N top_k: 激活的专家数 k Returns: scale: 缩放因子 return float(num_experts) / float(top_k) # 主算法类 class LLaMAMoE: LLaMA-MoE 实现类 核心思想 - 从密集的 LLaMA FFN 层构建 MoE 层 - 支持通过 IndependentRandom 策略分割神经元 - 实现 Top-k 路由和聚合 与基线对比 - 基线原始密集 LLaMA FFN - 改进将 FFN 参数分割为多个专家通过门控网络激活部分专家 def __init__(self, config: MoEConfig): 初始化 LLaMA-MoE 模型 Args: config: 模型配置 self.config config # 初始化密集权重 (模拟从预训练模型加载) # 实际应用中这些权重来自 LLaMA checkpoint np.random.seed(42) self.W_up np.random.randn(config.d_model, config.d_hidden) * 0.01 self.W_gate np.random.randn(config.d_model, config.d_hidden) * 0.01 self.W_down np.random.randn(config.d_hidden, config.d_model) * 0.01 # 存储专家权重列表 self.experts: List[ExpertWeights] [] # 构建专家 self._build_experts() # 计算缩放因子 self.scale_factor calculate_moe_scale_factor( config.num_experts, config.top_k ) def _build_experts(self): 构建专家 (Expert Construction 阶段) 使用 IndependentRandom 策略随机将 d_h 个神经元索引分成 n 个等长子集 d_h self.config.d_hidden n self.config.num_experts m d_h // n # 每个专家的神经元数 # 生成随机索引并打乱 indices np.arange(d_h) np.random.shuffle(indices) # 分割索引给每个专家 for i in range(n): start_idx i * m end_idx (i 1) * m expert_indices indices[start_idx:end_idx] # 使用公式3分割权重 W_up_j, W_gate_j, W_down_j slice_expert_weights( self.W_up, self.W_gate, self.W_down, expert_indices ) self.experts.append(ExpertWeights(W_up_j, W_gate_j, W_down_j)) print(f成功构建 {n} 个专家每个专家拥有 {m} 个神经元。) def forward( self, x: np.ndarray, gate_scores: Optional[np.ndarray] None ) - Tuple[np.ndarray, dict]: MoE 前向传播 Args: x: 输入张量, shape(batch_size, seq_len, d_model) gate_scores: 可选的门控网络得分如果为 None 则随机生成用于演示 Returns: output: MoE 层输出, shape(batch_size, seq_len, d_model) aux_info: 包含中间信息的字典 (用于调试或分析) batch_size, seq_len, d_model x.shape num_experts self.config.num_experts top_k self.config.top_k # 1. 模拟门控网络 # 实际中这里是一个线性层 Softmax if gate_scores is None: # 随机生成 logits gate_logits np.random.randn(batch_size, seq_len, num_experts) else: gate_logits gate_scores # Softmax 归一化 # 减去最大值以保持数值稳定性 max_logits np.max(gate_logits, axis-1, keepdimsTrue) exp_logits np.exp(gate_logits - max_logits) sum_exp np.sum(exp_logits, axis-1, keepdimsTrue) gate_weights exp_logits / (sum_exp 1e-9) # 2. 选择 Top-k 专家 # 获取分数最高的 k 个专家的索引 # np.argpartition 返回的是未排序的索引这里为了简单直接用 argsort top_k_indices np.argsort(gate_weights, axis-1)[..., -top_k:] # 3. 计算所有专家的输出 # 注意为了符合公式2的输入格式这里计算所有专家的输出 # 在实际高效实现中可以只计算被选中的专家 all_expert_outputs np.zeros( (batch_size, seq_len, num_experts, d_model) ) for i, expert in enumerate(self.experts): # 使用公式4计算单个专家输出 expert_out expert_forward( x, expert.W_up, expert.W_gate, expert.W_down ) all_expert_outputs[:, :, i, :] expert_out # 4. 聚合输出 (使用公式2) y moe_top_k_aggregate(all_expert_outputs, gate_weights, top_k_indices) # 5. 应用重缩放因子 (使用公式6) y y * self.scale_factor aux_info { gate_weights: gate_weights, top_k_indices: top_k_indices, scale_factor: self.scale_factor } return y, aux_info def analyze_neuron_importance( self, x: np.ndarray, grad_loss: np.ndarray ) - np.ndarray: 分析神经元重要性 (用于 Neuron-Sharing 策略) 演示公式5的使用。 注意这里需要获取中间隐藏状态 h。 为了演示我们假设 grad_loss 是对 h 的梯度。 在实际训练中这需要反向传播或 hook 机制。 # 这里为了演示我们重新计算密集 FFN 的 h # h x W_up \odot Swish(x W_gate) gate_out x self.W_gate swish_act gate_out * (1.0 / (1.0 np.exp(-gate_out))) up_out x self.W_up h up_out * swish_act # 使用公式5计算重要性 importance compute_neuron_importance(h, grad_loss) return importance # 使用示例 def main(): 使用示例 print( * 60) print(LLaMA-MoE 算法实现演示) print( * 60) # 1. 创建配置 (使用较小的维度以便快速演示) config MoEConfig( d_model512, # 较小的模型维度 d_hidden2048, # FFN 维度 (4x d_model) num_experts4, # 4个专家 top_k2, # 激活2个专家 batch_size2, seq_len128 ) print(f\n模型配置:) print(f 模型维度 (d): {config.d_model}) print(f 隐藏层维度 (d_h): {config.d_hidden}) print(f 专家数量 (N): {config.num_experts}) print(f Top-k (k): {config.top_k}) # 2. 初始化模型 model LLaMAMoE(config) # 3. 生成随机输入数据 np.random.seed(42) x np.random.randn(config.batch_size, config.seq_len, config.d_model).astype(np.float32) print(f\n输入数据 shape: {x.shape}) # 4. 执行前向传播 print(\n执行 MoE 前向传播...) output, aux_info model.forward(x) print(f输出数据 shape: {output.shape}) print(f使用的缩放因子: {aux_info[scale_factor]:.2f}) # 打印第一个 token 的路由情况 print(f\n第一个样本第一个 Token 的路由权重 (Top-{config.top_k}):) top_k_indices aux_info[top_k_indices][