V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
zckun
V2EX  ›  Python

Python asyncio 下载爬虫没法停止的一个问题

  •  
  •   zckun · 2019-05-09 11:08:43 +08:00 · 2386 次点击
    这是一个创建于 2069 天前的主题,其中的信息可能已经有所发展或是发生改变。

    一个礼拜前粗略的学习了下 asyncio + aiohttp 实现异步爬虫,三天前为了联手写了一个 ins 下载爬虫。 爬虫思路草图:a.png用文字描述把: 首先我参考了 aiohttp 官方的爬虫例子,官方爬虫例子在这:crawl.py,我的思路是这样的

    1.获取 user id ; user id 是必须的,所以我把这个写成了一个方法在创建类的时候直接调用

    2.获取页的数据;因为没发说是第几页,这个请求需要三个参数,一个是 user id,一个是一次获取到的数量,第三个是 end_cursor 用来获取下一页

    3.解析数据;获取到的数据是 json 格式的,我需要获取两个东西,第一个是图片链接,第二个是 end_cursor,用来获取下一页

    4.处理 url ;这个方法遍历 urls 调用 download 方法下载

    5.下载;用到了 aiohttp 和 aiofiles,没有异常、下载完后我用 asyncio.Task(self.get_display_urls(end_cursor))回到了获取页数据的方法,以此循环,当然获取页数据的方法有判读 end_cursor 是否为空,直接 loop.stop()

    为了实现抓取所有图片,我没有使用 run_until_complete,因为它只获取了一次就停了,我就是用的 run_forever

    全部代码如下:

    import aiohttp
    import asyncio
    import aiofiles
    import aioredis
    import re
    import json
    import os
    import signal
    import time
    import logging
    
    
    class Instagram(object):
    
        def __init__(self, username, loop, depth=0, maxtasks=200):
            """
            :param username: 用户名
            :param loop:
            :param depth: 下载页面数量
            :param maxtasks: 最大并发限制
            """
            self.down_tasks = set()
            self.down_todo = set()
            self.down_busy = set()
            self.down_done = {}
    
            self.loop = loop
            self.sem = asyncio.Semaphore(maxtasks, loop=loop)
    
            self.username = username
            self.max_page = depth if depth >= 1 else -1
    
            self.ROOT_URL = 'https://www.instagram.com/'
            self.headers = {
                'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.131 Safari/537.36',
                'Cookie': 'rur=ATN; mid=XM2K8QAEAAGy8fiEf1b2T05Pssas; fbm_124024574287414=base_domain=.instagram.com; fbsr_124024574287414=ns7o0TqnERhbPihnN390KYuDdI7xVM2vgUunMZT4URY.eyJjb2RlIjoiQVFESlVpaVhaSFNwWnBTZ2VGUE1nUGlfUXlsdElpRG9vOHJDdHB3Qm14Q25rNUx6YnJsNHdBX1JRVnowaDREU3J4ZzFGTWVHWHdlWFlhVGxuVi0yMk84ZXdlUVBNWTg5bVF6MFg5RG40b3psSEozTGk4WW40N1lPeFQzdE0yQUNJWkg5SWh1VmhpRHBoaXZ4ZXNMM3dhc2hMcHdQQ2RkSDZWR2FQMlR1QVM4V3U1SElGTERWaEpfYzl3akstem94TFl3QWRESE9wSjNwcDlhTjVhcXFBWGlWM0lfNTducGZ0cmpCWlFLd2xUZzlYZjBEbUlFdmR5RTBsMng3OEY0RkJ6Q1NtNWEzQ2RISTRYckVqNXB6LWVrYjRyNHRza05HOUhHUmZSaXAwS0hya1VqQ3l4T3YwNDBEU2txOHI4MGJvZG9GU3o4THFHelpSckZ4dldVMjNUWGhkZ2d6MTEzbHNfVnN5T1V5X01EUHZlSHVtUkQ5bXJ1V01ObGUxOFBuV2hvIiwidXNlcl9pZCI6IjEwMDAyNDA3NTU3MTE2NyIsImFsZ29yaXRobSI6IkhNQUMtU0hBMjU2IiwiaXNzdWVkX2F0IjoxNTU3MDQ0NTE0fQ; csrftoken=2JzdvnHL9iMuxbV7KiJcASk8RlKuYWAQ; shbid=2545; shbts=1557044558.2494695; ds_user_id=5561946202; sessionid=5561946202%3AwE5Vb00lI1bmIb%3A23; urlgen="{"2001:19f0:7001:1e1d:5400:1ff:fef7:67fd": 20473}:1hND0O:dQodCbp0SM_24vfenOyhBT-Curk"'
            }
            self.proxy = "http://localhost:8001"
            t = asyncio.ensure_future(self.init(), loop=loop)
            loop.run_until_complete(t)
    
        async def run(self):
            """
            :return:
            """
            await self.init()
            t = asyncio.ensure_future(self.addurls(), loop=self.loop)
            while self.down_busy:
                await asyncio.sleep(1, loop=self.loop)
            await t
            self.loop.close()
    
        async def init(self):
            """
            初始化必要参数:user id
            :return:
            """
            print('[init] 初始化参数...')
            shared_data = await self.get_shared_data()
            if not shared_data:
                print('!!!!!!!')
                exit(0)
            self.user_id = re.findall('"logging_page_id":.?"profilePage_(.*?)"', shared_data)[0]
    
        async def _http_request(self, url, **kwargs):
            """
            http 请求
            :param url: 请求链接
            :param kwargs: 链接参数
            :return: 网页 response
            """
            params = dict()
            if kwargs:
                for k, v in kwargs.items():
                    params.update(v)
            async with self.sem:
                async with aiohttp.ClientSession() as session:
                    try:
                        async with session.get(url, timeout=10, proxy=self.proxy, headers=self.headers,
                                                    params=params) as response:
                            html = (await response.read()).decode('utf-8', 'replace')
                            return html
                    except Exception as exc:
                        logging.warning("[_http_request] 异常: {}".format(exc))
    
        async def get_shared_data(self):
            """
            获取 shared data
            :return:
            """
            html = await self._http_request(self.ROOT_URL + self.username)
            if html:
                shared_data = html.split("window._sharedData = ")[1].split(";</script>")[0]
                return shared_data
    
        def get_ends_cursor(self, html):
            """
    
            :param html:
            :return:
            """
            if html:
                edge_media = json.loads(html)['data']['user']['edge_owner_to_timeline_media']
                edges = edge_media['edges']
                if edges:
                    end_cursor = edge_media['page_info']['end_cursor']
                    has_next_page = edge_media['page_info']['has_next_page']
                    if has_next_page:
                        return end_cursor
                    return ''
    
        async def get_display_url(self, max=50, end_cursor=""):
            """
            解析 display url
            :param max: 单次获取图片总量
            :param end_cursor: end_cursor 是获取下一页的参数
            :return: 包含{max}数量的图片链接列表
            """
            pic_params = {
                'query_hash': 'f2405b236d85e8296cf30347c9f08c2a',
                'variables': '{{"id":"{0}","first":{1},"after":"{2}"}}'.format(self.user_id, max, end_cursor),
            }
            pic_url = self.ROOT_URL + 'graphql/query/'
            html = await self._http_request(pic_url, parms=pic_params)
            if html:
                edge_media = json.loads(html)['data']['user']['edge_owner_to_timeline_media']
                edges = edge_media['edges']
                if edges:
                    display_urls = []
                    for edge in edges:
                        display_urls.append(edge['node']['display_url'])
                    return display_urls, self.get_ends_cursor(html)
    
        async def download(self, url):
            """
            下载到本地
            :param url:
            :return:
            """
            print('processing:', url)
            # try:
                # async with self.sem: //如果使用 Semaphore 会卡住。。。虽然不会报错
            self.down_todo.remove(url)
            self.down_busy.add(url)
            path = './instagram/' + self.username
            if not os.path.exists(path):
                os.makedirs(path)
    
            filename = path + '/' + url.split('?')[0].split('/')[-1]
            print('start download:', url)
            async with aiohttp.ClientSession() as session:
                try:
                    async with session.get(url, headers=self.headers, proxy=self.proxy) as resp:
                        if resp.status == 200:
                            f = await aiofiles.open(filename, 'wb')
                            await f.write(await resp.read())
                            await f.close()
                            await asyncio.Task(self.addurls(self.end_cursor))
                        resp.close()
                        self.down_done[url] = True
                except Exception as exc:
                    logging.error('[download]下载异常,异常:{}\n 链接:{}'.format(repr(exc), url))
                    self.down_done[url] = False
    
            self.down_busy.remove(url)
            print(len(self.down_done), 'completed tasks,', len(self.down_tasks),
                  'still pending, todo', len(self.down_todo))
            # 这个判断根本没有任何用,不会调用,直接卡住
            if self.end_cursor is False:
                print('下载完 la')
                self.loop.close()
    
            # except Exception as exc:
            #     logging.error('[download]下载异常,异常:{}\n 链接:{}'.format(repr(exc), url))
    
        async def add_down_urls(self, urls):
            print('[add_down_urls] 开始下载,数量:', len(urls))
            async with asyncio.Semaphore()
            for url in urls:
                self.down_todo.add(url)
                await self.sem.acquire()
                task = asyncio.ensure_future(self.download(url), loop=self.loop)
                task.add_done_callback(lambda t: self.sem.release())
                task.add_done_callback(self.down_tasks.remove)
                self.down_tasks.add(task)
    
        async def addurls(self, end_cursor=""):
            """
            :param end_cursor: 当前页面的标示 base64 加密,用于加载下一页,如果没有下一页改参数为 Fasle
            :return:
            """
            print("\n\n 开始获取下一页,end_cursor:", end_cursor)
    
            display_urls, self.end_cursor = await self.get_display_url(end_cursor=end_cursor)
            await self.add_down_urls(display_urls)
            if not self.end_cursor:
                return
    
    
    '''
    流程:
    run() --> addurls() --> add_own_urls() --> download()
                 ^                                |
                 |                                |
                 <-------<-----<--------<-------<--
    '''
    
    
    if __name__ == '__main__':
        start = time.time()
        loop = asyncio.get_event_loop()
        ins = Instagram('taeri__taeri', loop)
        future = asyncio.ensure_future(ins.addurls(), loop=loop)
        try:
            loop.add_signal_handler(signal.SIGINT, loop.stop)
        except RuntimeError:
            pass
        loop.run_forever()
        # loop.run_until_complete(future)
        # for i in future.result():
        #     print(">>>>", i)
        # ins.main()
        end = time.time()
        print('耗时:', end - start)
    
    

    我遇到的问题是不使用 Semaphore 的情况下一开始是疯狂下载,也的确是下载成功了,然后就直接卡住,也不停,就一直卡住(原谅我使用卡住这个词),希望能帮忙看一下错在哪,谢谢了

    10 条回复    2019-05-09 16:05:44 +08:00
    zckun
        1
    zckun  
    OP
       2019-05-09 11:09:23 +08:00
    草图在这。。。为了找画图工具用了半个多小时 https://github.com/ZCKun/d/blob/master/a.png
    zckun
        2
    zckun  
    OP
       2019-05-09 11:11:55 +08:00
    ins 是新号,所以 cookie 没去掉就算了
    zckun
        3
    zckun  
    OP
       2019-05-09 11:15:11 +08:00
    不能重新编辑么。。代码有些错误忘了删除,希望别介意
    zckun
        4
    zckun  
    OP
       2019-05-09 14:08:25 +08:00
    emmmm
    CSM
        5
    CSM  
       2019-05-09 14:15:46 +08:00   ❤️ 1
    你好,研究了下你的代码,发现一个小问题

    # 这个判断根本没有任何用,不会调用,直接卡住
    if self.end_cursor is False:

    这个是因为之前没有下一页的时候 end_cursor 是 '' 空字符串,而不是 False。


    另外就是我觉得你的架构上有问题,这个问题是经典的生产者-消费者模型,请求并解析出图片链接作为生产者,然后启动多个消费者来下载这些链接就行了。我重构了一下你的代码,具体可见 https://gist.github.com/cshuaimin/4cf8d769b88e93fc805ceefb9af8c1f4
    CSM
        6
    CSM  
       2019-05-09 14:25:25 +08:00
    还有就是可以看到在 _http_request 方法里为每一个请求都生成了一个 ClientSession,这样太浪费了,建议只用一个 session。doc:

    Session encapsulates a connection pool (connector instance) and supports keepalives by default. Unless you are connecting to a large, unknown number of different servers over the lifetime of your application, it is suggested you use a single session for the lifetime of your application to benefit from connection pooling.

    https://docs.aiohttp.org/en/stable/client_reference.html
    zckun
        7
    zckun  
    OP
       2019-05-09 14:49:42 +08:00
    @GSM 之前 get_end_cursor 方法获取不到的话是返回 False 的,,忘了改了
    zckun
        8
    zckun  
    OP
       2019-05-09 14:50:01 +08:00
    @CSM 谢谢你,我看一下
    shawndev
        9
    shawndev  
       2019-05-09 15:06:32 +08:00
    GvR 有一个 500lines 项目你可以参考一下。另外楼上说的很对,这是典型的生产者消费者模式,另外似乎没有考虑去重、重定向和超时重试?
    zckun
        10
    zckun  
    OP
       2019-05-09 16:05:44 +08:00
    @shawndev 知道了,以前学 java 的时候学过,但是已经忘得差不多了,我去复习,谢谢
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5044 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 32ms · UTC 09:57 · PVG 17:57 · LAX 01:57 · JFK 04:57
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.