Jeremy Trips

Where I share about my journey in tech and life.

This article is writen manually and uses ai in order to check the spelling.

Asynchronous Programming

Coroutines are a great way to handle blocking calls like I/O in a non-blocking manner. Let's explore how they are implemented.

Why?

During a research project I worked on, I had to use Bluetooth Low Energy to communicate with a hardware device. I decided to use Bleak to test the hardware. Bleak is a useful library implemented asynchronously. At the time, I had never used asynchronous programming in Python. Given time constraints, I decided to go with it and learn from the examples.

Now, two years later, it's time to fully understand what I did.


Asynchronous Programming

Asynchronous programming is a method of parallel programming that allows blocking calls to be handled in a non-blocking way. One approach to implementing asynchronous programming is through coroutines.

Coroutines

Coroutines are objects returned by a function defined using the async keyword.

def function():
    pass

async def async_function():
    pass

print(type(function))           # <class 'function'>
print(type(async_function))     # <class 'function'>
print(type(async_function()))   # <class 'coroutine'> + RuntimeWarning

Note that async_function itself is a function, but a coroutine is created when it is called.

Coroutines are one of the building blocks of asynchronous programming. Another important concept is generators.


Runtime Warning

Calling async_function() without awaiting it raises a runtime warning:

/Users/username/dev/AsyncAwait/test3.py:11: RuntimeWarning: coroutine 'coroutine' was never awaited
  print(type(coroutine()))
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

This warning is self-explanatory, but keep it in mind—we'll revisit it later.


Generators

A generator is a special kind of function that includes the yield keyword.

For completeness, there are other ways to create generators using magic methods, but here we will focus on the yield method.

def function():
    return None

def generator():
    yield None

print(type(function))       # <class 'function'>
print(type(generator))      # <class 'function'>
print(type(generator()))    # <class 'generator'>

Note that the generator function is still a function, but when called, it returns a generator object.

A generator executes synchronously until it reaches the yield statement. The function then retains its state and allows the caller to continue execution. When next() is called on the generator, execution resumes from where it left off. When the generator reaches a return statement, it raises a StopIteration exception containing a return value.

Here’s an example of a simple generator:

def useless_generator():
    def coroutine():
        value = yield  # Suspends execution and waits for a value to be sent
        return value

    c = coroutine()         # Creates a generator object
    next(c)                 # Advances the generator to the first yield statement
    
    try:
        c.send(1)           # Sends a value to the generator
    except StopIteration as e:
        print(e.value)      # int containing 1

Task

A Task is a class that wraps generators as coroutines, implementing the necessary asynchronous management through magic functions.

class Task:
    def __init__(self, coro):
        self.name = coro.__name__
        self.coro = coro
        self.finished = False
        self.res = None
        self.callback: Callable[[Any], None] = None
        self.catch_method: Callable[[Any], None] = None
    
    def __await__(self):
        while not self.finished:
            yield self
        return self.res

To make the task awaitable, we implement the __await__ magic method. This method checks whether the coroutine is finished. If not, execution reaches yield and stops. The event loop then resumes execution by calling next() on the Task.


Event Loop

The event loop orchestrates task execution, allowing each task to continue running from one yield statement to another. It can be implemented as a class or a function.

event_loop = Queue()  # FIFO queue to manage task rotation

def run(main: Coroutine) -> None:
    # Create the main coroutine and add it to the event loop
    event_loop.put(Task(main))
    
    while not event_loop.empty():
        task = event_loop.get()
        try:
            task.coro.send(None)  # Resume execution until the next yield
        except StopIteration as e:
            task.finished = True
            task.res = e.value
            if task.callback:
                task.callback(task.res)
        except Exception as e:
            if not task.catch_method:
                print(f"Warning: Unhandled exception from {task.name}")
                traceback.print_exc()
            else:
                task.catch_method(e)
            task.finished = True
        else:
            event_loop.put(task)  # Re-add the task if it's not finished

This function continuously runs tasks in the event loop until all are completed.


Coroutine Implementation

Now that we have a working Task class and an EventLoop, we need non-blocking coroutines.

Sleep Function

A non-blocking sleep function, similar to "sleepless sleep" in Arduino:

def _sleep(seconds: float):
    start_time = time.time()
    while (time.time() - start_time) < seconds:
        yield
    return time.time() - start_time

def sleep(seconds: float) -> None:
    return Task.new(_sleep(seconds))

Loading a File Asynchronously

Instead of reading a file all at once, we read in chunks until EOF.

def _load_file(file_path: str):
    full = []
    with open(file_path, "r") as file:
        while True:
            data = file.read(1024)
            if not data:
                break
            full.append(data)
            yield
    return "".join(full)

def load_file(file_path: str):
    return Task.new(_load_file(file_path))

Asynchronous HTTP Request

Since Python does not provide a built-in non-blocking GET method, we implement a basic version.

def _fetch_data_async(host, path="/", port=80):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect((host, port))
    request = f"GET {path} HTTP/1.1\r\nHost: {host}\r\n\r\n"
    s.send(request.encode())

    BUFFER_SIZE = 4096
    content = b""
    while b"\r\n\r\n" not in content:
        content += s.recv(4096)
        yield
    
    header, body = content.split(b"\r\n\r\n", 1)
    response = body

    if "Content-Length" in header.decode():
        total_size = int(header.decode().split("Content-Length: ")[1].split("\n")[0])
        while len(response) < total_size:
            response += s.recv(BUFFER_SIZE)
            yield
        s.close()
        return response.decode('utf-8')
    else:
        raise NotImplementedError("Protocol not implemented")

def fetch_data_async(url: str, path: str):
    return Task.new(_fetch_data_async(url, path))

I am aware the this http client implementation might be one of the worst you have ever seen but let's say that is does the job.


Wrapping Up

This deep dive into coroutine management was meant to reinforce my understanding of asynchronous programming in Python.

One of my next goals is to implement an HTTP server capable of handling multiple clients asynchronously.

Sources: