1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
| import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component;
/** * <pre> * 任务线程 * </pre> * * @author wwh */ @Component public class TaskThread2 implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(TaskThread2.class); /** * 获取锁的超时时间 */ private static final int LOCK_ACQUIRE_TIMEOUT = 1000; /** * 锁路径<br> * 全路径为:zkBasePath + LOCK_PATH + task.getId(); */ private static final String LOCK_PATH = "/lock/"; @Autowired private CuratorFramework curatorClient; /** * 基础路径 */ @Value("${zookeeper.basePath}") private String zkBasePath;
private Thread thread;
/** * 运行标记 */ private boolean runFlag = true;
private final ReentrantLock lock = new ReentrantLock(); private final Condition sysClose = lock.newCondition();
private String task;
@Override public void run() { InterProcessMutex lock = new InterProcessMutex(curatorClient, getLockPath());
while (runFlag) { try { if (lock.acquire(LOCK_ACQUIRE_TIMEOUT, TimeUnit.MILLISECONDS)) { try { logger.debug("当前线程获取到锁,开始处理数据"); doWithLock(); } catch (Exception e) { logger.error("执行任务:{} 处理逻辑时异常,", task, e); } finally { try { lock.release(); } catch (Exception e) { logger.error("任务:{} , 释放锁时:ZK错误或链接中断", task, e); } } } } catch (Exception e) { logger.error("任务:{} 获取锁时:ZK错误或链接中断", task, e); } } logger.info("任务:{} , 处理线程退出", task); // 移除zookeeper上锁节点 destroyZKLockPath(); }
/** * 具体执行代码<br> * 获取到锁的时候才会运行 */ private void doWithLock() { }
/** * 获取锁的路径,以任务ID为锁 * * @return */ private String getLockPath() { // 以任务id作为锁的路径 return zkBasePath + LOCK_PATH + task; }
private void destroyZKLockPath() { // 删除锁的条件是没有其他的节点了 try { String path = getLockPath(); List<String> list = curatorClient.getChildren().forPath(path); if (list == null || list.isEmpty()) { // 如果没有其他节点还在获取锁就删除 curatorClient.delete().forPath(path); } } catch (Exception e) { e.printStackTrace(); } }
/** * 启动任务 * * @param task */ public void start(String task) { // 只能启动一次 if (thread != null) { throw new IllegalStateException("任务线程只能启动一次"); } logger.info("开始处理任务:{}", task); this.task = task; // 启动线程 thread = new Thread(this, "T-" + task); thread.start(); }
/** * 停止任务 */ public void stop() { runFlag = false; // 如果是获取锁时的等待无法唤醒 lock.lock(); try { sysClose.signalAll();// 唤醒空数据时的等待 } finally { lock.unlock(); } } }
|