python爬虫-async


python爬虫-async

协程与任务

协程

1
2
3
4
5
6
7
8
9
10
11
协程 通过 async/await 语法进行声明,是编写 asyncio 应用的推荐方式
>>> import asyncio

>>> async def main():
... print('hello')
... await asyncio.sleep(1)
... print('world')

>>> asyncio.run(main())
hello
world
1
2
3
注意:简单地调用一个协程并不会使其被调度执行
>>> main()
<coroutine object main at 0x1053bb7c8>

三种主要机制

asyncio.run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
要真正运行一个协程,asyncio 提供了三种主要机制
asyncio.run() 函数用来运行最高层级的入口点 "main()" 函数 (参见上面的示例。)

import asyncio
import time

async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)

async def main():
print(f"started at {time.strftime('%X')}")

await say_after(1, 'hello')
await say_after(2, 'world')

print(f"finished at {time.strftime('%X')}")

asyncio.run(main())
1
2
3
4
started at 17:13:52
hello
world
finished at 17:13:55
asyncio.create_task()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
asyncio.create_task() 函数用来并发运行作为 asyncio 任务 的多个协程

让我们修改以上示例,并发 运行两个 say_after 协程

async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))

task2 = asyncio.create_task(
say_after(2, 'world'))

print(f"started at {time.strftime('%X')}")

# Wait until both tasks are completed (should take
# around 2 seconds.)
await task1
await task2

print(f"finished at {time.strftime('%X')}")
1
2
3
4
5
6
注意,预期的输出显示代码段的运行时间比之前快了 1 秒:

started at 17:14:32
hello
world
finished at 17:14:34

可被等待对象

1
2
3
如果一个对象可以在 await 语句中使用,那么它就是 可等待 对象。许多 asyncio API 都被设计为接受可等待对象。

可等待 对象有三种主要类型: 协程, 任务 和 Future

协程

1
2
3
4
5
6
Python 协程属于 可等待 对象,因此可以在其他协程中被等待

重要 在本文档中 "协程" 可用来表示两个紧密关联的概念:
协程函数: 定义形式为 async def 的函数;

协程对象: 调用 协程函数 所返回的对象

任务

1
2
3
任务 被用来“并行的”调度协程

当一个协程通过 asyncio.create_task() 等函数被封装为一个 任务,该协程会被自动调度执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio

async def nested():
return 42

async def main():
# Schedule nested() to run soon concurrently
# with "main()".
task = asyncio.create_task(nested())

# "task" can now be used to cancel "nested()", or
# can simply be awaited to wait until it is complete:
await task

asyncio.run(main())

Futures

1
2
3
4
5
Future 是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果。
当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。
在 asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调的代码。
通常情况下 没有必要 在应用层级的代码中创建 Future 对象。
Future 对象有时会由库和某些 asyncio API 暴露给用户,用作可等待对象
1
2
3
4
5
6
7
8
async def main():
await function_that_returns_a_future_object()

# this is also valid:
await asyncio.gather(
function_that_returns_a_future_object(),
some_python_coroutine()
)
1
一个很好的返回对象的低层级函数的示例是 loop.run_in_executor()

运行asyncio程序

1
2
3
4
5
6
asyncio.run(coro, *, debug=False)
执行 coroutine coro 并返回结果。
此函数会运行传入的协程,负责管理 asyncio 事件循环,终结异步生成器,并关闭线程池。
当有其他 asyncio 事件循环在同一线程中运行时,此函数不能被调用。
如果 debug 为 True,事件循环将以调试模式运行。
此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次

创建任务

1
2
3
4
5
6
7
8
asyncio.create_task(coro, *, name=None)
将 coro 协程 封装为一个 Task 并调度其执行。返回 Task 对象。

name 不为 None,它将使用 Task.set_name() 来设为任务的名称。

该任务会在 get_running_loop() 返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError。

此函数 在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的 asyncio.ensure_future() 函数
1
2
3
4
5
6
7
8
9
10
async def coro():
...

# In Python 3.7+
task = asyncio.create_task(coro())
...

# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())
...

休眠

1
2
3
4
5
coroutine asyncio.sleep(delay, result=None)
阻塞 delay 指定的秒数。
如果指定了 result,则当协程完成时将其返回给调用者。
sleep() 总是会挂起当前任务,以允许其他任务运行。
将 delay 设为 0 将提供一个经优化的路径以允许其他任务运行。 这可供长期间运行的函数使用以避免在函数调用的全过程中阻塞事件循环
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
这个函数从 3.7 开始就会隐式地获取当前正在运行的事件循环

以下协程示例运行 5 秒,每秒显示一次当前日期:

import asyncio
import datetime

async def display_date():
loop = asyncio.get_running_loop()
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)

asyncio.run(display_date())

并发运行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
awaitable asyncio.gather(*aws, return_exceptions=False)
并发运行aws序列中的可等待对象。

如果 aws 中的某个可等待对象为协程,它将自动被作为一个任务调度。

如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。

如果 return_exceptions 为 False (默认),所引发的首个异常会立即传播给等待 gather() 的任务。aws 序列中的其他可等待对象 不会被取消 并将继续运行。

如果 return_exceptions 为 True,异常会和成功的结果一样处理,并聚合至结果列表。

如果 gather() 被取消,所有被提交 (尚未完成) 的可等待对象也会 被取消。

如果 aws 序列中的任一Task或Future 对象 被取消,它将被当作引发了 CancelledError一样处理--在此情况下 gather()调用不会被取消。这是为了防止一个已提交的 Task/Future 被取消导致其他 Tasks/Future 也被取消。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import asyncio

async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({number}), currently i={i}...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")
return f

async def main():
# Schedule three calls *concurrently*:
L = await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)
print(L)

asyncio.run(main())

# Expected output:
#
# Task A: Compute factorial(2), currently i=2...
# Task B: Compute factorial(3), currently i=2...
# Task C: Compute factorial(4), currently i=2...
# Task A: factorial(2) = 2
# Task B: Compute factorial(3), currently i=3...
# Task C: Compute factorial(4), currently i=3...
# Task B: factorial(3) = 6
# Task C: Compute factorial(4), currently i=4...
# Task C: factorial(4) = 24
# [2, 6, 24]
1
注解 如果 return_exceptions 为 False,则在 gather() 被标记为已完成后取消它将不会取消任何已提交的可等待对象。 例如,在将一个异常传播给调用者之后,gather 可被标记为已完成,因此,在从 gather 捕获一个(由可等待对象所引发的)异常之后调用 gather.cancel() 将不会取消任何其他可等待对象

屏蔽取消操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
awaitable asyncio.shield(aw)
保护一个 可等待对象 防止其被 取消。

如果 aw 是一个协程,它将自动被作为任务调度。

以下语句:

res = await shield(something())
相当于:

res = await something()
不同之处 在于如果包含它的协程被取消,在 something() 中运行的任务不会被取消。从 something() 的角度看来,取消操作并没有发生。然而其调用者已被取消,因此 "await" 表达式仍然会引发 CancelledError。
如果通过其他方式取消 something() (例如在其内部操作) 则 shield() 也会取消。
如果希望完全忽略取消操作 (不推荐) 则 shield() 函数需要配合一个 try/except 代码段,如下所示:

try:
res = await shield(something())
except CancelledError:
res = None

超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
coroutine asyncio.wait_for(aw, timeout)
等待 aw 可等待对象 完成,指定 timeout 秒数后超时。

如果 aw 是一个协程,它将自动被作为任务调度。

timeout 可以为 None,也可以为 float 或 int 型数值表示的等待秒数。如果 timeout 为 None,则等待直到完成。

如果发生超时,任务将取消并引发 asyncio.TimeoutError.

要避免任务 取消,可以加上 shield()。

此函数将等待直到 Future 确实被取消,所以总等待时间可能超过 timeout。 如果在取消期间发生了异常,异常将会被传播。

如果等待被取消,则 aw 指定的对象也会被取消
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async def eternity():
# Sleep for one hour
await asyncio.sleep(3600)
print('yay!')

async def main():
# Wait for at most 1 second
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')

asyncio.run(main())

# Expected output:
#
# timeout!

简单等待

1
2
3
4
coroutine asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
并发地运行 aws 可迭代对象中的 可等待对象 并进入阻塞状态直到满足 return_when 所指定的条件。
aws 可迭代对象必须不为空。
返回两个 Task/Future 集合: (done, pending)
1
2
3
4
5
6
7
8
9
10
11
12
13
done, pending = await asyncio.wait(aws)

如指定 timeout (float 或 int 类型) 则它将被用于控制返回之前等待的最长秒数。

请注意此函数不会引发 asyncio.TimeoutError。当超时发生时,未完成的 Future 或 Task 将在指定秒数后被返回。

return_when 指定此函数应在何时返回。它必须为以下常数之一:
FIRST_COMPLETED
函数将在任意可等待对象结束或取消时返回。
FIRST_EXCEPTION
函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于 ALL_COMPLETED。
ALL_COMPLETED
函数将在所有可等待对象结束或取消时返回
1
与 wait_for() 不同,wait() 在超时发生时不会取消可等待对象

你给世界什么姿态,世界将还你什么样的人生。

本文标题:python爬虫-async

文章作者:TTYONG

发布时间:2020年04月04日 - 15:04

最后更新:2022年03月15日 - 15:03

原始链接:http://tianyong.fun/python%E7%88%AC%E8%99%AB-async.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%