[特殊字符] 多店铺淘宝订单统一拉取:TOP API + 分布式任务调度实战(附Python源码)
多店铺淘宝订单统一拉取TOP API 分布式任务调度实战附Python源码多淘宝/天猫店铺订单集中同步到一套ERP核心是每个店铺一个 Seller AccessToken → 统一增量时间窗拉取 → 以 tid 幂等入库 → 用分布式锁/原子标记防多节点重复跑。下面给你可直接用的 Python 实现。一、多店铺同步架构┌──────────────────────────────────────────────────────┐ │ 调度器 (APScheduler / Celery Beat) │ │ 获取 shop_list → 分布式锁(shop_iddate) │ └──────────────────────┬───────────────────────────────┘ ┌─────────────┼─────────────┐ ▼ ▼ ▼ Shop_A Token Shop_B Token Shop_C Token │ │ │ taobao.trades.sold.get (modified 时间窗) │ ▼ taobao.trade.fullinfo.get (逐单) │ ▼ ERP订单表 (UNIQUE(tid, shop_id) 幂等)⚠️每个店铺必须在开放平台分别 OAuth 授权得到对应access_token/refresh_token存shop_auth表。二、完整 Python 多店铺同步模块含分布式文件锁示意# top_multi_shop_sync.py 多店铺淘宝订单统一拉取 - 支持 N 个店铺(Seller AccessToken 不同) - 增量按 modified 时间窗 - 文件锁防同机多进程重复跑分布式子可以用 Redis SET NX EX - 幂等入库用 (tid, shop_id) UNIQUE 依赖: requests apscheduler (pip install requests apscheduler) import hashlib import time import requests import sqlite3 import os import fcntl from datetime import datetime, timedelta from typing import Dict, List # 封装好API供应商demo urlhttps://console.open.onebound.cn/console/?iLex # ───────────── TOP Client (内联) ───────────── class TopClient: GW https://gw.api.taobao.com/router/rest def __init__(self, ak, ask): self.ak, self.ask ak, ask def _sign(self, p: Dict) - str: filt sorted((k, v) for k, v in p.items() if v is not None and str(v).strip() ! and k ! sign) qs .join(f{k}{v} for k, v in filt) return hashlib.md5(f{self.ask}{qs}{self.ask}.encode()).hexdigest().upper() def call(self, method, biz, session): p {method: method, app_key: self.ak, timestamp: str(int(time.time() * 1000)), format: json, v: 2.0, sign_method: md5, session: session} p.update(biz) p[sign] self._sign(p) r requests.post(self.GW, datap, timeout15) r.raise_for_status() d r.json() if error_response in d: err d[error_response] raise Exception(fTOP[{err.get(code)}]:{err.get(msg)} {err.get(sub_msg,)}) return d.get(list(d.keys() - {error_response})[0], {}) # ───────────── 本地DB (shop_auth erp_order) ───────────── def init_db(dbmulti_shop_erp.db): conn sqlite3.connect(db) conn.execute(CREATE TABLE IF NOT EXISTS shop_auth( shop_id TEXT PRIMARY KEY, shop_name TEXT, access_token TEXT, refresh_token TEXT, expires_at TEXT)) conn.execute(CREATE TABLE IF NOT EXISTS erp_order( tid TEXT, shop_id TEXT, status TEXT, payment REAL, buyer_nick TEXT, created TEXT, PRIMARY KEY(tid, shop_id))) conn.commit() return conn def load_shops(conn) - List[Dict]: cur conn.execute(SELECT shop_id,shop_name,access_token FROM shop_auth) return [{shop_id: r[0], shop_name: r[1], token: r[2]} for r in cur.fetchall()] # 封装好API供应商demo urlhttps://console.open.onebound.cn/console/?iLex def upsert_order(conn, shop_id, trade: Dict): conn.execute(INSERT INTO erp_order(tid,shop_id,status,payment,buyer_nick,created) VALUES(?,?,?,?,?,?) ON CONFLICT(tid,shop_id) DO UPDATE SET statusexcluded.status,paymentexcluded.payment, buyer_nickexcluded.buyer_nick,createdexcluded.created, (str(trade[tid]), shop_id, trade.get(status), float(trade.get(payment) or 0), trade.get(buyer_nick,), trade.get(created,))) conn.commit() # ───────────── 分布式文件锁 (单进程/单机示意) ───────────── LOCK_FILE /tmp/top_multishop_sync.lock def acquire_lock(): fd open(LOCK_FILE, w) try: fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # 非阻塞 return fd except (IOError, BlockingIOError): fd.close() raise RuntimeError(另一实例正在运行中退出) def release_lock(fd): fcntl.flock(fd, fcntl.LOCK_UN) fd.close() try: os.remove(LOCK_FILE) except OSError: pass # ───────────── 核心同步逻辑 ───────────── class MultiShopOrderSync: def __init__(self, ak, ask, conn): self.top TopClient(ak, ask) self.conn conn def sync_one_shop(self, shop: Dict, minutes30): token shop[token] shop_id shop[shop_id] now datetime.now() start (now - timedelta(minutesminutes)).strftime(%Y-%m-%d %H:%M:%S) end now.strftime(%Y-%m-%d %H:%M:%S) # ① 列表 r self.top.call(taobao.trades.sold.get, { fields: tid,status,payment,modified,buyer_nick,created, start_modified: start, end_modified: end, page_no: 1, page_size: 40 }, token) tids [t[tid] for t in (r.get(trades, []) or [])] for tid in tids: # ② 明细 detail self.top.call(taobao.trade.fullinfo.get, { tid: str(tid), fields: tid,status,payment,buyer_nick,created }, token).get(trade, {}) upsert_order(self.conn, shop_id, detail) time.sleep(0.15) # QPS 保护 print(f ✅ 店铺[{shop[shop_name]}] 同步订单: {len(tids)} 笔) def sync_all(self, minutes30): shops load_shops(self.conn) if not shops: print(⚠️ 无店铺配置请在 shop_auth 表录入 shop_id/shop_name/access_token) return for s in shops: try: self.sync_one_shop(s, minutes) except Exception as e: print(f ❌ 店铺[{s[shop_name]}] 失败: {e}) # 定时入口 if __name__ __main__: APP_KEY YOUR_ENTERPRISE_APP_KEY APP_SECRET YOUR_APP_SECRET conn init_db() # ★ 首次运行前手动 INSERT shop_auth 或写一小段初始化 # conn.execute(INSERT OR IGNORE INTO shop_auth VALUES(SHOP001,旗舰店A,SELLER_TOKEN_A,,NULL)) syncer MultiShopOrderSync(APP_KEY, APP_SECRET, conn) try: lock_fd acquire_lock() print(f▶ 多店铺订单同步开始 {datetime.now().strftime(%H:%M:%S)}) syncer.sync_all(minutes30) print(▶ 同步完成) except RuntimeError as e: print(e) finally: try: release_lock(lock_fd) except Exception: pass三、shop_auth 表初始化示例SQLiteINSERT OR IGNORE INTO shop_auth(shop_id,shop_name,access_token) VALUES (SHOP001,天猫旗舰店A,SELLER_ACCESS_TOKEN_A), (SHOP002,淘宝企业店B,SELLER_ACCESS_TOKEN_B);AccessToken 通过各店铺卖家账号 OAuth2 授权换取同之前讲过的oauth.taobao.com/token流程每个店铺 token 不同。四、分布式调度建议部署形态防重手段单机多进程文件锁fcntl.flock已演示多机 / K8sRedisSET key value NX EX 300 获取锁 → 执行 → DEL失败跳过定时触发APSchedulerBlockingScheduler 锁 / Celery Beat断点续跑记录每个 shop 最后成功max_modified→ 下次从此时间拉APScheduler 常驻示例from apscheduler.schedulers.blocking import BlockingScheduler sched BlockingScheduler() sched.add_job(lambda: MultiShopOrderSync(APP_KEY,APP_SECRET,conn).sync_all(30), cron, minute*/5, idtb_multi_shop_sync) sched.start()五、避坑清单坑现象解决用同一 token 查不同店铺只能看授权店铺自己订单每个店铺独立 OAuth → 独立 tokensession 传买家 token403 / 空必须是卖家 AccessToken全量翻页不记断点重启重跑超日额度存last_sync_timeper shop多节点同时跑重复插入/重复API调用Redis NX 锁淘宝客应用无订单权限403创建自用型企业应用申请taobao.trades.sold.get六、面试/方案一句话多店铺淘宝订单统一拉取 各店铺分别 OAuth 授权存 Seller AccessToken → 定时增量按modified时间窗调taobao.trades.sold.gettrade.fullinfo.get→ 以(tid, shop_id)幂等入库 → 分布式 Redis NX 锁防多节点重复触发签名/QPS 同单店铺仅 token 按店铺隔离。需要我补Redis 分布式锁 Python 示例 或Token 自动刷新refresh_token 过期前置换 吗