7 min read

Leader Election In Distributed Systems - Demonstrated With Apache ZooKeeper

Leader Election In Distributed Systems - Demonstrated With Apache ZooKeeper
"The only creature on earth whose natural habitat is a zoo is the zookeeper." Robert Breault

Introduction

Distributed systems are becoming increasingly common in modern computing environments due to their high scalability and fault tolerance. These systems consist of multiple nodes that work together to achieve a common goal. Basically, every system that contains more than 1 service that talks to other services can be considered a distributed system. In this blog post, we will demonstrate such a system using Apache ZooKeeper.  

What is Apache ZooKeeper?

Apache ZooKeeper is a centralized, open-source software project that provides a highly reliable and scalable coordination service for distributed applications. It is used to manage configurations, naming services, synchronization, and group services in large-scale distributed systems.

ZooKeeper's architecture is designed to allow it to provide high availability, fault tolerance, and consistency. It achieves these goals by maintaining a replicated, in-memory database of configuration data, which is distributed across a cluster of nodes in the ZooKeeper ensemble. Clients connect to the ensemble and read or write data to the database, and ZooKeeper ensures that all updates are propagated to all nodes in the ensemble.

ZooKeeper is widely used in distributed systems, such as Apache Hadoop, Apache Kafka, and Apache Storm.

Why should I choose Apache ZooKeeper for my distributed system?

Apache ZooKeeper is a popular choice for managing distributed systems due to its many benefits, including:

  1. Consistency: ZooKeeper provides a consistent view of the distributed system, ensuring that all nodes see the same data at the same time.
  2. Reliability: ZooKeeper is designed to provide high availability and fault tolerance, even in the face of node failures or network partitions.
  3. Scalability: ZooKeeper can handle large numbers of clients and nodes, making it suitable for even the largest distributed systems.
  4. Easy to use: ZooKeeper provides a simple and intuitive API for managing distributed systems, making it easy to integrate into your application.
  5. Open-source: ZooKeeper is an open-source project, meaning that you can use it for free and customize it to your needs.

ZooKeeper Data Model

ZooKeeper's data model is a hierarchical tree-like structure, that function like a hybrid between file and directory. It is organized into nodes, which can contain data and/or children nodes.

ZooKeeper stores data as byte arrays, which can represent any kind of data, including text, JSON, or binary data. The data associated with a node is called the node's payload.

Each node is called a "Znode" and ZooKeeper provides several types of nodes, each with its own characteristics:

  1. Persistent Nodes: These nodes are created once and persist even if the client that created them disconnects. They can be updated or deleted by other clients.
  2. Ephemeral Nodes: These nodes are associated with a specific client session and are automatically deleted when the client disconnects or its session expires.
  3. Sequential Nodes: These nodes are created with a unique sequential number appended to their path. This ensures that each node has a unique name and helps with ordering and synchronization.

ZooKeeper threading model

When creating a ZooKeeper object in Java, two new threads are created.

  1. Event Thread: responsible for processing all the events generated by the client requests, such as changes to znodes, session expiration, or connection loss. The Event Thread is also responsible for notifying the client's Watcher objects when relevant changes occur.
  2. I/O Thread: also known as "SendThread", responsible for handling all incoming and outgoing network traffic between the ZooKeeper server and its clients.

Why do we need a leader in a distributed system?

A leader is a node or process that is responsible for coordinating the activities of other nodes or processes. The need for a leader arises from the fact that in a distributed system, multiple nodes or processes may need to perform the same task concurrently. Without a leader, there would be no way to ensure that all nodes are working together in a consistent and coordinated way.

We can find a pseudo code for a leader election algorithm inside ZooKeeper's official documentation.

Let ELECTION be a path of choice of the application. To volunteer to be a leader:
1. Create znode z with path "ELECTION/guid-n_" with both SEQUENCE           and EPHEMERAL flags;
2. Let C be the children of "ELECTION", and I am the sequence number of z;
   Watch for changes on "ELECTION/guid-n_j", where j is the largest
3. sequence number such that j < i and n_j is a znode in C;

For Example:

The first znode is elected to be the leader

Downloading and Installing ZooKeeper Locally

First, we need to download ZooKeeper from the Zookeeper Apache page.
You can download any version you would like, I chose the latest version which according to writing these lines is '3.8.1'.

Unzip the file, open the created folder, and create a folder named logs  inside apache-zookeeper-X.X.X-bin folder. (we will use it in a sec).

Open conf folder, change the .cfg file name to zoo.cfg this is the config file name ZooKeeper will look for.

Open zoo.cfg and change the DataDir to the logs folder we created earlier.

Open a terminal window, go the bin directory inside your ZooKeeper version and run ./zkServer.sh start
This should start the Zookeeper Server.

  • If your server fails to start, go to the logs folder and read the logs outputs to find the reason.

Finally, we can use ./zkCli.sh to create our election node.
Type create /election , use ls / to make sure it was created properly.

Implementing Leader Election With Java

With our ZooKeeper server up and running,
Let's Start by creating a new Maven project.
Create a new class called LeaderElection

public class LeaderElection {
	private static final String ZOOKEEPER_ADDRESS = "localhost:2181";
	private static final int SESSION_TIMEOUT = 2000;
	private static final String ELECTION_NAMESPACE = "/election";
	private ZooKeeper zooKeeper;
	private String currentZnodeName;


	public void connectToZookeeper() throws IOException {
		this.zooKeeper = new ZooKeeper(ZOOKEEPER_ADDRESS, SESSION_TIMEOUT, this);
	}

	public void run() throws InterruptedException {
		synchronized (zooKeeper) {
			zooKeeper.wait();
		}
	}

	public void close() throws InterruptedException {
		zooKeeper.close();
	}
}

We can now add a simple function to create an ephemeral sequential znode.

	public void createNewSequentialZnode() throws KeeperException, InterruptedException {
		String znodePrefix = ELECTION_NAMESPACE + "/candidate_";
		String znodeFullPath = zooKeeper.create(znodePrefix, new byte[]{}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

		System.out.println("Name: " + znodeFullPath);
		this.currentZnodeName = znodeFullPath.replace(ELECTION_NAMESPACE + "/", "");
	}

And the actual leader election function.

    public void electLeader() throws KeeperException, InterruptedException {
        List<String> children = zooKeeper.getChildren(ELECTION_NAMESPACE, false);

        Collections.sort(children);
        String smallestChild = children.get(0);

        if (smallestChild.equals(currentZnodeName)) {
            System.out.println("I am the leader 👑");
            return;
        }

        System.out.println("I am not the leader, " + smallestChild + " is the leader");
    }

In the main class let's create a leader election class and call its methods

    public static void main(String[] arg) throws IOException, InterruptedException, KeeperException {
        LeaderElection leaderElection = new LeaderElection();

        leaderElection.connectToZookeeper();
        leaderElection.volunteerForLeadership();
        leaderElection.electLeader();
        leaderElection.run();
        leaderElection.close();
        System.out.println("Session Closed, exiting application");
    }

We can now package the app and test it.
In your project directory run mvn clean package to create an executable jar for our app. Then, open a terminal window and run java -jar target/leader.election-1.0-SNAPSHOT-jar-with-dependencies.jar

Congratulations! Your first distributed system is up and running.

Re-electing a leader

But what will happen if our current leader crashes and stop functioning?
We will need to re-elect a new leader to replace it.
Following ZooKeeper's official documentation we can find a pseudo code for this scenario as well.

Upon receiving a notification of znode deletion:
1. Let C be the new set of children of ELECTION;
2. If z is the smallest node in C, then execute leader procedure;
3. Otherwise, watch for changes on "ELECTION/guid-n_j", where j is the
   largest sequence number such that j < i and n_j is a znode in C;

That means that the znode with the smallest id will become the leader once the current leader fails.    

let's improve the electLeader function to support the re-election scenario

	public void electLeader() throws KeeperException, InterruptedException {
		Stat predecessorStat = null;
		String predecessorZnodeName = "";
		while (predecessorStat == null) {
			List<String> children = zooKeeper.getChildren(ELECTION_NAMESPACE, false);

			Collections.sort(children);
			String smallestChild = children.get(0);

			if (smallestChild.equals(currentZnodeName)) {
				System.out.println("I am the leader 👑");
				return;
			} else {
				System.out.println("I am not the leader");
				int predecessorIndex = Collections.binarySearch(children, currentZnodeName) - 1;
				predecessorZnodeName = children.get(predecessorIndex);
				predecessorStat = zooKeeper.exists(ELECTION_NAMESPACE + "/" + predecessorZnodeName, this);
			}
		}

		System.out.println("Watching znode " + predecessorZnodeName);
		System.out.println();
	}

Running 2 terminals side by side will let us test the re-election mechanism.

  1. znode_0 is selected as the leader.
  2. you can see that znode_1 is watching znode_0.
  3. znode_0 crashes.
  4. znode_1 is the new leader.

Summary

In this blog post, we looked at Apache ZooKeeper and its use for managing distributed systems. We got familiar with ZooKeeper's architecture, benefits, data model, threading model, and the need for a leader in a distributed system. We learned how to install and configure ZooKeeper locally and how to implement the leader election algorithm in Java.