首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Python 高并发抢票技术拆解:异步请求、Cookie 持久化实战

Python 高并发抢票技术拆解:异步请求、Cookie 持久化实战

原创
作者头像
小白学大数据
发布2026-07-02 16:54:49
发布2026-07-02 16:54:49
50
举报

票务抢票场景本质为毫秒级高并发资源竞争。放票瞬时海量请求涌入服务端,仅低延迟、高稳定、可抗风控的客户端可抢占资源。该场景核心依赖三大技术支柱:异步并发请求、会话持久化、IP风控对抗。本文基于实战场景,精简拆解从会话维护、余票监听、并发下单到风控规避的全链路技术实现。一、抢票全链路技术架构1.1 核心执行链路用户登录 → Cookie持久化存储 → 实时余票监听 → 可控并发下单 → 结果归集与容错重试技术组件对应:aiohttp Session、CookieJar、异步轮询+WebSocket、asyncio.gather、重试退避机制1.2 核心技术选型与痛点适配

技术环节

技术选型

核心解决痛点

会话保持

Cookie序列化持久化

程序重启、进程中断导致登录态丢失,引发下单失败

并发请求

asyncio + aiohttp + 信号量

规避Python GIL全局锁瓶颈,单线程支撑高并发,防止请求溢出

风控对抗

动态代理IP池轮换

解决单IP高频请求触发平台限流、封禁问题

异常容错

分级重试+指数退避机制

解决网络抖动、瞬时接口异常导致的偶发下单失败

二、运行环境依赖基于异步网络架构,核心依赖异步请求与协程库,安装指令如下:

代码语言:txt
复制
pip install aiohttp asyncio redis ntplib

核心导入模块:协程调度、异步请求、序列化、时间校准、类型注解

代码语言:txt
复制
import asyncio
import aiohttp
import json
import time
import random
import pickle
import ntplib
from typing import Optional, Dict, Any

三、Cookie持久化会话管理登录态稳定是抢票成功的前置核心条件。基于aiohttp.CookieJar实现Cookie序列化存储、加载与有效性校验,实现跨进程会话复用。3.1 Cookie持久化工具类

代码语言:txt
复制
class CookieManager:
    def __init__(self, cookie_file='cookies.pkl'):
        self.cookie_file = cookie_file
        self.cookie_jar = aiohttp.CookieJar(unsafe=True)  # 支持跨域Cookie适配

    def save_cookies(self):
        # 序列化Cookie至本地文件,持久化登录态
        cookies = [{"name": c.key, "value": c.value, "domain": c["domain"], "path": c["path"]} for c in self.cookie_jar]
        with open(self.cookie_file, 'wb') as f:
            pickle.dump(cookies, f)

    def load_cookies(self):
        # 加载本地Cookie,恢复历史会话
        try:
            with open(self.cookie_file, 'rb') as f:
                return pickle.load(f)
        except FileNotFoundError:
            return []

    def is_expired(self, session: aiohttp.ClientSession) -> bool:
        # 业务接口校验登录态有效性
        return False

3.2 全局会话封装与自动登录封装异步会话实例,统一请求头、超时策略,实现Cookie自动加载、登录续期,规避网络卡死、会话失效问题。

代码语言:txt
复制
class TicketSession:
    BASE_URL = "https://ticket.example.com"
    def __init__(self, cookie_file='cookies.pkl'):
        self.cookie_manager = CookieManager(cookie_file)
        self.session: Optional[aiohttp.ClientSession] = None
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/120.0.0.0 Safari/537.36',
            'Referer': f'{self.BASE_URL}/index'
        }

    async def init_session(self):
        # 初始化会话并加载持久化Cookie
        self.session = aiohttp.ClientSession(
            cookie_jar=self.cookie_manager.cookie_jar,
            headers=self.headers,
            timeout=aiohttp.ClientTimeout(total=10, connect=3)
        )
        cookies = self.cookie_manager.load_cookies()
        [self.session.cookie_jar.update_cookies({c["name"]: c["value"]}) for c in cookies]
        return self.session

    async def login(self, username: str, password: str) -> bool:
        # 账号登录并更新Cookie
        try:
            async with self.session.post(f'{self.BASE_URL}/login', data={"username": username, "password": password}) as resp:
                result = await resp.json()
                if result.get('code') == 200:
                    self.cookie_manager.save_cookies()
                    return True
                return False
        except Exception:
            return False

    async def close(self):
        # 优雅关闭会话,释放连接
        if self.session:
            await self.session.close()

核心优化点:自定义连接超时策略,杜绝单请求阻塞整体任务;开启跨域Cookie适配,适配多域名票务系统。四、基于协程的高并发抢票引擎采用asyncio + aiohttp异步模型,通过信号量限制并发阈值,配合分级重试机制,在高并发与风控安全间取得平衡。4.1 可控并发核心逻辑

代码语言:txt
复制
class TicketGrabber:
    def __init__(self, session_manager: TicketSession, max_concurrent=20):
        self.session_mgr = session_manager
        self.semaphore = asyncio.Semaphore(max_concurrent)  # 并发限流
        self.results = []

    async def check_ticket(self, train_no: str, date: str) -> Dict:
        # 异步余票查询监听
        url = f'{self.session_mgr.BASE_URL}/query'
        params = {"train_no": train_no, "date": date}
        async with self.semaphore:
            try:
                async with self.session_mgr.session.get(url, params=params) as resp:
                    return await resp.json()
            except Exception as e:
                return {"tickets": 0, "error": str(e)}

    async def submit_order(self, train_no: str, seat_type: str, passenger: str) -> Dict:
        # 核心下单逻辑,3次重试+退避机制
        url = f'{self.session_mgr.BASE_URL}/order'
        payload = {
            "train_no": train_no, "seat_type": seat_type, "passenger": passenger,
            "timestamp": int(time.time() * 1000)
        }
        async with self.semaphore:
            for attempt in range(3):
                try:
                    async with self.session_mgr.session.post(url, json=payload) as resp:
                        result = await resp.json()
                        if result.get('code') == 200:
                            return {"success": True, **result}
                        if result.get('code') == 403:
                            return {"success": False, "reason": "blocked"}
                        await asyncio.sleep(0.1 * (attempt + 1))
                except aiohttp.ClientError:
                    await asyncio.sleep(0.1 * (attempt + 1))
        return {"success": False, "reason": "max_retries"}

    async def batch_order(self, orders: list) -> list:
        # 批量并发下单,异常不中断整体任务
        tasks = [self.submit_order(**order) for order in orders]
        return await asyncio.gather(*tasks, return_exceptions=True)

    async def monitor_and_grab(self, train_no: str, date: str, interval: float = 0.5):
        # 循环监听余票,发现余量立即触发下单
        while True:
            ticket_info = await self.check_ticket(train_no, date)
            if ticket_info.get('tickets', 0) > 0:
                return await self.submit_order(train_no, 'second', '张三')
            await asyncio.sleep(interval)

五、动态代理IP风控对抗方案票务系统具备严格的IP限流策略,单IP高频轮询、下单会快速被封禁。本文基于亿牛云代理API,实现动态IP轮换,同时解决会话与IP不匹配的核心风控问题。5.1 代理池封装

代码语言:txt
复制
class ProxyPool:
    API_URL = "http://ip.16yun.cn:817/myip/pl/<ORDER_ID>/?s=<ORDER_SIGN>&u=<USER>&format=json&count=10"
    def __init__(self):
        self.proxies = []
        self.last_fetch = 0

    async def refresh(self, session: aiohttp.ClientSession):
        # 批量刷新代理IP,缓存复用
        try:
            async with session.get(self.API_URL, timeout=10) as resp:
                if resp.status == 200:
                    data = await resp.json()
                    self.proxies = [f"http://{item['ip']}:{item['port']}" for item in data]
                    self.last_fetch = time.time()
        except Exception:
            pass

    def get_proxy(self) -> Optional[str]:
        # 随机获取可用代理
        return random.choice(self.proxies) if self.proxies else None

5.2 代理适配抢票引擎

代码语言:txt
复制
class ProxiedTicketGrabber(TicketGrabber):
    def __init__(self, session_manager, proxy_pool: ProxyPool, max_concurrent=20):
        super().__init__(session_manager, max_concurrent)
        self.proxy_pool = proxy_pool

    async def submit_order(self, train_no: str, seat_type: str, passenger: str) -> Dict:
        url = f'{self.session_mgr.BASE_URL}/order'
        payload = {
            "train_no": train_no, "seat_type": seat_type, "passenger": passenger,
            "timestamp": int(time.time() * 1000)
        }
        async with self.semaphore:
            for attempt in range(3):
                proxy = self.proxy_pool.get_proxy()
                try:
                    async with self.session_mgr.session.post(url, json=payload, proxy=proxy) as resp:
                        result = await resp.json()
                        if result.get('code') == 200:
                            return {"success": True, **result}
                        if result.get('code') in (403, 429):
                            continue
                except Exception:
                    continue
        return {"success": False, "reason": "all_proxies_failed"}

5.3 代理使用核心规范1. 频率控制:代理IP提取间隔≥1s,避免429限流;2. 会话绑定:登录与下单阶段必须保持同一出口IP,防止会话劫持拦截;3. 批量缓存:提前批量拉取10-20个代理备用,降低接口请求开销。六、全流程整合主程序

七、高阶性能优化方案7.1 连接池预热提前建立TCP连接,规避放票瞬时连接创建延迟,提升首请求响应速度。

Python

代码语言:txt
复制
async def warmup(session: aiohttp.ClientSession):
 connector = aiohttp.TCPConnector(limit=50, limit_per_host=20, enable_cleanup_closed=True)
 warmup_tasks = [session.get(f'{TicketSession.BASE_URL}/ping') for _ in range(10)]
 await asyncio.gather(*warmup_tasks, return_exceptions=True)

7.2 NTP服务器时间校准解决本地时间与服务端时间偏差,避免请求时序错位导致抢票失败。

Python

代码语言:txt
复制

def sync_server_time() -> float:
 ntp_client = ntplib.NTPClient()
 try:
 resp = ntp_client.request('ntp.aliyun.com', version=3)
 return (datetime.fromtimestamp(resp.tx_time) - datetime.now()).total_seconds()
 except Exception:
 return 0.0

八、核心踩坑与解决方案汇总

问题场景

核心成因

解决方案

程序重启需重复登录

Cookie未持久化

pickle序列化CookieJar,启动自动加载

换代理后下单被拦截

登录、下单IP不一致

绑定代理隧道,固定会话出口IP

高频请求触发403/429

并发无限制、单IP高频请求

信号量限流+动态IP轮换

代理提取429限流

接口请求频率过高

批量缓存代理,降低提取频次

请求时序错位

本地时间与服务端偏差

NTP全网时间同步校准

九、技术总结Python高并发抢票系统的核心竞争力源于全链路低延迟+风控自适应:其一,通过Cookie持久化实现会话稳态,规避登录态丢失风险;其二,基于asyncio异步模型突破单线程性能瓶颈,配合信号量精准控制并发规模;其三,依托动态代理IP池解决高频风控问题,通过IP与会话绑定机制规避会话劫持拦截。整套方案兼顾高并发性能与系统稳定性,是票务等高竞争场景的轻量化高效技术实现。合规声明:本文所有技术内容仅用于Python异步编程、网络爬虫技术学习与技术研究,禁止用于违规抢票、破坏平台交易规则等非法场景,一切操作请遵守网络安全法规与平台用户协议。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档