Actor pool example

As with every multiprocessing framework, the necessity of keeping track of many workers (in our case, actors) also applies to uActor.

Here’s where actor pools come to hand, allowing to keep track of many of them at the same time, while enabling parallelization and load-balancing.

Client-side parallelization

By design, calling actor methods is a fully synchronous operation, and as such we can simply use a ThreadPool to offload waiting the result into threads for very cheap, and exposing AsyncResult objects.

Example:

import os
import itertools
import multiprocessing.pool
import uactor

class SyncActor(uactor.Actor):
    def getpid(self):
        return os.getpid()

class AsyncActorPool:
    def __init__(self, size, cls, *args, **kwargs):
        self.threadpool = multiprocessing.pool.ThreadPool(size)
        self.pool = [cls(*args, **kwargs) for _ in range(size)]
        self.actors = itertools.cycle(self.pool)

    def call(self, method, *args, **kwargs):
        func = getattr(next(self.actors), method)
        return self.threadpool.apply_async(func, args, kwargs)

    def broadcast(self, method, *args, **kwargs):
        return self.threadpool.map_async(
            lambda actor: getattr(actor, method)(*args, **kwargs),
            self.pool,
            )

    def __enter__(self):
        self.threadpool.__enter__()
        self.broadcast('__enter__').wait()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        return any([
            *self.broadcast('__exit__', exc_type, exc_val, exc_tb).get(),
            self.threadpool.__exit__(exc_type, exc_val, exc_tb),
            ])

with AsyncActorPool(4, SyncActor) as pool:
    results = [pool.call('getpid') for _ in range(5)]
    print([result.get() for result in results])

Actor asynchronous results

uActor (because of multiprocessing SyncManager) supports proxying AsyncResult objects (see result proxies), so we might think putting a ThreadPool into the actor process and return AsyncResult proxies via actor methods, greatly simplifying client code.

Important: this example, while useful, is way more expensive than the above client-side parallelization implementation (around 24 times in my testing) as proxying AsyncResult is relatively costly.

Example:

import os
import itertools
import multiprocessing.pool
import uactor

class AsyncActor(uactor.Actor):
    _method_to_typeid_ = {'getpid': 'AsyncResult'}

    def __init__(self):
        self.threadpool = multiprocessing.pool.ThreadPool(4)

    def getpid(self):
        return self.threadpool.apply_async(os.getpid)

class SyncActorPool:
    def __init__(self, size, cls, *args, **kwargs):
        self.pool = [cls(*args, **kwargs) for _ in range(size)]
        self.actors = itertools.cycle(self.pool)

    def call(self, method, *args, **kwargs):
        return getattr(next(self.actors), method)(*args, **kwargs)

    def __enter__(self):
        for actor in self.pool:
            actor.__enter__()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        return any([
            actor.__exit__(exc_type, exc_val, exc_tb)
            for actor in self.pool
            ])

with SyncActorPool(4, AsyncActor) as pool:
    results = [pool.call('getpid') for _ in range(5)]
    print([result.get() for result in results])