MonkeyCode与Redis实战:高并发系统的缓存设计完全指南
缓存是性能的杠杆一个QPS 10000的系统如果70%的请求命中缓存数据库只需要承受3000 QPS。缓存设计好系统性能提升10倍不是问题。这篇文章用MonkeyCode生成完整的Redis缓存方案涵盖缓存策略、分布式锁、延迟队列、排行榜、限流等实战场景。给MonkeyCode的统一Prompt用Python Redis实现高并发缓存系统要求 1. 缓存策略Cache-Aside、Write-Through、Write-Behind 2. 缓存失效TTL、LRU淘汰 3. 分布式锁RedLock 4. 延迟队列 5. 排行榜有序集合 6. 限流滑动窗口、令牌桶 7. 消息发布订阅 8. 热点数据探测 9. 缓存异常降级处理 需要完整可运行的代码包含异常处理和监控埋点1. 缓存策略实现# cache_strategies.py - MonkeyCode生成 import redis import json import hashlib import time from functools import wraps from typing import Any, Callable, Optional from dataclasses import dataclass redis_client redis.Redis( hostlocalhost, port6379, db0, decode_responsesTrue ) # # Cache-Aside旁路缓存最常用策略 # class CacheAside: 读多写少的场景首选 def __init__(self, prefix: str cache): self.prefix prefix self.default_ttl 300 # 5分钟 def _make_key(self, key: str) - str: return f{self.prefix}:{key} def get(self, key: str, loader: Callable None, ttl: int None) - Any: 获取缓存未命中时加载并缓存 redis_key self._make_key(key) # 1. 读缓存 cached redis_client.get(redis_key) if cached: return json.loads(cached) # 2. 缓存未命中调用loader加载 if loader: data loader() # 3. 写入缓存 self.set(key, data, ttl or self.default_ttl) return data return None def set(self, key: str, value: Any, ttl: int None): 写入缓存 redis_key self._make_key(key) redis_client.setex( redis_key, ttl or self.default_ttl, json.dumps(value) ) def delete(self, key: str): 删除缓存 redis_client.delete(self._make_key(key)) def invalidate_pattern(self, pattern: str): 批量删除匹配模式 keys redis_client.keys(f{self.prefix}:{pattern}) if keys: redis_client.delete(*keys) # 使用示例 cache CacheAside(user) def get_user(user_id: int) - dict: 获取用户缓存优先 return cache.get( fuser:{user_id}, loaderlambda: load_user_from_db(user_id), ttl600 # 用户信息缓存10分钟 ) def update_user(user_id: int, data: dict): 更新用户时删除缓存 update_user_to_db(user_id, data) cache.delete(fuser:{user_id}) # # Write-Through写穿透数据一致性强保证 # class WriteThrough: 写操作同时更新缓存和数据库 def __init__(self, redis_client): self.redis redis_client def write(self, key: str, value: Any, ttl: int 3600): 同步写缓存和DB # 1. 写DB write_to_db(key, value) # 2. 写缓存 self.redis.setex(key, ttl, json.dumps(value)) def delete(self, key: str): 删除时同时清理缓存 delete_from_db(key) self.redis.delete(key) # # Write-Behind写回写入性能最高 # class WriteBehind: 异步写回批量写入减少IO def __init__(self, redis_client, queue_size: int 1000): self.redis redis_client self.queue_size queue_size self.write_queue [] def write(self, key: str, value: Any): 先写缓存异步批量写DB # 1. 写缓存 self.redis.set(key, json.dumps(value)) # 2. 加入写队列 self.write_queue.append({key: key, value: value}) # 3. 达到阈值批量写DB if len(self.write_queue) self.queue_size: self.flush() def flush(self): 批量写DB if not self.write_queue: return batch_write_to_db(self.write_queue) self.write_queue.clear()2. 缓存装饰器# cache_decorator.py - MonkeyCode生成 import redis import json import hashlib import time import functools redis_client redis.Redis(hostlocalhost, port6379, db0, decode_responsesTrue) def cached(key_prefix: str, ttl: int 300, key_builder: Callable None): 缓存装饰器 def decorator(func): functools.wraps(func) def wrapper(*args, **kwargs): # 构建缓存key if key_builder: cache_key key_builder(*args, **kwargs) else: key_parts [key_prefix, str(args), str(sorted(kwargs.items()))] key_str |.join(key_parts) cache_key f{key_prefix}:{hashlib.md5(key_str.encode()).hexdigest()} # 尝试获取缓存 cached_value redis_client.get(cache_key) if cached_value: return json.loads(cached_value) # 执行原函数 result func(*args, **kwargs) # 写入缓存 redis_client.setex(cache_key, ttl, json.dumps(result)) return result return wrapper return decorator def cache_invalidate(key_prefix: str, *args): 缓存失效装饰器 def decorator(func): functools.wraps(func) def wrapper(*args, **kwargs): result func(*args, **kwargs) # 清理相关缓存 pattern f{key_prefix}:* keys redis_client.keys(pattern) if keys: redis_client.delete(*keys) return result return wrapper return decorator # 使用 cached(product, ttl600) def get_product(product_id: int) - dict: 获取商品缓存600秒 return query_product_from_db(product_id) cache_invalidate(product) def update_product(product_id: int, data: dict): 更新商品时自动清理缓存 return update_product_in_db(product_id, data)3. 分布式锁# distributed_lock.py - MonkeyCode生成 import redis import uuid import time import threading from contextlib import contextmanager from typing import Optional class DistributedLock: Redis分布式锁RedLock简化版 def __init__(self, redis_client: redis.Redis, lock_name: str, timeout: int 30): self.redis redis_client self.lock_name flock:{lock_name} self.timeout timeout self.token str(uuid.uuid4()) # 锁持有者标识 def acquire(self, blocking: bool True, blocking_timeout: int 10) - bool: 获取锁 blocking: 是否阻塞等待 blocking_timeout: 阻塞超时时间秒 end_time time.time() blocking_timeout while True: # SET NX EX原子操作 acquired self.redis.set( self.lock_name, self.token, nxTrue, # 只有key不存在时设置 exself.timeout # 自动过期 ) if acquired: return True if not blocking: return False if time.time() end_time: return False time.sleep(0.01) # 避免CPU空转 def release(self) - bool: 释放锁Lua脚本保证原子性 只有持有者才能释放自己的锁 lua_script if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(del, KEYS[1]) else return 0 end result self.redis.eval(lua_script, 1, self.lock_name, self.token) return result 1 def extend(self, additional_time: int 30) - bool: 延长锁的过期时间 lua_script if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(expire, KEYS[1], ARGV[2]) else return 0 end new_timeout self.timeout additional_time result self.redis.eval(lua_script, 1, self.lock_name, self.token, new_timeout) return result 1 contextmanager def lock(self, blocking: bool True, blocking_timeout: int 10): 上下文管理器方式使用锁 acquired self.acquire(blocking, blocking_timeout) if not acquired: raise RuntimeError(fCould not acquire lock: {self.lock_name}) try: yield finally: self.release() # 使用示例 lock DistributedLock(redis_client, order:create:123, timeout60) # 方式1手动获取/释放 if lock.acquire(): try: # 临界区操作 create_order(123) finally: lock.release() # 方式2上下文管理器推荐 with lock.lock(): # 临界区操作 create_order(123)4. 延迟队列# delay_queue.py - MonkeyCode生成 import redis import json import time import threading from typing import Callable, Any, Optional class DelayQueue: Redis延迟队列基于Sorted Set def __init__(self, redis_client: redis.Redis, queue_name: str): self.redis redis_client self.queue_name fdelayq:{queue_name} self.consumer_name fconsumer:{queue_name}:{threading.current_thread().name} def push(self, job_data: dict, delay_seconds: int): 添加延迟任务 job_id fjob:{time.time()}:{id(job_data)} # 任务数据存在Hash里 self.redis.hset(job_id, mapping{ data: json.dumps(job_data), status: pending, created_at: time.time() }) # 任务ID和执行时间存入Sorted Set execute_at time.time() delay_seconds self.redis.zadd(self.queue_name, {job_id: execute_at}) return job_id