1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| @Slf4j public class ZookeeperSession implements InitializingBean, DisposableBean { private final String nodes; private final int sessionTimeout; private ZooKeeper zookeeper; private final CountDownLatch connectedSemaphore = new CountDownLatch(1);
public ZookeeperSession(ZookeeperProperties properties) { this.sessionTimeout = properties.getSessionTimeout(); this.nodes = StringUtils.join(properties.getNodes(), ","); }
/** * 获取锁 * * @param productId 商品标识 */ public void acquireDistributedLock(Long productId) { String path = getPath(productId); byte[] data = "".getBytes(); //尝试创建锁 int retry = 1; while (true) { try { this.zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); log.info("获取锁成功,尝试{}次", retry); return; } catch (Exception exception) { try { TimeUnit.MILLISECONDS.sleep(20); } catch (InterruptedException e) { log.warn("interrupted error"); } retry++; } } }
/** * 释放锁 * * @param productId */ public void releaseDistributedLock(Long productId) { try { this.zookeeper.delete(getPath(productId), -1); log.debug("release distributedLock ,{}", productId); } catch (Exception e) { log.error("release distributedLock error", e); } }
private String getPath(Long productId) { return "/product-lock-" + productId; }
@Override public void afterPropertiesSet() throws Exception { try { this.zookeeper = new ZooKeeper(this.nodes, this.sessionTimeout, watchedEvent -> { Watcher.Event.KeeperState state = watchedEvent.getState(); log.debug("zookeeper state:[{}]", state); if (state == Watcher.Event.KeeperState.SyncConnected) { log.info("zookeeper connected"); this.connectedSemaphore.countDown(); } }); } catch (IOException e) { log.error("zookeeper connect error", e); } this.connectedSemaphore.await(); }
@Override public void destroy() throws Exception { this.zookeeper.close(); } }
|