Implementation

Here, we describe the overall design of our distributed service, followed by implementation details.

Our system is a Multi-Paxos with leader election. The leader is responsible for handling client requests, initiating Paxos voting to determine command sequence, and disseminating chosen commands. It acts as the distinguished proposer and distinguished learner in the “Paxos made simple” paper [1]. The followers might be viewed as replicas of the leader: they have the same command sequence, and consequently the same state, as the leader, but they do not actively handle requests. Clients might contact any node in the server group, though a follower will redirect the client to the leader.

As a summary, we implemented the following basic functionalities:

  • Multi-instance Paxos and replicated state machine (RSM)
  • A simple lock service on top of the Paxos RSM
  • Fault tolerance to node failures and message loss

We also implemented these bonus functionalities:

  • Distinguished proposer (leader) optimization and its election mechanism
  • Leader failure tolerance
  • Wait, resend and catch-up mechanisms against network failures

Determining Command Sequence

We first describe how we use Paxos to determine command sequence. In the next subsection, we will discuss the leader election mechanism.

Paxos under Normal Circumstances

When the leader receives a request from the client that asks to execute command C, it starts a new Paxos instance with instance number i, trying to make C the i-th command. It first sends a prepare request with the next available proposal number to all followers. If this proposal number is higher than the proposal number that a follower has promised, the follower will remember this. The follower will always reply with the proposal number (after updating) and its corresponding value.

# Pseudo code for the prepare phase

def Proposer(self):
  message = { 'proposal_n': self.next_proposal_n() }
  send_prepare(message)

def Acceptor(self):
  if req.proposal_n > self.highest_proposed_n:
    self.highest_proposed_n = req.proposal_n
    self.highest_proposed_v = req.value
  message = {
    'proposal_n': highest_proposed_n,
    'value': highest_proposed_v
  }
  reply(message)

When the leader gathers a majority of prepare responses (including itself), it proceeds to the accept phase. In this phase, it will propose the highest proposal number and its value received from clients in accept request. It sends out accept requests to all followers. (The code snippet below includes logic for handling prepare responses of the previous phase.) The accept value will be that of the highest accepted proposal from the replies, or the intended command C if followers reported no other proposal. A follower will accept this request unless it has promised not to, and informs the leader.

# Pseudo code for the accept phase

def Proposer(self):
  highest_prep_n = max(prep_n in prepare_responses, -1)
  if highest_prep_n > 0 {
    v = highest_prep_value 
    n = highest_prep_n
  } else {
    v = my_command
    n = my_proposal_n
  }
  message = {
    'proposal_n': n,
    'value': v
  }
  send_accept(message)

def Acceptor(self):
  if req.proposal_n >= self.highest_proposed_n:
    self.highest_proposed_n = req.proposal_n
    self.highest_proposed_v = req.value
    reply('success')
  else:
    reply('failure')

After the leader receives a majority of accept responses (including itself) that all have the same proposal number, the value becomes chosen. In other words, the value (which is a command) is now the i-th command. The leader then disseminates this information to all followers until everyone learns this result.

When a node learns about a chosen command, it adds the command to its command sequence, and determines if the command can be executed. A command can be executed if every preceding command is already chosen (i.e. there are no “holes” before it in the command sequence). The server only replies to the client after it executes the command. In our implementation, we attach the client address to the command, so other servers might respond in case the leader fails.

Proposal Number

Our goal is to ensure that each proposal have a distinct proposal number, to prevent duplicated or stale proposals. To support this, each node draws from its distinct set of numbers. A node increments a base proposal number j each time it issues a new proposal. It then uses the following code to compute the proposal number:

# total_nodes is the total number of nodes in the Paxos group
# node_id is the ID of the current node
proposal_n = j * total_nodes + node_id

Apparently, here we assume that the # of nodes in our system will never exceed the static total_nodes. This is not always the case in reality, but we made this assumption for simplicity.

Handling Failures

To ensure progress under potential message loss (network outage), if the leader does not receive a majority of prepare or accept responses after a timeout, it will resend the messages.

Also, while our system is not handling node recovery yet, our consensus protocol will not be impacted if only a minorify of node failed (including leader). This is automatically guaranteed by a correct implementation of the voting mechanism.

Leader Election

We use a special Paxos instance to elect the leader. The voting process is exactly the same as described above, except that the value is leader node ID instead of command. Such voting is triggered when a follower thinks that the leader is dead.

The leader periodically sends heartbeat messages (in a separate channel) to all of its followers. When a follower does not receive the heartbeat message for a certain period, it assumes that the leader fails and initiates a leader election Paxos instance, proposing itself as the leader. To partly avoid multiple nodes proposing at the same time, we added a randomized timeout to each follower.

Question: what to do when we’re in the middle of leader election? Who will act as the distinguished proposer and learner?

Answer: If the client tries to connect to the original proposer and found that it is dead, the system might either be in the process of election or has finished. Now the client will send a request to the next node it knows in a round-robin fashion, and that node will return a redirection message pointing to the old, probably failed leader, if the new leader is yet to be chosen. Now the client can sleep for a while before the next trial. Or, if the election finished, a new leader will be returned. The client can realise the difference and send a request to the new leader.

Lock Service

We implemented a simple lock service on top of the Paxos RSM. We supplied a client implementation for testing purpose. The client is capable of sending multiple messages to the server, resending messages on timeout or failures, and remembers the Paxos leader identity. We additionally write several test scripts, each demonstrating a test scenario.

Test Scenarios

In our test scripts, we demonstrate the following scenarios:

  1. Under normal lock service operation (aka. no node failures), a client sends several lock and unlock command sequences to the server group.
  2. Under normal operation, a client requests a lock repeatedly.
  3. Under normal operation, multiple clients send requests concurrently.
  4. Under normal operation, a client sends multiple requests, while rotating the server it contacts. (We implemented the client to remember the Paxos leader identity, but here we force it to contact a different server each time.)
  5. A client sends multiple requests, but a follower node fails in between.
  6. A client sends multiple requests, but the leader node fails in between.

In all the scenarios above, we hardcoded a message drop rate of 5%. In other words, 5% of the messages sent between the Paxos servers will be dropped randomly.

Server API and Wire Protocol

Our server can accept request messages like the following one from client:

{
  "source": "client",
  "client": 1,         // client id
  "command": "lock_1"
}

Our server can acccept messages from peers (other servers) like above:

{
  "source": "server",
  "server_id": 1,
  "instance": 2,      // paxos instance number
  "type": "prepare",  // type of request
  "proposal_n": 10,   // proposal number
  "command": "lock_1" // value to be agreed on
}

For lack of space, please refer to our implementation for details of this protocol.

Assumptions

We made the following assumptions:

  • The Paxos server identities are hardcoded in the clients.
  • Clients are well-behaved.
  • The client to server communications are reliable.
  • The total number of nodes in the Paxos group is fixed and known in advance.
  • The client is persistent. Specifically, it might contact node A to request a lock but receives a response from node B.

Usage

Source Code Structure

Our source code consists of three parts:

Server implementation: paxos_server.py.

Client implementation for testing: paxos_client.py.

Test scripts:

  • test a normal lock & unlock run: test_run.py
  • test a failed lock after lock run: test_lock_semantics.py
  • test 10 clients sending requests concurrently: test_concurrency.py
  • test contacting different nodes: test_random_server.py
  • test the node failure handling logic: test_node_failure.py
  • test the leader failure hanlding logic: test_leader_failure.py

To run a test script:

# we use python 3
python test_XXXXX.py

The tests will start the servers and clients for you. You might also start a Paxos server manually by:

python paxos_server.py [server_id] [total_nodes] [-v]

Lock Service Client API

Our client module is a Python class that contains automatic rotate/retry logic, upon waiting (server busy) and refused connection (server dead). With that, our lock service API is very simple and self-explaining:

cli = LockClient(client_id, total_num_nodes)
assert cli.lock(1) == { 'status': 'ok' }
assert cli.lock(1) == { 'status': 'locked', 'by': 1 }
assert cli.unlock(1) == { 'status': 'ok' }

There are plenty of executable examples in our test_*.py corpus.

Metrics, Experiments and Measurements

Metrics Design

We will focus on the following aspects of our systems, which can be defined as a precise metrics. We will discuss them separately in the following text.

  1. Test regression rate given the message drop rate
  2. Test regression rate given the number of total nodes
  3. Latency of the request given the traffic level and the number of total nodes
  4. Number of messages exchanged between nodes given the messeage drop rate and the number of total nodes

Note that we did this quantitative study with a very limit time budget, so we don’t have enough samples to make scietific conclusion. Also, potential pure engineering-level bugs might skew the statistics as well. However, we did our best efforts to make the following empharical analysis which we don’t claim to be 100% true.

Test regression rate given the message drop rate

Data: measurements/test_run_regression_message_loss.csv.

This metric reflects the robustness of our system. The key fact is that our system is not designed to be tolerant to any degree of message drop rate. However, it would be useful if we know how system availibility degrades when loss rate increase.

Here is a bar plot showing for each message drop rate (X axis), the test success rate (not stuck or runtime error) in percentage (Y axis).

Test regression rate given the message drop rate

From the data, we can also observe that despite failure rate increases when message drop rate increases, the time spent for completing the request don’t change much.

Test regression rate given the number of total nodes

Data: measurements/test_run_regression_total_nodes.csv

This metrics shows that under a fixed message drop rate (25%), how number of total nodes in the server group might impact the test failure rate. We choose 25% based on data in 4.2.

The basic observation is that 3 or 5 nodes performs basically the same.

Latency of the request given the traffic level and the number of total nodes

Data: measurements/test_latency_concurrency.csv

This metrics show how traffic, or more concretely, the number of concurrent clients and size of server group might impact the latency (observed from client side).

We can see that larger the server group, bigger the latency; also, our system can scale from one client to ten clients without much impact to latency.

Number of messages exchanged between nodes given the messeage drop rate and the number of total nodes

Data: measurements/test_num_msgs.csv

This metrics show the number of message exchanges between nodes in the server group, given the different message drop rate and different total number of nodes.

Our observation is that, the larger the message drop rate, the less messages are exchanged (under the guarantee that functionality is still preseved). Also, the larget the server group, the more messages are exchanged.

Discussions

Potential Issues

  • The client might receive multiple reply messages for the same request it sent. (This is not really an issue as the client can simply ignore the redundant messages.)
  • For the simplicity of testing, the client sends a request and asynchronously wait for the reply. This is not what most clients do in reality.
  • The leader does not ensure that learn messages are received by all followers. This is a known issue.
  • Our implementation will put node X into its failed nodes set once it receives ConnectionRefusedError from it. This practice assumes that node won’t recover.
  • We have a handler_lock mutex to avoid concurrency bug, since state is shared between heartbeat threads and main server thread. But this might impact the performance or lead to deadlock (we didn’t observe)
  • We have a clever trick that calculates networktimeout for contacting leader. This method is based on addding a random number between 0 and 1 to its server id (an integer larger than 1), and multiply the sum by three. The final result is in seconds. This can help us avoid over-competition in election which might lead to poor performance and deadlock. While this trick still preserves progress, it is too artifical and might lead to performance degredation.

Interesting behaviours

  1. Our system will emit emoji in functionality tests

Bibliograph

  1. Paxos made simple, Lamport
  2. Paxos made practical, David Mazieres
  3. Paxos made live, Chandra et al., Google Inc.

Misc

Implementation (MIT License): https://github.com/izgzhen/paxos-example

Authors:

  • Zhen Zhang @ uw
  • Yang Liu @ uw