Result proxies¶
UActor provide two different ways to return proxies to objects living inside the actor process: declarative and wrapping, supporting different use-cases:
Defining the method proxy via
uactor.Actor._method_to_typeid_
results in the specified proxy to be returned only when called from actor proxy, so calls from within the actor itself will still receive the actual result.Using
uactor.proxy
helper explicitly specifies a proxy from the method, so you can to dynamically choose between different proxies and return values. These proxies will only be functional when received by the main process or other actors.
Example:
import uactor
class Actor(uactor.Actor):
_method_to_typeid_ = {'get_declarative_proxy_to_data': 'list'}
def __init__(self):
self.data = [1, 2, 3]
def get_declarative_proxy_to_data(self):
return self.data
def get_serialized_proxy_to_data(self):
return uactor.proxy(self.data, 'list')
with Actor() as actor:
proxy = actor.get_declarative_proxy_to_data()
print(type(proxy), uactor.typeid(proxy), list(proxy))
# <class 'multiprocessing.managers.ListProxy'> list [1, 2, 3]
proxy = actor.get_serialized_proxy_to_data()
print(type(proxy), uactor.typeid(proxy), list(proxy))
# <class 'multiprocessing.managers.ListProxy'> list [1, 2, 3]
Serialized proxies¶
The serialized proxy pattern is useful when you need to conditionally return different proxies or values.
When uactor.proxy
is called, a new proxy is created for the given value
and typeid, which can be transferred safely to other processes.
Example:
import uactor
class Actor(uactor.Actor):
def __init__(self):
self.data = [1, 2, 3]
def get_data(self, as_proxy=False):
return uactor.proxy(self.data, 'list') if as_proxy else self.data
with Actor() as actor:
value = actor.get_data()
print(type(value), value)
# <class 'list'> [1, 2, 3]
proxy = actor.get_data(as_proxy=True)
print(type(proxy), list(proxy))
# <class 'multiprocessing.managers.ListProxy'> [1, 2, 3]
Synchronization proxies¶
uActor enables easily sharing synchronization primitives between processes,
by including specific proxies for this such as Event
, Lock
, RLock
,
Semaphore
, BoundedSemaphore
, Condition
and Barrier
, which can be
used with primitives from threading
, or even multiprocessing
(albeit
using proxies to multiprocessing
should be avoided).
Example:
import threading
import uactor
class Actor(uactor.Actor):
_exposed_ = ('event',)
@property
def event(self):
return uactor.proxy(self._event, 'Event')
def __init__(self):
self._event = threading.Event()
with Actor() as actor:
print('Ready' if actor.event.wait(1) else 'Not ready')
# Not ready
actor.event.set()
print('Ready' if actor.event.wait(1) else 'Not ready')
# Ready
Asynchronous proxies¶
uActor includes those extremely useful Pool
and AsyncResult
(for
(for multiprocessing.pool.Pool
) and Queue
(for queue.Queue
) proxies.
This allow to parallelize work across multiple actors way easier than using raw primitives, just by sharing asynchronous result objects or queues.
Example:
import time
import multiprocessing.pool
import uactor
class Actor(uactor.Actor):
_exposed_ = ('pool',)
@property
def pool(self):
return uactor.proxy(self._pool, 'Pool')
def __init__(self):
self._pool = multiprocessing.pool.ThreadPool()
with Actor() as actor:
start = time.time()
async_result = actor.pool.apply_async(time.sleep, (2,))
print(f'{round(time.time() - start, 4)}s')
# 0.0014s
async_result.get()
print(f'{round(time.time() - start, 4)}s')
# 2.0032s
Proxy forwarding¶
Another neat feature from uActor is proxy forwarding, that is, being able to pass proxies as arguments or return them, to and from different actors.
When explicitly setting an authkey
on an actor (via its _options_
) or
when manually connecting to a remote proxy (via uactor.Actor.connect
),
their owned proxies will raise an AuthkeyError
when forwarded if the
caller process isn’t already connected to that specific actor.
Example (proxy_forwarding.py
, library):
import uactor
class MyActor(uactor.Actor):
_exposed_ = ('my_other_actor', 'my_other_actor_address')
def __init__(self):
self.my_other_actor = MyOtherActor()
@property
def my_other_actor_address(self):
return self.my_other_actor.connection_address
class MyOtherActor(uactor.Actor):
_options_ = {'authkey': b'OtherSecret'}
Example (raising AuthkeyError
with a remote actor):
from proxy_forwarding import MyActor
with MyActor() as actor:
my_other_actor = actor.my_other_actor
# AuthKeyError
In this case, we need to connect to actors before being able to handle their
proxies, as its authkey
must be defined beforehand.
Example:
from proxy_forwarding import MyActor
with MyActor() as actor:
address = actor.my_other_actor_address
with MyOtherActor.connect(address, b'OtherSecret'):
my_other_actor = actor.my_other_actor
Alternatively, we can opt to perform this connection only as a fallback via exception handling.
Example:
with MyActor() as actor:
try:
my_other_actor = actor.my_other_actor
except uactor.AuthKeyError as e:
address = actor.my_other_actor_address
with MyOtherActor.connect(address, b'OtherSecret'):
my_other_actor = actor.my_other_actor