TradingView-Screener 深度解析Python 量化交易数据获取技术实现【免费下载链接】TradingView-ScreenerA package that lets you create TradingView screeners in Python项目地址: https://gitcode.com/gh_mirrors/tr/TradingView-ScreenerTradingView-Screener 是一个专业的 Python 量化交易数据获取库通过封装 TradingView 官方 API 实现了高效、灵活的股票筛选器构建功能。该库支持全球 70 多个国家的股票市场、加密货币、外汇、期货、债券等多种金融资产提供超过 3000 个数据字段的实时获取能力是量化交易者和数据分析师进行市场数据获取的重要工具。技术架构概览与核心设计理念模块化架构设计TradingView-Screener 采用高度模块化的设计架构将不同功能分离到独立的模块中确保了代码的可维护性和可扩展性。核心模块包括查询构建模块 (query.py)- 负责构建和发送 API 请求字段操作模块 (column.py)- 提供类型安全的字段操作接口数据模型模块 (models.py)- 定义类型化的数据结构筛选器工厂模块 (screeners.py)- 提供特定市场的预定义筛选器类型安全的数据模型项目采用 Python 的类型提示系统通过 TypedDict 定义了完整的 API 响应数据结构。这种设计不仅提高了代码的可读性还能够在开发阶段捕获潜在的类型错误from typing import TypedDict, Literal class FilterOperationDict(TypedDict): left: str operation: Literal[ greater, egreater, less, eless, equal, nequal, in_range, not_in_range, empty, nempty, crosses, crosses_above, crosses_below, match, nmatch, smatch, has, has_none_of, above%, below%, in_range%, not_in_range%, in_day_range, in_week_range, in_month_range ] right: Any异步数据获取机制虽然当前版本主要使用同步请求但架构设计为异步扩展预留了空间。API 请求层抽象了网络通信细节使得未来可以轻松实现异步数据获取class Query: def __init__(self, market: str america) - None: self._market market self._columns: list[str] [] self._filter: list[FilterOperationDict] [] self._sort: SortByDict | None None self._range: list[int] DEFAULT_RANGE.copy()核心模块解析与实现细节查询构建器模式Query 类实现了流畅的构建器模式允许链式调用构建复杂的筛选条件。这种设计模式极大地提高了代码的可读性和易用性from tradingview_screener import Query, col # 构建复杂的筛选查询 query (Query() .select(name, close, volume, market_cap_basic) .where( col(market_cap_basic).between(1_000_000_000, 100_000_000_000), col(relative_volume_10d_calc) 1.5, col(price_earnings_ttm) 25 ) .order_by(volume, ascendingFalse) .limit(100))字段操作系统的技术实现Column 类封装了所有可用的字段操作提供了类型安全的比较和逻辑运算接口。每个操作都映射到 TradingView API 的特定操作符class Column: def __init__(self, name: str) - None: self.name name def __gt__(self, other) - FilterOperationDict: return {left: self.name, operation: greater, right: other} def between(self, left, right) - FilterOperationDict: return {left: self.name, operation: in_range, right: [left, right]} def has(self, values: str | list[str]) - FilterOperationDict: return {left: self.name, operation: has, right: values}多市场筛选器工厂screeners.py 模块提供了针对不同市场的预配置筛选器简化了常见用例的使用def stocks(market: str america) - Query: 获取指定市场的股票筛选器 query Query(market) query._query[markets] [market] return query def crypto() - Query: 获取中心化交易所的加密货币筛选器 query Query(crypto) query._query[markets] [crypto] return query def options(underlying: str) - Query: 获取特定标的资产的期权链筛选器 query Query(america) query._query[markets] [america] query._query[symbols] {query: {types: [option]}, tickers: []} query._query[options][underlying] underlying return query数据流设计与网络通信机制API 请求构建流程当调用get_scanner_data()方法时系统会执行以下数据流查询参数构建- 将用户指定的筛选条件转换为 TradingView API 要求的 JSON 格式HTTP 请求发送- 使用 requests 库发送 POST 请求到 TradingView 的扫描器端点响应解析- 解析 JSON 响应并转换为 pandas DataFrame数据格式化- 对特殊字段进行格式化处理如技术评级请求载荷结构分析TradingView-Screener 生成的请求载荷遵循 TradingView 官方 API 的结构规范{ markets: [america], symbols: {query: {types: []}, tickers: []}, options: {lang: en}, columns: [name, close, volume, market_cap_basic], sort: {sortBy: volume, sortOrder: desc}, range: [0, 50], filter: [ { left: market_cap_basic, operation: in_range, right: [1000000000, 100000000000] } ] }错误处理与重试机制系统实现了完善的错误处理机制包括网络超时、API 限制、数据格式错误等情况的处理def get_scanner_data(self, **kwargs) - tuple[int, pd.DataFrame]: 获取扫描器数据包含错误处理和重试逻辑 try: response requests.post( URL.format(marketself._market), headersHEADERS, data{filter: json.dumps(self._build_query())}, cookieskwargs.get(cookies), timeoutkwargs.get(timeout, 30) ) response.raise_for_status() data response.json() # 解析响应数据 total_count data.get(totalCount, 0) df pd.DataFrame(data[data]) return total_count, df except requests.exceptions.Timeout: raise TimeoutError(请求超时请检查网络连接) except requests.exceptions.HTTPError as e: if e.response.status_code 429: raise RateLimitError(API 请求频率限制请稍后重试) raise性能优化策略与最佳实践批量请求优化对于需要获取大量数据的场景建议使用分页机制和适当的请求间隔from time import sleep from tradingview_screener import Query def fetch_large_dataset(total_limit: int 1000, batch_size: int 100): 分批获取大量数据避免触发 API 限制 all_data [] for offset in range(0, total_limit, batch_size): query Query().select(name, close, volume).limit(batch_size).offset(offset) total_count, batch_data query.get_scanner_data() all_data.append(batch_data) # 避免请求过于频繁 if offset batch_size total_limit: sleep(1) # 1秒间隔 return pd.concat(all_data, ignore_indexTrue)缓存策略实现对于不频繁变化的数据可以实现本地缓存机制import pickle import hashlib from pathlib import Path class CachedQuery: def __init__(self, cache_dir: str ./cache): self.cache_dir Path(cache_dir) self.cache_dir.mkdir(exist_okTrue) def get_data(self, query: Query, cache_ttl: int 300) - tuple[int, pd.DataFrame]: 获取数据支持缓存机制 query_hash hashlib.md5(str(query).encode()).hexdigest() cache_file self.cache_dir / f{query_hash}.pkl # 检查缓存是否有效 if cache_file.exists(): cache_time cache_file.stat().st_mtime if time.time() - cache_time cache_ttl: with open(cache_file, rb) as f: return pickle.load(f) # 获取新数据并缓存 result query.get_scanner_data() with open(cache_file, wb) as f: pickle.dump(result, f) return result并发处理优化虽然 TradingView API 可能有并发限制但对于独立的市场查询可以使用并发处理import concurrent.futures from tradingview_screener import stocks def fetch_multiple_markets(markets: list[str]): 并发获取多个市场的数据 results {} with concurrent.futures.ThreadPoolExecutor(max_workers3) as executor: future_to_market { executor.submit(stocks(market).limit(50).get_scanner_data): market for market in markets } for future in concurrent.futures.as_completed(future_to_market): market future_to_market[future] try: results[market] future.result() except Exception as e: print(f获取 {market} 数据失败: {e}) return results扩展集成方案与生态系统与 Pandas 深度集成TradingView-Screener 天然支持 pandas DataFrame可以无缝集成到现有的数据分析工作流中import pandas as pd import numpy as np from tradingview_screener import Query # 获取数据并直接进行数据分析 total_count, df Query().select(name, close, volume, market_cap_basic).get_scanner_data() # 数据清洗和转换 df[volume_millions] df[volume] / 1_000_000 df[market_cap_category] pd.cut( df[market_cap_basic], bins[0, 2e9, 10e9, 200e9, np.inf], labels[小型股, 中型股, 大型股, 超大型股] ) # 统计分析 summary_stats df.groupby(market_cap_category).agg({ close: [mean, std, min, max], volume: sum })机器学习集成示例结合 scikit-learn 进行机器学习分析from sklearn.preprocessing import StandardScaler from sklearn.cluster import KMeans from tradingview_screener import Query # 获取技术指标数据 _, df (Query() .select(RSI, MACD.macd, MACD.signal, BB.upper, BB.lower, close) .limit(500) .get_scanner_data()) # 数据预处理 features df[[RSI, MACD.macd, MACD.signal]].fillna(0) scaler StandardScaler() scaled_features scaler.fit_transform(features) # 聚类分析 kmeans KMeans(n_clusters3, random_state42) df[cluster] kmeans.fit_predict(scaled_features) # 分析每个簇的特征 cluster_stats df.groupby(cluster).agg({ close: mean, RSI: [mean, std], volume: sum })实时监控系统集成构建实时市场监控系统import schedule import time from datetime import datetime from tradingview_screener import Query, col class MarketMonitor: def __init__(self, alert_conditions: dict): self.alert_conditions alert_conditions self.previous_data None def monitor_market(self): 监控市场并触发警报 current_time datetime.now() print(f[{current_time}] 开始市场监控...) # 获取当前市场数据 _, current_data (Query() .select(name, close, change, volume, RSI) .where(col(volume) 1000000) .limit(100) .get_scanner_data()) # 检测异常波动 if self.previous_data is not None: merged current_data.merge( self.previous_data[[name, close]], onname, suffixes(_current, _previous) ) merged[price_change_pct] ( (merged[close_current] - merged[close_previous]) / merged[close_previous] * 100 ) # 触发警报条件 alerts merged[merged[price_change_pct].abs() 5] if not alerts.empty: self.send_alerts(alerts) self.previous_data current_data def send_alerts(self, alert_data): 发送警报通知 for _, row in alert_data.iterrows(): message ( f警报: {row[name]} 价格波动 {row[price_change_pct]:.2f}%\n f当前价格: {row[close_current]}, RSI: {row[RSI]} ) print(message) # 这里可以集成邮件、短信、Webhook等通知方式 # 定时执行监控 monitor MarketMonitor({}) schedule.every(5).minutes.do(monitor.monitor_market) while True: schedule.run_pending() time.sleep(1)实战应用案例量化策略回测系统技术指标筛选策略以下是一个完整的技术指标筛选策略实现示例from typing import List, Tuple from dataclasses import dataclass from tradingview_screener import Query, col dataclass class TechnicalSignal: symbol: str signal_type: str # bullish, bearish, neutral strength: float indicators: dict class TechnicalScreener: def __init__(self, config: dict): self.config config def scan_bullish_candidates(self) - List[TechnicalSignal]: 扫描看涨技术信号候选股 query (Query() .select( name, close, volume, RSI, MACD.macd, MACD.signal, BB.upper, BB.lower, SMA20, SMA50 ) .where( col(RSI).between(30, 70), # RSI 在合理区间 col(MACD.macd) col(MACD.signal), # MACD 金叉 col(close) col(SMA20), # 价格在20日均线上方 col(volume) 1000000 # 成交量充足 ) .order_by(volume, ascendingFalse) .limit(50)) total_count, df query.get_scanner_data() signals [] for _, row in df.iterrows(): signal_strength self.calculate_signal_strength(row) if signal_strength 0.7: # 信号强度阈值 signals.append(TechnicalSignal( symbolrow[name], signal_typebullish, strengthsignal_strength, indicators{ rsi: row[RSI], macd_diff: row[MACD.macd] - row[MACD.signal], above_sma20: row[close] row[SMA20] } )) return signals def calculate_signal_strength(self, row) - float: 计算技术信号强度 strength 0.0 # RSI 权重 if 30 row[RSI] 50: strength 0.3 elif 50 row[RSI] 70: strength 0.2 # MACD 金叉强度 macd_diff row[MACD.macd] - row[MACD.signal] if macd_diff 0: strength min(0.3, macd_diff * 10) # 均线位置 if row[close] row[SMA20]: strength 0.2 if row[close] row[SMA50]: strength 0.2 return min(strength, 1.0) # 归一化到0-1基本面与技术面结合筛选class FundamentalTechnicalScreener: def __init__(self): self.fundamental_filters { pe_ratio_max: 25, market_cap_min: 1_000_000_000, dividend_yield_min: 0.02 } self.technical_filters { rsi_range: (30, 70), volume_min: 500000 } def find_quality_stocks(self) - pd.DataFrame: 寻找基本面良好且技术面健康的股票 query (Query() .select( name, close, volume, market_cap_basic, price_earnings_ttm, dividends_yield_current, RSI, MACD.macd, MACD.signal, volume ) .where( # 基本面筛选 col(market_cap_basic) self.fundamental_filters[market_cap_min], col(price_earnings_ttm) self.fundamental_filters[pe_ratio_max], col(dividends_yield_current) self.fundamental_filters[dividend_yield_min], # 技术面筛选 col(RSI).between(*self.technical_filters[rsi_range]), col(volume) self.technical_filters[volume_min], # 综合条件 col(MACD.macd) col(MACD.signal) # MACD 向上 ) .order_by(market_cap_basic, ascendingFalse) .limit(100)) total_count, df query.get_scanner_data() # 计算综合得分 df[fundamental_score] self.calculate_fundamental_score(df) df[technical_score] self.calculate_technical_score(df) df[composite_score] ( df[fundamental_score] * 0.6 df[technical_score] * 0.4 ) return df.sort_values(composite_score, ascendingFalse)未来演进方向与技术展望异步支持与性能优化未来的版本计划加入完整的异步支持利用 asyncio 和 aiohttp 提升并发性能# 计划中的异步接口 async def get_scanner_data_async(self, session: aiohttp.ClientSession): 异步获取扫描器数据 async with session.post( URL.format(marketself._market), headersHEADERS, jsonself._build_query() ) as response: data await response.json() return self._parse_response(data)数据流处理管道计划实现数据流处理管道支持实时数据流处理和复杂事件处理class DataPipeline: def __init__(self): self.processors [] self.sinks [] def add_processor(self, processor): self.processors.append(processor) def add_sink(self, sink): self.sinks.append(sink) async def process_stream(self, query: Query, interval: int 60): 处理数据流 while True: data await query.get_scanner_data_async() # 应用处理器链 for processor in self.processors: data processor.process(data) # 输出到接收器 for sink in self.sinks: await sink.consume(data) await asyncio.sleep(interval)机器学习集成增强计划提供更深入的机器学习集成包括特征工程、模型训练和预测class MLEnhancedScreener: def __init__(self, model_path: str None): self.model self.load_model(model_path) if model_path else None self.feature_engineer FeatureEngineer() def extract_features(self, raw_data: pd.DataFrame) - pd.DataFrame: 从原始数据中提取机器学习特征 features self.feature_engineer.transform(raw_data) return features def predict_signals(self, features: pd.DataFrame) - pd.DataFrame: 使用机器学习模型预测交易信号 if self.model is None: self.train_model(features) predictions self.model.predict(features) probabilities self.model.predict_proba(features) result features.copy() result[predicted_signal] predictions result[signal_probability] probabilities.max(axis1) return resultTradingView-Screener 作为一个专业级的金融数据获取库通过其精心设计的架构和丰富的功能集为量化交易者和数据分析师提供了强大的工具。其模块化设计、类型安全的接口和灵活的查询构建系统使得复杂的市场数据分析变得简单高效。随着金融科技领域的不断发展该库将继续演进为开发者提供更加强大和易用的数据获取和分析能力。【免费下载链接】TradingView-ScreenerA package that lets you create TradingView screeners in Python项目地址: https://gitcode.com/gh_mirrors/tr/TradingView-Screener创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考