有一个读文件然后写数据库的操作,想尝试使用协程。
使用协程的:
async def parse_text(file_path: Path, context_qs: [asyncio.Queue]):
ql = len(context_qs)
i = 0
# 每一个 Queue 放 step 个数据就切换下一个
step = 2
with open(file_path, encoding="utf8") as f:
for text in f:
if i // step == ql:
i = 0
context_q = context_qs[i // step]
context = {}
text = re.findall(r"\d+", text)
if text:
context = {"解析然后组装成 dict"}
await context_q.put(context)
# 这里如果不 join ,会一直在这个 for 循环里不出去
await context_q.join()
i = i + 1
else:
await context_q.put("结束标记")
return
async def write_db(context_q: asyncio.Queue, model: ModelBase):
async with AsyncSession() as session:
while 1:
context = await context_q.get()
if context["结束标记"] == "end":
return
info, obj = None, None
try:
if context["info"]:
info = await session.execute(
select(InfoModel).filter(
InfoModel.attr == context["info"]
)
)
info = info.scalars().one_or_none()
if not info:
info = InfoModel(attr=context["info"])
session.add(info)
if context["header"]:
obj = await session.execute(
select(model).filter(
model.header == context["header"]
).options(selectinload(getattr(model, "info")))
)
obj = obj.scalars().one_or_none()
if not obj:
obj = model(header=context["header"])
session.add(obj)
if obj or info:
if info not in obj.info:
obj.info.append(info)
session.add(obj)
await session.commit()
except Exception as e:
await session.rollback()
raise e
else:
context_q.task_done()
async def main():
# 每个读取文件并解析的方法对应 c_q_count 个写数据库的方法
c_q_count = 3
a_context_qs = [asyncio.Queue() for i in range(c_q_count)]
b_context_qs = [asyncio.Queue() for i in range(c_q_count)]
tasks = [
asyncio.create_task(
parse_text(Path("a.txt"), a_context_qs)
),
asyncio.create_task(
parse_text(Path("b.txt"), b_context_qs)
),
]
for i in range(c_q_count):
tasks.append(asyncio.create_task(write_db(a_context_qs[i], AModel)))
tasks.append(asyncio.create_task(write_db(b_context_qs[i], BModel)))
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main(), debug=settings.DEBUG)
不使用协程的:
def sync_read_file():
af = Path("a.txt").open(encoding="utf8")
bf = Path("b.txt").open(encoding="utf8")
with Session() as session:
while 1:
if af:
try:
text = af.readline()
context = parse_text(text)
sync_write_db(session, context, AModel)
except IOError:
af.close()
af = None
if bf:
try:
text = bf.readline()
context = parse_text(text)
sync_write_db(session, context, BModel)
except IOError:
bf.close()
bf = None
if not af and not bf:
return
def sync_write_db(session, context, model):
info, obj = None, None
try:
if context["info"]:
info = session.execute(
select(Info).filter(
Info.attr == context["info"]
)
)
info = info.scalars().one_or_none()
if not info:
info = Info(attr=context["info"])
session.add(info)
if context["header"]:
obj = session.execute(
select(model).filter(model.info == context["info"]))
obj = obj.scalars().one_or_none()
if not obj:
obj = model(info=context["info"])
session.add(obj)
if obj or info:
if info not in obj.info:
obj.info.append(info)
session.add(obj)
session.commit()
except Exception as e:
session.rollback()
raise e
if __name__ == '__main__':
sync_read_file()
这个协程的方法,每秒每个表可以写 400 多行,改为同步单线程的还是每秒写 400 多行。
不知道是我协程的用法有问题?还是说有别的什么原因?
1
jenlors 2021-11-20 23:11:37 +08:00
试试 aiofiles 之类的,你的文件 IO 还是同步的
|
2
Trim21 2021-11-20 23:15:05 +08:00
with open(file_path, encoding="utf8") as f:
for text in f: 这两行都是阻塞的 |
6
Nitroethane 2021-11-20 23:48:52 +08:00 via iPhone
@firejoke 如果读取文件速度比较慢,而且文件比较大的话影响应该比较明显
|
7
firejoke OP @Nitroethane #6 每行数据小于 1kb ,而且是用的 for ,这里相当于一个生成器
|
8
Trim21 2021-11-20 23:50:53 +08:00
@firejoke #5 不是,你的代码中仅仅会阻塞在 open 和 for text in f 这两行。在等待这两行底层的同步 io 完成的时间里是不会运行其他 task 的。
|
9
firejoke OP @Trim21 #8 我改成了 asyncfiles ,然后把队列的 join 去掉了,这次成功跳到了其他 await 的位置,确实如你所说,感谢!
但测试发现,虽然没了 io 的阻塞,但写入速度还是没太大变化,他每读一行,切到其他 task ,和我之前没读一行,join 住,就执行流程来说,是不是没差? |
10
Trim21 2021-11-21 00:51:20 +08:00 via Android
我没仔细看完整的代码,只是看到一开始就有同步阻塞的问题就回复了。
|
11
locoz 2021-11-21 02:50:56 +08:00
目测是正则导致的阻塞...有一说一你这种情况不太适合用 asyncio ,或者说不太适合没有包上隐式多进程的 asyncio ,毕竟不是纯粹的 IO 操作。然后文件操作方面 aiofiles 实际背后也是靠线程池跑的,这一点需要注意一下,有时候可能会导致踩坑。
|
12
documentzhangx66 2021-11-21 03:29:02 +08:00
先监视一下设备性能极限。
iostat -x -m -d 1 |
13
LeeReamond 2021-11-21 04:07:18 +08:00
大概看了一眼楼上说的应该没问题,并非所有类型的任务都能通过异步加速,你要做好心理准备。另外 aiofiles 的实现其实很丑陋。。楼上说是线程池跑的,我有点忘记具体情况了,只记得以前读源码的印象是很丑陋。。
|
14
Contextualist 2021-11-21 08:17:31 +08:00
看上去没有明显的问题,不过对于任何为了改进性能的重写建议还是先 profile 一下,看看瓶颈到底出在哪个调用上。
然后异步文件 IO 不是为了提升性能(降低平均延迟)的,而是为了降低尾延迟的,参见: https://trio.readthedocs.io/en/stable/reference-io.html#background-why-is-async-file-i-o-useful-the-answer-may-surprise-you |
15
2i2Re2PLMaDnghL 2021-11-21 08:47:44 +08:00
(我会尝试先把所有信息读进内存然后 timeit 数据库部分,看瓶颈是不是文件
|
16
lesismal 2021-11-21 11:37:53 +08:00
不是说给函数加上异步就是一切都异步了:
1. 异步的函数 A 2. A 内部调用 B C D ,B C D 有任意同步阻塞的行为,A 也一样跟着阻塞 py 的性能痛点远不只是 asyncio 就能解决的了的,how about trying golang -_- |
17
firejoke OP @locoz #11 我也感觉似乎没发挥出 asyncio 的优势,每一条数据都不超过 1kb ,所以可能除了数据库操作稍微耗时长一点,其他地方等待的很少,所以和单线程的性能差不多?另外请教一下,“没有包上隐式多进程” 具体是指什么呢?
|
18
firejoke OP @documentzhangx66 #12 设备性能应该没问题,12 核 24 线程,64G 内存,磁盘读取速度也没有跑满,IO 读写也不是特别高。
|
19
firejoke OP @LeeReamond #13 嗯,我昨天也想了一下,如果每一步阻塞住的操作实际上都很快,那 asyncio 其实发挥不出切换等待的优势。
|
20
locoz 2021-11-21 12:37:05 +08:00
@firejoke #17 建议用调试工具或者排除法看看具体是哪里拖慢了,单看代码和前面的讨论我感觉是正则部分导致的。
前面没讲清楚,“包上隐式多进程的 asyncio”指的是把多进程和协程结合,开一堆子进程然后每个子进程一个 eventloop ,因为之前有看到过一个专门的库把这部分操作给隐式处理了,使用起来两三行搞定,不需要自己写进程管理部分。然后一些框架其实也会隐式地做这种结合处理来提高效率。 |
21
firejoke OP @Contextualist #14 看文档的意思,是说用异步文件 IO ,在从内存读取时反倒会变慢,在从磁盘读取的时候会加快,在不同环境下其结果是不可预测的。那我如果单独用一个进程读取文件到内存,然后另一个进程从内存读取然后再操作,应该可以绕开这个问题。
|
22
firejoke OP @locoz #20 我昨天最后也是改成用多进程了,一个进程专门读文件,然后放进队列,其他子进程从队列读,然后操作数据库,那看来我思路没跑偏。还有其他的解法吗?多进程和协程的结合,一般都是以多进程为主吗?
|
23
Contextualist 2021-11-21 14:01:03 +08:00
又看了一下你贴出来文件的部分,你是不是就两个大文件(就是说不是大量小文件),那文件 IO 就基本不可能是你的瓶颈,你看到磁盘读取没跑满很有可能是你下游的处理速度没跟上。
多进程和协程,感觉你自己也总结出来了。协程得用在有长时间等待系统调用 (syscall) 的地方(比如网络、子进程、定时任务)。CPU 密集的操作得用多线程或多进程,但在 Python 里有 GIL ,就只能用多进程。 |
24
firejoke OP @Contextualist #23 是的,就是两个大文件,所以我也觉得文件 IO 不是我这里的瓶颈,协程在这个场景中没体现出他的优势,我已经改成了多进程了。
|
25
Contextualist 2021-11-21 14:26:48 +08:00
我对数据库不熟,不过我猜对于很多数据库并发写是不会有性能提升的,用单线程就可以了,但你可能需要 batch / bulk 操作,用来一次性插入数十条、数百条数据,而不是一次插入一条。
|
26
O5oz6z3 2021-11-21 15:05:20 +08:00
虽然不懂,看完楼上感觉原因之一在于 asyncio 的上限就是单线程,而单线程吞吐量不如多线程?
|
27
firejoke OP @Contextualist #25 对欸!资源是消耗在每一条查询和写入的操作上,如果批量写,就可以降低写入频率,至于查询,我已经在查询字段上加了索引,我改一下试试。感谢~
然后我看到你之前提到的 trio ,看他的文档像是涉及到异步操作的都有涉及,感觉非常不错啊。 |
29
yufpga 2021-11-21 20:01:53 +08:00
大概看了下, 这瓶颈显然不是在 parse_text 中的文件读,就算再怎么阻塞,读写本地文件也不至于到每秒才 400 行的程度. 而在 write_db 中, 出现好几处 await 的地方, 这些地方可都是要同步等待结果返回的呀. 一个很好容易验证的方法就是把 write_db 中的 await 用 await asyncio.sleep 替换掉, 尝试不同的 sleep 时间. 实际上上面的问题在于每一次 while 1 的循环循环是同步的, 你必须要先处理完队列中的前一条数据, 才能继续处理下一条数据. 所以处理也很简单, 把每一次的循环异步化掉.
|
30
hustlibraco 2021-11-22 00:36:59 +08:00
用```async for```替代```for```可以吗?
|
31
firejoke OP @hustlibraco #30 换成异步文件读,就可以换成 async for 了。
|
32
firejoke OP @yufpga #29 我看日志里,我同时开了好多个 task ,这个 task 的循环里 await query 或 add 或 commit ,就会跳到另一个 task 的循环里的 query 或 add 或 commit 。
|
33
yufpga 2021-11-22 10:17:06 +08:00
@firejoke 是我看差了, 我以为只有一个 queue, 而你的代码里是两个 context, 各自 3 个 queue, 也就是总共 6 个 queue, 对应 6 个 write_db 的 task. 当遇到 await 的时候, 确实是会跳转到别的 task 里面执行. 确实比较奇怪,但我仍然觉得瓶颈不大可能在 parse_text, 你可以试着记录一下队列写入数据的速率, 如果这个速率也在 400/s 左右, 那说明确实有可能是 parse_text 慢了
|
36
mlbjay 2023-07-21 15:01:08 +08:00
python 的 asyncio 是纯用户态线程,同步 io 会阻塞整个线程及其中的所有协程。
Golang 的 MPG 模型就解决的这个问题。 |