本文共 10220 字,大约阅读时间需要 34 分钟。
org.apache.zookeeper.Zookeeper是客户端入口主类,负责建立与server的会话
它提供以下几类主要方法 :
功能 | 描述 |
create | 在本地目录树中创建一个节点 |
delete | 删除一个节点 |
exists | 测试本地是否存在目标节点 |
get/set data | 从目标节点上读取 / 写数据 |
get/set ACL | 获取 / 设置目标节点访问控制列表信息 |
get children | 检索一个子节点上的列表 |
sync | 等待要被传送的数据 |
表 1 : ZooKeeper API 描述
package cn.com.toto.zk;
import java.io.IOException;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper;
public class SimpleDemo { //回话超时时间,设置为与系统默认时间一致 private static final int SESSION_TIMEOUT = 30000; //创建ZooKeeper实例 ZooKeeper zk; //创建Watcher实例 Watcher wh = new Watcher() {
@Override public void process(WatchedEvent event) { System.out.println(event.toString()); } };
//初始化ZooKeeper实例 private void createZKInstance() throws IOException { zk = new ZooKeeper("hadoop:2181,hadoop2:2181,hadoop3:2181",SimpleDemo.SESSION_TIMEOUT,this.wh); }
private void ZKOperations() throws KeeperException, InterruptedException { System.out.println("/n1. 创建 ZooKeeper 节点 (znode : zoo2, 数据: myData2 ,权限: OPEN_ACL_UNSAFE ,节点类型: Persistent"); zk.create("/zoo2", "myData2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("/n2.查看是否创建成功:"); System.out.println(new String(zk.getData("/zoo2", false, null))); System.out.println("/n3.修改节点数据"); zk.setData("/zoo2", "toto".getBytes(), -1); System.out.println("/n4.查看是否修改成功:"); System.out.println(new String(zk.getData("/zoo2", false, null))); System.out.println("/n5.删除节点"); zk.delete("/zoo2", -1); System.out.println("/n6.查看节点是否被删除:"); System.out.println("节点状态:[" + zk.exists("/zoo2", false) + "]"); }
private void ZKClose() throws InterruptedException { zk.close(); }
public static void main(String[] args) throws KeeperException, InterruptedException, IOException { SimpleDemo dm = new SimpleDemo(); dm.createZKInstance(); dm.ZKOperations(); dm.ZKClose(); } } |
运行结果: /n1. 创建 ZooKeeper 节点 (znode : zoo2, 数据: myData2 ,权限: OPEN_ACL_UNSAFE ,节点类型: Persistent 一月 10, 2017 12:48:26 上午 org.apache.zookeeper.ClientCnxn$SendThread primeConnection 信息: Socket connection established to hadoop3/192.168.106.82:2181, initiating session 一月 10, 2017 12:48:26 上午 org.apache.zookeeper.ClientCnxn$SendThread onConnected 信息: Session establishment complete on server hadoop3/192.168.106.82:2181, sessionid = 0x35983c177d00007, negotiated timeout = 30000 WatchedEvent state:SyncConnected type:None path:null /n2.查看是否创建成功: myData2 /n3.修改节点数据 /n4.查看是否修改成功: toto /n5.删除节点 /n6.查看节点是否被删除: 节点状态:[null] |
Zookeeper的监听器工作机制
监听器是一个接口,我们的代码中可以实现Wather这个接口,实现其中的process方法,方法中即我们自己的业务逻辑
监听器的注册是在获取数据的操作中实现:
getData(path,watch?)监听的事件是:节点数据变化事件
getChildren(path,watch?)监听的事件是:节点下的子节点增减变化事件
所需jar包:
图1 项目包结构
package cn.com.toto.zk;
import java.util.List; import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test;
public class SimpleZkClient {
private static final String connectString = "192.168.106.80:2181,192.168.106.81:2181,192.168.106.82:2181"; private static final int sessionTimeout = 2000;
// latch就相当于一个对象锁,当latch.await()方法执行时,方法所在的线程会等待 //当latch的count减为0时,将会唤醒等待的线程 CountDownLatch latch = new CountDownLatch(1); ZooKeeper zkClient = null;
@Before public void init() throws Exception { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
//事件监听回调方法 @Override public void process(WatchedEvent event) { if (latch.getCount() > 0 && event.getState() == KeeperState.SyncConnected) { System.out.println("countdown"); latch.countDown(); }
//收到事件通知后的回调函数(应该是我们自己的事件处理逻辑) System.out.println(event.getType() + "---" + event.getPath()); System.out.println(event.getState()); } }); latch.await(); }
//创建数据节点到zk中 @Test public void testCreate() throws KeeperException, InterruptedException { //参数1:要创建的节点的路径 参数2:节点大数据参数3:节点的权限 参数4:节点的类型 String nodeCreated = zkClient.create("/eclipse", "hellozk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //上传的数据可以是任何类型,但都要转成byte zkClient.close(); }
//判断znode是否存在 @Test public void testExist() throws KeeperException, InterruptedException { Stat stat = zkClient.exists("/eclipse", false); System.out.println(stat == null ? "not exist" : "exist"); }
//获取znode下的孩子节点 @Test public void getChildren() throws KeeperException, InterruptedException { List<String> children = zkClient.getChildren("/", true); for(String child : children) { System.out.println(child); } Thread.sleep(Long.MAX_VALUE); }
//获取参数 @Test public void getData() throws KeeperException, InterruptedException { byte[] data = zkClient.getData("/eclipse", true, null); System.out.println(new String(data)); Thread.sleep(Long.MAX_VALUE); }
//删除znode @Test public void deleteZnode() throws InterruptedException, KeeperException { //参数2:指定要删除的版本,-1表示删除所有版本 zkClient.delete("/eclipse", -1); }
//设置参数 @Test public void setData() throws Exception { //要注意,这里的/zookeeper 要在zookeeper中的节点中有 zkClient.setData("/zookeeper", "imissyou angelababy".getBytes(), -1);
byte[] data = zkClient.getData("/zookeeper", false, null); System.out.println(new String(data)); } } |
案例二
package cn.com.toto.zk;
import java.util.List; import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat;
import com.sun.org.apache.bcel.internal.generic.NEW;
public class TestZKclient { static ZooKeeper zk = null;
public static void main(String[] args) throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(1); zk = new ZooKeeper("hadoop:2181",2000,new Watcher() {
@Override public void process(WatchedEvent event) { if (event.getState() == KeeperState.SyncConnected) { countDownLatch.countDown(); } System.out.println(event.getPath()); System.out.println(event.getType()); try { zk.getChildren("/zookeeper", true); } catch (Exception e) { e.printStackTrace(); } } });
countDownLatch.await();
/** zk.create("/myboys", "丑陋型".getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.close(); **/
/** byte[] data = zk.getData("/myboys", true, null); System.out.println(new String(data,"UTF-8")); Thread.sleep(Long.MAX_VALUE); **/
/** zk.create("/myboys/wangkai", "测试型".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.close(); **/
/** List<String> children = zk.getChildren("/myboys", true); for(String child : children) { System.out.println(child); } **/
/**zk.delete("/myboys/wangkai", -1);**/
/**zk.setData("/myboys", "fasdfasdf".getBytes(), -1);**/ /** byte[] data = zk.getData("/myboys", true, null); System.out.println(new String(data,"UTF-8")); **/
Stat stat = zk.exists("/mywives", true); System.out.println(stat == null ? "确实不存在" : "存在"); zk.close(); } } |
准备工作拷贝ZooKeeper安装目录下的zookeeper.x.x.x.jar文件到项目的classpath路径下. 创建连接和回调接口首先需要创建ZooKeeper对象, 后续的一切操作都是基于该对象进行的. 1. ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException 以下为各个参数的详细说明: · connectString. zookeeper server列表, 以逗号隔开. ZooKeeper对象初始化后, 将从server列表中选择一个server, 并尝试与其建立连接. 如果连接建立失败, 则会从列表的剩余项中选择一个server, 并再次尝试建立连接. · sessionTimeout. 指定连接的超时时间. · watcher. 事件回调接口. 注意, 创建ZooKeeper对象时, 只要对象完成初始化便立刻返回. 建立连接是以异步的形式进行的, 当连接成功建立后, 会回调watcher的process方法. 如果想要同步建立与server的连接, 需要自己进一步封装. 1. public class ZKConnection { 2. /** 3. * server列表, 以逗号分割 4. */ 5. protected String hosts = "localhost:4180,localhost:4181,localhost:4182"; 6. /** 7. * 连接的超时时间, 毫秒 8. */ 9. private static final int SESSION_TIMEOUT = 5000; 10. private CountDownLatch connectedSignal = new CountDownLatch(1); 11. protected ZooKeeper zk; 12. 13. /** 14. * 连接zookeeper server 15. */ 16. public void connect() throws Exception { 17. zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new ConnWatcher()); 18. // 等待连接完成 19. connectedSignal.await(); 20. } 21. 22. public class ConnWatcher implements Watcher { 23. public void process(WatchedEvent event) { 24. // 连接建立, 回调process接口时, 其event.getState()为KeeperState.SyncConnected 25. if (event.getState() == KeeperState.SyncConnected) { 26. // 放开闸门, wait在connect方法上的线程将被唤醒 27. connectedSignal.countDown(); 28. } 29. } 30. } 31. }
创建znodeZooKeeper对象的create方法用于创建znode. 1. String create(String path, byte[] data, List acl, CreateMode createMode); 以下为各个参数的详细说明: · path. znode的路径. · data. 与znode关联的数据. · acl. 指定权限信息, 如果不想指定权限, 可以传入Ids.OPEN_ACL_UNSAFE. · 指定znode类型. CreateMode是一个枚举类, 从中选择一个成员传入即可. 关于znode类型的详细说明, 可参考本人的上一篇博文. 1. /** 2. * 创建临时节点 3. */ 4. public void create(String nodePath, byte[] data) throws Exception { 5. zk.create(nodePath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); 6. }
获取子node列表ZooKeeper对象的getChildren方法用于获取子node列表. 1. List getChildren(String path, boolean watch); watch参数用于指定是否监听path node的子node的增加和删除事件, 以及path node本身的删除事件. 判断znode是否存在ZooKeeper对象的exists方法用于判断指定znode是否存在. 1. Stat exists(String path, boolean watch); watch参数用于指定是否监听path node的创建, 删除事件, 以及数据更新事件. 如果该node存在, 则返回该node的状态信息, 否则返回null. 获取node中关联的数据ZooKeeper对象的getData方法用于获取node关联的数据. 1. byte[] getData(String path, boolean watch, Stat stat); watch参数用于指定是否监听path node的删除事件, 以及数据更新事件, 注意, 不监听path node的创建事件, 因为如果path node不存在, 该方法将抛出KeeperException.NoNodeException异常. stat参数是个传出参数, getData方法会将path node的状态信息设置到该参数中. 更新node中关联的数据ZooKeeper对象的setData方法用于更新node关联的数据. 1. Stat setData(final String path, byte data[], int version); data为待更新的数据. version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查.返回path node的状态信息. 删除znodeZooKeeper对象的delete方法用于删除znode. 1. void delete(final String path, int version); version参数的作用同setData方法. 其余接口请查看ZooKeeper对象的API文档. 需要注意的几个地方· znode中关联的数据不能超过1M. zookeeper的使命是分布式协作, 而不是数据存储. · getChildren, getData, exists方法可指定是否监听相应的事件. 而create, delete, setData方法则会触发相应的事件的发生. · 以上介绍的几个方法大多存在其异步的重载方法, 具体请查看API说明. |
转载地址:http://gltoo.baihongyu.com/