0%

zookeeper 实现配置中心、分布式锁

Maven依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.0</version>
</dependency>

客户端连接工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ZkUtil {
private static volatile ZooKeeper zk;
/**
* zk服务端地址,/可以添加上级节点目录,之后客户端的所有操作都基于此目录
*/
private static String connectAddr = "192.168.142.121:2181,192.168.142.122:2181,192.168.142.123:2181,192.168.142.124:2181/myLock";
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static DefaultWatch watch = new DefaultWatch();
public static ZooKeeper getZk(){
if(zk == null){
synchronized (ZooKeeper.class){
if(zk == null){
try {
zk = new ZooKeeper(connectAddr, 3000, watch);
watch.setCountDownLatch(countDownLatch);
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
return zk;
}
}

配置中心

监控回调类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/**
* 监控回调类
* 实现Watcher,process(WatchedEvent watchedEvent)方法可以获取节点的事件状态进行监控,例如节点的创建、改变、删除
* 实现AsyncCallback.StatCallback的processResult方法可以对操作节点的状态进行监控,判断是否存在的方法触发
* 实现AsyncCallback.DataCallback的processResult方法可以监控节点,获取节点修改数据
* @author yrl
* @date 2021/5/18
*/
public class WatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
/**
* 客户端连接
*/
private ZooKeeper zk;
/**
* 用于存放节点名和节点内容
*/
private MyContent myContent;
/**
*
*/
private CountDownLatch countDownLatch = new CountDownLatch(1);

public ZooKeeper getZk() {
return zk;
}

public void setZk(ZooKeeper zk) {
this.zk = zk;
}

public MyContent getMyContent() {
return myContent;
}

public void setMyConfig(MyContent myContent) {
this.myContent = myContent;
}

public void await(){
//判断节点是否存在
//Watcher
//StatCallback
zk.exists(myContent.getPathName(),this,this,"ctx-exists");
try {
countDownLatch.await();
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}

/**
* AsyncCallback.DataCallback
* 数据回调
*/
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if(data != null){
myContent.setContent(new String(data));
countDownLatch.countDown();
}
}

/**
* AsyncCallback.StatCallback
* 状态回调
*/
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if(stat != null){
zk.getData(myContent.getPathName(),this,this,"ctx-StatCallback");
}
}

/**
* Watcher
* 节点事件回调
*/
@Override
public void process(WatchedEvent watchedEvent) {
Event.EventType type = watchedEvent.getType();
switch (type) {
case None:
break;
case NodeCreated:
//没有创建
zk.getData(myContent.getPathName(),this,this,"ctx-Watcher-nodeCreate");
break;
case NodeDeleted:
myContent.setContent("");
countDownLatch = new CountDownLatch(1);
break;
case NodeDataChanged:
//节点数据改变
zk.getData(myContent.getPathName(),this,this,"ctx-Watcher-nodeChanged");
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
case PersistentWatchRemoved:
break;
}
}
}

测试运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
  public class Test {

private ZooKeeper zk;

@Before
public void connect(){
zk = ZkUtil.getZk();
}

@After
public void close(){
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

/**
* 尝试使用zk实现发现配置功能
* 当node节点被修改时,客户端客户监控更新本地配置
*
*
* 当获取不到数据时调用await()
* 调用zk的exists方法,使用Watcher和AsyncCallback.StatCallback两种监控
* Watcher
* =>监控节点是否被删除、创建、改变
* => 删除
* => myConfig.setContent("")、重新创建闭锁对象,让客户端进入等待中
* => 创建、改变
* => 调用getData方法,使用watch和AsyncCallback.DataCallback
* => watch继续监控节点变化
* => DataCallback,修改数据,放行闭锁
* AsyncCallback.StatCallback
* => 调用getData方法,使用watch和AsyncCallback.DataCallback
* => watch继续监控节点变化
* => DataCallback,修改数据,放行闭锁
*
* 调用countDownLatch.await()
*/
@org.junit.Test
public void test(){
WatchCallBack watchCallBack = new WatchCallBack();
watchCallBack.setZk(zk);
//用于存放节点名和节点内容
MyContent myConfig = new MyContent();
myConfig.setPathName("/xxx");

watchCallBack.setMyConfig(myConfig);
watchCallBack.await();

while (true){
//内容为空
if(StringUtils.isBlank(myConfig.getContent())){
System.out.println("no data");
watchCallBack.await();
}else {
System.out.println(myConfig.getContent());
}

//控制获取节点内容输出的速度,方便查看
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

分布式锁

监控回调类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/**
* tryLock()
* zk.create()
* countDownLatch.await()
* AsyncCallback.StringCallback.processResult
* zk.getChildren()
* AsyncCallback.Children2Callback.processResult
* zk.setData()和countDown()
* zk.exists()
* watch.process()
* 当nodeDelete时zk.getChildren()重复上面步骤
*
*
* @author yrl
* @date 2021/5/19
*/
public class WatchCallback implements Watcher, AsyncCallback.StringCallback , AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
private ZooKeeper zk;
private MyContent myContent;
private String curPathName;
private CountDownLatch countDownLatch = new CountDownLatch(1);

public ZooKeeper getZk() {
return zk;
}

public void setZk(ZooKeeper zk) {
this.zk = zk;
}

public MyContent getMyContent() {
return myContent;
}

public void setMyContent(MyContent myContent) {
this.myContent = myContent;
}

public void tryLock(){
try {
System.out.println("lock---->" + myContent.getThreadName());
//TODO 此处可增加节点内容数据判断,实现可重入操作
//当前线程创建临时序列节点
zk.create(myContent.getPathName(),myContent.getContent().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL,this,"ctx");
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void unLock(){
//删除当前节点,让其他监控该节点的线程执行
try {
System.out.println("unlock---->" + myContent.getThreadName());
zk.delete(curPathName,-1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}


/**
* 前一个节点发生事件时,通过此方法回调当前节点
* watch监控回调
*/
@Override
public void process(WatchedEvent watchedEvent) {
Event.EventType type = watchedEvent.getType();
switch (type) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
//前一个节点被删除,获取父节点下的所有子节点 => Children2Callback中判断当前节点是否为第一个节点
//非第一个节点挂了,当前节点也能收到回调,获取父节点下的所有子节点,重新监控当前节点前面的节点
zk.getChildren("/",false,this,"ctx");
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
case PersistentWatchRemoved:
break;
}
}


/**
* AsyncCallback.StringCallback
* create操作的回调函数
*/
@Override
public void processResult(int rc, String pathName, Object ctx, String nodeName) {
//创建的节点名称不能为空
if(!StringUtils.isBlank(nodeName)){
//获取根节点下的子节点集合,并在Children2Callback回调中操作
//当前操作是为了操作子节点,不需要监控当前根节点,所以watch设置为false
System.out.println(myContent.getThreadName() + " 创建 " + nodeName);
curPathName = nodeName;
zk.getChildren("/",false,this,"ctx");
}
}

/**
* AsyncCallback.Children2Callback
* getChildren操作的回调函数
*/
@Override
public void processResult(int rc, String pathName, Object ctx, List<String> childrenList, Stat stat) {
//获取的节点列表是乱序的,需要将其进行排序操作
Collections.sort(childrenList);
//截取pathName“/”后面的数据
//判断当前节点是否为最靠前的节点,是的话才允许往后进行操作
int index = childrenList.indexOf(curPathName.substring(1));
if(index == 0){
//将抢到锁的线程名称放入节点中,在创建节点的地方判断放行,实现可重入机制
try {
System.out.println(myContent.getThreadName() + "---have lock " + curPathName);
zk.setData("/",myContent.getThreadName().getBytes(),-1);
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}else {
System.out.println("no first");
//非第一个节点时,判断当前节点的前一节点是否存在,当前节点通过watch监控
zk.exists("/" + childrenList.get(index - 1),this,this,"ctx");
}
}

/**
* 判断是否存在的节点状态回调
* AsyncCallback.StatCallback
*/
@Override
public void processResult(int rc, String pathName, Object ctx, Stat stat) {

}
}

测试运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class TestLock {
private ZooKeeper zk;

@Before
public void connect(){
zk = ZkUtil.getZk();
}

@After
public void close(){
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Test
public void test(){
for (int i = 0; i < 10; i++) {
new Thread(){
@Override
public void run() {
WatchCallback watchCallback = new WatchCallback();
MyContent myContent = new MyContent();
myContent.setContent("");
myContent.setPathName("/test");
myContent.setThreadName(Thread.currentThread().getName());
watchCallback.setMyContent(myContent);
watchCallback.setZk(zk);

watchCallback.tryLock();
//不睡眠的话,可能第一个节点已经删除,第二个节点还没监控,等第二个节点开启监控时,第一个节点已经没了
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("doing。。。");
watchCallback.unLock();
}
}.start();
}

while (true){

}
}
}