4S体系择时模块代码实现,根据指数估值分位,判定当前整体仓位档位。
4S 体系择时模块基于指数估值分位的仓位管理一、实际应用场景描述在量化投资中选股Stock Selection 和 择时Market Timing 是两件事。4S 体系不仅关注买什么同样关注什么时候买、买多少。核心问题即使选到了好股票如果在市场整体估值过高时满仓入场依然可能面临大幅回撤。典型场景场景 问题2015 年创业板 PE 突破 100 倍 好公司 贵市场 依然亏钱2018 年全市场估值底部 悲观情绪下不敢建仓错过底部牛市中段 不知道该加仓还是减仓震荡市 频繁调整仓位交易成本吞噬利润二、引入痛点痛点 表现 估值主观判断 贵不贵全凭感觉缺乏量化标准 仓位拍脑袋 满仓 / 半仓 / 空仓没有系统化依据 滞后性 看到大跌才减仓已经来不及 单一指标不可靠 只看 PE 或只看 PB容易误判 不同指数差异大 沪深 300、中证 500、创业板不能用同一把尺 均值回归周期不确定 估值均值回归可能需要 3~5 年三、核心逻辑讲解3.1 什么是指数估值分位估值分位Percentile 回答的问题是当前估值在历史所有交易日中排在什么位置估值分位 历史上估值低于当前值的天数 / 总天数 × 100%示例当前 PE 15 → 历史上 20% 的时间比这更低→ 估值分位 20% → 当前处于历史低位 → 可以重仓当前 PE 40 → 历史上 85% 的时间比这更低→ 估值分位 85% → 当前处于历史高位 → 应该轻仓3.2 为什么用分位而不是绝对值对比 绝对值如 PE 15 分位值如 PE 分位 30%跨指数适用 ❌ 沪深 300 和创业板 PE 中枢完全不同 ✅ 都映射到 0~100%可直接对比跨时段适用 ❌ 10 年前 PE 15 和今天 PE 15 含义不同 ✅ 分位自动适应市场结构变化极端值敏感 ❌ 一个极端值就能拉偏 ✅ 分位天然抗极端值3.3 4S 择时仓位映射┌─────────────────────────────────────────────────────┐│ 4S 择时仓位决策矩阵 │├─────────────────────────────────────────────────────┤│ ││ 估值分位区间 │ 市场状态 │ 建议仓位 ││ ───────────────────────────────────────────── ││ [0%, 20%) │ 极度低估 │ 90%~100% ││ [20%, 40%) │ 低估 │ 70%~90% ││ [40%, 60%) │ 合理 │ 50%~70% ││ [60%, 80%) │ 偏高 │ 30%~50% ││ [80%, 100%] │ 高估 │ 0%~30% ││ ││ ★ 核心原则估值越低仓位越重估值越高仓位越轻 ││ │└─────────────────────────────────────────────────────┘3.4 多指数加权实际使用中单一指数不够全面。我们采用多指数加权方式综合仓位 w1 × 沪深300分位 w2 × 中证500分位 w3 × 创业板指分位默认权重沪深30040%大盘蓝筹代表中证50030%中盘成长代表创业板指30%小盘成长代表3.5 信号平滑与滞后处理直接用分位切换仓位会导致频繁调仓。引入两个机制1. 分位区间缓冲带在边界 ±5% 范围内保持当前仓位不变2. 持仓再平衡频率限制最短 10 个交易日才允许调整一次四、项目结构timing_module/├── README.md├── requirements.txt├── config.yaml├── data/│ ├── index_pe_history.csv # 指数 PE 历史数据│ └── index_pb_history.csv # 指数 PB 历史数据├── src/│ ├── data_loader.py # 数据加载│ ├── valuation_calculator.py # ★ 估值分位计算│ ├── position_mapper.py # ★ 分位 → 仓位映射│ ├── signal_smoother.py # 信号平滑与频率控制│ ├── timing_engine.py # ★ 择时主引擎│ ├── backtester.py # 回测框架│ └── visualizer.py # 可视化├── main.py└── output/└── timing_signals.csv # 输出的择时信号五、完整代码requirements.txtpandas1.5numpy1.21matplotlib3.5seaborn0.12scipy1.9pyyaml6.0config.yaml# 4S 择时模块配置# 跟踪的指数indices:- name: 沪深300code: 000300weight: 0.4pe_col: pe_300pb_col: pb_300- name: 中证500code: 000905weight: 0.3pe_col: pe_500pb_col: pb_500- name: 创业板指code: 399006weight: 0.3pe_col: pe_cybpb_col: pb_cyb# ★ 仓位映射分位 → 仓位position_mapping:method: linear # linear / step线性插值 / 阶梯式min_position: 0.10 # 最低仓位 10%max_position: 0.95 # 最高仓位 95%# 阶梯式映射step 模式step_rules:- max_percentile: 20position: 0.90- max_percentile: 40position: 0.75- max_percentile: 60position: 0.60- max_percentile: 80position: 0.35- max_percentile: 100position: 0.15# 信号平滑smoothing:enabled: truebuffer_zone: 0.05 # 分位边界 ±5% 缓冲带min_rebalance_days: 10 # 最短 10 天调仓一次use_pb_fallback: true # PE 缺失时用 PB 替代# 回测backtest:start_date: 2015-01-01end_date: 2024-12-31initial_capital: 1000000output:path: output/timing_signals.csvsrc/data_loader.pydata_loader.py指数估值数据加载import pandas as pdimport numpy as npfrom pathlib import Pathdef load_index_valuation(filepath: str) - pd.DataFrame:加载指数历史估值数据预期格式:date,index_code,pe_ttm,pb,close2015-01-05,000300,13.5,1.85,3523.45...df pd.read_csv(filepath, parse_dates[date])df df.sort_values([index_code, date]).reset_index(dropTrue)return dfdef load_all_indices(directory: str) - pd.DataFrame:从目录加载所有指数文件并合并import globall_files glob.glob(f{directory}/*.csv)dfs []for f in all_files:df pd.read_csv(f, parse_dates[date])dfs.append(df)return pd.concat(dfs, ignore_indexTrue)def generate_mock_valuation_data(start: str 2015-01-01,end: str 2024-12-31,seed: int 42) - pd.DataFrame:生成模拟指数估值数据模拟逻辑- PE 围绕中枢波动叠加长期趋势和短期噪声- 2015 年中高估、2018 年底低估、2021 年初高估np.random.seed(seed)dates pd.date_range(start, end, freqB)indices [{code: 000300, name: 沪深300, pe_mean: 14.0, pe_std: 3.0},{code: 000905, name: 中证500, pe_mean: 22.0, pe_std: 5.0},{code: 399006, name: 创业板指, pe_mean: 35.0, pe_std: 10.0},]records []for idx_info in indices:code idx_info[code]pe_mean idx_info[pe_mean]pe_std idx_info[pe_std]n len(dates)# 长期趋势2015 高估 → 2018 低估 → 2021 高估 → 现在中性t np.arange(n) / ntrend 0.3 * np.sin(2 * np.pi * t * 3) 0.2 * np.sin(2 * np.pi * t * 1.5)noise np.random.normal(0, 0.15, n)pe_ratio pe_mean * (1 trend noise)pe_ratio np.clip(pe_ratio, pe_mean * 0.4, pe_mean * 2.5)pb pe_ratio / 12.0 * np.random.uniform(0.8, 1.2, n)close 3000 * np.cumprod(1 np.random.normal(0.0003, 0.015, n))for i, d in enumerate(dates):records.append({date: d,index_code: code,pe_ttm: round(pe_ratio[i], 2),pb: round(pb[i], 2),close: round(close[i], 2)})return pd.DataFrame(records)src/valuation_calculator.py★ 核心模块valuation_calculator.py★ 估值分位计算器核心功能1. 计算 PE / PB 的历史分位2. 支持滚动窗口如过去 5 年3. 缺失值处理向前填充 / PB 替代4. 多指数聚合import pandas as pdimport numpy as npfrom typing import Dict, List, Optional, Tupleimport logginglogging.basicConfig(levellogging.INFO, format%(asctime)s [%(levelname)s] %(message)s)logger logging.getLogger(__name__)class ValuationCalculator:★ 指数估值分位计算器核心方法- compute_percentile(pe, history) → 当前 PE 在历史上的分位- compute_rolling_percentile(data, window) → 滚动分位- composite_signal(pe_pct, pb_pct) → 综合估值信号def __init__(self,lookback_years: int 5,use_pb_fallback: bool True,min_history_days: int 252):参数:lookback_years: 回溯年数默认 5 年 ≈ 1260 个交易日use_pb_fallback: PE 缺失时是否用 PB 替代min_history_days: 最少需要多少天历史数据才计算分位self.lookback lookback_years * 252 # 年 → 交易日self.use_pb use_pb_fallbackself.min_days min_history_dayslogger.info(f估值计算器初始化: 回溯 {lookback_years} 年, fPB 兜底: {开 if use_pb_fallback else 关})def compute_percentile(self,current_value: float,history: pd.Series) - float:★ 核心方法计算当前值在历史上的分位参数:current_value: 当前 PE 或 PBhistory: 历史估值序列返回:分位值 (0~100)值越大表示当前越贵if pd.isna(current_value) or current_value 0:return np.nanhistory history.dropna()if len(history) self.min_days:logger.debug(f历史数据不足: {len(history)} {self.min_days})return np.nan# 分位 历史上低于当前值的天数 / 总天数pct (history current_value).sum() / len(history) * 100return round(pct, 2)def compute_rolling_percentile(self,valuation_data: pd.DataFrame,index_code: str,metric: str pe_ttm) - pd.Series:计算滚动分位序列参数:valuation_data: 估值数据含 date, index_code, pe_ttm, pb 等index_code: 指数代码metric: 用哪个指标pe_ttm 或 pb返回:Series: indexdate, value分位 (0~100)mask valuation_data[index_code] index_codeidx_data valuation_data[mask].sort_values(date).reset_index(dropTrue)if len(idx_data) self.min_days:logger.warning(f指数 {index_code} 数据不足 {self.min_days} 天)return pd.Series(dtypefloat)percentiles pd.Series(dtypefloat, indexidx_data[date])for i in range(len(idx_data)):current idx_data.iloc[i]current_date current[date]current_val current[metric]# 回溯窗口window_start i - self.lookbackif window_start 0:window_start 0hist idx_data.iloc[window_start:i 1][metric].dropna()pct self.compute_percentile(current_val, hist)percentiles[current_date] pctreturn percentilesdef composite_signal(self,pe_percentile: float,pb_percentile: float) - Tuple[float, str]:★ 综合 PE PB 分位输出最终估值信号参数:pe_percentile: PE 分位 (0~100)pb_percentile: PB 分位 (0~100)返回:(composite_pct, label)composite_pct: 综合分位 (0~100)label: 估值标签极度低估 / 低估 / 合理 / 偏高 / 高估# PE 优先PB 作为补充if pd.notna(pe_percentile):primary pe_percentilesource PEelif self.use_pb and pd.notna(pb_percentile):primary pb_percentilesource PBelse:return 50.0, 未知数据不足# 两者都有时取加权平均PE 权重更高if pd.notna(pe_percentile) and pd.notna(pb_percentile):composite pe_percentile * 0.7 pb_percentile * 0.3source PEPBelse:composite primary# 打标签label self._classify_valuation(composite)logger.debug(f估值信号: PE{pe_percentile}, PB{pb_percentile} f→ 综合{composite:.1f} ({label}, 来源{source}))return round(composite, 2), labeldef _classify_valuation(self, pct: float) - str:将分位映射为文字标签if pct 20:return 极度低估elif pct 40:return 低估elif pct 60:return 合理elif pct 80:return 偏高else:return 高估def batch_compute(self,valuation_data: pd.DataFrame,index_configs: List[Dict]) - pd.DataFrame:批量计算多指数的估值分位返回:DataFrame:date | index_code | pe_pct | pb_pct | composite_pct | labelall_results []for config in index_configs:code config[code]name config.get(name, code)logger.info(f 计算 {name}{code}估值分位...)pe_pcts self.compute_rolling_percentile(valuation_data, code, pe_ttm)pb_pcts self.compute_rolling_percentile(valuation_data, code, pb)for date in pe_pcts.index:pe pe_pcts[date]pb pb_pcts[date] if date in pb_pcts.index else np.nancomp, label self.composite_signal(pe, pb)all_results.append({date: date,index_code: code,index_name: name,pe_pct: pe,pb_pct: pb,composite_pct: comp,valuation_label: label})return pd.DataFrame(all_results)src/position_mapper.py★ 核心模块position_mapper.py★ 估值分位 → 仓位映射将 0~100 的分位值映射为建议仓位0%~100%import pandas as pdimport numpy as npfrom typing import Dict, List, Optionalimport logginglogger logging.getLogger(__name__)class PositionMapper:★ 分位 → 仓位映射器两种模式1. linear线性插值分位从 0→100 时仓位从 max→min 线性变化2. step阶梯式预设分位区间对应固定仓位def __init__(self,method: str step,min_position: float 0.10,max_position: float 0.95,step_rules: Optional[List[Dict]] None):参数:method: linear 或 stepmin_position: 最高估值分位时的最低仓位max_position: 最低估值分位时的最高仓位step_rules: 阶梯规则step 模式如:[{max_percentile: 20, position: 0.90},{max_percentile: 40, position: 0.75},...]self.method methodself.min_pos min_positionself.max_pos max_positionself.step_rules step_rules or [{max_percentile: 20, position: 0.90},{max_percentile: 40, position: 0.75},{max_percentile: 60, position: 0.60},{max_percentile: 80, position: 0.35},{max_percentile: 100, position: 0.15},]# 按 max_percentile 排序self.step_rules sorted(self.step_rules, keylambda x: x[max_percentile])logger.info(f仓位映射器: 模式{method}, 仓位范围 [{min_position*100:.0f}%, {max_position*100:.0f}%])if method step:logger.info( 阶梯规则:)for rule in self.step_rules:logger.info(f 分位 ≤ {rule[max_percentile]}% → 仓位 {rule[position]*100:.0f}%)def map_to_position(self, composite_pct: float) - float:★ 核心方法将综合分位映射为建议仓位参数:composite_pct: 综合估值分位 (0~100)返回:建议仓位 (0.0 ~ 1.0)if pd.isna(composite_pct):logger.warning(分位值为空返回中性仓位 50%)return 0.50if self.method linear:return self._linear_map(composite_pct)else:return self._step_map(composite_pct)def _linear_map(self, pct: float) - float:线性插值分位越高 → 仓位越低# 分位 0 → max_pos, 分位 100 → min_pospos self.max_pos - (pct / 100.0) * (self.max_pos - self.min_pos)return round(max(self.min_pos, min(self.max_pos, pos)), 4)def _step_map(self, pct: float) - float:阶梯映射找到第一个匹配的区间for rule in self.step_rules:if pct rule[max_percentile]:return rule[position]# 兜底return self.min_posdef map_dataframe(self,df: pd.DataFrame,pct_col: str composite_pct) - pd.DataFrame:对 DataFrame 批量映射新增列: suggested_positiondf df.copy()df[suggested_position] df[pct_col].apply(self.map_to_position)return dfsrc/signal_smoother.pysignal_smoother.py信号平滑缓冲带 调仓频率控制import pandas as pdimport numpy as npfrom typing import Optionalimport logginglogger logging.getLogger(__name__)class SignalSmoother:择时信号平滑器解决两个问题1. 分位在边界附近反复横跳 → 缓冲带2. 信号变化过于频繁 → 最小调仓间隔def __init__(self,buffer_zone: float 0.05,min_rebalance_days: int 10):参数:buffer_zone: 缓冲带宽度如 0.05 ±5% 分位min_rebalance_days: 最短调仓间隔交易日self.buffer buffer_zoneself.min_days min_rebalance_daysself.last_rebalance_date Noneself.last_position Nonelogger.info(f信号平滑器: 缓冲带±{buffer_zone*100:.0f}%, f最小调仓间隔 {min_rebalance_days} 天)def smooth(self,date: pd.Timestamp,current_pct: float,raw_position: float,index_code: str ) - Tuple[float, bool, str]:★ 核心方法平滑处理参数:date: 当前日期current_pct: 当前估值分位raw_position: 原始映射仓位index_code: 指数代码用于日志返回:(final_position, changed, reason)if self.last_position is None:# 首次调用直接采用self.last_position raw_positionself.last_rebalance_date datereturn raw_position, True, 首次建仓# 检查 1缓冲带 # 分位变化是否在缓冲带内pct_change abs(current_pct - getattr(self, _last_pct, current_pct))if pct_change self.buffer * 100:# 在缓冲带内保持上次仓位logger.debug(f[{date.strftime(%Y-%m-%d)}] {index_code} f分位变化 {pct_change:.1f}% 缓冲带 {self.buffer*100:.0f}%保持仓位)self._last_pct current_pctreturn self.last_position, False, f缓冲带内变化{pct_change:.1f}%# 检查 2最小调仓间隔 if self.last_rebalance_date is not None:days_since (date - self.last_rebalance_date).daysif days_since self.min_days:logger.debug(f[{date.strftime(%Y-%m-%d)}] {index_code} f距上次调仓仅 {days_since} 天 {self.min_days} 天保持仓位)self._last_pct current_pctreturn self.last_position, False, f调仓间隔不足{days_since}天# 通过所有检查执行调仓 old_pos self.last_positionself.last_position raw_positionself.last_rebalance_date dateself._last_pct current_pctchange_pct (raw_position - old_pos) * 100direction 加仓 if change_pct 0 else 减仓logger.info(f[{date.strftime(%Y-%m-%d)}] {index_code} f{direction}: {old_pos*100:.0f}% → {raw_position*100:.0f}% f分位 {current_pct:.1f}%)return raw_position, True, f{direction}分位变化{pct_change:.1f}%def smooth_dataframe(self,df: pd.DataFrame,pct_col: str composite_pct,pos_col: str suggested_position) - pd.DataFrame:对 DataFrame 批量平滑按日期升序处理df df.sort_values(date).reset_index(dropTrue)# 重置状态self.last_rebalance_date Noneself.last_position Noneresults []for _, row in df.iterrows():date row[date]pct row[pct_col]raw_pos row[pos_col]code row.get(index_code, )final_pos, changed, reason self.smooth(date, pct, raw_pos, code)r row.to_dict()r[final_position] final_posr[position_changed] changedr[change_reason] reasonresults.append(r)return pd.DataFrame(results)src/timing_engine.py★ 主引擎timing_engine.py★ 4S 择时主引擎串联估值分位计算 → 仓位映射 → 信号平滑 → 输出import pandas as pdimport numpy as npfrom src.valuation_calculator import ValuationCalculatorfrom src.position_mapper import PositionMapperfrom src.signal_smoother import SignalSmootherfrom typing import Dict, List, Optionalimport logginglogging.basicConfig(levellogging.INFO, format%(asctime)s [%(levelname)s] %(message)s)logger logging.getLogger(__name__)class TimingEngine:★ 4S 择时引擎完整流程1. 加载指数估值数据2. 计算各指数 PE / PB 历史分位3. 加权合成综合分位4. 映射到建议仓位5. 信号平滑缓冲带 调仓频率控制6. 输出最终择时信号de本文代码仅供学习与技术交流不构成任何投资建议股市有风险入市需谨慎利用AI解决实际问题如果你觉得这个工具好用欢迎关注长安牧笛