目录
Zookeeper部署与使用
1 分布式安装部署
配置服务器编号
增加zoo.cfg集群配置参数
2 启动集群服务器
3 启动集群客户端和命令
4 ️API的使用
引入依赖
创建客户端
创建节点
获取子节点并监听数据变化
判断节点是否存在
️服务器节点动态上下线案例分析
️代码实现
Zookeeper部署与使用
1 分布式安装部署
配置服务器编号
在.../zkData文件目录下创建myid文件,并写入服务器编号
[root@hadoop101 apache-zookeeper-3.5.10-bin]# cd zkData/
[root@hadoop101 zkData]# 1 > myid
bash: 1: 未找到命令...
[root@hadoop101 zkData]# echo 1 > myid
其他的两台服务器分别写为2 3
增加zoo.cfg集群配置参数
##############cluster##############
server.1=192.168.60.101:2888:3888
server.2=192.168.60.102:2888:3888
server.3=192.168.60.103:2888:3888
配置参数解读 server.A=B:C:D
A是一个数字,表示是第几号服务器。
Zookeeper启动的时候会先读取dataDir目录下的myid文件,拿到里面的数据与zoo.cfg里面的配置信息对比从而判断到底是那个server
B是这个服务器的ip地址。
C是这个服务器与Leader服务器交换同步副本信息的端口号。
D是当Leader宕机后,这个服务器与其他服务器通信交换选举信息的端口号。
2 启动集群服务器
先启动一台服务器,会发现由于半数机制,Zookeeper没有启动成功
[root@hadoop101 bin]# ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/module/apache-zookeeper-3.5.10-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@hadoop101 bin]# jps
33904 QuorumPeerMain
34066 Jps
[root@hadoop101 bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/module/apache-zookeeper-3.5.10-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Error contacting service. It is probably not running.
这时候再启动二号机,一号机就会变成Follower,二号机成为Leader
[root@hadoop101 bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/module/apache-zookeeper-3.5.10-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
3 启动集群客户端和命令
./zkcli.sh
shell命令
显示所有操作: help
查看当前znode中包含的内容:ls
查看当前znode中包含的详细内容:ls2
创建普通节点:create
[zk: localhost:2181(CONNECTED) 5] create /sanguo
Created /sanguo
[zk: localhost:2181(CONNECTED) 6] create /sanguo/shuguo "liubei"
Created /sanguo/shuguo
[zk: localhost:2181(CONNECTED) 7] get /sanguo
null
[zk: localhost:2181(CONNECTED) 8] get /sanguo/shuguo
liubei
获取节点内容:get
创建短暂节点:create -e,客户端退出后则消失:
[zk: localhost:2181(CONNECTED) 9] create -e /sanguo/wuguo "sunquan"
Created /sanguo/wuguo
[zk: localhost:2181(CONNECTED) 10] get /sanguo/wuguo
sunquan
[zk: localhost:2181(CONNECTED) 11] quit
[zk: localhost:2181(CONNECTED) 0] ls /sanguo
[shuguo]
创建带序号的节点:create -s
[zk: localhost:2181(CONNECTED) 0] ls /sanguo
[shuguo]
[zk: localhost:2181(CONNECTED) 1] create /sanguo/weiguo "caocao"
Created /sanguo/weiguo
[zk: localhost:2181(CONNECTED) 2] create -s /sanguo/weiguo "caocao"
Created /sanguo/weiguo0000000003
[zk: localhost:2181(CONNECTED) 3] create -s /sanguo/weiguo "caocao"
Created /sanguo/weiguo0000000004
[zk: localhost:2181(CONNECTED) 4] create -s /sanguo/weiguo "caocao"
Created /sanguo/weiguo0000000005
[zk: localhost:2181(CONNECTED) 5] ls /sanguo
[shuguo, weiguo, weiguo0000000003, weiguo0000000004, weiguo0000000005]
会自动为节点添加编号,没有则从0开始,否则编号顺序添加后有几个节点则编号就为几
修改节点的值:set
[zk: localhost:2181(CONNECTED) 6] set /sanguo/weiguo "simayi"
[zk: localhost:2181(CONNECTED) 7] get /sanguo/weiguo
weiguo weiguo0000000003 weiguo0000000004 weiguo0000000005
[zk: localhost:2181(CONNECTED) 7] get /sanguo/weiguo
simayi
节点值的变化监听:get -w
[zk: localhost:2181(CONNECTED) 4] get -w /sanguo
null
[zk: localhost:2181(CONNECTED) 5]
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/sanguo
watch监听只能生效一次
节点路径监听:ls -w
[zk: localhost:2181(CONNECTED) 8] ls -w /sanguo
[shuguo, weiguo, weiguo0000000003, weiguo0000000004, weiguo0000000005, wuguo]
[zk: localhost:2181(CONNECTED) 9]
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sanguo
如果只创建节点而不存值的话是监听不到的
删除节点:delete
[zk: localhost:2181(CONNECTED) 11] delete /sanguo/weiguo
weiguo weiguo0000000003 weiguo0000000004 weiguo0000000005
[zk: localhost:2181(CONNECTED) 11] delete /sanguo/weiguo
递归删除:deleteall
[zk: localhost:2181(CONNECTED) 12] delete /sanguo
Node not empty: /sanguo
[zk: localhost:2181(CONNECTED) 13] rmr /sanguo
The command 'rmr' has been deprecated. Please use 'deleteall' instead.
[zk: localhost:2181(CONNECTED) 14] deleteall /sanguo
Node does not exist: /sanguo
[zk: localhost:2181(CONNECTED) 15] ls /
[zookeeper]
4 ️API的使用
引入依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.1</version>
</dependency>
</dependencies>
创建客户端
使用构造函数创建Zookeeper客户端对象,参数为:
- ip地址+端口号(集群之间使用逗号隔开,不能添加空格)
会话超时时间
watcher匿名内部类,包含一个process方法,即一个
监听器对象
public class ZookeeperTest {
// Zookeeper客户端器的ip和端口号
private String connectString = "192.168.60.101:2181,192.168.60.102:2181,192.168.60.103:2181";
// 会话超时时间
private int sessionTimeOut = 2000;
@Test
public void test1() throws IOException {
ZooKeeper zkCli = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
}
20:08:20.001 [main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=192.168.60.101:2181,192.168.60.102:2181,192.168.60.103:2181 sessionTimeout=2000 watcher=com.hikaru.ZookeeperTest$1@50cbc42f
创建节点
public class ZookeeperTest {
// Zookeeper客户端器的ip和端口号
private String connectString = "192.168.60.101:2181,192.168.60.102:2181,192.168.60.103:2181";
// 会话超时时间
private int sessionTimeOut = 2000;
private ZooKeeper zkCli;
@Before
public void test1() throws IOException {
zkCli = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
@Test
public void createNodeTest() throws KeeperException, InterruptedException {
String path = zkCli.create("/zhanguo", "ZhenTianXingCun".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(path);
}
}
[zk: localhost:2181(CONNECTED) 11] get /zhanguo
ZhenTianXingCun
获取子节点并监听数据变化
查询节点
@Test
public void getDataAndWatchTest() throws KeeperException, InterruptedException {
List<String> children = zkCli.getChildren("/", false);
for(String child : children) {
System.out.println(child);
}
}
zookeeper
zhanguo
查询并监控
@Test
public void getDataAndWatchTest() throws KeeperException, InterruptedException {
List<String> children = zkCli.getChildren("/", true);
for(String child : children) {
System.out.println(child);
}
Thread.sleep(Long.MAX_VALUE);
}
21:09:45.568 [main-SendThread(192.168.60.101:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got notification session id: 0x100001a97080003
21:09:45.569 [main-SendThread(192.168.60.101:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/ for session id 0x100001a97080003
判断节点是否存在
@Test
public void exist() throws KeeperException, InterruptedException {
Stat exists = zkCli.exists("/zhanguo", false);
System.out.println(exists);
}
️服务器节点动态上下线案例分析
需求:客户端能实时洞察到服务器上下线的变化
分析:相对于Zookeeper集群,服务器端和客户端都是“客户端”:服务端需要向集群写数据,客户端则需要监视数据
服务器启动的时候就去注册信息,此时在Zookeeper创建的都是临时节点
客户端启动就去getChildren,获取当前在线服务器列表
,并注册监听(注册实际上就是创建一个节点)
服务器下线的时候就会通知客户端
️代码实现
如上面所说的,这里的服务器和客户端是相对于我们开发人员接触到的分布式微服务而言的,下面实现的本质是依靠Zookeeper客户端实现server注册(本质即在Zookeeper集群创建节点)和数据监听(本质监听Zookeeper集群)
服务器注册代码实现,比较简单不再赘述,需要注意一点创建的节点应该是带序号的临时节点
public class DistributeServer {
// Zookeeper客户端器的ip和端口号
private String connectString = "192.168.60.101:2181,192.168.60.102:2181,192.168.60.103:2181";
// 会话超时时间
private int sessionTimeOut = 2000;
private ZooKeeper zkCli;
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeServer distributeServer = new DistributeServer();
// 1 server连接集群
distributeServer.connect();
// 2 server注册
distributeServer.register(args);
// 3 业务逻辑
distributeServer.services();
}
private void services() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void register(String[] hosts) throws KeeperException, InterruptedException {
for(String host : hosts) {
zkCli.create("/servers/server", host.getBytes(StandardCharsets.UTF_8),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
private void connect() throws IOException {
this.zkCli = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
}
客户端监听代码实现
- 首先客户端连接Zookeeper集群,并进行注册监听
注册监听会首先利用
Zookeeper客户端
获取并监听持久节点/services下面的子节点,子节点的值对应一台服务器,如此便获取到全部节点信息每当节点信息发生变动则会触发
process()
方法,继续注册一次监听并实时获取全部节点信息(这样做的目的是监听只能生效一次)public class DistributeClient {
// Zookeeper客户端器的ip和端口号
private String connectString = "192.168.60.101:2181,192.168.60.102:2181,192.168.60.103:2181";
// 会话超时时间
private int sessionTimeOut = 3000;
private ZooKeeper zkCli;
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeClient distributeClient = new DistributeClient();
// 1 客户端连接集群
distributeClient.connect();
// 2 Client注册监听
distributeClient.watch();
// 3 业务逻辑
distributeClient.services();
}
private void services() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void watch() throws KeeperException, InterruptedException {
List<String> children = zkCli.getChildren("/servers", true);
List<String> serversOnline = new ArrayList<>();
for(String child : children) {
byte[] host = zkCli.getData("/servers/" + child, false, null);
serversOnline.add(new String(host));
}
System.out.println(serversOnline);
}
private void connect() throws IOException {
this.zkCli = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
watch();
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
});
}
}