烂笔头

不积跬步无以至千里

0%

zookeeper-实现分布式锁

分布式锁可以避免不同节点重复相同的工作,也可以进行分布式节点的任务进行协调。
实现分布式锁的方式有很多,zookeeper的实现也比较简单

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
<version>3.5.8</version>
</dependency>

将zookeeper配置外置

1
2
3
4
5
6
7
8
9
10
11
@ConfigurationProperties(prefix = ZookeeperProperties.PREFIXX)
public class ZookeeperProperties {
static final String PREFIXX = "zoo";

private List<String> nodes;

private int sessionTimeout = 5000;

...getter
...setter
}

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@Slf4j
public class ZookeeperSession implements InitializingBean, DisposableBean {
private final String nodes;
private final int sessionTimeout;
private ZooKeeper zookeeper;
private final CountDownLatch connectedSemaphore = new CountDownLatch(1);

public ZookeeperSession(ZookeeperProperties properties) {
this.sessionTimeout = properties.getSessionTimeout();
this.nodes = StringUtils.join(properties.getNodes(), ",");
}

/**
* 获取锁
*
* @param productId 商品标识
*/
public void acquireDistributedLock(Long productId) {
String path = getPath(productId);
byte[] data = "".getBytes();
//尝试创建锁
int retry = 1;
while (true) {
try {
this.zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
log.info("获取锁成功,尝试{}次", retry);
return;
} catch (Exception exception) {
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
log.warn("interrupted error");
}
retry++;
}
}
}

/**
* 释放锁
*
* @param productId
*/
public void releaseDistributedLock(Long productId) {
try {
this.zookeeper.delete(getPath(productId), -1);
log.debug("release distributedLock ,{}", productId);
} catch (Exception e) {
log.error("release distributedLock error", e);
}
}

private String getPath(Long productId) {
return "/product-lock-" + productId;
}


@Override
public void afterPropertiesSet() throws Exception {
try {
this.zookeeper = new ZooKeeper(this.nodes, this.sessionTimeout, watchedEvent -> {
Watcher.Event.KeeperState state = watchedEvent.getState();
log.debug("zookeeper state:[{}]", state);
if (state == Watcher.Event.KeeperState.SyncConnected) {
log.info("zookeeper connected");
this.connectedSemaphore.countDown();
}
});
} catch (IOException e) {
log.error("zookeeper connect error", e);
}
this.connectedSemaphore.await();
}

@Override
public void destroy() throws Exception {
this.zookeeper.close();
}
}

注册为spring bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
@EnableConfigurationProperties(ZookeeperProperties.class)
public class ZookeeperSessionConf {

private final ZookeeperProperties properties;

public ZookeeperSessionConf(ZookeeperProperties properties) {
this.properties = properties;
}

@Bean
ZookeeperSession zookeeperSession() {
return new ZookeeperSession(properties);
}
}
1
2
3
4
5
zoo:
nodes:
- 192.168.0.201:2181
- 192.168.0.202:2181
- 192.168.0.203:2181

注入ZookeeperSession进行锁定和释放