python基于httpx的异步网络请求,带重试机制,支持post、get,自定义header、携带cookie、等功能。
函数
import asyncio
from typing import Optional
import httpx
'''
带重试机制的网络请求函数
post、get
'''
# get请求 重试机制
async def curl_get( client: httpx.AsyncClient, url: str, id:int = 0, cookie: Optional[dict] = None, headers: Optional[dict] = None, retries: int = 3):
# 设置重试次数
for attempt in range(1,retries + 1):
try:
response = await client.get(url, cookies=cookie, headers=headers)
response.raise_for_status() # 如果请求失败,抛出异常
return {"code": 1,"id":id, "response":response,"attempt":attempt} # 返回解析后的 JSON 数据
except httpx.HTTPStatusError as e:
if attempt < retries:
await asyncio.sleep(1) # 重试前暂停1秒
else:
return {"code": 0,"id":id, "error": f"请求失败:{e.response.status_code}","attempt":attempt}
except httpx.RequestError as e:
print("网络错误",e)
# 捕获网络错误(例如请求超时、连接错误等)
if attempt < retries:
await asyncio.sleep(2) # 重试前暂停1秒
else:
return {"code": 0, "id":id,"error": f"网络错误 [已重试{str(attempt)}次]:{str(e)}","attempt":attempt}
except Exception as e:
if attempt < retries:
await asyncio.sleep(1) # 重试前暂停1秒
else:
return {"code": 0, "id": id, "error": f"请求错误 [已重试{str(attempt)}次]:{str(e)}", "attempt": attempt}
return {"code": 0,"id":id, "error": "重试次数已达上限","attempt":attempt}
# post请求 重试机制
async def curl_post( client: httpx.AsyncClient, url: str, data = None, cookie: Optional[dict] = None, headers: Optional[dict] = None, retries: int = 3):
# 设置重试次数
for attempt in range(1,retries + 1):
try:
response = await client.post(url=url, cookies=cookie, headers=headers, data=data)
response.raise_for_status() # 如果请求失败,抛出异常
html = response.text
return {"code": 1, 'response':response, "data":html , "attempt": attempt } # 返回解析后的 JSON 数据
except httpx.HTTPStatusError as e:
# 捕获 HTTP 错误(例如 404 或 500 错误)
if attempt < retries:
await asyncio.sleep(1) # 重试前暂停1秒
else:
return {"code": 0, "error": f"请求失败:{e.response.status_code}", "attempt": attempt}
except httpx.RequestError as e:
# 捕获网络错误(例如请求超时、连接错误等)
if attempt < retries:
await asyncio.sleep(1) # 重试前暂停1秒
else:
return {"code": 0,"error": f"请求错误 [{str(attempt)}] {str(e)}", "attempt": attempt}
except Exception as e:
if attempt < retries:
await asyncio.sleep(1) # 重试前暂停1秒
else:
return {"code": 0, "id": id, "error": f"请求错误 [{str(attempt)}]:{str(e)}", "attempt": attempt}
return {"code": 0, "error": "重试次数已达上限","attempt":attempt}
使用
async with httpx.AsyncClient() as client:
reslut = await curl_get(client, url)
if reslut.get("code") != 1:
return reslut.get("error")
response = reslut.get("response")
json = response.json()
评论 (0)