The chistributed Architecture¶
The chistributed architecture is capable of running a number of processes concurrently, and simulates a distributed system by allowing those processes to communicate through a central message broker. So, instead of communicating directly with each other via sockets, processes communicate by sending messages to the message broker, which is responsible for getting the message to its destination. This provides a central point for testing all communications between nodes, as well as simulating failure conditions in the distributed system.
When implementing a specific distributed system, all you need to do is implement a standalone program representing a node. Your program must accept a number of command-line parameters (specified in Implementing a Node) and must be able to process messages in a simple JSON protocol (specified below).
To simulate a system with N nodes, the broker will simply launch N copies of your program. Of course, your system may require that some nodes in the system take on specific roles (e.g., Proposers vs. Acceptors in a Paxos implementation). Our architecture requires that you implement all those roles in the same program; you can then use command-line parameters (which you can configure the broker to use when launching your program) to make your program run in a specific role.
The message broker is implemented using ZeroMQ/ØMQ , a lightweight message queue with support for many topologies and bindings for many programming languages. Don’t worry if you’ve never used ZeroMQ (or even message queues) before: the language bindings for ZeroMQ provide a simple and very high-level interface (well above the socket layer), and there are convenient functions to “send message X to node named A” and to “wait for a message”.
ZeroMQ topology¶
The ZeroMQ topology is depicted below.
Every node in your system connects to the broker with two separate ZeroMQ sockets:
A
ZMQ_REQ
socket connects to the broker’s router socket. This socket is used to send messages to the message broker.Note that the ZeroMQ API guarantees that a message sent is actually received, unsegmented, by the peer for
ZMQ_REQ
sockets. However, it requires that a response be received before another message can be sent on that socket. As such, every message you send on theZMQ_REQ
socket will receive a response from the broker acknowledging the message (or, in some cases, indicating that an error happened).A
ZMQ_SUB
socket connected to aZMQ_PUB
socket on the broker. This socket is used to receive messages to the message broker.In particular, all communication from other nodes, or special messages from the broker, will be received on this socket.
Every node is identified by a string (not by an IP address). So, in this topology, if a node Alice
wishes to send a message to a node Bob
, then Alice
will send the message to the broker
through the ZMQ_REQ
socket, specifying Bob
as the destination. Bob
will then receive
the message through its ZMQ_SUB
socket.
The exact format of the messages is described below. The specific details on how to configure the ZeroMQ sockets in your implementation is described in Implementing a Node.
Virtual Client¶
Conceptually, there are also “clients” that send requests to the distributed system. In chistributed, your distributed system will implement a key-value data store, so the virtual client can only send two types of requests: a request to set a value in the data store, or a request to get a value from the data store. These requests are routed through the message broker, and must be directed to a specific node in your system. However, there are no actual client processes; instead, you will be able to directly instruct the broker to send get and set requests to the nodes in your distributed system.
While we define a specific format for these get and set requests, the semantics of these operations is up to you. For example, suppose your system can only provide a stale value in response to a get request. Depending on how you designed your system, this could result in a successful get response (because you prefer to return a stale value to not return any value at all), or an error response (because you designed your system to only return the latest value, or none at all).
Messages¶
All messages in chistributed are encoded as a JSON object. For example:
{"type": "set",
"destination": "proposer2",
"id": 987568098927,
"key": "x",
"value": "42"}
In this section we describe only the format of the chistributed messages. In Implementing a Node we will see that ZeroMQ requires these messages to be further encapsulated in a ZeroMQ-specific data structure.
All messages have at least a type
field to specify the type of message, with additional fields
depending on the type of message. When a message is directed to a specific node (as opposed to
a message directed to the broker itself), the message will also have a destination
field
specifying the node that the message is directed to.
We define three message types that always originate in the broker, and are directed to a
specific node (specified with the destination
field):
hello
: The very first message the broker sends to a node.get
: Represents a request by a client to get the current value of a given key.set
: Represents a request by a client to set the value for a given key.
We also define three message types that represent messages from a node to the broker itself (i.e., the broker itself is the final recipient of the message; they are not messages that have to be relayed to another node):
helloResponse
: Sent in response tohello
. The broker will not consider a node to be running until it receives ahelloResponse
from the node.getResponse
: Sent in response to aget
.setResponse
: Sent in response to aset
.
Your implementation can use additional message types. When the broker encounters a message with
a type
that is not listed above, it will simply deliver it to the node specified in the
destination
field.
The set
and setResponse
messages¶
The set
message has the following fields:
type
, equal toset
.destination
, set to the node that will process the set request. For example, in a Paxos implementation it only makes sense to sendset
messages to a Proposer node.id
, set to a unique integer identifier selected by the broker.key
andvalue
, used to specify the key and value that will be set by this request.
For example:
{"type": "set",
"destination": "proposer2",
"id": 3215786,
"key": "x",
"value": "42"}
When a set
request has been processed, the node which received the set
message must send back
a setResponse
message. If the request was processed correctly, the message will have the following
fields:
type
, set tosetResponse
.id
,key
,value
, with the same values as in theset
request.
For example:
{"type": "setResponse",
"id": 3215786,
"key": "x",
"value": "42"}
Note that sending a setResponse
does not imply that the set operation has been fully completed;
it just implies the request to set a value has been processed correctly (e.g., in an eventually
consistent data store, you could return a setResponse
once the replication
of the value has begun, but without waiting for it to complete).
If the request was not processed correctly, the message will have the following fields:
type
, set tosetResponse
.id
, with the same identifier as in theset
request.error
, with a descriptive error message.
For example:
{"type": "setResponse",
"id": 3215786,
"error": "Could not set x to 42 -- rejected by acceptors"}
The get
and getResponse
messages¶
The get
message has the following fields:
type
, equal toget
.destination
, set to the node that will process the get request.id
, set to a unique integer identifier selected by the broker. Note: The broker will use unique identifiers across both sets and gets (i.e., a set operation will never have the same identifier as a get operation)key
, used to specify the key whose value we want to get.
For example:
{"type": "get",
"destination": "master",
"id": 3215787,
"key": "x"}
When a get
request has been processed, the node which received the get
message must send back
a getResponse
message. If the request was processed correctly, the message will have the following
fields:
type
, set togetResponse
.id
,key
, with the same values as in theget
request.value
, with the value associated with keykey
.
For example:
{"type": "getResponse",
"id": 3215787,
"key": "x",
"value": "42"}
If the request was not processed correctly, the message will have the following fields:
type
, set togetResponse
.id
, with the same identifier as in theget
request.error
, with a descriptive error message.
For example:
{"type": "getResponse",
"id": 3215787,
"error": "No such key: x"}
The hello
and helloResponse
messages¶
When a new node is started, the broker will first send that node a hello
message with the following fields:
type
, equal tohello
.destination
, set to the node’s name.
The node must reply with a helloResponse
message with the following fields:
type
, equal tohelloResponse
.source
, set to the node’s name.
Sending a helloResponse
to the broker indicates that the node is ready to receive more messages. The broker
will not allow messages to be sent to a node until it receives a helloResponse
message is received
from that node.
Custom messages¶
You can define as many additional types of messages as you want. For example:
{"type": "prepare",
"source": "proposer1",
"destination": "acceptor1",
"proposal_id": 3}
The broker will simply deliver these messages to the node specified in the destination
field.
You should, however, take the following into account:
Whenever you send a message from one node to another to communicate a value, you must use a field called
value
. When simulating Byzantine failures, this is the field that the broker will tamper with in your messages.ZeroMQ will not include information about the node who originally sent a given message, so you will probably want to include a
source
field in all your messages.