V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
tomtao00001
V2EX  ›  Python

请教 aiohttp clientSession 认证过期带来的重定向问题

  •  1
     
  •   tomtao00001 · 2022-01-12 10:51:06 +08:00 · 1698 次点击
    这是一个创建于 857 天前的主题,其中的信息可能已经有所发展或是发生改变。

    使用 aiohttp 的背景:

    flask 项目需要访问阿里云上部署的 kubeflow api 服务, 这个 api 参数需要使用 query 传递, 因 query 构造过长报了 413,后考虑拆分 query 并发访问服务.

    尝试 1 aiphttp + asyncio

    大致代码抽象如下

    import asyncio
    import json
    import re
    import functools
    from typing import Dict, List
    
    import aiohttp
    from loguru import logger
    
    
    def kubeflow_auth_with_async(func):
        """做 kubeflow 的 auth"""
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
    
            async with aiohttp.ClientSession() as session:
                # login
                payload = {
                    "username": conf.USERNAME,
                    "password": conf.PASSWORD,
                }
                await session.post(conf.AIX_AUTH_URL, data=payload)
                # get req
                # async with session.get(conf.PIPELINE_URL) as response:
                #     text = await response.text()
                response = await session.get(conf.PIPELINE_URL)
                text = await response.text()
                pattern = r"/dex/auth/aix\?req="
                index_beg = re.search(pattern, text).span()
                index_end = text.find('"', index_beg[1])
                req = text[index_beg[1] : index_end]
                params = {"req": req}
                # login kubeflow with aix
                await session.get(conf.DEX_AUTH_URL, params=params)
    
                return await func(session, *args, **kwargs)
    
        return wrapper
    
    
    async def async_get_runs(
        session: aiohttp.ClientSession,
        page_size=1,
        page_token=None,
        experiment_id=None,
        filters=None,
        sort_by="created_at desc",
    ):
    
        if not filters:
            filters = {}
    
        query = {
            "page_size": page_size,
            "page_token": page_token,
            "sort_by": sort_by,
            "resource_reference_key.type": "EXPERIMENT",
            "resource_reference_key.id": experiment_id,
            "filter": json.dumps(filters),
        }
        response = await session.get(
            url=f"{conf.PIPELINE_URL}/apis/v1beta1/runs", params=query
        )  
    
        # async with session.get(
        #     url=f"{conf.PIPELINE_URL}/apis/v1beta1/runs", params=query
        # ) as response:
        # response.raise_for_status()
        # return await response.json()
    
        response.raise_for_status()
        return await response.json()
    
    
    @kubeflow_auth_with_async
    async def gather_fetch_runs(
        session: aiohttp.ClientSession,
        runs_lst:List[str],
    ):
        exp_id = "xxxxx"
        tasks = []
        for sub_runs in runs_lst:
            filters: dict = {
                "predicates": [
                    {
                        "key": "id",
                        "op": FILTER_OPERATIONS.IN.value,
                        "string_values": {"values": sub_runs}, 
                    },
                ]
            }
            tasks.append(
                async_get_runs(
                    session,
                    page_size=len(sub_runs),
                    experiment_id=exp_id,
                    filters=filters,
                )
            )
    
        return await asyncio.gather(*tasks)
    
    
    res = []
    for r in asyncio.run(gather_fetch_runs(["xxx","xxx","xx"])):
        res.extend(r.get("runs", []))
    

    请教 V 站大佬

    出现的报错,不知道怎么贴图,手动概括一下异常

    服务稳定运行一段时间后,会出现突然 500 刷新又可以坚挺一段时间
    aiohttp.client_exceptions.ContentypTypeError
    从报错信息上看像是 session 失效导致的,被 auth 重定向到了登录页. contentType 变成了 text/html
    而请求的 url 重定向到了登录的 url,已经不是我传入的那个
    
    因为没试过 async 函数的装饰器,不知道是不是这个问题,
    还有就是 clientSession 的连接池是不是保存了 session 的状态. 装饰器每次都重新请求了.这个 session 按道理不应该还是一样的.
    
    

    尝试 2 gevent + request

    也试了,没有这个问题,不过访问次数多起来之后容易莫名 GG,大概率是被自己家部署的 kubeflow 反爬了?

    4 条回复    2022-01-19 17:15:03 +08:00
    amlee
        1
    amlee  
       2022-01-15 06:48:07 +08:00
    你可能是想在装饰函数里面获取 token ,然后 session 发出的每次请求的请求头都要带上这个 token ?
    如果我理解没错的话,装饰器里面的 wrapper 只执行一次的,你可以看看服务端的 token 过期时间。
    tomtao00001
        2
    tomtao00001  
    OP
       2022-01-17 15:17:08 +08:00
    @amlee emm 服务器每次获取请求后, 内部实现都会用这个装饰器, 您指的 wrapper 执行一次, 我不是很理解, 表达的是 wrapper 函数外的作用域只执行一次么? 如果是这个意思 , 对于目前这个方式来讲 它应该是不影响的对么? 不知道我理解的对不对.
    amlee
        3
    amlee  
       2022-01-18 19:40:41 +08:00
    @tomtao00001 之前看你代码不仔细,我上面那个回答是错误的,不要理会上面那个回答了。

    我重新读了一遍你的代码,你说的“每次获取请求,内部函数都会用这个装饰器”跟你的代码行为有误差。

    你通过 gather_fetch_runs 构建一组了协程对象并发执行,但这一组协程对象通过 gather_fetch_runs 的 @kubeflow_auth_with_async 装饰器赋予了同一个 session 对象。当这一组协程并发运行足够长的时间,登录会超时。而你的登录状态是保存在同一个 session 中的,这一组协程共同使用这个 session ,所以登录超时以后这个 session 失效,你这一组并发的协程也会请求失败。

    而你说的刷新又好了的情况,或许是重新运行了一遍 gather_fetch_runs ,这会导致重新构建一组协程,重新构建一个已登录的 session
    tomtao00001
        4
    tomtao00001  
    OP
       2022-01-19 17:15:03 +08:00
    @amlee 好的 非常感谢回复 最后还是换了一种方式 这个就直接放弃了
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1101 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 28ms · UTC 18:43 · PVG 02:43 · LAX 11:43 · JFK 14:43
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.