背景说明: 有一套Web服务程序, 为了保证HA, 需要在多台服务器上部署, 该服务程序有一些定时任务要执行, 现在要保证的是, 同一定时任务不会在多台机器上被同时执行.
方案1 --- 任务级的主备方案:
每个定时任务启动后, 都发起任务级的主节点的竞争, 胜出者执行具体任务.方案2 --- 服务器级的主备方案, 需要两个组件:
组件一: 一个后台线程用来竞争Leader 具体细节为: Web服务程序中开一个后台线程来竞争作为服务级别的Leader, 优胜者将自己的 ${服务器名}+${端口} 记录到zk 中.组件二:每个定时作业, 在开始的时候, 先对比本机是否是服务主节点, 如果是主节点即执行具体任务, 否则跳过. 方案1的说明:1. 每个定时任务都需要竞争Leader, 任务的执行效率较差. 2. 如果两个服务器的时间不同步, 定时任务耗时又很短, 在这种情况下容易double run, 需要故意延长任务执行时间以避免double run. 3. 有的定时任务在其中一台上执行, 另一些在另一台上执行, 查日志不是很方便. 4. 因为是在每个定时任务启动的时候竞争leader, 不必关心任务执行过程中, 由于zk客户端长连接断开需要进行leader切换的问题. 5. 本方案采用了Curator 的 LeaderLatch 选举机制. 方案2的说明:1. 该方案能很好地从几台服务器中选出一个Master机器, 不仅仅可以用于定时任务场景, 还可以用在其他场景下. 2. 该方案能实现Master节点的自动 failover, 经我测试 failover 过程稍长, 接近1分钟. 5. 本方案采用了Curator 的 LeaderSelector 选举机制. ==============================LeaderLatch 和 LeaderSelector 两种选举实现==============================LeaderLatch 的方式:是以一种抢占的方式来决定选主. 比较简单粗暴, 逻辑相对简单. 类似非公平锁的抢占, 所以, 多节点是一个随机产生主节点的过程, 谁抢到就算谁的.LeaderSelector 方式:
内部通过一个分布式锁来实现选主, 并且选主结果是公平的, zk会按照各节点请求的次序成为主节点.LeaderLatch 和 LeaderSelector 本身也提供 Master 节点的自动failover, 经我测试 failover 过程都稍长, 有时会接近1分钟.
下文先讲解 LeaderLatch 相关知识, 以及用 LeaderLatch 实现方案1 的过程.
==============================Curator 中 LeaderLatch 相关函数==============================最简单的构造子public LeaderLatch(CuratorFramework client, String latchPath)leaderLatch.start()
start()让zk 客户端立即参与选举, zk server最终会确定某个客户端成为leader.leaderLatch.hasLeadership()
检查是否是Leader, 返回值为boolean, 该函数调用会立即返回.leaderLatch.await()
阻塞调用, 直到本客户端成为Leader才返回.leaderLatch.await(long timeout, TimeUnit)
阻塞调用, 并设定一个等待时间.leaderLatch.close()
对于参与者是Leader, 只有调用该方法, 当前参与者才能失去Leader资格, 其他参与者才能获取Leader资格. 对于其他参与者, 调用该方法将主动退出选举.leaderLatch.addListener()
增加一个Listener监听器, 当参与者成为Leader或失去Leader资格后, 自动触发该监听器.client.getConnectionStateListenable().addListener()
为zk 客户端增加一个监听器, 用来监听连接状态, 共有三种状态: RECONNECT/LOST/SUSPEND 当连接状态为 ConnectionState.LOST 时, 写代码强制客户端重连, 以便该客户端能继续参与Leader选举. 当连接状态为 ConnectionState.SUSPEND 时, 我们一般不用处理, 输出log即可.=============================
环境准备=============================在 VM (192.168.1.11) 上启动一个 zookeeper 容器docker run -d --name myzookeeper --net host zookeeper:latest在Windows开发机上, 使用 zkCli.cmd 应该能连上虚机中的 zk server.
zkCli.cmd -server 192.168.1.11:2181
=============================
SpringBoot 服务程序=============================增加下面三个依赖项, 引入 actuator 仅仅是为了不写任何代码就能展现一个web UI.org.apache.curator curator-recipes 2.12.0 org.slf4j slf4j-simple 1.7.7 org.springframework.boot spring-boot-starter-actuator
完整Java 代码
package com.example.demo;import java.text.SimpleDateFormat;import java.util.Date;import java.util.concurrent.TimeUnit;import org.apache.curator.RetryPolicy;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.imps.CuratorFrameworkState;import org.apache.curator.framework.recipes.leader.LeaderLatch;import org.apache.curator.framework.recipes.leader.LeaderLatch.State;import org.apache.curator.framework.recipes.leader.LeaderLatchListener;import org.apache.curator.framework.state.ConnectionState;import org.apache.curator.framework.state.ConnectionStateListener;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.curator.utils.CloseableUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.scheduling.annotation.EnableScheduling;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;/* * 主程序类 * */@EnableScheduling@SpringBootApplicationpublic class ZkServiceApplication { public static void main(String[] args) throws Exception { SpringApplication.run(ZkServiceApplication.class, args); }}/* * 常量工具类 */class ZkTaskConst { public static final String SERVICE_NAME = "ServiceA"; public static final String SERVICE_SERVER = "Server1:8080"; public static final String ZK_URL = "localhost:2181"; // 为了确保能选出一个leader, 需要等待一会儿, public static final int WAIT_SECONDS_ENSURE_BE_LEADER = 20; // Task运行完毕后, 再Hold leader一会儿, 以防止多个服务器时间不准导致作业double run public static final int SLEEP_SECONDS_AFTER_TASK = 30; public static String getZkLatchPath(String taskName) { return String.format("/%s/%s", SERVICE_NAME, taskName); }}/* * Leader 选举Listener */class ZkTaskLeaderLatchListener implements LeaderLatchListener { private static final Logger log = LoggerFactory.getLogger(ZkTaskLeaderLatchListener.class); @Override public void isLeader() { log.info(String.format("The server (%s) become the leader", ZkTaskConst.SERVICE_SERVER)); } @Override public void notLeader() { log.debug(String.format("The server (%s) has not been the leader", ZkTaskConst.SERVICE_SERVER)); }}/* * Zk Connection 监听器 * 如果 zk client 长连接断开后, 需要重连以保证该客户端仍能参与 Leader 选举. * 对于定时任务级的Leader选举, 这个监听器并不重要. * 对于服务器级别的Leader选举, 这个监听器很重要. */class ZkConnectionStateListener implements ConnectionStateListener { private static final Logger log = LoggerFactory.getLogger(ZkConnectionStateListener.class); @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { log.debug("Zk connection change to " + newState); if (ConnectionState.CONNECTED != newState) { while (true) { try { log.error("Disconnected to the Zk server. Try to reconnect Zk server"); client.getZookeeperClient().blockUntilConnectedOrTimedOut(); log.info("Succeed to reconnect Zk server"); } catch (InterruptedException e) { log.error(e.getMessage(), e); } } } }}/* * 定时任务控制器类 */class ZkTaskController { private static final Logger log = LoggerFactory.getLogger(ZkTaskController.class); private CuratorFramework client; private LeaderLatch leaderLatch; private String taskName; public boolean isLeader = false; public ZkTaskController(String taskName) { this.taskName = taskName; } private void start() throws Exception { client = getClient(false); client.getConnectionStateListenable().addListener(new ZkConnectionStateListener()); leaderLatch = new LeaderLatch(client, ZkTaskConst.getZkLatchPath(taskName)); leaderLatch.addListener(new ZkTaskLeaderLatchListener()); client.start(); if (leaderLatch.getState() != State.STARTED) { leaderLatch.start(); } } private void awaitForLeader(long timeout, TimeUnit unit) { try { this.leaderLatch.await(timeout, unit); isLeader = leaderLatch.hasLeadership(); } catch (InterruptedException e) { // log.error(e.getMessage(), e); } } private void stop(boolean closeLeaderLatch) { if (closeLeaderLatch) { CloseableUtils.closeQuietly(leaderLatch); } client.close(); } private static CuratorFramework getClient(boolean autoStart) { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient(ZkTaskConst.ZK_URL, retryPolicy); if (client.getState() != CuratorFrameworkState.STARTED && autoStart) { client.start(); } return client; } publicvoid runTask(Runnable action) { ZkTaskController zkTaskController = new ZkTaskController(taskName); try { zkTaskController.start(); zkTaskController.awaitForLeader(ZkTaskConst.WAIT_SECONDS_ENSURE_BE_LEADER, TimeUnit.SECONDS); if (zkTaskController.isLeader) { log.info(String.format("The task %s will run on this task's leader server", taskName)); action.run(); } else { log.info(String.format("The task %s will not this task's non-leader server", taskName)); } // 再Hold leader一会儿, 以防止多个服务器时间不准导致作业double run Thread.sleep(1000 * ZkTaskConst.SLEEP_SECONDS_AFTER_TASK); } catch (Exception e) { log.error(e.getMessage(), e); } finally { zkTaskController.stop(true); } }}/* * 定时任务类 */@Componentclass MyTasks { /** * 一个定时任务 reportCurrentTimeTask 方法 (每分钟运行) */ @Scheduled(cron = "0 * * * * *") public void reportCurrentTimeTask() { ZkTaskController zkTaskController = new ZkTaskController("reportCurrentTime"); zkTaskController.runTask(new ReportCurrentTimeTaskInternal()); } /** * 定时任务 reportCurrentTimeTask 真正执行的内容 */ class ReportCurrentTimeTaskInternal implements Runnable { private final Logger log = LoggerFactory.getLogger(ReportCurrentTimeTaskInternal.class); private final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss"); @Override public void run() { log.info(String.format("The server (%s) is now %s", ZkTaskConst.SERVICE_SERVER, dateFormat.format(new Date()))); } }}
======================参考====================== https://www.cnblogs.com/leesf456/p/6032716.htmlhttps://www.jianshu.com/p/70151fc0ef5dhttps://www.codelast.com/%e5%8e%9f%e5%88%9b-zookeeper%e6%b3%a8%e5%86%8c%e8%8a%82%e7%82%b9%e7%9a%84%e6%8e%89%e7%ba%bf%e8%87%aa%e5%8a%a8%e9%87%8d%e6%96%b0%e6%b3%a8%e5%86%8c%e5%8f%8a%e6%b5%8b%e8%af%95%e6%96%b9%e6%b3%95/http://www.cnblogs.com/francisYoung/p/5464789.htmlhttps://www.cnblogs.com/LiZhiW/p/4930486.html