python 异步网络请求封装函数 带重试机制,支持get、post

EN
EN
2025-03-14 / 0 评论 / 19 阅读 / 正在检测是否收录...

python基于httpx的异步网络请求,带重试机制,支持post、get,自定义header、携带cookie、等功能。

函数

import asyncio
from typing import Optional

import httpx

'''
带重试机制的网络请求函数
post、get
'''

# get请求  重试机制
async def curl_get( client: httpx.AsyncClient, url: str, id:int = 0, cookie: Optional[dict] = None, headers: Optional[dict] = None, retries: int = 3):
    # 设置重试次数
    for attempt in range(1,retries + 1):
        try:
            response = await client.get(url, cookies=cookie, headers=headers)
            response.raise_for_status()  # 如果请求失败,抛出异常
            return {"code": 1,"id":id, "response":response,"attempt":attempt}  # 返回解析后的 JSON 数据
        except httpx.HTTPStatusError as e:
            if attempt < retries:
                await asyncio.sleep(1)  # 重试前暂停1秒
            else:
                return {"code": 0,"id":id, "error": f"请求失败:{e.response.status_code}","attempt":attempt}
        except httpx.RequestError as e:
            print("网络错误",e)
            # 捕获网络错误(例如请求超时、连接错误等)
            if attempt < retries:
                await asyncio.sleep(2)  # 重试前暂停1秒
            else:
                return {"code": 0, "id":id,"error": f"网络错误 [已重试{str(attempt)}次]:{str(e)}","attempt":attempt}
        except Exception as e:
            if attempt < retries:
                await asyncio.sleep(1)  # 重试前暂停1秒
            else:
                return {"code": 0, "id": id, "error": f"请求错误 [已重试{str(attempt)}次]:{str(e)}", "attempt": attempt}
    return {"code": 0,"id":id, "error": "重试次数已达上限","attempt":attempt}

# post请求  重试机制
async def curl_post( client: httpx.AsyncClient, url: str, data = None, cookie: Optional[dict] = None, headers: Optional[dict] = None, retries: int = 3):
    # 设置重试次数
    for attempt in range(1,retries + 1):
        try:
            response = await client.post(url=url, cookies=cookie, headers=headers, data=data)
            response.raise_for_status()  # 如果请求失败,抛出异常
            html = response.text
            return {"code": 1, 'response':response, "data":html , "attempt": attempt }  # 返回解析后的 JSON 数据
        except httpx.HTTPStatusError as e:
            # 捕获 HTTP 错误(例如 404 或 500 错误)
            if attempt < retries:
                await asyncio.sleep(1)  # 重试前暂停1秒
            else:
                return {"code": 0, "error": f"请求失败:{e.response.status_code}", "attempt": attempt}
        except httpx.RequestError as e:
            # 捕获网络错误(例如请求超时、连接错误等)
            if attempt < retries:
                await asyncio.sleep(1)  # 重试前暂停1秒
            else:
                return {"code": 0,"error": f"请求错误 [{str(attempt)}] {str(e)}", "attempt": attempt}
        except Exception as e:
            if attempt < retries:
                await asyncio.sleep(1)  # 重试前暂停1秒
            else:
                return {"code": 0, "id": id, "error": f"请求错误 [{str(attempt)}]:{str(e)}", "attempt": attempt}
    return {"code": 0, "error": "重试次数已达上限","attempt":attempt}

使用

async with httpx.AsyncClient() as client:
    reslut = await curl_get(client, url)
    if reslut.get("code") != 1:
        return reslut.get("error")
response = reslut.get("response")
json = response.json()
0

评论 (0)

取消