V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
uti6770werty
V2EX  ›  Python

Python 的多进程,考虑到会发生死进程,如何收敛结束,安全又方便?

  •  
  •   uti6770werty · 2020-07-06 20:44:42 +08:00 · 1999 次点击
    这是一个创建于 1642 天前的主题,其中的信息可能已经有所发展或是发生改变。
    最近把以前不会弄多进程时候,写的单进程的 python 程序,修改成用 concurrent 模块,使用多进程去运行。。。
    有个这样的场景没有办法解决:
    把封装好的函数(假如函数叫 executFunc())交给 futures.ProcessPoolExecutor()去执行,事实上发生了严重超时,超过了 1000 秒,想去结束它,发现没有办法在主程序流程上,在外部把这个进程所产生的系统进程结束( concurrent 没有这样的方法,只能默认一直等?),futures.Future()的.running(),一直返回 Ture
    假如 executFunc()创建了 4 个 chrome.exe ,如果把系统中的 chrome.exe 全 kill 了,又会影响到其它的进程。。。。
    请问各位高手,遇到这样的情况,应该如何处理? 或者有什么其它的模块更好面对这样的问题?
    executFunc() 是可以小修改一下 time.clock - startTime > 1000,然后就做些事情的。。。。
    4 条回复    2020-07-08 00:11:34 +08:00
    nightwitch
        1
    nightwitch  
       2020-07-06 20:50:25 +08:00
    ClericPy
        2
    ClericPy  
       2020-07-06 20:57:27 +08:00
    https://github.com/ClericPy/ichrome/blob/master/ichrome/daemon.py#L465

    https://github.com/ClericPy/ichrome/blob/master/ichrome/base.py#L115

    可以看下这两种杀 chrome 的方式

    有 debugging-port 的跟着端口号杀比较准, ProcessPoolExecutor 有挺多小坑, 没什么必要的情况下还是别用它
    joApioVVx4M4X6Rf
        3
    joApioVVx4M4X6Rf  
       2020-07-07 11:41:51 +08:00   ❤️ 1
    @ClericPy 这个开源我爱了,太酷了
    uti6770werty
        4
    uti6770werty  
    OP
       2020-07-08 00:11:34 +08:00
    @nightwitch 感谢链接,应该就是我要找的帮助,谢谢!
    @ClericPy 感谢告知可以根据 debugging-port 的方式 kill chrome

    试着写了个模拟的程序,进程是被停下来了,但是这个进程池好像再也用不料了。。。

    ```
    # coding=utf-8
    import codecs
    import difflib
    import os.path
    import re
    import time
    import string
    import chardet
    import shutil
    import copy
    from concurrent.futures import ProcessPoolExecutor

    import datetime

    def PrintCount(PName):
    if PName == '甲':
    DelayTime = 0.9
    if PName == '乙':
    DelayTime = 1.2
    if PName == '丙':
    DelayTime = 1.7
    if PName == '丁':
    DelayTime = 2
    if PName == '戊':
    DelayTime = 2.5

    countt = 0
    while True:
    countt += 1
    time.sleep(DelayTime)
    print(f'{PName}池:->',f'{countt}')


    if __name__ == '__main__':

    StartTime = time.clock()
    FutureDict = {}
    FutureRetDict = {}
    FutureTimeRecoderDict = {}

    PoolNameList = ["甲", "乙", "丙", "丁", "戊"]

    # 初始化进程池
    for i in range(len(PoolNameList)):
    # 进程
    FutureDict.update({PoolNameList[i]: ProcessPoolExecutor(max_workers=1)})
    # 进程 Ret
    FutureRetDict.update({PoolNameList[i]: futures.Future()})
    # 进程启动时间
    FutureTimeRecoderDict.update({PoolNameList[i]: None})

    # 模拟测试清空进程池的信号
    closeflag = True

    # 开始工作
    while True:
    ProcessNum = 0

    # 增加任务
    for ProcessName,FutureRet in FutureRetDict.items():

    # 模拟 40 秒后终结 [乙] 进程池
    if closeflag == True:
    if time.clock() - StartTime >= 40: # 在启动 40 秒后触发
    print(f"开始强制结束 [乙] 进程池")
    for pid, process in FutureDict['乙']._processes.items():
    process.terminate()
    FutureDict['乙'].shutdown()
    closeflag = False
    time.sleep(15)

    # 如果进程池在运行
    if FutureRet.running() == True:
    pass
    else:
    # 增加任务
    FutureRetDict[ProcessName] = FutureDict[ProcessName].submit(PrintCount,ProcessName)
    print(f'{ProcessName} 进程池提交了开始.')
    time.sleep(2)
    break
    time.sleep(3)

    ```



    40 秒之后,乙进程池的确被停了,但是再向乙进程池提交任务的时候,会提示:

    ```
    甲池:-> 57
    甲池:-> 58
    丙池:-> 25
    戊池:-> 13
    甲池:-> 59
    丁池:-> 19
    甲池:-> 60
    丙池:-> 26
    Traceback (most recent call last):
    File "D:/TestForMu.py", line 80, in <module>
    FutureRetDict[ProcessName] = FutureDict[ProcessName].submit(PrintCount,ProcessName)
    File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\concurrent\futures\process.py", line 452, in submit
    raise BrokenProcessPool('A child process terminated '
    concurrent.futures.process.BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore
    甲池:-> 61
    戊池:-> 14
    丁池:-> 20
    丙池:-> 27
    ```

    应该要做些什么事情,再次启用?
    原谅我这样设计流程,也许不是科学的,只是尽早希望能跑起多进程,把问题解决了就好。。。^_^
    谢谢大家解答!
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1332 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 17:47 · PVG 01:47 · LAX 09:47 · JFK 12:47
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.