Networking

Actors will default to the most efficient method of inter-process communication available, which often relies on local sockets or pipes.

Alternatively, actors be set to listen to TCP/IP interfaces and distributed across different machines over networks, rendering uActor as a great tool for distributed computing.

Please note when following this approach, due object serialization, actor classes (and any other serialized type) are required to be available at the same import locations, in all the involved parties.

Connection

When declaring an actor, we can define we want it to listen to a TCP/IP interface by specifying that on his _options_ attribute, along with an explicit authkey secret that clients will need to authenticate.

We would need the value of uactor.Actor.connection_address to know which address an actor is available at.

Example (network_actor.py, server and library):

import os
import time
import uactor

class NetworkActor(uactor.Actor):

    # Actor manager options to listen over all TCP on a random port
    _options_ = {'address': ('0.0.0.0', 0), 'authkey': b'SECRET'}

    def getpid(self):
        return os.getpid()

if __name__ == '__main__':
    with NetworkActor() as actor:
        host, port = actor.connection_address
        print(f'Actor process {actor.getpid()} at {host}:{port}')
        # Actor process 140262 at 0.0.0.0:37255

        while True:  # keep the owner proxy alive
            time.sleep(10)

We can use uactor.Actor.connect classmethod in conjunction with the address available at uactor.Actor.connection_address to connect to a remote actor.

Example (client):

from network_actor import NetworkActor

address = 'localhost', 37255
with NetworkActor.connect(address, b'SECRET') as actor:
    host, port = actor.connection_address
    print(f'Actor process {actor.getpid()} at {host}:{port}')
    # Actor process 140262 at localhost:37255

Forwarded proxies

When networking, because of connections are made manually via uactor.Actor.connect (and as such, actors being considered remote), when receiving a foreign proxy to an actor we aren’t connected to, an AuthkeyError could be raised either because unknown authkey (see proxy forwarding) or because of an invalid address.

Example (network_proxy_forwarding.py, library):

import uactor

class MyActor(uactor.Actor):
    _exposed_ = ('my_other_actor',)

    def __init__(self):
        self.my_other_actor = MyOtherActor()

class MyOtherActor(uactor.Actor):
    _options_ = {'address': ('0.0.0.0', 7000), 'authkey': b'OtherSecret'}

Example (raising AuthkeyError with a remote actor):

from network_proxy_forwarding import MyActor

with MyActor() as actor:
    my_other_actor = actor.my_other_actor
    # AuthKeyError

We need to connect to actors before being able to handle their proxies, as its authkey must be set beforehand, while enabling remote address translation when necessary (via uactor.Actor.connect capture parameter).

Example:

from network_proxy_forwarding import MyActor

with MyActor() as actor:
    address = 'localhost', 7000
    capture = [('0.0.0.0', 7000)]
    with MyOtherActor.connect(address, b'OtherSecret', capture=capture):
        my_other_actor = actor.my_other_actor

For further information head to the proxy forwarding section.

Remote shutdown

By design, actor processes are kept alive as long of their parent processes are running. We can enable remote clients to shutdown an actor process via additional logic on the parent process (mainloop).

Example (network_actor_shutdown.py, server and library):

import threading
import uactor

class NetworkActor(uactor.Actor):

    # Actor manager options to listen over TCP on a random port
    _options_ = {'address': ('0.0.0.0', 6000), 'authkey': b'SECRET'}

    def __init__(self):
        self.finished = threading.Event()

    def shutdown(self):
        self.finished.set()

    def wait(self, timeout=-1):
        return self.finished.wait(timeout)

if __name__ == '__main__':
    with NetworkActor() as actor:
        while not actor.wait(timeout=10):  # timeout avoids socket timeouts
            pass

The code above will enable remote proxies to break the mainloop by calling shutdown, exiting the actor context and effectively finishing both parent and actor processes.

Example:

from network_actor_shutdown import NetworkActor

address = 'localhost', 6000
external = NetworkActor.connect(address, b'SECRET')
external.shutdown()

Registry

In order to help distributed actor visibility while enabling more advance patterns, a centralized actor registry can be implemented.

Example (network_actor_registry.py, server and library):

import itertools
import os
import time
import uactor

class Registry(uactor.Actor):

    _options_ = {'address': ('0.0.0.0', 5000), 'authkey': b'SECRET'}

    def __init__(self):
        self.addresses = frozenset()
        self.iterator = iter(())

    def register(self, *addresses):
        addresses = self.addresses.union(addresses)
        self.iterator, self.addresses = itertools.cycle(addresses), addresses

    def pick(self):
        return next(self.iterator, None)

class NetworkActor(uactor.Actor):

    # Actor manager options to listen over TCP on a random port
    _options_ = {'address': ('0.0.0.0', 0), 'authkey': b'SECRET'}

    def getpid(self):
        return os.getpid()

if __name__ == '__main__':
    with Registry() as registry:
        actors = [NetworkActor() for actor in range(10)]
        addresses = [actor.connection_address for actor in actors]
        registry.register(*addresses)

        print(f'Registry listening at port {registry.connection_address[1]}')
        # Registry serving at port 5000

        print(f'Actors listening at ports {[port for _, port in addresses]}')
        # Actors listening at ports [36061, 35245, ..., 33701, 41653]

        while True:  # keep registry and actors alive
            time.sleep(10)

Using a registry also allow us to register new actors dynamically.

Example (remote actor registration):

import time

from network_actor_registry import Registry, NetworkActor

address = 'localhost', 5000
with Registry.connect(address) as registry:
    actors = [NetworkActor() for actor in range(10)]
    addresses = [actor.connection_address for actor in actors]
    registry.register(*addresses)

    print(f'Actors listening at ports {[port for _, port in addresses]}')
    # Actors listening at ports [36061, 35245, ..., 33701, 41653]

    while True:  # keep actors alive
        time.sleep(10)

And we can access those actors by retrieving their addresses from the registry (keep in mind you would still need to translate local addresses, see forwarded proxies).

Exemple (actor registry usage):

from network_actor_registry import Registry, NetworkActor

address = 'localhost', 5000
with Registry.connect(address, b'SECRET') as registry:
    for i in range(10):
        _, port = registry.pick()
        address = 'localhost', port
        with NetworkActor.connect(address, b'SECRET') as actor:
            print(f'Actor at port {port} has pid {actor.getpid()}')

Autodiscovery

By using zeroconf to provide Multicast DNS Service Discovery, we can easily publish uactor.Actor processes across the network, without the need of any centralized registry.

Example (network_actor_zeroconf.py, server and library):

import os
import socket
import time
import uactor
import zeroconf

class NetworkActor(uactor.Actor):

    # Actor manager options to listen over TCP on a random port
    _options_ = {'address': ('0.0.0.0', 0), 'authkey': b'SECRET'}

    def getpid(self):
        return os.getpid()

if __name__ == '__main__':
    with NetworkActor() as actor:
        host, port = actor.connection_address
        zc = zeroconf.Zeroconf()
        try:
            zc.register_service(
                zeroconf.ServiceInfo(
                    '_uactor._tcp.local.',
                    'NetworkActor._uactor._tcp.local.',
                    addresses=[socket.inet_aton(host)],
                    port=port,
                    server=f'{socket.gethostname()}.local.',
                    )
                )
            while True:  # keep service alive
                time.sleep(10)
        finally:
            zc.close()

And this would be a service relying on zeroconf to fetch the actor address.

Example:

import socket
import threading
import zeroconf

from network_actor_zeroconf import NetworkActor

class MyListener:
    def __init__(self):
        self.discovery = threading.Event()

    def remove_service(self, zeroconf, type, name):
        print(f'Service {name} removed')

    def add_service(self, zeroconf, type, name):
        print(f'Service {name} discovered')
        # Service NetworkActor._uactor._tcp.local discovered

        info = zeroconf.get_service_info(type, name)
        address = socket.inet_ntoa(info.addresses[0]), info.port

        with NetworkActor.connect(address, b'SECRET') as actor:
            print(f'NetworkActor.getpid returned {actor.getpid()}')
            # NetworkActor.getpid returned 906151

        self.discovery.set()

try:
    zc = zeroconf.Zeroconf()
    listener = MyListener()
    zeroconf.ServiceBrowser(zc, '_uactor._tcp.local.', listener)
    listener.discovery.wait(10)
finally:
    zc.close()