我编写了一个脚本,按计划解析API (星期二至星期六),下载前一天的所有内容。
import requests
import pandas as pd
from datetime import date, timedelta
# # This is what I'd normally use, but since there would be no data today,
# # I assign specific date myself
# DATE = (date.today() - timedelta(days=1)).strftime("%Y-%m-%d")
DATE = "2020-10-23"
URL = "https://spending.gov.ua/portal-api/v2/api/transactions/page/"
def fetch(session, params):
next_page, last_page = 0, 0
while next_page <= last_page:
params["page"] = next_page
data = session.get(URL, params=params).json()
yield pd.json_normalize(data.get("transactions"))\
.assign(page=params.get("page"))
next_page, last_page = next_page+1, data["count"] // data["pageSize"]
def fetch_all():
with requests.Session() as session:
params = {"page": 0, "pageSize": 100, "startdate": DATE, "enddate": DATE}
yield from fetch(session, params)
if __name__ == "__main__":
data = fetch_all()
pd.concat(data).to_csv(f"data/{DATE}.csv", index=False)我想知道几件事。
首先,如果我正确地使用requests.Session。
我在文件中看到:
Session对象允许您在请求之间持久化某些参数。..。因此,如果您向同一台主机发出多个请求,则将重用底层TCP连接,这可能会显著提高性能。
我不确定这里是否是这样的,因为我没有注意到表演中的任何变化。
其次,如果将代码分成两个函数而不是一个函数是个好主意。
在这里,我认为维护起来会更容易--基础函数fetch不会改变,而fetch_all可能会改变。例如,我可以提供一系列日期,而不是单一日期,将fetch_all更改为:
def fetch_all(date_range):
with requests.Session() as session:
for date in date_range:
params = {"page": 0, "pageSize": 100, "startdate": date, "enddate": date}
yield from fetch(session, params)另外,yield和yield from --可以使用.append并返回一个列表。不确定哪种方法更好。
发布于 2020-10-27 01:45:55
我想知道几件事。首先,如果我正确地使用
requests.Session。
是的,你是。在我的其他评论之一中,以同样的方式使用requests.Session来迭代分页的API几乎可以减少总执行时间的一半。
我做了一些快速测试,下载了最后7页(1625-1631页)的"2020-10-23“,它比用requests.get发出请求要好得多:
requests.get:23.2秒requests.Session:17.7秒其次,如果将代码分成两个函数而不是一个函数是个好主意。
我认为把它分成两种功能是好的。尽管如此,我确实有一些关于fetch的职责和接口的评论,以及如何更好地利用您在下面使用的yield和yield from。
总的来说,代码看起来很干净,而且易于阅读。以下是我认为可以改进的方法:
fetch的调用方中抽象出来。也就是说,fetch的S函数签名应该是这样的: def fetch(会话: requests.Session,start_date: date,end_date: date,starting_page: int = 0,page_size: int = 100,page_size:-> Iterator: pass ),所以现在创建一个适当的params应该是fetch's的责任,而不是fetch_all的任务。请注意start_date和end_date是datetime.date类型的,而不是str类型的。类似地,fetch_all不应该关心API所接受的日期字符串序列化格式;这是fetch的S责任。fetch中,与其对每个请求维护变量next_page和last_page,我认为最好只计算第一个请求(页k)的总页数(n),然后对页面k+1.n-1使用for循环(json_data: Dict,page: int) -> pd.DataFrame:返回D35 def fetch(会话: requests.Session,start_date: date,end_date: date,starting_page: int = 0,page_size: int = 100,) -> Iterator: params ={ "startdate":start_date.isoformat(),"enddate":end_date.isoformat(),"page":starting_page,"pageSize":page_size,} data =session.get( params=params).json() page_count =math.ceil(数据/数据) last_page = page_count -1如果starting_page > last_page:返回打印(f“{starting_page}/ {last_page}")使页面在范围内(starting_page+ 1,page_count)产生to_dataframe( data,starting_page):params = page data= session.get(URL ),params=params).json() .json().json() print(f"{ page } / {last_page}")产生to_dataframe(数据,页面)这里的折衷之处是代码的复制很小,因为第一个请求的处理方式有点不同,但是现在我们已经将页码迭代的责任委托给for循环。session对象中,以便它总是在响应对象上调用raise_for_status()。这确保了如果服务器给我们4xx或5xx响应,使用会话发出的所有请求都会引发requests.HTTPError,并阻止我们将错误响应的.json()数据转换为数据raise : session.hooks.append( lambda r,*args,**kwargs: r.raise_for_status() )fetch_all作为Iterator[pd.DataFrame]的优势,我认为最好立即将每个数据写入CSV,这样我们就不需要在内存中保存它了: output_path = Path(f" data /{DATE}.csv") output_path.unlink(missing_ok=True) data= fetch_all() for i,dataframe in枚举(Data):write_header = True如果我== 0==0 False dataframe.to_csv( output_path,header=write_header,index=False,mode="a“)重构版本:
#!/usr/bin/env python3
import math
from datetime import date, timedelta
from pathlib import Path
from typing import Any, Dict, Iterator
import pandas as pd # type: ignore
import requests
# # This is what I'd normally use, but since there would be no data today,
# # I assign specific date myself
# DATE = date.today() - timedelta(days=1)
DATE = date.fromisoformat("2020-10-23")
URL = "https://spending.gov.ua/portal-api/v2/api/transactions/page/"
def to_dataframe(json_data: Dict[str, Any], page: int) -> pd.DataFrame:
return pd.json_normalize(json_data["transactions"]).assign(page=page)
def fetch(
session: requests.Session,
start_date: date,
end_date: date,
starting_page: int = 0,
page_size: int = 100,
) -> Iterator[pd.DataFrame]:
params = {
"startdate": start_date.isoformat(),
"enddate": end_date.isoformat(),
"page": starting_page,
"pageSize": page_size,
}
data = session.get(URL, params=params).json()
page_count = math.ceil(data["count"] / data["pageSize"])
last_page = page_count - 1
if starting_page > last_page:
return
print(f"{starting_page} / {last_page}")
yield to_dataframe(data, starting_page)
for page in range(starting_page + 1, page_count):
params["page"] = page
data = session.get(URL, params=params).json()
print(f"{page} / {last_page}")
yield to_dataframe(data, page)
def fetch_all() -> Iterator[pd.DataFrame]:
with requests.Session() as session:
session.hooks["response"].append(
lambda r, *args, **kwargs: r.raise_for_status()
)
yield from fetch(session, start_date=DATE, end_date=DATE)
if __name__ == "__main__":
output_path = Path(f"data/{DATE}.csv")
output_path.unlink(missing_ok=True)
data = fetch_all()
for i, dataframe in enumerate(data):
write_header = True if i == 0 else False
dataframe.to_csv(
output_path, header=write_header, index=False, mode="a"
)https://codereview.stackexchange.com/questions/251140
复制相似问题