Result proxies

UActor provide two different ways to return proxies to objects living inside the actor process: declarative and wrapping, supporting different use-cases:

  • Defining the method proxy via uactor.Actor._method_to_typeid_ results in the specified proxy to be returned only when called from actor proxy, so calls from within the actor itself will still receive the actual result.

  • Using uactor.proxy helper explicitly specifies a proxy from the method, so you can to dynamically choose between different proxies and return values. These proxies will only be functional when received by the main process or other actors.

Example:

import uactor

class Actor(uactor.Actor):
    _method_to_typeid_ = {'get_declarative_proxy_to_data': 'list'}

    def __init__(self):
        self.data = [1, 2, 3]

    def get_declarative_proxy_to_data(self):
        return self.data

    def get_serialized_proxy_to_data(self):
        return uactor.proxy(self.data, 'list')

with Actor() as actor:

    proxy = actor.get_declarative_proxy_to_data()
    print(type(proxy), uactor.typeid(proxy), list(proxy))
    # <class 'multiprocessing.managers.ListProxy'> list [1, 2, 3]

    proxy = actor.get_serialized_proxy_to_data()
    print(type(proxy), uactor.typeid(proxy), list(proxy))
    # <class 'multiprocessing.managers.ListProxy'> list [1, 2, 3]

Serialized proxies

The serialized proxy pattern is useful when you need to conditionally return different proxies or values.

When uactor.proxy is called, a new proxy is created for the given value and typeid, which can be transferred safely to other processes.

Example:

import uactor

class Actor(uactor.Actor):

    def __init__(self):
        self.data = [1, 2, 3]

    def get_data(self, as_proxy=False):
        return uactor.proxy(self.data, 'list') if as_proxy else self.data

with Actor() as actor:

    value = actor.get_data()
    print(type(value), value)
    # <class 'list'> [1, 2, 3]

    proxy = actor.get_data(as_proxy=True)
    print(type(proxy), list(proxy))
    # <class 'multiprocessing.managers.ListProxy'> [1, 2, 3]

Synchronization proxies

uActor enables easily sharing synchronization primitives between processes, by including specific proxies for this such as Event, Lock, RLock, Semaphore, BoundedSemaphore, Condition and Barrier, which can be used with primitives from threading, or even multiprocessing (albeit using proxies to multiprocessing should be avoided).

Example:

import threading
import uactor

class Actor(uactor.Actor):
    _exposed_ = ('event',)

    @property
    def event(self):
        return uactor.proxy(self._event, 'Event')

    def __init__(self):
        self._event = threading.Event()

with Actor() as actor:
    print('Ready' if actor.event.wait(1) else 'Not ready')
    # Not ready

    actor.event.set()

    print('Ready' if actor.event.wait(1) else 'Not ready')
    # Ready

Asynchronous proxies

uActor includes those extremely useful Pool and AsyncResult (for (for multiprocessing.pool.Pool) and Queue (for queue.Queue) proxies.

This allow to parallelize work across multiple actors way easier than using raw primitives, just by sharing asynchronous result objects or queues.

Example:

import time
import multiprocessing.pool
import uactor

class Actor(uactor.Actor):
    _exposed_ = ('pool',)

    @property
    def pool(self):
        return uactor.proxy(self._pool, 'Pool')

    def __init__(self):
        self._pool = multiprocessing.pool.ThreadPool()

with Actor() as actor:
    start = time.time()
    async_result = actor.pool.apply_async(time.sleep, (2,))
    print(f'{round(time.time() - start, 4)}s')
    # 0.0014s

    async_result.get()
    print(f'{round(time.time() - start, 4)}s')
    # 2.0032s

Proxy forwarding

Another neat feature from uActor is proxy forwarding, that is, being able to pass proxies as arguments or return them, to and from different actors.

When explicitly setting an authkey on an actor (via its _options_) or when manually connecting to a remote proxy (via uactor.Actor.connect), their owned proxies will raise an AuthkeyError when forwarded if the caller process isn’t already connected to that specific actor.

Example (proxy_forwarding.py, library):

import uactor

class MyActor(uactor.Actor):
    _exposed_ = ('my_other_actor', 'my_other_actor_address')

    def __init__(self):
        self.my_other_actor = MyOtherActor()

    @property
    def my_other_actor_address(self):
      return self.my_other_actor.connection_address

class MyOtherActor(uactor.Actor):
    _options_ = {'authkey': b'OtherSecret'}

Example (raising AuthkeyError with a remote actor):

from proxy_forwarding import MyActor

with MyActor() as actor:
    my_other_actor = actor.my_other_actor
    # AuthKeyError

In this case, we need to connect to actors before being able to handle their proxies, as its authkey must be defined beforehand.

Example:

from proxy_forwarding import MyActor

with MyActor() as actor:
    address = actor.my_other_actor_address
    with MyOtherActor.connect(address, b'OtherSecret'):
        my_other_actor = actor.my_other_actor

Alternatively, we can opt to perform this connection only as a fallback via exception handling.

Example:

with MyActor() as actor:
    try:
        my_other_actor = actor.my_other_actor
    except uactor.AuthKeyError as e:
        address = actor.my_other_actor_address
        with MyOtherActor.connect(address, b'OtherSecret'):
            my_other_actor = actor.my_other_actor