Search This Blog

Tuesday, January 3, 2012

Apache ZooKeeper - A maven example

Let me begin with - Happy New Year! I am starting this years BLOG entry with some playing I did over the holidays with Apache ZooKeeper. So what is Apache ZooKeeper?  Broadly put, ZooKeeper provides a service for maintaining distributed naming, configuration and group membership. Zookeeper provides a centralized coordination service which is distributed, consistent and highly available. In particular, it specializes in coordination tasks such as leader election, status propagation and rendezvous. Quoting the ZooKeeper documentation, " The motivation behind ZooKeeper is to relieve distributed applications the responsibility of implementing coordination services from scratch."

Zookeeper facilitates the coordination of distributed systems via a hierarchical name space called ZNodes. A ZNode can have children Znodes as well as have data associated with it (figure linked from official ZooKeeper documentation):

ZooKeeper also has the concept of Nodes and Ephermeral Nodes. The former survives the death of the client that created it while the latter does not. Nodes can also be SEQUENTIAL where a SEQUENCE number is associated with Node. Sequences clearly lend themselves for queue type or ordered behavior when required. ZNodes cannot be deleted as long as they have children under it.

The Zookeeper API itself it a very simple programming interface that contains methods for CRUD of a ZNode and data along with a way to listen for changes to the node and/or its data.

One can read quite a bit about Zookeeper from their documentation and I will not delve into details. My goal in this BLOG is to provide some simple examples of how Zookeeper can be used for:

1. Leader Election
2. Rendezvous or Barrier

1. Leader Election Example:

As mentioned previously, Zoo Keeper specializes in  Leader Election, where a particular server of a type is elected the leader and upon its demise, another stands up to take its place. The concept of Ephemeral nodes is pretty useful in such a scenario where a service upon start up registers itself as a candidate in the pool of resources to be used. For the sake of discussion, consider an Echo Service with the fictional constraint that there can be only one Echo Service at any given time servicing requests but should that robust service die, another one is ready to resume the responsibility of servicing clients. The EchoService on start up registers an ephemeral node at "/echo/n_000000000X". It further registers a watcher on its predecessor Echo Service, denoted by the ZNode,  "/echo/n_000000000X-1", with the intent that should the predecessor die, then the node in question can assume leadership if it is the logical successor as shown in the picture below:
Leader Election of Echo Server



The reason for registering a watch on the predecessor versus the root of "/echo" is to prevent herding as described in the Zookeeper recipes. In other words in the event of  a leader being decommisioned, we wouldn't want all the Echo Server leadership contenders to stress the system by requesting ZooKepper for the children of the "/echo" node to determine the next leader. With the chosen strategy, if the predecessor dies, then only its follower is the one that executes the call to get the children of "/echo". The example provided herewith spawns two Echo Servers and has a client that calls the server. One of the two servers is considered the leader and assumes the responsibility of servicing the requests of the client. If the leader dies, then the successor Echo Server assumes leadership and starts servicing client requests. The client will connect to the next leader automatically upon the death of the current leader. The Echo Server code is shown below:
public class EchoServer extends Thread implements IZkDataListener, IZkStateListener {
  private ZkClient zkClient;
  // My ZNode
  private SequentialZkNode zkNode;

  // My leader ZNode
  private SequentialZkNode zkLeaderNode;
  private ServerSocket serverSocket;
  
  // Port to start server on
  private final Integer port;
  private final List<SlaveJob> slaves;
  private final Object mutex = new Object();
  private final AtomicBoolean start = new AtomicBoolean(false);
  private final Random random = new Random();

  public EchoServer(int port, int zkPort) {
    this.port = port;
    this.zkClient = new ZkClient("localhost:" + zkPort);
    this.slaves = new ArrayList<SlaveJob>();
  }

  @Override
  public void run() {
    try {
      // Sleep for some randomness leader selection
      Thread.sleep(random.nextInt(1000));

      // Create Ephermal node
      zkNode = ZkUtils.createEphermalNode("/echo", port, zkClient);

      // Find all children
      NavigableSet<SequentialZkNode> nodes = ZkUtils.getNodes("/echo", zkClient);

      // Find my leader
      zkLeaderNode = ZkUtils.findLeaderOfNode(nodes, zkNode);

      // If I am leader, enable me to start
      if (zkLeaderNode.getPath().equals(zkNode.getPath())) {
        start.set(true);
      } else {
        // Set a watch on next leader path
        zkClient.subscribeDataChanges(zkLeaderNode.getPath(), this);
        zkClient.subscribeStateChanges(this);
      }

      synchronized (mutex) {
        while (!start.get()) {
          LOG.info("Server on Port:" + port + " waiting to spawn socket....");
          mutex.wait();
        }
      }
      // Start accepting connections and servicing requests 
      this.serverSocket = new ServerSocket(port);

      Socket clientSocket = null;
      
      while (!Thread.currentThread().isInterrupted() && !shutdown) {
        clientSocket = serverSocket.accept();
        // ... start slave job
      }
    }
    catch (Exception e) {
     ....
    }
  }

  private void electNewLeader() {
    final NavigableSet<SequentialZkNode> nodes = ZkUtils.getNodes("/echo", zkClient);

    if (!zkClient.exists(zkLeaderNode.getPath())) {
      // My Leader does not exist, find the next leader above
      zkLeaderNode = ZkUtils.findLeaderOfNode(nodes, zkNode);
      zkClient.subscribeDataChanges(zkLeaderNode.getPath(), this);
      zkClient.subscribeStateChanges(this);
    }

    // If I am the leader then start
    if (zkNode.getSequence().equals(nodes.first().getSequence())) {
      LOG.info("Server on port:" + port + "  will now be notified to assume leadership");
      synchronized (mutex) {
        start.set(true);
        mutex.notify();
      }
    }
  }
  .....

  @Override
  public void handleDataDeleted(String dataPath) throws Exception {
    if (dataPath.equals(zkLeaderNode.getPath())) {
      // Leader gone away
      LOG.info("Recieved a notification that Leader on path:" + dataPath
        + " has gone away..electing new leader");
      electNewLeader();
    }
  }
}

The Echo Client is shown below which connects to the leader of the Echo Server ensemble:
public class EchoClient implements IZkDataListener, IZkStateListener, IZkChildListener {
  private Connection connection;
  private Object mutex = new Object();
  private final ZkClient zkClient;
  private SequentialZkNode connectedToNode;

  public EchoClient(int zkPort) {
    this.zkClient = new ZkClient("localhost:" + zkPort);
  }

  private Connection getConnection() throws UnknownHostException, IOException, InterruptedException {
    synchronized (mutex) {
      if (connection != null) {
        return connection;
      }

      NavigableSet<SequentialZkNode> nodes = null;
      
      // Check to see if there an Echo server exists, if not listen on /echo for 
      // a Server to become available 
      while ((nodes = ZkUtils.getNodes("/echo", zkClient)).size() == 0) {
        LOG.info("No echo service nodes ...waiting...");
        zkClient.subscribeChildChanges("/echo", this);
        mutex.wait();
      }

      // Get the leader and connect
      connectedToNode = nodes.first();
      Integer port = zkClient.readData(connectedToNode.getPath());
      connection = new Connection(new Socket("localhost", port));
      
      // Subscribe to Server changes
      zkClient.subscribeDataChanges(connectedToNode.getPath(), this);
      zkClient.subscribeStateChanges(this);
    }

    return connection;
  }

  public String echo(String command) throws InterruptedException {
    while (true) {
      try {
        return getConnection().send(command);
      }
      catch (InterruptedException e) {
        throw e;
      }
      catch (Exception e) {
        synchronized (mutex) {
          if (connection != null) {
            connection.close();
            connection = null;
          }
        }
      }
    }
  }

  private static class Connection { 
    // Code that connects and sends data to Echo Server
    public String send(String message) {
      ....
    }
  }

  @Override
  public void handleDataDeleted(String dataPath) throws Exception {
    synchronized (mutex) {
      if (dataPath.equals(connectedToNode.getPath())) {
        LOG.info("Client Received notification that Server has died..notifying to re-connect");
        if (connection != null) {
          connection.close();
        }
        if (connectedToNode != null) {
          connectedToNode = null;
        }
      }
      mutex.notify();
    }
  }

  @Override
  public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
    synchronized (mutex) {
      LOG.info("Got a notification about a Service coming up...notifying client");
      mutex.notify();
    }
  }
}

A simple test of the example starts an EchoClient and two servers. One of the servers assumes leadership and starts servicing request while the other server sits waiting to assume leadership. The client in turn sends messages which is serviced by the leader. When the leader terminates, messages are then serviced by the successor that assumes leadership.
  @Test
  public void simpleLeadership() throws InterruptedException {

    EchoClient client = new EchoClient(ZK_PORT);
    EchoServer serverA = new EchoServer(5555, ZK_PORT);
    EchoServer serverB = new EchoServer(5556, ZK_PORT);

    serverA.start();
    serverB.start();
 
    // Send messages
    for (int i = 0; i < 10; i++) {
      System.out.println("Client Sending:Hello-" + i);
      System.out.println(client.echo("Hello-" + i));
    }
    
    if (serverA.isLeader()) {
      serverA.shutdown();
    } else {
      serverB.shutdown();
    }
 
    for (int i = 0; i < 10; i++) {
      System.out.println("Client Sending:Hello-" + i);
      System.out.println(client.echo("Hello-" + i));
    }

    serverA.shutdown();
    serverB.shutdown();
  }
On running the test, you can see output similar to:
20:18:49 INFO - com.welflex.zookeeper.EchoClient.getConnection(44) | No echo service nodes ...waiting...
20:18:49 INFO - com.welflex.zookeeper.EchoClient.handleChildChange(171) | Got a notification about a Service coming up...notifying client
20:18:49 INFO - com.welflex.zookeeper.EchoServer.run(85) | Server on Port:5556 is now the Leader. Starting to accept connections...
Client Sending:Hello-0
Echo:Hello-0
Client Sending:Hello-1
Echo:Hello-1
.....
20:18:49 INFO - com.welflex.zookeeper.EchoServer.run(122) | Server on Port:5556 has been shutdown
20:18:49 INFO - com.welflex.zookeeper.EchoClient.getConnection(44) | No echo service nodes ...waiting...
Server has died..notifying to re-connect
20:18:49 INFO - com.welflex.zookeeper.EchoClient.getConnection(44) | No echo service nodes ...waiting...
20:18:49 INFO - com.welflex.zookeeper.EchoClient.handleChildChange(171) | Got a notification about a Service coming up...notifying client
20:18:49 INFO - com.welflex.zookeeper.EchoServer.run(85) | Server on Port:5555 is now the Leader. Starting to accept connections...
20:18:49 INFO - com.welflex.zookeeper.EchoClient.handleChildChange(171) | Got a notification about a Service coming up...notifying client
....
Echo:Hello-7
Client Sending:Hello-8
Echo:Hello-8
Client Sending:Hello-9
Echo:Hello-9
....

2. Rendezvous/Barrier Example: 

Distributed systems can use the concept of a barrier to block the processing of a certain task until other members of the system ensemble are available after which all the systems participating can proceed with their tasks. Having a little fun here :-) with a fictional case where Double O agents of MI-6 meet to present their reports, and reports are not edited here if you know what I mean ;-). Agents join the meeting but cannot start presenting until all the expected agents join the meeting after which they are free to present in parallel (agents are designed to grasp from multiple presenters; after all the 007 variety are trained at such things). Agents can present but cannot leave the briefing until all presenters have completed their presentation. The agent upon start up creates a ZNode under the meeting identifier, say, "/M-debrief", and then waits for all other agents to join before beginning their presentation, i.e., barrier entry. After each presentation, the agent removes themselves from the list of ZNodes and then waits for other agents to do the same before leaving the briefing, i.e., barrier exit. With that said, the Agent code looks like:
public class Agent extends Thread implements IZkChildListener {
  // Number of agents total
  private final int agentCount;

  private final String agentNumber;

  private final Object mutex = new Object();

  private final Random random = new Random();

  private SequentialZkNode agentRegistration;
  
  private final String meetingId;

  public Agent(String agentNumber, String meetingId, int agentCount, int zkPort) {
    this.agentNumber = agentNumber;
    this.meetingId = meetingId;
    this.agentCount = agentCount;
    this.zkClient = new ZkClient("localhost:" + zkPort);
    zkClient.subscribeChildChanges(meetingId, this);
  }

  private void joinBriefing() throws InterruptedException {
    Thread.sleep(random.nextInt(1000));
    agentRegistration = ZkUtils.createEpheremalNode(meetingId, agentNumber, zkClient);
    LOG.info("Agent:" + agentNumber + " joined Briefing:" + meetingId);
    
    // Wait for all other agents to join the meeting
    while (true) {
      synchronized (mutex) {
        List<String> list = zkClient.getChildren(meetingId);

        if (list.size() < agentCount) {
          LOG.info("Agent:" + agentNumber + " waiting for other agents to join before presenting report...");
          mutex.wait();
        }
        else {
          break;
        }
      }
    }
  }

  private void presentReport() throws InterruptedException {
    LOG.info("Agent:" + agentNumber + " presenting report...");
    Thread.sleep(random.nextInt(1000));
    LOG.info("Agent:" + agentNumber + " completed report...");

    // Completion of a report is identified by deleting their ZNode
    zkClient.delete(agentRegistration.getPath());
  }

  private void leaveBriefing() throws InterruptedException {

    // Wait for all agents to complete
    while (true) {
      synchronized (mutex) {
        List<String> list = zkClient.getChildren(meetingId);
        if (list.size() > 0) {
          LOG.info("Agent:" + agentNumber
            + " waiting for other agents to complete their briefings...");
          mutex.wait();
        }
        else {
          break;
        }
      }
    }
    LOG.info("Agent:" + agentNumber + " left briefing");
  }

  @Override
  public void run() {
    joinBriefing();
    presentReport();
    leaveBriefing();
  }

  @Override
  public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
    synchronized (mutex) {
      mutex.notifyAll();
    }
  }
}
Running the test, one would see something like the following where agents join the meeting, wait for the others before presenting, present at the meeting and then leave after everyone completes. Note that output is edited in the interest of real estate:
21:02:17 INFO - com.welflex.zookeeper.Agent.joinBriefing(38) | Agent:003 joined Briefing:/M-debrief
21:02:17 INFO - com.welflex.zookeeper.Agent.joinBriefing(45) | Agent:003 waiting for other agents to join before presenting report...
21:02:17 INFO - com.welflex.zookeeper.Agent.joinBriefing(38) | Agent:005 joined Briefing:/M-debrief
21:02:17 INFO - com.welflex.zookeeper.Agent.joinBriefing(45) | Agent:005 waiting for other agents to join before 
.....
:17 INFO - com.welflex.zookeeper.Agent.joinBriefing(45) | Agent:005 waiting for other agents to join before presenting report...
......
21:02:17 INFO - com.welflex.zookeeper.Agent.joinBriefing(45) | Agent:006 waiting for other agents to join before presenting report...
21:02:18 INFO - com.welflex.zookeeper.Agent.joinBriefing(38) | Agent:007 joined Briefing:/M-debrief
21:02:18 INFO - com.welflex.zookeeper.Agent.joinBriefing(45) | Agent:007 waiting for other agents to join before presenting report...
21:02:18 INFO - com.welflex.zookeeper.Agent.presentReport(57) | Agent:002 presenting report...
21:02:18 INFO - com.welflex.zookeeper.Agent.presentReport(57) | Agent:005 presenting report...
21:02:18 INFO - com.welflex.zookeeper.Agent.presentReport(57) | Agent:007 presenting report...
21:02:18 INFO - com.welflex.zookeeper.Agent.presentReport(57) | Agent:003 presenting report...
21:02:18 INFO - com.welflex.zookeeper.Agent.presentReport(57) | Agent:001 presenting report...
....
21:02:18 INFO - com.welflex.zookeeper.Agent.leaveBriefing(69) | Agent:002 waiting for other agents to complete their briefings...
21:02:18 INFO - com.welflex.zookeeper.Agent.leaveBriefing(69) | Agent:002 waiting for other agents to complete their briefings...
21:02:18 INFO - com.welflex.zookeeper.Agent.presentReport(59) | Agent:000 completed report...
21:02:18 INFO - com.welflex.zookeeper.Agent.leaveBriefing(69) | Agent:000 waiting for other agents to complete their briefings...
21:02:18 INFO - com.welflex.zookeeper.Agent.leaveBriefing(69) | Agent:002 waiting for other agents to complete their briefings...
21:02:18 INFO - com.welflex.zookeeper.Agent.leaveBriefing(69) | Agent:000 waiting for other agents to complete their briefings...
21:02:18 INFO - com.welflex.zookeeper.Agent.presentReport(59) | Agent:007 completed report...
21:02:18 INFO - com.welflex.zookeeper.Agent.leaveBriefing(69) | Agent:007 waiting for other agents to complete their briefings...
....
21:02:19 INFO - com.welflex.zookeeper.Agent.leaveBriefing(78) | Agent:007 left briefing
21:02:19 INFO - com.welflex.zookeeper.Agent.leaveBriefing(78) | Agent:003 left briefing
21:02:19 INFO - com.welflex.zookeeper.Agent.leaveBriefing(78) | Agent:002 left briefing
...
21:02:19 INFO - com.welflex.zookeeper.Agent.leaveBriefing(78) | Agent:000 left briefing

Running the code:

Download a maven example of the above code from HERE. Execute a "mvn test" to see the example in action or import into eclipse to dissect the code. The example itself runs its tests on a single ZooKeeper server. There is no reason why the same cannot be expanded to try on a larger ZooKeeper ensemble.

Conclusion and Credits:

The examples shown are highly influenced by the examples provided at the ZooKeeper site. The code does not use the ZooKeeper code directly but instead uses a wrapper around the same, i.e., ZkClient, which provides a much easier experience to using ZooKeeper than the default ZooKeeper API. The examples also rely heavily on the excellent article and supporting code by Erez Mazor who utilizes Spring Framework very nicely to demonstrate Leader Election and control of the same. I simply loved his code and images and have used parts of the same in the above demonstration. I am sure I have only touched the surface of  ZooKeeper and am hoping I can learn more about the same. If my understanding about any of the concepts mentioned above is flawed, I would love to hear about the same. Again, a very Happy 2012! Keep the faith, the world won't end on December 21'st.

1 comment:

Unknown said...

I read an article from internet "I am planning to write an application which will have Worker processes
distributed across multiple machines. One of them will be Leader which
will assign tasks to other processes.".

From your example, How can an Echo server (the leader) assign task for another Echo Server?