ZooKeeper 相關概念以及使用小結

ZooKeeper 相關概念以及使用小結

Dubbo 通過註冊中心在分佈式環境中實現服務的註冊與發現,而註冊中心通常采用 ZooKeeper,研究註冊中心相關源碼繞不開 ZooKeeper,所以學習瞭 ZooKeeper 的基本概念以及相關 API 操作。

ZooKeeper 相關概念

session

客戶端與服務端采用 TCP 長連接,服務端在為客戶端創建 Session 會分配一個唯一 sessionId。在 Session timeout 時間內,客戶端可以向服務端發送請求以及接受 watcher 事件通知。

數據結構

Zookeeper 將所有數據存儲在內存中,數據模型是一棵樹(Znode Tree),由斜杠(/)的進行分割的路徑,就是一個Znode,例如/foo/path1。

ZooKeeper 相關概念以及使用小結

Znode

Znode 將會保存數據內容以及相關屬性信息。在 Znode 中使用 Stat 數據結保存相關屬性信息。Stat 屬性中有三種版本信息,分別為 version:當前節點版本信息,cversion:當前節點子節點版本,aversion 當前節點的 ACL 版本。每次發生改動,版本數值將會單調遞增。

更新,刪除 Znode 可以傳入版本數值,如果版本數值不對,將會導致刪除/更新失敗,這個特性類似於 CAS 操作。

Znode 有以下幾種類型:

  1. 永久節點

一旦創建,將會一直存在,除非手動刪除。dubbo 目錄節點為永久節點。

  1. 臨時節點

臨時節點基於客戶端 Session,Session 有效期內將會一直存在,Session 失效,節點將會自動刪除。

利用這個機制,Dubbo 服務者創建的節點就是臨時節點。如果 Dubbo 服務者程序意外宕機,在 Session 超時之後,也能自動刪除服務節點,自動下線有問題的服務。

3 順序節點

創建順序節點將會自動在名字後追加整形數字,默認長度為 10 位。順序節點也分為永久與臨時。

利用臨時順序節點,我們可以用來實現分佈式鎖 https://juejin.im/post/5c01532ef265da61362232ed。

Watcher 機制

客戶端可以在指定節點註冊監聽器(Watcher),在觸發特定事件後,ZooKeeper 服務端會將事件通知到客戶端。在 Dubbo 中消費者基於 watcher 機制可以動態感知到新的服務者加入。

ZooKeeper 可以在三種請求中設置監聽,分別為:

  • getData(),獲取節點數據
  • getChildren() 獲取子節點
  • exists() 判斷節點是否存在

通知事件類型分為,增刪改事件,以及子節點變動事件。

需要註意的是,watcher 通知過一次之後將會失效,若想繼續監聽通知,需要重新註冊。

ZooKeeper 原生 API 操作

ZooKeeper 官方提供 Java API 實現,提供相關操作的方法。

創建連接


ZooKeeper zk=new ZooKeeper("127.0.0.1:2181", 150000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("已經觸發瞭" + watchedEvent.getType() + "事件"+watchedEvent);
}
});

創建連接需要傳入 ZooKeeper 服務端地址,然後設定 session 超時時間,另外還需要創建一個 Watcher,用於監聽連接事件。建立連接之後,就可以使用該客戶端操作。

CURD 操作


// 創建永久節點,需要傳入 ACL 權限列表,以及指定節點類型
zk.create("/test","test".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL,CreateMode.PERSISTENT);
// 修改節點值。更新節點值需要傳入節點的版本,如果版本與服務端版本不一致,更新失敗,類似 CAS 機制。-1 代表不比較節點版本
zk.setData("/test","test1".getBytes(),-1);
// 刪除節點.刪除節點也需要傳入節點版本
zk.delete("/test",-1);
// 創建臨時節點
zk.create("/ephemeral","ephemeral".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL,CreateMode.EPHEMERAL);

ZooKeeper 客戶端相關 CRUD 操作如上。可以看到相關操作比較繁瑣,需要傳入參數較多。

watcher

 // 在 exists 註冊 watcher,創建節點,刪除節點,改變節點將會觸發回調
zk.exists("/test", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("回調實例,類型為:"+event.getType());
}
});
// 獲取節點數據,可以註冊 watcher,刪除節點以及改變節點數據可以觸發回調
zk.getData("/test", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("回調實例,類型為:"+event.getType());
}
},new Stat());
// 獲取子節點,註冊 watcher,一級子節點變動後將會觸發回調
zk.getChildren("/test", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("回調實例,類型為:"+event.getType());
}
});

ZooKeeper API 可以為三種操作註冊 watcher,一旦相關節點變動將會觸發事件通知。

Curator

從上面代碼示例可以看到 ZooKeeper 提供 API 比較復雜且難用。可以使用 Curator 或者 zkclient 這種第三方框架代替原生 API。這類框架封裝 ZooKeeper 原生 API,抽象化相關接口,簡化操作難度。

dubbo 抽象相關 ZooKeeper 操作,並分別使用 Curator 或者 zkclien 實現。在 dubbo 2.6.1 版本之後將會默認使用 Curator,之前版本默認使用 zkclient 。

下面我們使用 Curator 操作 ZooKeeper 。

建立連接

 // 設置重試策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
// 默認 session 超時時間 60 s
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);

Curator 創建連接與原生 API 大致相關,不過需要設置重試策略,第一次連接失敗,Curator 可以重新嘗試連接,直到超過最大連接次數。

節點 CURD 操作

 // 創建目錄節點
client.create().forPath("/test", "123456789".getBytes());
// 創建普通節點
client.create().forPath("/test/normal", "123121".getBytes());
// 修改普通節點內容
client.setData().forPath("/test/normal", "1121231231".getBytes());
// 刪除節點
client.delete().forPath("/test/normal");
// 創建臨時節點
client.create().withMode(CreateMode.EPHEMERAL).forPath("/ephemeral");
// 創建永久順序節點
client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/sequential");

// 獲取所有子節點
List nodes = client.getChildren().forPath("/parent");

Curator CRUD 操作比較簡單,無需設置相關屬性參數。

設置監聽

Curator 相關監聽 API 封裝 zookeeper 原生API,內部增加重復註冊等功能,從而使監聽可以重復使用。

Curator 存在三種類型 API。

  • NodeCache:針對節點增刪改操作。
  • PathChildrenCache:針對節點一級目錄下節點增刪改監聽
  • TreeCache:結合 NodeCache 與 PathChildrenCache 操作,不僅可以監聽當前節點,還可以監聽節點下任意子節點(支持多級)變動。
 // `NodeCache`使用方式
NodeCache nodeCache=new NodeCache(client,"/test1",false);
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("當前節點:"+nodeCache.getCurrentData());
}
});
nodeCache.start();
// PathChildrenCache 使用方式
PathChildrenCache pathChildrenCache=new PathChildrenCache(client,"/test2",false);
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println(event);
}
});
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
System.out.println("註冊watcher成功...");
// TreeCache 使用方式
TreeCache treeCache=new TreeCache(client,"/tree");
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println(event);
}
});
treeCache.start();
System.out.println("註冊watcher成功...");

Published in News by Awesome.

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *