Networking

Actors will default to the most efficient method of inter-process communication available.

But in some cases, you may want to distribute workloads between different machines on a same network over TCP/IP. This can by done by defining the appropriate addresses on your actors.

Keep in mind that the same actors classes must be available, at the same modules, in all the involved parties.

Connection

We can declare our network actor as usual, but customizing the options forwarded to ActorManager with a TCP address (in the example below, 0.0.0.0 means listening to all addresses, while 0 means choosing a random port), we also need to specify an authtoken (the authentication secret), initialize the actor (listening to incoming connections), and print at which port is the actor listening at.

We’ll name this module network_actor.py to be used later.

import os
import time
import uactor

class NetworkActor(uactor.Actor):

    # Actor manager options to listen over TCP on a random port
    _options_ = {'address': ('0.0.0.0', 0), 'authkey': b'SECRET'}

    def getpid(self):
        return os.getpid()

if __name__ == '__main__':
    with NetworkActor() as actor:
        host, port = actor.connection_address
        print(f'Actor process {actor.getpid()} at {host}:{port}')
        # Actor process 140262 at 0.0.0.0:37255

        while True:  # keep the owner proxy alive
            time.sleep(10)

We can now connect, remotely, to the same actor process with the uactor.Actor.connect method with the correct authkey, keep in mind both proxy address hostname and port to reach its actor process can vary at different network locations.

This is a remote connection example (importing the actor class from above):

from network_actor import NetworkActor

address = 'localhost', 37255
with NetworkActor.connect(address, b'SECRET') as actor:
    host, port = actor.connection_address
    print(f'Actor process {actor.getpid()} at {host}:{port}')
    # Actor process 140262 at localhost:37255

Forwarded proxies

One neat feature of uActor is proxy forwarding, that is, being able to pass proxies as arguments or return them, to and from actors.

But when forwarding proxies from actors with different secrets, complexity adds up pretty quickly.

If a proxy returns a foreign proxy from an actor we aren’t connected to, an AuthkeyError will be raised because our process does not know its authkey.

import uactor

class MyActor(uactor.Actor):
    _exposed_ = ('my_other_actor',)

    def __init__(self):
        self.my_other_actor = MyOtherActor()

class MyOtherActor(uactor.Actor):
    _options_ = {'address': ('0.0.0.0', 7000), 'authkey': b'OtherSecret'}

with MyActor() as actor:
    my_other_actor = actor.my_other_actor
    # AuthKeyError

We need to connect to actors before being able to take proxies pointing to them, while at the same time we probably need to translate those proxies addresses to be reachable from our location.

with MyActor() as actor:
    address = 'localhost', 7000
    capture = [('0.0.0.0', 7000)]
    with MyOtherActor.connect(address, b'OtherSecret', capture=capture):
        my_other_actor = actor.my_other_actor

Or, alternatively, we can wait until we get an exception to perform the connection.

with MyActor() as actor:
    try:
        my_other_actor = actor.my_other_actor
    except uactor.AuthKeyError as e:
        address = 'localhost', 7000
        capture = [('0.0.0.0', 7000)]
        with MyOtherActor.connect(address, b'OtherSecret', capture=capture):
            my_other_actor = actor.my_other_actor

Both approaches have their pros and cons, is opt to the developer to choose wisely between them, based on the side-effects on his implementation.

Server mainloop with remote shutdown

Since the actor process have to be kept alive by its parent process, we can implement some simple logic to keep it around until needed, while allowing remote shutdowns.

We’ll name this module network_actor.py to be used later.

import threading
import uactor

class NetworkActor(uactor.Actor):

    # Actor manager options to listen over TCP on a random port
    _options_ = {'address': ('0.0.0.0', 6000), 'authkey': b'SECRET'}

    def __init__(self):
        self.finished = threading.Event()

    def shutdown(self):
        self.finished.set()

    def wait(self, timeout=-1):
        return self.finished,wait(timeout)

if __name__ == '__main__':
    with NetworkActor() as actor:
        while not actor.wait(timeout=10):  # avoid socket timeouts
            pass

This way, a remote proxy will be able to end the mainloop by calling shutdown and end the owner process mainloop, effectively finishing the process. We’ll import the actor class from the previous example.

from network_actor import NetworkActor

address = 'localhost', 6000
external = NetworkActor.connect(address, b'SECRET')
external.shutdown()

Autodiscovery

To enable dynamic actor discovery, we might keep an central actor listening to an specific port, acting as an central registry for other actors.

This way, we can start as many actors as we like, at any time.

We’ll name this module network_actor_registry.py to be used later.

import itertools
import os
import time
import uactor

class Registry(uactor.Actor):

    _options_ = {'address': ('0.0.0.0', 5000), 'authkey': b'SECRET'}

    def __init__(self):
        self.addresses = frozenset()
        self.iterator = iter(())

    def register(self, *addresses):
        addresses = self.addresses.union(addresses)
        self.iterator, self.addresses = itertools.cycle(addresses), addresses

    def pick(self):
        return next(self.iterator, None)

class NetworkActor(uactor.Actor):

    # Actor manager options to listen over TCP on a random port
    _options_ = {'address': ('0.0.0.0', 0), 'authkey': b'SECRET'}

    def getpid(self):
        return os.getpid()

if __name__ == '__main__':
    with Registry() as registry:
        actors = [NetworkActor() for actor in range(10)]
        addresses = [actor.connection_address for actor in actors]
        registry.register(*addresses)

        print(f'Registry listening at port {registry.connection_address[1]}')
        # Registry serving at port 5000

        print(f'Actors listening at ports {[port for _, port in addresses]}')
        # Actors listening at ports [36061, 35245, ..., 33701, 41653]

        while True:  # keep registry and actors alive
            time.sleep(10)

Using registry also allow us to register new actors dynamically.

import time

from network_actor_registry import Registry, NetworkActor

address = 'localhost', 5000
with Registry.connect(address) as registry:
    actors = [NetworkActor() for actor in range(10)]
    addresses = [actor.connection_address for actor in actors]
    registry.register(*addresses)

    print(f'Actors listening at ports {[port for _, port in addresses]}')
    # Actors listening at ports [36061, 35245, ..., 33701, 41653]

    while True:  # keep actors alive
        time.sleep(10)

And we can access those actors by retrieving their addresses from the registry (taking care of handling local addresses, see forwarded proxies).

from network_actor_registry import Registry, NetworkActor

address = 'localhost', 5000
with Registry.connect(address, b'SECRET') as registry:
    for i in range(10):
        _, port = registry.pick()
        address = 'localhost', port
        with NetworkActor.connect(address, b'SECRET') as actor:
            print(f'Actor at port {port} has pid {actor.getpid()}')