
票务抢票场景本质为毫秒级高并发资源竞争。放票瞬时海量请求涌入服务端,仅低延迟、高稳定、可抗风控的客户端可抢占资源。该场景核心依赖三大技术支柱:异步并发请求、会话持久化、IP风控对抗。本文基于实战场景,精简拆解从会话维护、余票监听、并发下单到风控规避的全链路技术实现。一、抢票全链路技术架构1.1 核心执行链路用户登录 → Cookie持久化存储 → 实时余票监听 → 可控并发下单 → 结果归集与容错重试技术组件对应:aiohttp Session、CookieJar、异步轮询+WebSocket、asyncio.gather、重试退避机制1.2 核心技术选型与痛点适配
技术环节 | 技术选型 | 核心解决痛点 |
|---|---|---|
会话保持 | Cookie序列化持久化 | 程序重启、进程中断导致登录态丢失,引发下单失败 |
并发请求 | asyncio + aiohttp + 信号量 | 规避Python GIL全局锁瓶颈,单线程支撑高并发,防止请求溢出 |
风控对抗 | 动态代理IP池轮换 | 解决单IP高频请求触发平台限流、封禁问题 |
异常容错 | 分级重试+指数退避机制 | 解决网络抖动、瞬时接口异常导致的偶发下单失败 |
二、运行环境依赖基于异步网络架构,核心依赖异步请求与协程库,安装指令如下:
pip install aiohttp asyncio redis ntplib核心导入模块:协程调度、异步请求、序列化、时间校准、类型注解
import asyncio
import aiohttp
import json
import time
import random
import pickle
import ntplib
from typing import Optional, Dict, Any三、Cookie持久化会话管理登录态稳定是抢票成功的前置核心条件。基于aiohttp.CookieJar实现Cookie序列化存储、加载与有效性校验,实现跨进程会话复用。3.1 Cookie持久化工具类
class CookieManager:
def __init__(self, cookie_file='cookies.pkl'):
self.cookie_file = cookie_file
self.cookie_jar = aiohttp.CookieJar(unsafe=True) # 支持跨域Cookie适配
def save_cookies(self):
# 序列化Cookie至本地文件,持久化登录态
cookies = [{"name": c.key, "value": c.value, "domain": c["domain"], "path": c["path"]} for c in self.cookie_jar]
with open(self.cookie_file, 'wb') as f:
pickle.dump(cookies, f)
def load_cookies(self):
# 加载本地Cookie,恢复历史会话
try:
with open(self.cookie_file, 'rb') as f:
return pickle.load(f)
except FileNotFoundError:
return []
def is_expired(self, session: aiohttp.ClientSession) -> bool:
# 业务接口校验登录态有效性
return False3.2 全局会话封装与自动登录封装异步会话实例,统一请求头、超时策略,实现Cookie自动加载、登录续期,规避网络卡死、会话失效问题。
class TicketSession:
BASE_URL = "https://ticket.example.com"
def __init__(self, cookie_file='cookies.pkl'):
self.cookie_manager = CookieManager(cookie_file)
self.session: Optional[aiohttp.ClientSession] = None
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/120.0.0.0 Safari/537.36',
'Referer': f'{self.BASE_URL}/index'
}
async def init_session(self):
# 初始化会话并加载持久化Cookie
self.session = aiohttp.ClientSession(
cookie_jar=self.cookie_manager.cookie_jar,
headers=self.headers,
timeout=aiohttp.ClientTimeout(total=10, connect=3)
)
cookies = self.cookie_manager.load_cookies()
[self.session.cookie_jar.update_cookies({c["name"]: c["value"]}) for c in cookies]
return self.session
async def login(self, username: str, password: str) -> bool:
# 账号登录并更新Cookie
try:
async with self.session.post(f'{self.BASE_URL}/login', data={"username": username, "password": password}) as resp:
result = await resp.json()
if result.get('code') == 200:
self.cookie_manager.save_cookies()
return True
return False
except Exception:
return False
async def close(self):
# 优雅关闭会话,释放连接
if self.session:
await self.session.close()核心优化点:自定义连接超时策略,杜绝单请求阻塞整体任务;开启跨域Cookie适配,适配多域名票务系统。四、基于协程的高并发抢票引擎采用asyncio + aiohttp异步模型,通过信号量限制并发阈值,配合分级重试机制,在高并发与风控安全间取得平衡。4.1 可控并发核心逻辑
class TicketGrabber:
def __init__(self, session_manager: TicketSession, max_concurrent=20):
self.session_mgr = session_manager
self.semaphore = asyncio.Semaphore(max_concurrent) # 并发限流
self.results = []
async def check_ticket(self, train_no: str, date: str) -> Dict:
# 异步余票查询监听
url = f'{self.session_mgr.BASE_URL}/query'
params = {"train_no": train_no, "date": date}
async with self.semaphore:
try:
async with self.session_mgr.session.get(url, params=params) as resp:
return await resp.json()
except Exception as e:
return {"tickets": 0, "error": str(e)}
async def submit_order(self, train_no: str, seat_type: str, passenger: str) -> Dict:
# 核心下单逻辑,3次重试+退避机制
url = f'{self.session_mgr.BASE_URL}/order'
payload = {
"train_no": train_no, "seat_type": seat_type, "passenger": passenger,
"timestamp": int(time.time() * 1000)
}
async with self.semaphore:
for attempt in range(3):
try:
async with self.session_mgr.session.post(url, json=payload) as resp:
result = await resp.json()
if result.get('code') == 200:
return {"success": True, **result}
if result.get('code') == 403:
return {"success": False, "reason": "blocked"}
await asyncio.sleep(0.1 * (attempt + 1))
except aiohttp.ClientError:
await asyncio.sleep(0.1 * (attempt + 1))
return {"success": False, "reason": "max_retries"}
async def batch_order(self, orders: list) -> list:
# 批量并发下单,异常不中断整体任务
tasks = [self.submit_order(**order) for order in orders]
return await asyncio.gather(*tasks, return_exceptions=True)
async def monitor_and_grab(self, train_no: str, date: str, interval: float = 0.5):
# 循环监听余票,发现余量立即触发下单
while True:
ticket_info = await self.check_ticket(train_no, date)
if ticket_info.get('tickets', 0) > 0:
return await self.submit_order(train_no, 'second', '张三')
await asyncio.sleep(interval)五、动态代理IP风控对抗方案票务系统具备严格的IP限流策略,单IP高频轮询、下单会快速被封禁。本文基于亿牛云代理API,实现动态IP轮换,同时解决会话与IP不匹配的核心风控问题。5.1 代理池封装
class ProxyPool:
API_URL = "http://ip.16yun.cn:817/myip/pl/<ORDER_ID>/?s=<ORDER_SIGN>&u=<USER>&format=json&count=10"
def __init__(self):
self.proxies = []
self.last_fetch = 0
async def refresh(self, session: aiohttp.ClientSession):
# 批量刷新代理IP,缓存复用
try:
async with session.get(self.API_URL, timeout=10) as resp:
if resp.status == 200:
data = await resp.json()
self.proxies = [f"http://{item['ip']}:{item['port']}" for item in data]
self.last_fetch = time.time()
except Exception:
pass
def get_proxy(self) -> Optional[str]:
# 随机获取可用代理
return random.choice(self.proxies) if self.proxies else None5.2 代理适配抢票引擎
class ProxiedTicketGrabber(TicketGrabber):
def __init__(self, session_manager, proxy_pool: ProxyPool, max_concurrent=20):
super().__init__(session_manager, max_concurrent)
self.proxy_pool = proxy_pool
async def submit_order(self, train_no: str, seat_type: str, passenger: str) -> Dict:
url = f'{self.session_mgr.BASE_URL}/order'
payload = {
"train_no": train_no, "seat_type": seat_type, "passenger": passenger,
"timestamp": int(time.time() * 1000)
}
async with self.semaphore:
for attempt in range(3):
proxy = self.proxy_pool.get_proxy()
try:
async with self.session_mgr.session.post(url, json=payload, proxy=proxy) as resp:
result = await resp.json()
if result.get('code') == 200:
return {"success": True, **result}
if result.get('code') in (403, 429):
continue
except Exception:
continue
return {"success": False, "reason": "all_proxies_failed"}5.3 代理使用核心规范1. 频率控制:代理IP提取间隔≥1s,避免429限流;2. 会话绑定:登录与下单阶段必须保持同一出口IP,防止会话劫持拦截;3. 批量缓存:提前批量拉取10-20个代理备用,降低接口请求开销。六、全流程整合主程序
七、高阶性能优化方案7.1 连接池预热提前建立TCP连接,规避放票瞬时连接创建延迟,提升首请求响应速度。
Python
async def warmup(session: aiohttp.ClientSession):
connector = aiohttp.TCPConnector(limit=50, limit_per_host=20, enable_cleanup_closed=True)
warmup_tasks = [session.get(f'{TicketSession.BASE_URL}/ping') for _ in range(10)]
await asyncio.gather(*warmup_tasks, return_exceptions=True)
7.2 NTP服务器时间校准解决本地时间与服务端时间偏差,避免请求时序错位导致抢票失败。
Python
def sync_server_time() -> float:
ntp_client = ntplib.NTPClient()
try:
resp = ntp_client.request('ntp.aliyun.com', version=3)
return (datetime.fromtimestamp(resp.tx_time) - datetime.now()).total_seconds()
except Exception:
return 0.0八、核心踩坑与解决方案汇总
问题场景 | 核心成因 | 解决方案 |
|---|---|---|
程序重启需重复登录 | Cookie未持久化 | pickle序列化CookieJar,启动自动加载 |
换代理后下单被拦截 | 登录、下单IP不一致 | 绑定代理隧道,固定会话出口IP |
高频请求触发403/429 | 并发无限制、单IP高频请求 | 信号量限流+动态IP轮换 |
代理提取429限流 | 接口请求频率过高 | 批量缓存代理,降低提取频次 |
请求时序错位 | 本地时间与服务端偏差 | NTP全网时间同步校准 |
九、技术总结Python高并发抢票系统的核心竞争力源于全链路低延迟+风控自适应:其一,通过Cookie持久化实现会话稳态,规避登录态丢失风险;其二,基于asyncio异步模型突破单线程性能瓶颈,配合信号量精准控制并发规模;其三,依托动态代理IP池解决高频风控问题,通过IP与会话绑定机制规避会话劫持拦截。整套方案兼顾高并发性能与系统稳定性,是票务等高竞争场景的轻量化高效技术实现。合规声明:本文所有技术内容仅用于Python异步编程、网络爬虫技术学习与技术研究,禁止用于违规抢票、破坏平台交易规则等非法场景,一切操作请遵守网络安全法规与平台用户协议。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。