前面七讲我们基本上介绍完了 Zookeeper C API 的所有内容,本文将结合一个小例子讲讲如何在你的实际项目中使用 Zookeeper 服务。
设想如下场景:
假设程序 A 需要 7* 24 小时在线对外提供服务,但是 A 程序在生产环境下总是不稳定,时常崩溃,不过幸运的是解决方案很简单,在 A 程序崩溃以后只需要重启它就可以了。当然如此简单的问题你可以提出多种解决方案,比方说自己实现一个服务程序,每隔一定时间去轮询 A 的状态,如果发现 A 崩溃了,立即重启它,并向管理人员报告问题。不过我们并不打算这么做,毕竟本文主题是讲 Zookeeper C API 的应用,所以我们采用 Zookeeper 服务来解决该问题。
若采用 Zookeeper 服务可以按照如下方案解决问题,程序 A 在启动时创建一个临时 (ZOO_EPHEMERAL) znode 节点 /A,然后按照正常流程对外提供服务。另外监控程序对 /A 节点设置监视,当 /A 节点消失(说明 A 程序已经崩溃) 时,重启 A 程序。假设 A 的名称是 QueryServer,即对外提供查询服务的程序,具体提供什么查询服务由应用自身决定,我们这里只是简单地模拟一下。QueryServer 在启动时创建一个 /QueryServer 的临时节点 (ZOO_EPHEMERAL),然后,程序 QueryServerd 监控 /QueryServer 节点,当 /QueryServer 节点消失(说明 A 程序已经崩溃) 时,重启 QueryServer 程序。
下面是 QueryServer 的实现代码:
- /*
- * =============================================================================
- *
- * Filename: QueryServer.c
- *
- * Description: QueryServer
- *
- * Created: 02/15/2013 08:48:49 PM
- *
- * Author: Fu Haiping (forhappy), haipingf@gmail.com
- * Company: ICT ( Institute Of Computing Technology, CAS )
- *
- * =============================================================================
- */
- #include#include#include#include < string.h > #include#include
- void QueryServer_watcher_g(zhandle_t * zh, int type, int state, const char * path, void * watcherCtx) {
- if (type == ZOO_SESSION_EVENT) {
- if (state == ZOO_CONNECTED_STATE) {
- printf("[[[QueryServer]]] Connected to zookeeper service successfully!n");
- } else if (state == ZOO_EXPIRED_SESSION_STATE) {
- printf("Zookeeper session expired!n");
- }
- }
- }
- void QueryServer_string_completion(int rc, const char * name, const void * data) {
- fprintf(stderr, "[%s]: rc = %dn", (char * )(data == 0 ? "null": data), rc);
- if (!rc) {
- fprintf(stderr, "tname = %sn", name);
- }
- }
- void QueryServer_accept_query() {
- printf("QueryServer is running...n");
- }
- int main(int argc, const char * argv[]) {
- const char * host = "127.0.0.1:2181,127.0.0.1:2182,""127.0.0.1:2183,127.0.0.1:2184,127.0.0.1:2185";
- int timeout = 30000;
- zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
- zhandle_t * zkhandle = zookeeper_init(host, QueryServer_watcher_g, timeout, 0, "hello zookeeper.", 0);
- if (zkhandle == NULL) {
- fprintf(stderr, "Error when connecting to zookeeper servers...n");
- exit(EXIT_FAILURE);
- }
- // struct ACL ALL_ACL[] = {{ZOO_PERM_ALL, ZOO_ANYONE_ID_UNSAFE}};
- // struct ACL_vector ALL_PERMS = {1, ALL_ACL};
- int ret = zoo_acreate(zkhandle, "/QueryServer", "alive", 5, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, QueryServer_string_completion, "zoo_acreate");
- if (ret) {
- fprintf(stderr, "Error %d for %sn", ret, "acreate");
- exit(EXIT_FAILURE);
- }
- do {
- // 模拟 QueryServer 对外提供服务.
- // 为了简单起见, 我们在此调用一个简单的函数来模拟 QueryServer.
- // 然后休眠 5 秒,程序主动退出(即假设此时已经崩溃).
- QueryServer_accept_query();
- sleep(5);
- } while ( false );
- zookeeper_close(zkhandle);
- }
Makefile 如下:
- all: QueryServer
- QueryServer: QueryServer.o gcc - L / usr / local / lib / -lzookeeper_mt - o $@$ ^
- QueryServer.o: QueryServer.c gcc - DTHREADED - I / usr / local / include / zookeeper - o $@ - c $ ^
- .PHONY: clean
- clean: rm QueryServer.o QueryServer
QueryServerd 代码如下:
- /*
- * =============================================================================
- *
- * Filename: QueryServerd.c
- *
- * Description: QueryServer daemon using zookeeper.
- *
- * Created: 02/15/2013 08:48:49 PM
- *
- * Author: Fu Haiping (forhappy), haipingf@gmail.com
- * Company: ICT ( Institute Of Computing Technology, CAS )
- *
- * =============================================================================
- */
- #include#include#include#include < string.h > #include#include#include#include
- void QueryServerd_watcher_global(zhandle_t * zh, int type, int state, const char * path, void * watcherCtx);
- static void QueryServerd_dump_stat(const struct Stat * stat);
- void QueryServerd_stat_completion(int rc, const struct Stat * stat, const void * data);
- void QueryServerd_watcher_awexists(zhandle_t * zh, int type, int state, const char * path, void * watcherCtx);
- static void QueryServerd_awexists(zhandle_t * zh);
- void QueryServerd_watcher_global(zhandle_t * zh, int type, int state, const char * path, void * watcherCtx) {
- if (type == ZOO_SESSION_EVENT) {
- if (state == ZOO_CONNECTED_STATE) {
- printf("Connected to zookeeper service successfully!n");
- } else if (state == ZOO_EXPIRED_SESSION_STATE) {
- printf("Zookeeper session expired!n");
- }
- }
- }
- static void QueryServerd_dump_stat(const struct Stat * stat) {
- char tctimes[40];
- char tmtimes[40];
- time_t tctime;
- time_t tmtime;
- if (!stat) {
- fprintf(stderr, "nulln");
- return;
- }
- tctime = stat - >ctime / 1000;
- tmtime = stat - >mtime / 1000;
- ctime_r( & tmtime, tmtimes);
- ctime_r( & tctime, tctimes);
- fprintf(stderr, "tctime = %stczxid=%llxn""tmtime=%stmzxid=%llxn""tversion=%xtaversion=%xn""tephemeralOwner = %llxn", tctimes, stat - >czxid, tmtimes, stat - >mzxid, (unsigned int) stat - >version, (unsigned int) stat - >aversion, stat - >ephemeralOwner);
- }
- void QueryServerd_stat_completion(int rc, const struct Stat * stat, const void * data) {
- // fprintf(stderr, "%s: rc = %d Stat:n", (char *) data, rc);
- // QueryServerd_dump_stat(stat);
- }
- void QueryServerd_watcher_awexists(zhandle_t * zh, int type, int state, const char * path, void * watcherCtx) {
- if (state == ZOO_CONNECTED_STATE) {
- if (type == ZOO_DELETED_EVENT) {
- printf("QueryServer gone away, restart now...n");
- // re-exists and set watch on /QueryServer again.
- QueryServerd_awexists(zh);
- pid_t pid = fork();
- if (pid < 0) {
- fprintf(stderr, "Error when doing fork.n");
- exit(EXIT_FAILURE);
- }
- if (pid == 0) {
- /* child process */
- // 重启 QueryServer 服务.
- execl("/tmp/QueryServer/QueryServer", "QueryServer", NULL);
- exit(EXIT_SUCCESS);
- }
- sleep(1);
- /* sleep 1 second for purpose. */
- } else if (type == ZOO_CREATED_EVENT) {
- printf("QueryServer started...n");
- }
- }
- // re-exists and set watch on /QueryServer again.
- QueryServerd_awexists(zh);
- }
- static void QueryServerd_awexists(zhandle_t * zh) {
- int ret = zoo_awexists(zh, "/QueryServer", QueryServerd_watcher_awexists, "QueryServerd_awexists.", QueryServerd_stat_completion, "zoo_awexists");
- if (ret) {
- fprintf(stderr, "Error %d for %sn", ret, "aexists");
- exit(EXIT_FAILURE);
- }
- }
- int main(int argc, const char * argv[]) {
- const char * host = "127.0.0.1:2181,127.0.0.1:2182,""127.0.0.1:2183,127.0.0.1:2184,127.0.0.1:2185";
- int timeout = 30000;
- zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
- zhandle_t * zkhandle = zookeeper_init(host, QueryServerd_watcher_global, timeout, 0, "QueryServerd", 0);
- if (zkhandle == NULL) {
- fprintf(stderr, "Error when connecting to zookeeper servers...n");
- exit(EXIT_FAILURE);
- }
- QueryServerd_awexists(zkhandle);
- // Wait for asynchronous zookeeper call done.
- getchar();
- zookeeper_close(zkhandle);
- return 0;
- }
Makefile 如下:
- all: QueryServerd
- QueryServerd: QueryServerd.o gcc - L / usr / local / lib / -lzookeeper_mt - o $@$ ^
- QueryServerd.o: QueryServerd.c gcc - g - DTHREADED - I / usr / local / include / zookeeper - o $@ - c $ ^
- .PHONY: clean
- clean: rm QueryServerd.o QueryServerd
首先执行 QueryServerd,
- forhappy@haiping - ict: /tmp/QueryServerd$. / QueryServerd Connected to zookeeper service successfully !
然后执行 QueryServer,
- forhappy@haiping - ict: /tmp/QueryServer$. / QueryServer QueryServer is running... [[[QueryServer]]] Connected to zookeeper service successfully ! [zoo_acreate] : rc = 0 name = /QueryServer/
可见 Queryerver 创建了 /QueryServer 节点,5 秒后 QueryServer 模拟程序崩溃而退出,那么此时在 QueryServerd 端输出如下:
- Connected to zookeeper service successfully ! QueryServer started...#QueryServerd感知到QueryServer已正常启动.QueryServer gone away,
- restart now...#5秒钟后,QueryServer崩溃,QueryServerd准备重启QueryServer.QueryServer is running...# QueryServer正在运行,以下3行是QueryServer输出结果。 [[[QueryServer]]] Connected to zookeeper service successfully ! [zoo_acreate] : rc = 0 name = /QueryServer
- QueryServer started... # QueryServerd 感知到 QueryServer 已正常启动.
- QueryServer gone away, restart now...# 又过了 5 秒钟后,QueryServer 崩溃,QueryServerd 准备重启 QueryServer.
- QueryServer is running... # QueryServer 再次运行,以下 3 行是 QueryServer 输出结果。
- [[[QueryServer]]] Connected to zookeeper service successfully!
- [zoo_acreate]: rc = 0
- name = /QueryServer QueryServer started...# QueryServerd再次感知到QueryServer已正常启动,如此反复.QueryServer gone away,
- restart now...QueryServer is running... [[[QueryServer]]] Connected to zookeeper service successfully ! [zoo_acreate] : rc = 0 name = /QueryServer
- QueryServer started.../
即 QueryServer 每 5 秒钟崩溃一次,然后又被 QueryServerd 重启,模拟了上面的应用场景。
好了 Zookeeper C API 的应用小示例讲完了,可能应用场景选取的不好,不过大致可以一些说明问题吧,如果你想看 Zookeeper 更贴近现实的应用场景,可以参考淘宝的一篇文章《ZooKeeper 典型应用场景一览》和 IBM developerWorks 的一篇博文《分布式服务框架 Zookeeper -- 管理分布式环境中的数据》。
来源: http://it.taocms.org/03/21.htm