Where I share about my journey in tech and life.
This article is writen manually and uses ai in order to check the spelling.
Coroutines are a great way to handle blocking calls like I/O in a non-blocking manner. Let's explore how they are implemented.
During a research project I worked on, I had to use Bluetooth Low Energy to communicate with a hardware device. I decided to use Bleak to test the hardware. Bleak is a useful library implemented asynchronously. At the time, I had never used asynchronous programming in Python. Given time constraints, I decided to go with it and learn from the examples.
Now, two years later, it's time to fully understand what I did.
Asynchronous programming is a method of parallel programming that allows blocking calls to be handled in a non-blocking way. One approach to implementing asynchronous programming is through coroutines.
Coroutines are objects returned by a function defined using the async
keyword.
def function():
pass
async def async_function():
pass
print(type(function)) # <class 'function'>
print(type(async_function)) # <class 'function'>
print(type(async_function())) # <class 'coroutine'> + RuntimeWarning
Note that async_function
itself is a function, but a coroutine is created when it is called.
Coroutines are one of the building blocks of asynchronous programming. Another important concept is generators.
Calling async_function()
without awaiting it raises a runtime warning:
/Users/username/dev/AsyncAwait/test3.py:11: RuntimeWarning: coroutine 'coroutine' was never awaited
print(type(coroutine()))
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
This warning is self-explanatory, but keep it in mind—we'll revisit it later.
A generator is a special kind of function that includes the yield
keyword.
For completeness, there are other ways to create generators using magic methods, but here we will focus on the yield
method.
def function():
return None
def generator():
yield None
print(type(function)) # <class 'function'>
print(type(generator)) # <class 'function'>
print(type(generator())) # <class 'generator'>
Note that the generator
function is still a function, but when called, it returns a generator object.
A generator executes synchronously until it reaches the yield
statement. The function then retains its state and allows the caller to continue execution. When next()
is called on the generator, execution resumes from where it left off. When the generator reaches a return
statement, it raises a StopIteration
exception containing a return value.
Here’s an example of a simple generator:
def useless_generator():
def coroutine():
value = yield # Suspends execution and waits for a value to be sent
return value
c = coroutine() # Creates a generator object
next(c) # Advances the generator to the first yield statement
try:
c.send(1) # Sends a value to the generator
except StopIteration as e:
print(e.value) # int containing 1
A Task is a class that wraps generators as coroutines, implementing the necessary asynchronous management through magic functions.
class Task:
def __init__(self, coro):
self.name = coro.__name__
self.coro = coro
self.finished = False
self.res = None
self.callback: Callable[[Any], None] = None
self.catch_method: Callable[[Any], None] = None
def __await__(self):
while not self.finished:
yield self
return self.res
To make the task awaitable, we implement the __await__
magic method. This method checks whether the coroutine is finished. If not, execution reaches yield
and stops. The event loop then resumes execution by calling next()
on the Task.
The event loop orchestrates task execution, allowing each task to continue running from one yield
statement to another. It can be implemented as a class or a function.
event_loop = Queue() # FIFO queue to manage task rotation
def run(main: Coroutine) -> None:
# Create the main coroutine and add it to the event loop
event_loop.put(Task(main))
while not event_loop.empty():
task = event_loop.get()
try:
task.coro.send(None) # Resume execution until the next yield
except StopIteration as e:
task.finished = True
task.res = e.value
if task.callback:
task.callback(task.res)
except Exception as e:
if not task.catch_method:
print(f"Warning: Unhandled exception from {task.name}")
traceback.print_exc()
else:
task.catch_method(e)
task.finished = True
else:
event_loop.put(task) # Re-add the task if it's not finished
This function continuously runs tasks in the event loop until all are completed.
Now that we have a working Task
class and an EventLoop
, we need non-blocking coroutines.
A non-blocking sleep
function, similar to "sleepless sleep" in Arduino:
def _sleep(seconds: float):
start_time = time.time()
while (time.time() - start_time) < seconds:
yield
return time.time() - start_time
def sleep(seconds: float) -> None:
return Task.new(_sleep(seconds))
Instead of reading a file all at once, we read in chunks until EOF.
def _load_file(file_path: str):
full = []
with open(file_path, "r") as file:
while True:
data = file.read(1024)
if not data:
break
full.append(data)
yield
return "".join(full)
def load_file(file_path: str):
return Task.new(_load_file(file_path))
Since Python does not provide a built-in non-blocking GET method, we implement a basic version.
def _fetch_data_async(host, path="/", port=80):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
request = f"GET {path} HTTP/1.1\r\nHost: {host}\r\n\r\n"
s.send(request.encode())
BUFFER_SIZE = 4096
content = b""
while b"\r\n\r\n" not in content:
content += s.recv(4096)
yield
header, body = content.split(b"\r\n\r\n", 1)
response = body
if "Content-Length" in header.decode():
total_size = int(header.decode().split("Content-Length: ")[1].split("\n")[0])
while len(response) < total_size:
response += s.recv(BUFFER_SIZE)
yield
s.close()
return response.decode('utf-8')
else:
raise NotImplementedError("Protocol not implemented")
def fetch_data_async(url: str, path: str):
return Task.new(_fetch_data_async(url, path))
I am aware the this http client implementation might be one of the worst you have ever seen but let's say that is does the job.
This deep dive into coroutine management was meant to reinforce my understanding of asynchronous programming in Python.
One of my next goals is to implement an HTTP server capable of handling multiple clients asynchronously.
Sources: