Concurrency In Python - From Coroutines To Asynchronous Programming - Part 1
In the previous post, we saw how coroutines in Python can be used to achieve single threaded concurrency.
After reading that post, you might be asking
- Retrieving the value returned by a coroutine by catching a
StopIteration
exception and accessing thevalue
attribute seems a bit hacky, is there a cleaner way? - Whilst the example at the end worked, it was
- Pretty contrived and unrealistic as use cases go
- Quite verbose / complicated for a simple task
In this post, we will introduce yield from
which will go some way in addressing the
points above.
yield from
First, let’s see how the yield from
syntax works.
Like yield
, yield from
is only valid inside the definition of a function body and
turns a function into a generator / coroutine.
Unlike yield
, yield from
must be followed by an iterable.
yield from
converts the iterable into an iterator and exhausts it, yielding each value
along the way, e.g.
def coroutine(iterable)
yield from iterable
coro = coroutine('hello')
next(coro) # 'h'
next(coro) # 'e'
which is equivalent to
def coroutine(iterable):
for item in iterable:
yield item
As yield from
can be followed by any iterable, it can be followed by a generator
(recall generators are both iterables and iterators).
def coroutine(generator): # `generator` is a generator function
yield from generator()
def generator():
yield 'h'
yield 'e'
coro = coroutine(generator)
next(coro) # 'h'
next(coro) # 'e'
The above means coroutines can call other generators / coroutines in the same way a function can call other functions, i.e the above is analogous to
def func1():
return 1
def func2():
return func1()
func2() # 1
Without yield from
, we have to introduce another variable
def coroutine(generator):
yield generator()
def generator():
yield 'h'
yield 'e'
coro = coroutine(generator)
g = next(coro)
next(g) # 'h'
next(g) # 'e'
which looks even worse if another “intermediary” generator is introduced
def coroutine(generator):
yield generator()
def generator1():
yield generator2()
def generator2():
yield 'h'
yield 'e'
coro = coroutine(generator1)
g = next(next(coro))
next(g) # 'h'
next(g) # 'e'
Each time we add an intermediary generator, we have to make an extra call to next()
.
For $n$ generators, we’d have next(next(...))
$n$ times.
However, with yield from
:
def coroutine1(coroutine):
yield from coroutine()
def coroutine2():
yield from generator()
def generator():
yield 'h'
yield 'e'
coro = coroutine1(coroutine2)
next(coro) # 'h'
next(coro) # 'e'
the same code works regardless of the number of coroutines separating the outer and inner most objects.
This might seem like a trivial saving, but according to PEP 380, it often amounts to something substantial.
Now that we have seen how yield from
works, let’s see how it addresses the points raised
at the beginning of the post.
Accessing a coroutine’s return
value
yield from
allows a cleaner way of accessing a coroutine’s return
value:
results = []
def coroutine1():
yield "I'm not done, hitting pause"
return 1
def coroutine2():
result = yield from coroutine1()
results.append(result)
coro = coroutine2()
while 1:
try:
next(coro) # "I'm not done, hitting pause"
except StopIteration:
break
print(results) # [1]
More realistic, simpler concurrency example
Concurrency in the real world
We introduced concurrency in the previous post where it was defined as “making progress on multiple tasks in the same time period”.
We saw it applied to a particular situation and reduce the running time of a script.
More generally though, why do we care about concurrency when programming in Python?
In Python (and other languages), functions that perform I/O (moving data from A to B) can take a long time to run, introducing latency.
Data might be moved across a network, e.g. when making a HTTP request, from an external hard drive to local disk, from a local database to the local file system, etc.
In Python, programs are by default run synchronously, i.e. lines of code are executed in order, one after the other. If there are two lines of code, the second line of code can only run once the first line is completed.
So if the first line performs I/O, the second line can only run once the I/O is completed.
This is wasteful as each time the program waits for I/O to complete, the CPU is sitting idle (in terms of your program).
It is also time consuming; the running time of the program increases linearly with the number of I/O operations performed, i.e. its time complexity is $O(n)$.
For example, for a program performing $n$ I/O operations each taking two seconds:
I/O operations | Running time |
---|---|
1,000 | ~30 mins |
10,000 | ~5 hours |
1,000,000 | ~23 days |
1,000,000,000 | ~63 years |
To put this into perspective, there are about 50 billion webpages which Google crawl regularly. Given the speed at which Google seach results update, one suspects Google is not doing this synchronously!
One solution is to use concurrency expressed via asynchronous programming.
Now, when our program encounters a line of code that performs I/O, it does not block the rest of the program (this is why asynchronous is synonymous with non blocking). Instead, the next line of the program is run.
The drawback of this approach is that it makes your program more difficult to reason about, e.g. how do I know when the I/O performed by the first line has completed, and how do I process the result? What if there was an error during that I/O?
The advantage is that the program in our previous example now takes two seconds to run, rather than $2n$ seconds. This means our program’s running time is scaleable, as it is completely independent of $n$, the number of I/O operations, i.e. it is $O(1)$.
Why two seconds? Because for a program with $n$ asynchronous I/O operations with running times $(r_1,\ldots,r_n)$, the total running time is $\max(r_1,\ldots,r_n)$. And because we assumed each I/O operation take two seconds,
\[\max(r_1,\ldots,r_n) = \max(2,\ldots,2) = 2\]In reality, I/O operations are unpredictable, e.g. whilst the majority might take two seconds to complete, a few might take 10 seconds, and some might just hang.
This is why, in reality, you have timeouts on each I/O operation. This ensures the running time of your program never exceeds the timeout.
Concurrency and yield from
Implementing concurrency with yield from
means I/O code you wish to run concurrently
is isolated in self-contained units. This is beneficial as such code is often
complicated, involving low level interaction with the OS. This isolation takes place
by placing I/O code into coroutines.
These coroutines are then called by other coroutines written in the same way as normal, synchronous code.
For example,
def io_coroutine1():
# difficult, low-level I/O stuff
return result
def io_coroutine2(x):
# difficult, low-level I/O stuff
return result
def easy_coroutine():
# normal, synchronous Python
result1 = yield from io_coroutine1()
result2 = yield from io_coroutine2(result1)
return result2
Ok, easy_coroutine()
doesn’t quite look like normal, synchronous Python. But if
we remove yield from
, we are left with
def easy_coroutine():
result1 = io_coroutine1()
result2 = io_coroutine2(result1)
return result2
which does.
And here comes the punch line:
Although the code in easy_coroutine()
is basically normal
Python (apart from yield from
), the I/O in io_coroutine1()
and
io_coroutine2()
is non blocking.
For example, if we had easy_coroutine1()
and
easy_coroutine2()
, with I/O going on in both, progress in both coroutines could be
made in the same time period.
Further, in reality, the difficult I/O stuff going on in io_coroutine1()
and
io_coroutine2()
either comes from the asyncio
standard library or a third party
library, e.g. trio
, aiohttp
.
However, if this all sounds a little too good to be true, it is!
There is one missing piece to the jigsaw, which is that coroutines like
easy_coroutine1()
and easy_coroutine2()
have to be run in an event loop.
But this, again, is something you will never write yourself in production code. Rather,
it will be provided by an asynchronous event programming framework like asyncio
or
trio
.
For learning purposes though, let’s imagine the rough outline of a homemade implementation:
- Put all I/O code into coroutines, “I/O coroutines”.
- Implement rest of the program in “easy coroutines” (some making calls to “I/O
coroutines”) which, apart from
yield from
, are written like regular, synchronous Python functions - Run the “easy coroutines” in an event loop.
Let’s flesh out some of the details of the above in an example.
In our example, we have one “I/O coroutine”, network_io_coroutine()
, and one “easy coroutine”, coroutine()
, which calls network_io_coroutine()
to get a network response.
Our example will make two fake network I/O operations concurrently. It will use a basic event loop, displaying the results at the end:
import time
start = time.time()
def start_network_io():
pass # make low level call to OS to start network I/O
def is_network_io_complete(start):
# mock OS polling checking if network I/O is complete
# mocked so that network I/O completes after 3 seconds
return time.time() - start > 3
def get_network_io_response():
return 200
def network_io_coroutine():
start = time.time()
start_network_io()
while 1:
if is_network_io_complete(start):
break
yield # hand control back to the event loop
# network I/O complete, response ready to be returned
return get_network_io_response()
def coroutine():
# our "easy" coroutine
# 1st line is blocking BUT DOES NOT BLOCK EVENT LOOP
# 1st line unblocks when `network_io_coroutine` returns
# This blocking then unblocking makes the code easy to write
response = yield from network_io_coroutine()
network_io_responses.append(response)
network_io_responses = []
def main():
# "register" coroutines with event loop
coro1 = coroutine()
coro2 = coroutine()
coro_results = {'coro1': None, 'coro2': None}
current_coro = None
# start event loop
while coro_results['coro1'] is None or coro_results['coro2'] is None:
try:
if coro_results['coro1'] is None:
current_coro = 'coro1'
next(coro1) # start / resume 1st network I/O
if coro_results['coro2'] is None:
current_coro = 'coro2'
next(coro2) # start / resume 2nd network I/O
except StopIteration:
if current_coro == 'coro1': # 1st network I/O complete
coro_results['coro1'] = network_io_responses[-1]
if current_coro == 'coro2': # 2nd network I/O complete
coro_results['coro2'] = network_io_responses[-1]
print('Network I/O responses', network_io_responses)
print('Coroutine results', coro_results)
print(f'Script took {time.time() - start:.2f}s')
main()
Network I/O responses [200, 200]
Coroutine results {'coro1': 200, 'coro2': 200}
Script took 3.00s
We can see that although each network I/O operation takes three seconds, the script’s total running time is also three seconds.
As mentioned earlier, this running time is independent of the number of I/O operations.
To check this holds, let’s run the same script, only this time with three I/O operations instead of two.
The only code changes required are in main()
def main():
# "register" coroutines with event loop
coro1 = coroutine()
coro2 = coroutine()
coro3 = coroutine()
coro_results = {'coro1': None, 'coro2': None, 'coro3': None}
current_coro = None
# start event loop
while (coro_results['coro1'] is None or coro_results['coro2'] is None
or coro_results['coro3'] is None):
try:
if coro_results['coro1'] is None:
current_coro = 'coro1'
next(coro1) # start / resume 1st network I/O
if coro_results['coro2'] is None:
current_coro = 'coro2'
next(coro2) # start / resume 2nd network I/O
if coro_results['coro3'] is None:
current_coro = 'coro3'
next(coro3) # start / resume 3rd network I/O
except StopIteration:
if current_coro == 'coro1': # 1st network I/O complete
coro_results['coro1'] = network_io_responses[-1]
if current_coro == 'coro2': # 2nd network I/O complete
coro_results['coro2'] = network_io_responses[-1]
if current_coro == 'coro3': # 3rd network I/O complete
coro_results['coro3'] = network_io_responses[-1]
print('Network I/O responses', network_io_responses)
print('Coroutine results', coro_results)
print(f'Script took {time.time() - start:.2f}s')
Network I/O responses [200, 200, 200]
Coroutine results {'coro1': 200, 'coro2': 200, 'coro3': 200}
Script took 3.00s
which works as expected.
Conclusion
We can see how yield from
makes quite a big difference to the way we
write and organise our code when programming asynchronously in Python. By programming
asynchronously, we achieve concurrency which leads to performance benefits when doing
I/O operations.
Asynchronous programming in Python is usually implemented via a
framework such as asyncio
. However, in this post we saw how such an implementation
might look without a framework.
In the next post, we will take a more realistic approach, and
see fully fledged, genuine examples of asynchronous programming
in Python with the asyncio
framework.
Disclaimer: In no way, shape, or form do I claim all the content in this post to be my own work / not copied, paraphrased, or derived in any other way from an external source.
To the best of my knowledge, all sources used are referenced. If you feel strongly about any of the content in this post from a plagarism, copyright, etc. point of view, please do not hesitate to get in touch to discuss and resolve the situation.