V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
firejoke
V2EX  ›  Python

一些多进程多线程对文件操作的一些总结

  •  
  •   firejoke · 2018-05-12 22:16:10 +08:00 · 3533 次点击
    这是一个创建于 2147 天前的主题,其中的信息可能已经有所发展或是发生改变。
    读写文件,多进程和多线程

    # coding:utf-8
    """
    把大文件分块
    big_file.txt 是一个 500M 的 5 位数的乱序文档
    多线程并没有提升速度
    """

    import time
    txtfile = ''
    import threading
    def txtmap(txtup):
    with open('big_file.txt','r') as f:
    i = 0
    while i < 100000:
    txt = f.read(1)
    i += 1 if txt == ',' else 0
    txtup += txt
    # txtmap(txtfile)
    start = time.time()
    for i in range(100):
    txti = threading.Thread(target = txtmap(txtfile))
    txti.start()
    txti.join()
    print(time.time() - start)
    def txtmap2(txtup):
    with open('big_file.txt','r') as f:
    i = 0
    while i < 1000000:
    txt = f.read(1)
    i += 1 if txt == ',' else 0
    txtup += txt
    start = time.time()
    for i in range(10):
    txtmap2(txtfile)
    print(time.time() - start)


    多进程和队列初试
    import time
    from multiprocessing import Process
    from multiprocessing import Queue
    num = 0
    qnum = Queue()
    qnum.put(num)
    def testnum(num):
    num += 1
    qnum.put(num)
    for i in range(10):
    p = Process(target = testnum,args = (qnum.get(),))
    p.start()
    p.join()
    # testnum(num)
    print(qnum.get(),qnum.empty())
    在这里,qnum 属于实例化对象,不需要用 global 标记



    多次测试,发现多进程加队列,必须把文件指针位置也放进去,不然下一个读取位置就会乱跳
    with open('big_file.txt','r') as f:
    q.put ((txtfile3,f.tell()))
    def txtmap(qget):
    txtup = qget[0]
    i = 0
    f.seek(qget[1])
    while i < 10:
    txt = f.read(1)
    i += 1 if txt == ',' else 0
    txtup += txt
    q.put((txtup,f.tell()))
    start = time.time()
    for i in range(10):
    txtp2i = Process(target = txtmap,args =(q.get(),))
    txtp2i.start()
    txtp2i.join()
    print('多进程加队列',time.time() - start,'\n',q.get())

    以及这个 args 的赋值真是烦人,明明从 q.get()出来的就是元组,它非要你=后面必须是一个元组的形式才行


    # coding:utf-8
    """
    把大文件分块
    big_file.txt 是一个 500M 的 5 位数的乱序文档
    多进加队列速度 < 多进程全局变量 < 多线程加队列 < 多线程加全局变量 < 普通全局变量
    多进程加队列不稳定 多进程因为进程间通讯必须借助队列 Queue 或者管道 pipe 否则改变不了全局变量
    """

    import os
    import time
    from multiprocessing import Process
    from multiprocessing import Queue
    import threading
    txtfile = ''
    txtfile2 = ''
    txtfile3 = ''
    txtfile4 = ''
    q = Queue()

    #因为本子是 4 核的,所以我只创建了 4 个进程,python 的 GIL 的限制决定 4 核只能同时跑 4 个 python 进程,多了就要等待
    with open('big_file.txt','r') as f:
    def txtmap():
    i = 0
    global txtfile
    while i < 25:
    txt = f.read(1)
    i += 1 if txt == ',' else 0
    txtfile += txt
    print(txt)
    print (os.getpid ())
    print(txtfile)
    start = time.time()
    for i in range(4):
    txtpi = Process(target = txtmap)
    txtpi.start()
    txtpi.join()
    print('多进程全局变量',time.time() - start,'\n',txtfile)
    if txtfile:
    print(True)
    else:
    print(False)

    with open('big_file.txt','r') as f:
    def txtmap():
    i = 0
    global txtfile2
    while i < 10:
    txt = f.read(1)
    i += 1 if txt == ',' else 0
    txtfile2 += txt
    start = time.time()
    for i in range(10):
    txtti = threading.Thread(target = txtmap)
    txtti.start()
    txtti.join()
    print('多线程全局变量',time.time() - start,'\n',txtfile2)


    with open('big_file.txt','r') as f:
    q.put ((txtfile3,f.tell()))
    def txtmap(qget):
    txtup = qget[0]
    i = 0
    f.seek(qget[1])
    while i < 25:
    txt = f.read(1)
    i += 1 if txt == ',' else 0
    txtup += txt
    print (os.getpid ())
    q.put((txtup,f.tell()))
    start = time.time()
    for i in range(4):
    txtp2i = Process(target = txtmap,args =(q.get(),))
    txtp2i.start()
    txtp2i.join()
    print('多进程加队列',time.time() - start,'\n',q.get()[0])

    #因为队列 q 内的消息已被取完,所以再放进去一次,不然会一直处于阻塞状态等待插入消息
    q.put(txtfile3)
    with open('big_file.txt','r') as f:
    def txtmap(txtup):
    i = 0
    while i < 10:
    txt = f.read(1)
    i += 1 if txt == ',' else 0
    txtup += txt
    q.put(txtup)
    start = time.time()
    for i in range(10):
    txtt2i = threading.Thread(target = txtmap,args = (q.get(),))
    txtt2i.start()
    txtt2i.join()
    print('多线程加队列',time.time() - start,'\n',q.get())

    with open('big_file.txt','r') as f:
    def txtmap2():
    i = 0
    global txtfile4
    while i < 10:
    txt = f.read(1)
    i += 1 if txt == ',' else 0
    txtfile4+= txt
    start = time.time()
    for i in range(10):
    txtmap2()
    print('普通全局变量',time.time() - start,'\n',txtfile4)

    #os.path.geisize 返回的文件大小就是 seek 指针的最后一个位置
    print(os.path.getsize('big_file.txt'))

    with open('big_file.txt','r') as f:
    print(f.seek(0,2))



    #终于看到了多进程同时进行,哦呼!甚是欢喜!而且和文件的指针并不会冲突
    from multiprocessing import Process
    import time
    with open('test.txt', 'r') as f:
    def readseek(num):
    f.seek(num)
    print(time.time(),f.read(10))
    time.sleep(10)
    p1 = Process(target = readseek, args = (20,))
    p2 = Process(target = readseek, args = (60,))
    p3 = Process(target = readseek, args = (120,))
    p4 = Process(target = readseek, args = (160,))
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    p1.join()
    p2.join()
    p3.join()
    p4.join()
    print(time.time())
    第 1 条附言  ·  2018-05-14 08:20:23 +08:00
    放到了 GitHub 上,看着会舒服很多
    第 2 条附言  ·  2018-05-17 14:12:15 +08:00
    下面的链接放错了
    是这个 https://github.com/firejoke/python_test.git
    16 条回复    2018-05-18 08:47:57 +08:00
    firejoke
        1
    firejoke  
    OP
       2018-05-12 22:37:45 +08:00
    第一次发帖,改了大概 3 次后发现不能更改了......
    想再多进程全局变量那里加一个:多进程并不能改变全局变量,只能通过队列 Queue,或者管道 Pipe
    testsec
        2
    testsec  
       2018-05-12 22:39:51 +08:00 via iPhone
    Python 多线程只适用 io 密集场景
    firejoke
        3
    firejoke  
    OP
       2018-05-12 22:59:48 +08:00
    @testsec 是的,但是同一时刻也是只能运行一个线程,都是在等待,所以我觉得,要是文件真的超过 1G 的那种,仍然要用和核心数一样多的进程数,这样就可以同时运行和核心数一样多的线程
    neoblackcap
        4
    neoblackcap  
       2018-05-12 23:11:56 +08:00
    @firejoke 我建议你贴到外面的网站上,比如 github gist。你贴出来的代码实在影响大家的交流,缩进丢失,非常难以理解。
    我觉得你说的多线程没有用大概是实现的一些地方不是那么好,有其他因素抵消了多线程的优势。IO 本身是会释放 GIL,跟是不是多进程没关系
    lukefan
        5
    lukefan  
       2018-05-12 23:12:23 +08:00   ❤️ 1
    Python 的 IO 操作以及部分在底层释放 GIL 锁的类库并不受 GIL 限制.

    一般起 CPU 核数的进程针对的是 CPU 密集型操作, 并不是 IO 密集型操作.

    本地的文件 IO 一般情况下搞个单独的消费者线 /进程就够了, 多了性能反而差.

    一般爆协程、线程处理的 IO 密集型操作的 IO 主要是 socket、PIPE 这类, 并不是文件 IO.
    laqow
        6
    laqow  
       2018-05-12 23:14:02 +08:00 via Android
    感觉 python 的文本处理性能就是屎,特别是 utf8 这种变长编码的,既然不用管内容的话按二进制文件处理会快很多
    Applenice
        7
    Applenice  
       2018-05-12 23:58:09 +08:00
    像楼上们说的 Python 线程还是在 I/O 密集型应用上用的多,另外没有缩进看起来真的难受....0.0
    wspsxing
        8
    wspsxing  
       2018-05-13 04:50:31 +08:00 via Android
    依赖缩进的代码不要乱贴,如果不支持代码块就放到 github 之类,gist 在国内被 q 了。

    另外,不要用随机读写来加速磁盘访问。
    firejoke
        9
    firejoke  
    OP
       2018-05-13 12:51:35 +08:00
    @neoblackcap 第一次发帖,改了几次发现还是改不了,我贴到 GitHub 看下
    firejoke
        10
    firejoke  
    OP
       2018-05-13 12:53:13 +08:00
    @lukefan 我设想的是把大文件分四个块分别给四个进程处理,这样不就可以缩短时间吗?
    firejoke
        11
    firejoke  
    OP
       2018-05-13 12:53:47 +08:00
    @laqow 这个我没想过,我试试用 pickle 弄一下
    firejoke
        12
    firejoke  
    OP
       2018-05-13 12:54:39 +08:00
    @Applenice 第一贴 第一贴 第一贴,谅解一下下~我来发到 GitHub
    firejoke
        13
    firejoke  
    OP
       2018-05-13 12:57:52 +08:00
    @wspsxing 随机读写?读倒不是随机,就是命名的时候用的随机
    RicardoScofileld
        14
    RicardoScofileld  
       2018-05-14 13:24:29 +08:00
    可以试试 Dpark
    firejoke
        15
    firejoke  
    OP
       2018-05-17 14:02:33 +08:00
    firejoke
        16
    firejoke  
    OP
       2018-05-18 08:47:57 +08:00
    @RicardoScofileld 第一次知道这个框架,好像很好用,我仔细看看
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   3205 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 14:26 · PVG 22:26 · LAX 07:26 · JFK 10:26
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.