Networking¶
Actors will default to the most efficient method of inter-process communication available, which often relies on local sockets or pipes.
Alternatively, actors be set to listen to TCP/IP interfaces and distributed across different machines over networks, rendering uActor as a great tool for distributed computing.
Please note when following this approach, due object serialization, actor classes (and any other serialized type) are required to be available at the same import locations, in all the involved parties.
Connection¶
When declaring an actor, we can define we want it to listen to a TCP/IP
interface by specifying that on his _options_
attribute, along with an
explicit authkey
secret that clients will need to authenticate.
We would need the value of uactor.Actor.connection_address
to know which
address an actor is available at.
Example (network_actor.py
, server and library):
import os
import time
import uactor
class NetworkActor(uactor.Actor):
# Actor manager options to listen over all TCP on a random port
_options_ = {'address': ('0.0.0.0', 0), 'authkey': b'SECRET'}
def getpid(self):
return os.getpid()
if __name__ == '__main__':
with NetworkActor() as actor:
host, port = actor.connection_address
print(f'Actor process {actor.getpid()} at {host}:{port}')
# Actor process 140262 at 0.0.0.0:37255
while True: # keep the owner proxy alive
time.sleep(10)
We can use uactor.Actor.connect
classmethod in conjunction with the address
available at uactor.Actor.connection_address
to connect to a remote actor.
Example (client):
from network_actor import NetworkActor
address = 'localhost', 37255
with NetworkActor.connect(address, b'SECRET') as actor:
host, port = actor.connection_address
print(f'Actor process {actor.getpid()} at {host}:{port}')
# Actor process 140262 at localhost:37255
Forwarded proxies¶
When networking, because of connections are made manually via
uactor.Actor.connect
(and as such, actors being considered remote),
when receiving a foreign proxy to an actor we aren’t connected to, an
AuthkeyError
could be raised either because unknown authkey
(see proxy forwarding) or because of an invalid address.
Example (network_proxy_forwarding.py
, library):
import uactor
class MyActor(uactor.Actor):
_exposed_ = ('my_other_actor',)
def __init__(self):
self.my_other_actor = MyOtherActor()
class MyOtherActor(uactor.Actor):
_options_ = {'address': ('0.0.0.0', 7000), 'authkey': b'OtherSecret'}
Example (raising AuthkeyError
with a remote actor):
from network_proxy_forwarding import MyActor
with MyActor() as actor:
my_other_actor = actor.my_other_actor
# AuthKeyError
We need to connect to actors before being able to handle their proxies,
as its authkey
must be set beforehand, while enabling remote address
translation when necessary (via uactor.Actor.connect
capture
parameter).
Example:
from network_proxy_forwarding import MyActor
with MyActor() as actor:
address = 'localhost', 7000
capture = [('0.0.0.0', 7000)]
with MyOtherActor.connect(address, b'OtherSecret', capture=capture):
my_other_actor = actor.my_other_actor
For further information head to the proxy forwarding section.
Remote shutdown¶
By design, actor processes are kept alive as long of their parent processes are running. We can enable remote clients to shutdown an actor process via additional logic on the parent process (mainloop).
Example (network_actor_shutdown.py
, server and library):
import threading
import uactor
class NetworkActor(uactor.Actor):
# Actor manager options to listen over TCP on a random port
_options_ = {'address': ('0.0.0.0', 6000), 'authkey': b'SECRET'}
def __init__(self):
self.finished = threading.Event()
def shutdown(self):
self.finished.set()
def wait(self, timeout=-1):
return self.finished.wait(timeout)
if __name__ == '__main__':
with NetworkActor() as actor:
while not actor.wait(timeout=10): # timeout avoids socket timeouts
pass
The code above will enable remote proxies to break the mainloop by calling shutdown, exiting the actor context and effectively finishing both parent and actor processes.
Example:
from network_actor_shutdown import NetworkActor
address = 'localhost', 6000
external = NetworkActor.connect(address, b'SECRET')
external.shutdown()
Registry¶
In order to help distributed actor visibility while enabling more advance patterns, a centralized actor registry can be implemented.
Example (network_actor_registry.py
, server and library):
import itertools
import os
import time
import uactor
class Registry(uactor.Actor):
_options_ = {'address': ('0.0.0.0', 5000), 'authkey': b'SECRET'}
def __init__(self):
self.addresses = frozenset()
self.iterator = iter(())
def register(self, *addresses):
addresses = self.addresses.union(addresses)
self.iterator, self.addresses = itertools.cycle(addresses), addresses
def pick(self):
return next(self.iterator, None)
class NetworkActor(uactor.Actor):
# Actor manager options to listen over TCP on a random port
_options_ = {'address': ('0.0.0.0', 0), 'authkey': b'SECRET'}
def getpid(self):
return os.getpid()
if __name__ == '__main__':
with Registry() as registry:
actors = [NetworkActor() for actor in range(10)]
addresses = [actor.connection_address for actor in actors]
registry.register(*addresses)
print(f'Registry listening at port {registry.connection_address[1]}')
# Registry serving at port 5000
print(f'Actors listening at ports {[port for _, port in addresses]}')
# Actors listening at ports [36061, 35245, ..., 33701, 41653]
while True: # keep registry and actors alive
time.sleep(10)
Using a registry also allow us to register new actors dynamically.
Example (remote actor registration):
import time
from network_actor_registry import Registry, NetworkActor
address = 'localhost', 5000
with Registry.connect(address) as registry:
actors = [NetworkActor() for actor in range(10)]
addresses = [actor.connection_address for actor in actors]
registry.register(*addresses)
print(f'Actors listening at ports {[port for _, port in addresses]}')
# Actors listening at ports [36061, 35245, ..., 33701, 41653]
while True: # keep actors alive
time.sleep(10)
And we can access those actors by retrieving their addresses from the registry (keep in mind you would still need to translate local addresses, see forwarded proxies).
Exemple (actor registry usage):
from network_actor_registry import Registry, NetworkActor
address = 'localhost', 5000
with Registry.connect(address, b'SECRET') as registry:
for i in range(10):
_, port = registry.pick()
address = 'localhost', port
with NetworkActor.connect(address, b'SECRET') as actor:
print(f'Actor at port {port} has pid {actor.getpid()}')
Autodiscovery¶
By using zeroconf to provide Multicast DNS Service Discovery,
we can easily publish uactor.Actor
processes across the network, without
the need of any centralized registry.
Example (network_actor_zeroconf.py
, server and library):
import os
import socket
import time
import uactor
import zeroconf
class NetworkActor(uactor.Actor):
# Actor manager options to listen over TCP on a random port
_options_ = {'address': ('0.0.0.0', 0), 'authkey': b'SECRET'}
def getpid(self):
return os.getpid()
if __name__ == '__main__':
with NetworkActor() as actor:
host, port = actor.connection_address
zc = zeroconf.Zeroconf()
try:
zc.register_service(
zeroconf.ServiceInfo(
'_uactor._tcp.local.',
'NetworkActor._uactor._tcp.local.',
addresses=[socket.inet_aton(host)],
port=port,
server=f'{socket.gethostname()}.local.',
)
)
while True: # keep service alive
time.sleep(10)
finally:
zc.close()
And this would be a service relying on zeroconf to fetch the actor address.
Example:
import socket
import threading
import zeroconf
from network_actor_zeroconf import NetworkActor
class MyListener:
def __init__(self):
self.discovery = threading.Event()
def remove_service(self, zeroconf, type, name):
print(f'Service {name} removed')
def add_service(self, zeroconf, type, name):
print(f'Service {name} discovered')
# Service NetworkActor._uactor._tcp.local discovered
info = zeroconf.get_service_info(type, name)
address = socket.inet_ntoa(info.addresses[0]), info.port
with NetworkActor.connect(address, b'SECRET') as actor:
print(f'NetworkActor.getpid returned {actor.getpid()}')
# NetworkActor.getpid returned 906151
self.discovery.set()
try:
zc = zeroconf.Zeroconf()
listener = MyListener()
zeroconf.ServiceBrowser(zc, '_uactor._tcp.local.', listener)
listener.discovery.wait(10)
finally:
zc.close()