🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
## 一、Curator框架使用 Curator框架,非常强大,目前已经是Apache的顶级项目,里面提供了更多丰富的操作,session超时重连,主从选举,分布式计数器,分布式锁等适合各种复杂的zookeeper ## 二、依赖的引入 <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.1</version> </dependency> ## 三、Curator框架中使用链式编程的风格,易读性更强,试用工程方法创建连接对象 1.使用CuratorFameworkFactory的两个静态工厂方法来实现 参数一:connectString,连接串 参数二: retryPolicy,重试连接策略, 参数三 sessionTimeoutMs会话超时,默认60ms 参数四 connectionTimeout 连接超时时间,默认为15000ms 2.创建节点create方法,可选链式项 creatingParentsIfNeeded:是否需要父节点 withMode: 需要的模式 forPath: 路径 key value withACL: 需要认证 3.删除节点delete方法,可选择链式项 deletingChildrenIfNeeded:递归的删除 graranted: 安全的操作 withVersion: 删除版本 forPath: 4.读取和修改数据 gatData: 读取数据 setData: 设置数据 6.异步绑定回调方法,比如节点绑定一个回调函数,该回调函数可以输出服务器的状态码,以及服务的时间的类型,还可以加入一个线程池进行优化操作。 7.读取子节点方法getChildren 8.判断节点是否存在方法checkExists ## 四、代码实现 public class CuratorBase { /** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181"; /** session超时时间 */ static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception { //1 重试策略:初试时间为1s 重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //2 通过工厂创建连接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) // .namespace("super") .build(); //3 开启连接 cf.start(); // System.out.println(States.CONNECTED); // System.out.println(cf.getState()); // 新加、删除 /** //4 建立节点 指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容 cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1","c1内容".getBytes()); //5 删除节点 cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"); */ // 读取、修改 /** //创建节点 // cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1","c1内容".getBytes()); // cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c2","c2内容".getBytes()); //读取节点 // String ret1 = new String(cf.getData().forPath("/super/c2")); // System.out.println(ret1); //修改节点 // cf.setData().forPath("/super/c2", "修改c2内容".getBytes()); // String ret2 = new String(cf.getData().forPath("/super/c2")); // System.out.println(ret2); */ // 绑定回调函数 /** ExecutorService pool = Executors.newCachedThreadPool(); cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception { System.out.println("code:" + ce.getResultCode()); System.out.println("type:" + ce.getType()); System.out.println("线程为:" + Thread.currentThread().getName()); } }, pool) .forPath("/super/c3","c3内容".getBytes()); Thread.sleep(Integer.MAX_VALUE); */ // 读取子节点getChildren方法 和 判断节点是否存在checkExists方法 /** List<String> list = cf.getChildren().forPath("/super"); for(String p : list){ System.out.println(p); } Stat stat = cf.checkExists().forPath("/super/c3"); System.out.println(stat); Thread.sleep(2000); cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"); */ //cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"); } } ### 总结: curator 对节点的简单操作 #### 一、新建 cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .forPath("/super/c1","c1内容".getBytes()); #### 二、删除节点 cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"); ### 三、获取子节点 cf.getData().forPath("/super/c2") ### 四、修改子节点 cf.setData().forPath("/super/c2", "修改c2内容".getBytes()); ### 重点 ExecutorService pool = Executors.newCachedThreadPool(); cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception { System.out.println("code:" + ce.getResultCode()); System.out.println("type:" + ce.getType()); System.out.println("线程为:" + Thread.currentThread().getName()); } }, pool) 为什么采用线程池去承载,是因为在高并发的情况下,我们不能为每一个线程开辟一段空间,我们采用 线程池去调度。当线程池里面的线程有空闲的时间,我们会调度线程池里面的线程去执行里面的代码。 ### 五、 Curator 的监听 之一 第一步导入jar包 <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.1</version> </dependency> 使用NodeCache的方式去客服端实例中注册一个监听缓存,然后实现对应的监听方法即可,这里我们主要有两种监听方式。 NodeCacheListener: 监听节点的新增、修改操作 PathChildrenCacheListener:监听子节点的新增、修改、删除操作 #### 加缓存,不是重复注册 public class CuratorWatcher1 { /** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181"; /** session超时时间 */ static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception { //1 重试策略:初试时间为1s 重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //2 通过工厂创建连接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) .build(); //3 建立连接 cf.start(); //4 建立一个cache缓存 final NodeCache cache = new NodeCache(cf, "/super", false); cache.start(true); cache.getListenable().addListener(new NodeCacheListener() { /** * <B>方法名称:</B>nodeChanged<BR> * <B>概要说明:</B>触发事件为创建节点和更新节点,在删除节点的时候并不触发此操作。<BR> * @see org.apache.curator.framework.recipes.cache.NodeCacheListener#nodeChanged() */ @Override public void nodeChanged() throws Exception { System.out.println("路径为:" + cache.getCurrentData().getPath()); System.out.println("数据为:" + new String(cache.getCurrentData().getData())); System.out.println("状态为:" + cache.getCurrentData().getStat()); System.out.println("---------------------------------------"); } }); Thread.sleep(1000); cf.create().forPath("/super", "123".getBytes()); Thread.sleep(1000); cf.setData().forPath("/super", "456".getBytes()); Thread.sleep(1000); cf.delete().forPath("/super"); Thread.sleep(Integer.MAX_VALUE); } } ## 六、 Curator 的监听 之二 public class CuratorWatcher { /** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181"; /** session超时时间 */ static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception { //1 重试策略:初试时间为1s 重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //2 通过工厂创建连接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) .build(); //3 建立连接 cf.start(); //4 建立一个PathChildrenCache缓存,第三个参数为是否接受节点数据内容 如果为false则不接受 PathChildrenCache cache = new PathChildrenCache(cf, "/super", true); //5 在初始化的时候就进行缓存监听 cache.start(StartMode.POST_INITIALIZED_EVENT); cache.getListenable().addListener(new PathChildrenCacheListener() { /** * <B>方法名称:</B>监听子节点变更<BR> * <B>概要说明:</B>新建、修改、删除<BR> * @see org.apache.curator.framework.recipes.cache.PathChildrenCacheListener#childEvent(org.apache.curator.framework.CuratorFramework, org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent) */ @Override public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: System.out.println("CHILD_ADDED :" + event.getData().getPath()); break; case CHILD_UPDATED: System.out.println("CHILD_UPDATED :" + event.getData().getPath()); break; case CHILD_REMOVED: System.out.println("CHILD_REMOVED :" + event.getData().getPath()); break; default: break; } } }); //创建本身节点不发生变化 cf.create().forPath("/super", "init".getBytes()); //添加子节点 Thread.sleep(1000); cf.create().forPath("/super/c1", "c1内容".getBytes()); Thread.sleep(1000); cf.create().forPath("/super/c2", "c2内容".getBytes()); //修改子节点 Thread.sleep(1000); cf.setData().forPath("/super/c1", "c1更新内容".getBytes()); //删除子节点 Thread.sleep(1000); cf.delete().forPath("/super/c2"); //删除本身节点 Thread.sleep(1000); cf.delete().deletingChildrenIfNeeded().forPath("/super"); Thread.sleep(Integer.MAX_VALUE); } } ### 七、分布式锁 ### 在java高并发和多线程中是怎么解决的呢? 在分布式场景中,我们为了保证数据的一致性,经常在程序运行的某一点需要同步操作 (java 可提供synchronized 或者 Reentrantlock实现) public class Lock1 { static ReentrantLock reentrantLock = new ReentrantLock(); static int count = 10; public static void genarNo(){ try { reentrantLock.lock(); count--; //System.out.println(count); } finally { reentrantLock.unlock(); } } public static void main(String[] args) throws Exception{ final CountDownLatch countdown = new CountDownLatch(1); for(int i = 0; i < 10; i++){ new Thread(new Runnable() { @Override public void run() { try { countdown.await(); genarNo(); SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS"); System.out.println(sdf.format(new Date())); //System.out.println(System.currentTimeMillis()); } catch (Exception e) { e.printStackTrace(); } finally { } } },"t" + i).start(); } Thread.sleep(50); countdown.countDown(); } } ### Zookpeer分布式锁的实现 public class Lock2 { /** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181"; /** session超时时间 */ static final int SESSION_OUTTIME = 5000;//ms static int count = 10; public static void genarNo(){ try { count--; System.out.println(count); } finally { } } public static void main(String[] args) throws Exception { //1 重试策略:初试时间为1s 重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //2 通过工厂创建连接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) // .namespace("super") .build(); //3 开启连接 cf.start(); //4 分布式锁 final InterProcessMutex lock = new InterProcessMutex(cf, "/super"); //final ReentrantLock reentrantLock = new ReentrantLock(); final CountDownLatch countdown = new CountDownLatch(1); for(int i = 0; i < 10; i++){ new Thread(new Runnable() { @Override public void run() { try { countdown.await(); //加锁 lock.acquire(); //reentrantLock.lock(); //-------------业务处理开始 //genarNo(); SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS"); System.out.println(sdf.format(new Date())); //System.out.println(System.currentTimeMillis()); //-------------业务处理结束 } catch (Exception e) { e.printStackTrace(); } finally { try { //释放 lock.release(); //reentrantLock.unlock(); } catch (Exception e) { e.printStackTrace(); } } } },"t" + i).start(); } Thread.sleep(100); countdown.countDown(); } } ## 八、分布式计数器 分布式计数器,在高并发中使用AtomicInteger这种经典的方式、如果对于一个jvm 的场景当然没有问题,但是我们现在是分布式、就需要利用Curator框架的Distributed AtomicInteger了 public class CuratorAtomicInteger { /** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181"; /** session超时时间 */ static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception { //1 重试策略:初试时间为1s 重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //2 通过工厂创建连接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) .build(); //3 开启连接 cf.start(); //cf.delete().forPath("/super"); //4 使用DistributedAtomicInteger DistributedAtomicInteger atomicIntger = new DistributedAtomicInteger(cf, "/super", new RetryNTimes(3, 1000)); AtomicValue<Integer> value = atomicIntger.add(1); System.out.println(value.succeeded()); System.out.println(value.postValue()); //最新值 System.out.println(value.preValue()); //原始值 } } ## 九、barrier public class CuratorBarrier1 { /** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181"; /** session超时时间 */ static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception { for(int i = 0; i < 5; i++){ new Thread(new Runnable() { @Override public void run() { try { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .retryPolicy(retryPolicy) .build(); cf.start(); DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/super", 5); Thread.sleep(1000 * (new Random()).nextInt(3)); System.out.println(Thread.currentThread().getName() + "已经准备"); barrier.enter(); System.out.println("同时开始运行..."); Thread.sleep(1000 * (new Random()).nextInt(3)); System.out.println(Thread.currentThread().getName() + "运行完毕"); barrier.leave(); System.out.println("同时退出运行..."); } catch (Exception e) { e.printStackTrace(); } } },"t" + i).start(); } } } public class CuratorBarrier2 { /** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181"; /** session超时时间 */ static final int SESSION_OUTTIME = 5000;//ms static DistributedBarrier barrier = null; public static void main(String[] args) throws Exception { for(int i = 0; i < 5; i++){ new Thread(new Runnable() { @Override public void run() { try { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) .build(); cf.start(); barrier = new DistributedBarrier(cf, "/super"); System.out.println(Thread.currentThread().getName() + "设置barrier!"); barrier.setBarrier(); //设置 barrier.waitOnBarrier(); //等待 System.out.println("---------开始执行程序----------"); } catch (Exception e) { e.printStackTrace(); } } },"t" + i).start(); } Thread.sleep(5000); barrier.removeBarrier(); //释放 } } ## 十、集群管理 public class CuratorWatcher { /** 父节点path */ static final String PARENT_PATH = "/super"; /** zookeeper服务器地址 */ public static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181"; /** 定义session失效时间 */ public static final int SESSION_TIMEOUT = 30000; public CuratorWatcher() throws Exception{ //1 重试策略:初试时间为1s 重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //2 通过工厂创建连接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(retryPolicy) .build(); //3 建立连接 cf.start(); //4 创建跟节点 if(cf.checkExists().forPath(PARENT_PATH) == null){ cf.create().withMode(CreateMode.PERSISTENT).forPath(PARENT_PATH,"super init".getBytes()); } //4 建立一个PathChildrenCache缓存,第三个参数为是否接受节点数据内容 如果为false则不接受 PathChildrenCache cache = new PathChildrenCache(cf, PARENT_PATH, true); //5 在初始化的时候就进行缓存监听 cache.start(StartMode.POST_INITIALIZED_EVENT); cache.getListenable().addListener(new PathChildrenCacheListener() { /** * <B>方法名称:</B>监听子节点变更<BR> * <B>概要说明:</B>新建、修改、删除<BR> * @see org.apache.curator.framework.recipes.cache.PathChildrenCacheListener#childEvent(org.apache.curator.framework.CuratorFramework, org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent) */ @Override public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: System.out.println("CHILD_ADDED :" + event.getData().getPath()); System.out.println("CHILD_ADDED :" + new String(event.getData().getData())); break; case CHILD_UPDATED: System.out.println("CHILD_UPDATED :" + event.getData().getPath()); System.out.println("CHILD_UPDATED :" + new String(event.getData().getData())); break; case CHILD_REMOVED: System.out.println("CHILD_REMOVED :" + event.getData().getPath()); System.out.println("CHILD_REMOVED :" + new String(event.getData().getData())); break; default: break; } } }); } } public class Client1 { public static void main(String[] args) throws Exception{ CuratorWatcher watcher = new CuratorWatcher(); Thread.sleep(100000000); } } public class Client2 { public static void main(String[] args) throws Exception{ CuratorWatcher watcher = new CuratorWatcher(); Thread.sleep(100000000); } }