"""uActor module."""
"""
uActor: Process Actor Model
===========================
See `README.md`_ provided in source distributions or available
at the `project repository`_.
.. _README.md: https://gitlab.com/ergoithz/uactor/-/blob/master/README.md
.. _project repository: https://gitlab.com/ergoithz/uactor
License
-------
Copyright (c) 2020, Felipe A Hernandez.
MIT License (see `LICENSE`_).
.. _LICENSE: https://gitlab.com/ergoithz/uactor/-/blob/master/LICENSE
"""
__author__ = 'Felipe A Hernandez'
__email__ = 'ergoithz@gmail.com'
__license__ = 'MIT'
__version__ = '0.1.1'
__all__ = (
'DEFAULT_SERIALIZER',
'UActorException',
'ProxyError',
'AuthkeyError',
'BaseProxy',
'ActorManager',
'ActorProxy',
'Actor',
'proxy',
'typeid',
)
import collections
import functools
import multiprocessing
import multiprocessing.managers
import types
import typing
import weakref
DEFAULT_SERIALIZER = 'pickle'
"""
Default :mod:`multiprocessing.managers` serializer name for :mod:`uactor`.
.. versionadded:: 0.1.1
"""
Token = multiprocessing.managers.Token
BaseManager = multiprocessing.managers.BaseManager
TActor = typing.TypeVar('TActor', bound='Actor')
TActorProxy = typing.TypeVar('TActorProxy', bound='ActorProxy')
TActorManager = typing.TypeVar('TActorManager', bound='ActorManager')
AddressType = typing.Union[typing.Tuple[str, int], str, bytes, int]
autoproxy_classes = weakref.WeakValueDictionary()
process_managers = weakref.WeakValueDictionary()
connection_managers = weakref.WeakValueDictionary()
def bootstrap(manager: 'ActorManager',
cls: typing.Type['Actor'],
args: typing.Iterable[typing.Any],
kwargs: typing.Dict[str, typing.Any],
) -> None:
"""
Actor process initialization function.
.. versionadded:: 0.1.0
"""
process = multiprocessing.current_process()
process._uactor_manager = manager
process._uactor_class = cls
process._uactor_owner = cls(*args, **kwargs)
def current_actor(actor: typing.Optional[TActor] = None) -> TActor:
"""Get current process actor object, used as proxy constructor."""
current = multiprocessing.current_process()._uactor_owner
if actor not in (None, current):
# we must not use an invalid proxy on a foreign actor
raise ProxyError(
'Proxy \'actor\' cannot be used with foreign actor references',
)
return current
def proxy_method(actor_class: typing.Type['Actor'],
name: str) -> typing.Union[callable, property]:
"""Create proxy method/property from class and attribute name."""
wrapped = getattr(actor_class, name, None)
if callable(wrapped):
def wrapper(proxy, *args, **kwargs):
return proxy._callmethod(name, args, kwargs)
return functools.update_wrapper(wrapper, wrapped), ()
def fget(proxy):
return proxy._callmethod('__getattribute__', (name,))
def fset(proxy, value):
return proxy._callmethod('__setattr__', (name, value))
return property(fget, fset), ('__getattribute__', '__setattr__')
[docs]def proxy(value: typing.Any,
typeid: str = 'auto',
serializer: str = DEFAULT_SERIALIZER,
) -> 'BaseProxy':
"""
Create serialized proxy from given value and typeid (defaults to `auto`).
This function can be only used from inside the actor process.
.. versionadded:: 0.1.0
"""
process = multiprocessing.current_process()
try:
server = process._manager_server
manager = process._uactor_manager
proxytype = server.registry[typeid][-1]
except AttributeError:
raise ProxyError(
'Proxies can only be created under actor processes',
) from None
except KeyError:
raise ProxyError(
f'No proxy is registered with typeid {typeid!r}',
) from None
# FIXME: if someday the connection became required, we would need to
# subclass multiprocessing.managers.Server to expose it as a
# thread local
ident, exposed = server.create(None, typeid, value)
token = multiprocessing.managers.Token(typeid, server.address, ident)
options = {'manager': manager, 'exposed': exposed, 'incref': False}
return proxytype(token, serializer, **options)
[docs]def typeid(proxy: 'BaseProxy') -> str:
"""
Get typeid from given proxy object.
.. versionadded:: 0.1.0
"""
try:
return proxy._token.typeid
except AttributeError:
exc_type, exc_message = (
(ProxyError, 'Cannot get typeid, proxy token not available')
if isinstance(proxy, BaseProxy) else
(ValueError, f'Cannot get typeid, {proxy!r} is not a proxy')
)
raise exc_type(exc_message) from None
def rebuild_manager(manager_class: typing.Type[TActorManager],
address: AddressType,
authkey: typing.Optional[bytes] = None,
serializer: str = DEFAULT_SERIALIZER,
) -> TActorManager:
"""
Rebuild manager from basic params and cache.
.. versionadded:: 0.1.1
"""
key = manager_class, address
manager = connection_managers.get(key)
matching = (
manager
and authkey in (None, manager._authkey)
and serializer == manager._serializer
)
if not matching:
manager = manager_class(
address,
authkey,
serializer,
parent=process_managers.get(key),
)
manager.connect()
return manager
def rebuild_proxy(manager_class: typing.Type['ActorManager'],
builder: typing.Callable[..., 'BaseProxy'],
proxytype: typing.Type['BaseProxy'],
token: Token,
serializer: str,
*args,
) -> 'BaseProxy':
"""
Rebuild proxy from serialization data.
.. versionadded:: 0.1.1
"""
manager = rebuild_manager(manager_class, token.address, None, serializer)
params = {'manager': manager}
token.address = manager.address # rewrite token address with managed one
if not isinstance(proxytype, type):
params['proxytype'] = proxytype
proxytype = rebuild_autoproxy
# FIXME: private-implementation-specific code
args[-1].update(params)
return builder(proxytype, token, serializer, *args)
def rebuild_autoproxy(token: 'Token',
serializer: str,
*args,
proxytype: typing.Optional[type] = None,
**kwd,
) -> 'BaseProxy':
"""
Create and convert autoproxy to uactor proxy.
The wrapping mechanism is intended as a workaround for multiprocessing
bugs causing the loss of authkey and manager on nested proxies.
.. versionadded:: 0.1.1
"""
proxy = proxytype(token, serializer, *args, **kwd)
proxytype = type(proxy)
if proxytype not in autoproxy_classes:
autoproxy_classes[proxytype] = type(
proxytype.__name__,
(BaseProxy, proxytype),
{'_isauto': getattr(proxy, '_isauto', False)},
)
return autoproxy_classes[proxytype](token, serializer, *args, **kwd)
def extract_proxies(base: typing.Type[BaseManager],
) -> typing.Dict[str, callable]:
"""
Create an updated copy from the registry of given manager.
.. versionadded:: 0.1.1
"""
proxies = {}
context = globals()
manager = type('Workbench', (base,), {})
manager.register('auto', create_method=False)
for typeid, params in manager._registry.items():
# FIXME: private-implementation-specific code
proxytype = params[3]
if isinstance(proxytype, type):
# BaseProxy subclass
proxytype = type(f'Proxy[{typeid}]', (BaseProxy, proxytype), {})
context[proxytype.__name__] = proxytype
else:
# AutoProxy factory
proxytype = functools.update_wrapper(
functools.partial(rebuild_autoproxy, proxytype=proxytype),
proxytype,
)
proxies[typeid] = proxytype
return proxies
[docs]class UActorException(Exception):
"""
Base exception for uactor module.
.. versionadded:: 0.1.0
"""
[docs]class ProxyError(UActorException):
"""
Exception for errors on proxy logic.
.. versionadded:: 0.1.0
"""
[docs]class AuthkeyError(ProxyError):
"""
Exception raised when connecting to proxy with invalid authkey.
.. versionadded:: 0.1.1
"""
class CollisionDict(collections.UserDict):
"""
Dictionary raising an exception when overwriting existing keys.
.. versionadded:: 0.1.1
"""
def __init__(self,
data: typing.MutableMapping,
exc_type: typing.Type['UActorException'],
exc_args: typing.Sequence[typing.Any]):
"""Initialize."""
self.exc_type = exc_type
self.exc_args = exc_args
super().__init__(data)
def __setitem__(self, key: typing.Hashable, value: typing.Any):
"""Set proxy data for an id."""
if self.get(key) not in (None, value):
raise self.exc_type(*self.exc_args)
super().__setitem__(key, value)
[docs]class BaseProxy(multiprocessing.managers.BaseProxy):
"""
Base Proxy class.
This class implements a few workarounds around bugs found in
:class:`multiprocessing.managers.BaseProxy` by preventing
:attr:`BaseProxy._manager` from getting lost on both unserialization
and process forking by recreating it on demand.
.. versionadded:: 0.1.0
"""
@property
def _manager(self) -> 'ActorManager':
"""Get set manager."""
return self._manager_instance
@_manager.setter
def _manager(self, manager: typing.Optional['ActorManager']):
"""Set manager or create a new one when trying to unset."""
self._manager_instance = manager or rebuild_manager(
type(self._manager_instance),
self._token.address,
self._authkey,
self._serializer,
)
def _callmethod(self, *args, **kwargs):
"""Call method of proxied object."""
result = super()._callmethod(*args, **kwargs)
# FIXME: find a more elegant way of doing this
if isinstance(result, BaseProxy) and result._manager.process:
result._manager = None
return result
def __reduce__(self) -> tuple:
"""Implement the pickle reduce protocol."""
# FIXME: private-implementation-specific code (python bug workaround)
manager_class = type(self._manager)
builder, args, *state = super().__reduce__()
return (rebuild_proxy, (manager_class, builder, *args), *state)
[docs]class ActorManager(BaseManager):
"""
Multiprocessing manager for uactor.
.. versionadded:: 0.1.0
"""
@classmethod
def _Server(cls, *args, **kwargs):
"""Return server object patched with proxy collision checks."""
# FIXME: private-implementation-specific code (python bug workaround)
server = super()._Server(*args, **kwargs)
server.id_to_obj = CollisionDict(
server.id_to_obj,
ProxyError,
('Two different proxies cannot point to the same object',),
)
return server
[docs] @classmethod
def typeids(cls) -> typing.Tuple[str, ...]:
"""Get tuple of typeid of all registered proxies."""
return tuple(cls._registry)
@property
def process(self) -> typing.Optional[multiprocessing.Process]:
"""Get remote actor process if owned by this manager."""
return getattr(self, '_process', None)
def __init__(self,
address: typing.Optional['AddressType'] = None,
authkey: typing.Optional[bytes] = None,
serializer: str = DEFAULT_SERIALIZER,
*args,
parent: typing.Optional['ActorManager'] = None,
**kwargs):
"""Initialize manager."""
self.parent = parent
authkey = authkey or (parent._authkey if parent else None)
super().__init__(address, authkey, serializer, *args, **kwargs)
[docs] def start(self, *args, **kwargs):
"""Start manager process."""
super().start(*args, **kwargs)
process_managers[type(self), self.address] = self
[docs] def connect(self):
"""
Connect to manager process.
:raises AuthkeyError: on actor process authkey rejection.
"""
try:
super().connect()
connection_managers[type(self), self.address] = self
except multiprocessing.context.AuthenticationError as e:
raise AuthkeyError(
'Actor process rejected provided authkey',
) from e
[docs]class ActorProxy(BaseProxy):
"""
Actor proxy base class.
Actors will inherit from this class to create custom implementations based
on their declared configuration and interface.
.. versionadded:: 0.1.0
"""
@property
def connection_address(self) -> AddressType:
"""
Get connection address to :class:`Actor` process.
.. versionadded:: 0.1.1
"""
return self._token.address
[docs] def __enter__(self, *args, **kwargs):
"""Call :meth:`Actor.__enter__` method."""
return self._callmethod('__enter__', args, kwargs)
[docs] def __exit__(self, *args, **kwargs):
"""
Call :meth:`Actor.__exit__` method.
When this proxy is the direct result from instancing
the :class:`Actor` class, calling this method will
also result on :meth:`Actor.shutdown` being called,
finishing the actor process (like when calling
:meth:`ActorProxy.shutdown`).
"""
try:
return self._callmethod('__exit__', args, kwargs)
finally:
if self._manager.process:
self._callmethod('shutdown')
self._manager.shutdown()
[docs] def shutdown(self):
"""
Call :meth:`Actor.shutdown` method.
When the current process is responsible of initializing
the actor, calling this method will also finish the actor
process.
"""
try:
return self._callmethod('shutdown')
finally:
manager = self._manager.parent or self._manager
if manager.process:
manager.shutdown()
[docs]class Actor:
"""
Actor base class for actor implementations to inherit from.
An actor represents a processing unit. During instantiation,
a new actor process is be started, and the corresponding
proxy is returned.
Actors also implement the context manager interface, and you can override
both :meth:`Actor.__enter__` and :meth:`Actor.__exit__` to implement your
own logic, but keep in mind they're both specially handled and calling
:meth:`ActorProxy.__exit__` will also terminate the process (just
like calling :meth:`ActorProxy.shutdown`).
.. versionadded:: 0.1.0
"""
manager_class = ActorManager
"""
:class:`ActorManager` subclass used to initialize the actor process.
Whatever is defined here, will be subclassed during actor class
initialization to apply the declared actor configuration.
"""
proxy_class = ActorProxy
"""
:class:`ActorProxy` subclass used to initialize the actor proxy.
Whatever is defined here, will be subclassed during actor class
initialization to apply the declared actor configuration.
"""
_options_: typing.Mapping[str, typing.Any] = {}
"""
Option :class:`dict` will be passed to :class:`Actor.manager_class`.
This options mapping is passed to :class:`Actor.manager_class` during
:class:`Actor` instantiation.
"""
_exposed_: typing.Optional[typing.Tuple[str]] = (
'__enter__',
'__exit__',
'shutdown',
)
"""
:class:`tuple` containing then list of method/properties will be exposed.
Class inheritance will be honored when using this attribute.
"""
_proxies_: typing.Mapping[str, typing.Type[BaseProxy]] = extract_proxies(
multiprocessing.managers.SyncManager,
)
"""
Proxy :class:`dict` of typeid keys and :class:`BaseProxy` values.
Proxies defined here will be registered at :class:`Actor.manager_class`
and will be made available from within the actor process.
"""
_method_to_typeid_: typing.Mapping[str, str] = {
'__enter__': 'actor',
}
"""
Configuration :class:`dict` of method name keys and typeid values.
Including a method with an typeid here will result on the corresponding
proxy to be returned when called from an :class:`ActorProxy` instance.
"""
def __init_subclass__(cls):
"""
Initialize actor class.
This method is what takes our declarative actor-based interface and
dynamically setup the components required by our usage of
:mod:`multiprocessing.manager` functionality.
.. versionchanged:: 0.1.1
Replaces previous metaclass implementation.
"""
super().__init_subclass__()
base_vars = tuple(map(vars, reversed(cls.mro())))
proxies = {
typeid: proxy
for dct in base_vars
for typeid, proxy in (dct.get('_proxies_') or {}).items()
}
exposed = [
name
for dct in base_vars
for exposed in (
(
(
key
for key, value in dct.items()
if not key.startswith('_') and callable(value)
)
if dct.get('_exposed_') is None else
dct['_exposed_']
),
(
dct.get('_method_to_typeid_') or ()
),
)
for name in exposed
]
method_to_typeid = {
method: typeid
for dct in base_vars
if dct.get('_method_to_typeid_')
for method, typeid in dct['_method_to_typeid_'].items()
}
properties = [
(name, *proxy_method(cls, name))
for name in exposed
]
exposed.extend(
requirement
for name, func, requirements in properties
for requirement in requirements
)
invalid = set(proxies) & {'actor'}
if invalid:
raise TypeError(f'typeid {next(iter(invalid))!r} is reserved')
undefined = set(method_to_typeid.values()) - {None, 'actor', *proxies}
if undefined:
raise TypeError(f'typeid {next(iter(undefined))!r} is not defined')
cls.manager_class = type(
f'{cls.__qualname__}.manager_class',
(cls.manager_class,),
{'__module__': cls.__module__},
)
cls.proxy_class = type(
f'{cls.__qualname__}.proxy_class',
(cls.proxy_class,),
{
'__module__': cls.__module__,
'_exposed_': tuple(exposed),
'_method_to_typeid_': {
method: typeid
for method, typeid in method_to_typeid.items()
if typeid is not None
},
**{
name: func
for name, func, requirement in properties
if not hasattr(cls.proxy_class, name)
},
},
)
cls.manager_class.register(
typeid='actor',
callable=current_actor,
proxytype=cls.proxy_class,
create_method=True,
)
for typeid, proxy in proxies.items():
cls.manager_class.register(
typeid=typeid,
proxytype=proxy,
create_method=False,
)
def __new__(cls, *args, **kwargs):
"""Start actor process, initialize actor and return its proxy."""
process = multiprocessing.current_process()
initializing = (
getattr(process, '_uactor_class', None) is cls
and getattr(process, '_uactor_owner', None) is None
)
if initializing:
self = super().__new__(cls)
process._uactor_owner = self # prevent recursion on __init__
return self
manager = cls.manager_class(**cls._options_)
manager.start(bootstrap, (manager, cls, args, kwargs))
return manager.actor()
def __init__(self):
"""
Initialize actor.
This method will be called once when the actor process starts.
"""
super().__init__()
[docs] def __enter__(self: TActor) -> TActor:
"""Enter context, return actor proxy."""
return self
[docs] def __exit__(self, exc_type: typing.Type[Exception], exc_val: Exception,
exc_tb: types.TracebackType) -> typing.Optional[bool]:
"""
Leave context.
Method :meth:`uactor.Actor.shutdown` will be called after
this one when the context manager interface is used on the owner
process.
"""
[docs] def shutdown(self):
"""
Perform shutdown work before the process dies (stub).
This method will be called by explicit
:meth:`ActorProxy.shutdown` calls, even if no real
process shutdown is involved (ie. with remote proxy connections),
enabling implementing remote shutdown logic here
(ie. breaking a mainloop).
This method will be also called after :meth:`Actor.__exit__`
when the owner process uses the :class:`ActorProxy` context
manager interface.
"""
[docs] @classmethod
def connect(cls: typing.Type[TActorProxy],
address: AddressType,
authkey: bytes,
serializer: str = DEFAULT_SERIALIZER,
capture: typing.Sequence[AddressType] = (),
) -> TActorProxy:
"""
Get actor proxy instance connected to address.
:param address: actor process connection address
:param authkey: authentication secret key
:param serializer: serializer name
:param capture: iterable of additional addresses will be handled
with this connection.
.. versionadded:: 0.1.1
"""
manager_class = cls.manager_class
manager = rebuild_manager(manager_class, address, authkey, serializer)
# TODO: do this in a better place
connection_managers.update(
((manager_class, address), manager)
for address in capture
)
return manager.actor()