http://catern.com/caternetes.html
No more DSLs: Implement and deploy a distributed system with a single
program
Table of Contents
* 1. Introduction
* 2. Writing a single-program system
+ 2.1. orderd: an order entry daemon
+ 2.2. async def start_orderd(
+ 2.3. nursery: trio.Nursery,
+ 2.4. thread: rsyscall.Thread,
+ 2.5. positiond: Positiond,
+ 2.6. database: orderd.Database,
+ 2.7. listening_sock: FileDescriptor,
+ 2.8. ) -> Orderd:
+ 2.9. Full example
* 3. Implementation of component libraries
* 4. Conclusion
1 Introduction
If you want to write a distributed system, then instead of writing a
thousand different programs and configuration files and DSLs, you can
use an approach I call "single-program systems", and write just one
program. In this article, I describe how we do this at $DAYJOB, and I
show some example programs. These examples are in Python, but this
approach works in any language.
Besides single-program systems, distributed languages also try to
allow the user to implement a distributed system as a single program.
But these languages assume the presence of lots of distributed
systems infrastructure which is maintained outside the language; a
program written in such a language is then only one component in a
larger distributed system. The single-program system approach,
instead, allows you to write a program which manages every aspect of
the distributed system above the hardware level, such that running
the program deploys the distributed system, and the program and the
distributed system can really be said to be the same thing.
At $DAYJOB, each team maintains Python libraries for their
components; these libraries are used to write the single-program
system. Some components are in Python, but others are written in
other languages (mostly C/C++/Java) and run in a separate process. In
this way, performance-sensitive components of a distributed system
are written in faster languages than Python, in the same way that
Fortran libraries are used to speed up math in Python programs. All
the component libraries are maintained in a single repository to
reduce compatibility issues.
The component libraries use normal programming language features,
including Python 3's static type system, to handle distributed
systems issues. For example, regular function arguments are used to
express service discovery and dependencies between services; for each
service, there's a service-starting function which takes that
service's dependencies as arguments, uses those arguments to run the
service, and returns a value that can be passed as an argument (a
dependency) to other service starting functions. In general, services
are "configured" by passing arguments, in normal code, and multiple
instances can be created just by calling functions multiple times
with different arguments.
When a service-starting function starts a C/C++/Java process, it
starts the process directly as a subprocess which the program will
continue to monitor directly, rather than delegating to a process
supervisor service. Functionality for distributed execution and
monitoring is shared through libraries rather than by delegating to
external orchestration systems, making a single-program system
completely self-contained. This allows a single-program system to be
used for deployment in any environment, whether that's bare-metal,
virtual machines, containers, or a developer machine; though
different environments and use cases usually implies different
programs with different configurations.
You can run the resulting program to run the full system for
production, or for testing the system against its users. This program
is not a DSL; it doesn't get processed by some tool and turned into a
list of actions to perform. Rather, it's run by a normal Python
interpreter, and when the program calls start_foo, the program starts
up the foo process, then and there, directly. When everything is
started, the program continues running, monitoring each process and
providing interfaces (the Python REPL, UIs, etc) to interact with the
running system.
Upgrading the distributed system is a matter of upgrading the single
program. Doing this doesn't require downtime. One simple approach is
to restart the single program without killing its child processes,
and then the new version takes on the responsibility of gradually
upgrading the services. Alternatively, more advanced techniques such
as dynamic software updating could be used to upgrade the single
program without restarting it.
This approach works for almost all distributed systems. Most
distributed systems, such as those operated by a single company, have
centralized authority; by this I mean that, ultimately, the owners
and employees of the company have the authority to do anything they
want with the distributed system (though doing anything actually
useful might be hard). A single-program system extends this to
logically centralized control; that centralized authority is
delegated to a single program which controls the system. Systems with
centralized authority are easier to operate in many ways, but
centralization isn't always possible or desirable. Decentralized
systems such as the Internet couldn't be run as a single-program
system because they contain many interacting "ultimate authorities"
running their own subsystems; each of those subsystems, however,
could be run as a single-program system.
2 Writing a single-program system
We'll see this in action by looking at a working example program.
2.1 orderd: an order entry daemon
* Accepts or rejects orders sent over a TCP listening socket
* Updates the positiond service with the positions
* Stores order data in a SQLite database
orderd is a real daemon ("d" is short for "daemon"), with a few
details removed. We're looking at orderd specifically because it has
only the three dependencies we've already mentioned. Note that orderd
itself may or may not be written in Python this is abstracted away
from us. (although in practice, it is in fact a separate subprocess)
For our example, we'll write a test of our distributed system. We'll
start up orderd and its dependencies (just positiond) for the test,
using functions from the component libraries to run each service.
First, some testing boilerplate:
import unittest
from orderd import start_orderd
class TestOrderd(unittest.TestCase):
def setUp(self) -> None:
# TODO start up orderd and its dependencies
self.orderd = start_orderd(...)
def test(self) -> None:
self.assertTrue("Do test stuff")
To write setUp, we'll proceed by looking at the signature of the
start_orderd function, provided by the orderd component library. Note
the type annotations for static type checking, introduced by Python
3.
# in the "orderd" module
async def start_orderd(
nursery: trio.Nursery,
thread: rsyscall.Thread,
positiond: positiond.Positiond,
listening_sock: rsyscall.FileDescriptor,
database: orderd.Database,
) -> Orderd:
We'll look at the start_orderd signature line by line, creating each
argument individually, and at the end we'll call start_orderd and
have a running instance of orderd.
The first three lines of the function signature (up to and including
thread: rsyscall.Thread,) are essentially common to all service
starting functions. The last four lines (starting with positiond:
Positiond,) are specific to orderd.
2.2 async def start_orderd(
async def start_orderd(
start_orderd is an async function. In Python, this simply means that
it can run concurrently with other functions, which allows us to
start services up in parallel, using Python-specific techniques which
we won't show in this example. Other than that, it's a completely
normal function, which is called with await start_orderd(...) from
any other async function, and which blocks execution until it
returns.
Since start_orderd is async, we need to run it from an async runner.
We'll use the open source library trio for that, which means we'll
need to tweak our boilerplate slightly to use TrioTestCase.
from trio_unittest import TrioTestCase
class TestOrderd(TrioTestCase):
async def asyncSetUp(self) -> None:
self.orderd = await start_orderd(...)
Other than this change in boilerplate, Python async functions work
like any others; you can safely ignore the "async" and "await"
annotations. We won't use any async features in this TestCase
example; the only use of async features will be later, with
start_exampled, when we look at how a component library is
implemented.
2.3 nursery: trio.Nursery,
nursery: trio.Nursery,
trio.Nursery is defined by the open source trio library, and it
provides the ability to start up functions in the background. We pass
it in to start_orderd so that start_orderd can start a function in
the background to monitor the running orderd process. If the orderd
process exits, the background function monitoring that process will
throw, and the resulting exception will be propagated to the
trio.Nursery, which will deal with it in some way specific to how the
trio.Nursery was produced. Upon seeing an exception in a background
function, the logic for a trio.Nursery might call start_orderd again
immediately, it might kill the other background functions and start
them all up again with start_ functions, or it might ultimately
prompt for operator intervention through various means. An operator
might then work at a UI or a REPL to fix the issue, by calling
start_orderd with different arguments.
In this case, we'll use self.nursery as provided by TrioTestCase,
which turns any failure in a background task into a failure of the
whole test.
async def asyncSetUp(self) -> None:
# self.nursery provided by TrioTestCase
self.orderd = await start_orderd(
self.nursery,
...,
)
2.4 thread: rsyscall.Thread,
thread: rsyscall.Thread,
rsyscall.Thread is defined by the open source rsyscall library, and
it provides the ability to run system calls, including running
subprocesses. We pass it in to start_orderd so that start_orderd can
start the orderd subprocess, as well as perform other operations to
prepare the environment for orderd. An rsyscall.Thread may operate on
a local or remote host, or inside a container or VM, or on other
kinds of nodes, depending on how the rsyscall.Thread was produced,
but it provides a completely common interface regardless of where it
runs.
Component library code itself never runs distributed across multiple
nodes; there's a single Python interpreter on a single host. All
distributed operations are performed by method calls on
rsyscall.Thread objects.
In this case, we'll use local_thread imported from rsyscall and
assigned to self.thread. local_thread runs on the same thread as the
Python interpreter - that is, on localhost.
from rsyscall import local_thread
async def asyncSetUp(self) -> None:
self.thread = local_thread
self.orderd = await start_orderd(
..., self.thread, ...,
)
2.5 positiond: Positiond,
positiond: Positiond,
This is the first orderd-specific argument.
positiond is a service which orderd updates with information about
its position. All the information required to connect to and use
positiond is contained in the Positiond class.
Since positiond is its own service, we need to use start_positiond to
start it.
async def start_positiond(
nursery: trio.Nursery,
thread: rsyscall.Thread,
workdir: rsyscall.Path,
) -> Positiond: ...
The first two arguments are shared with orderd. The third argument,
workdir, is unique to positiond. workdir is a path in the filesystem
that positiond will use; in this case, positiond will use it to store
shared memory communication mechanisms and persistent data.
We'll pass a path in a temporary directory in this example.
# Make a temporary directory
self.tmpdir = await self.thread.mkdtemp()
self.orderd = await start_orderd(
...,
await start_positiond(self.nursery, self.thread, self.tmpdir/"positiond"),
...,
)
2.6 database: orderd.Database,
database: orderd.Database,
This is a completely conventional SQLite database, initialized with
the orderd schema.
Here, for a test, we're calling orderd.Database.make to make a fresh
database, every time. If we wanted to persist state between runs of
orderd, we'd pass in a orderd.Database instance from a previous run,
recovered from some known path in the filesystem with
order.Database.recover(path).
self.orderd = await start_orderd(
...,
await orderd.Database.make(self.thread, self.tmpdir/"db"),
...,
)
2.7 listening_sock: FileDescriptor,
listening_sock: FileDescriptor,
This is a listening socket, passed down to the orderd subprocess
through file descriptor inheritance, and used to listen for TCP
connections.
This is standard Unix socket programming, so we won't go into this in
depth; although note that we create this with self.thread, so that it
it's on the same host as orderd.
async def asyncSetUp(self) -> None:
# Make a TCP socket...
sock = await self.thread.socket(AF.INET, SOCK.STREAM)
# ...bind to a random port on localhost...
await sock.bind(await self.thread.ptr(SockaddrIn(0, "127.0.0.1")))
# ...and start listening.
await sock.listen(1024)
self.orderd = await start_orderd(
..., sock, ...,
)
2.8 ) -> Orderd:
) -> Orderd:
Like all good component libraries, start_orderd returns an Orderd
class which contains all the information required to connect to
Orderd, such as an address and port, a shared memory segment, or a
path in the filesystem.
start_orderd, again like all good component libraries, will only
return when the orderd communication mechanisms have been fully
created, and therefore the Orderd class can be immediately used to
connect to orderd.
2.9 Full example
Here's the full, working example:
class TestOrderd(TrioTestCase):
async def asyncSetUp(self) -> None:
# self.nursery provided by TrioTestCase
self.thread = local_thread
self.tmpdir = await self.thread.mkdtemp()
sock = await self.thread.socket(AF.INET, SOCK.STREAM)
await sock.bind(await self.thread.ptr(SockaddrIn(0, "127.0.0.1")))
await sock.listen(1024)
self.orderd = await start_orderd(
self.nursery, self.thread,
await start_positiond(self.nursery, self.thread, self.tmpdir/"positiond")
await Database.make(self.thread, self.tmpdir/"db"),
sock,
)
Then we can proceed to test by running user code.
3 Implementation of component libraries
Now we'll step through a working example of how a component library
is implemented. This one shells out to a separate process, exampled.
This daemon is packaged and deployed with Nix; at $DAYJOB we use a
proprietary package manager with similar APIs.
Below is the full code for the exampled component library, with
comments inline to explain it.
import nix_rsyscall
import rsyscall
import trio
# a Nix-specific generated module, containing the information required
# to deploy the exampled package; generated by setup.py.
import exampled._nixdep
class Exampled:
def __init__(self, workdir: rsyscall.Path) -> None:
self.workdir = workdir
async def start_exampled(
nursery: trio.Nursery,
thread: rsyscall.Thread,
workdir: rsyscall.Path,
) -> Exampled:
# deploy the exampled package and its dependencies; this doesn't deploy the
# package for this Python library, but rather the exampled daemon
package = await nix_rsyscall.deploy(thread, exampled._nixdep.closure)
# build the command to actually run
command = package.bin('exampled').args("--verbose", "--do-stuff-fast")
# make the thread that we'll run that exampled command in;
# this child_thread is a process under our control, see http://rsyscall.org
child_thread = await thread.clone()
# change the CWD of the child thread; CWD is inherited over exec, so it will be used by exampled
await child_thread.mkdir(workdir)
await child_thread.chdir(workdir)
# exec the command in the child thread; this exec helper method returns a monitorable child process object
child_process = await child_thread.exec(command)
# monitor the child process in the background; see https://trio.readthedocs.io/
# we'll get an exception if it exits uncleanly; this is our one use of async features.
nursery.start_soon(child_process.check)
# return a class containing exampled's communication mechanisms;
# it communicates with the world only by creating files under `workdir'
return Exampled(workdir)
4 Conclusion
The single-program approach allows one to write a single type-safe
program which runs the entire distributed system, delegating to
libraries to share functionality and subprocesses to improve
performance. The alternative is writing ten programs and a thousand
config files for configuration management, service discovery,
orchestration, etc. I believe the advantages of the single-program
approach are self-explanatory.
The single-program system techniques are explained in greater detail
in the posts linked in the introduction. With those techniques, and
with the open source libraries rsyscall and trio, anyone can write
their own component libraries, and combine them with other libraries
to write a distributed system in one program.
Created: 2021-07-09 Fri 18:48
Validate