Akka提供的非常吸引人的特性之1就是輕松構建自定義集群,這也是我要選擇Akka的最基本緣由之1。如果你不想敲太多代碼,也能夠通過簡單的配置構建1個非常簡單的集群。本文為說明Akka集群構建的學習本錢低廉,以Akka官網的例子代碼動身,進行簡單改造后與Spring集成,有關Spring集成的信息你可以選擇瀏覽《Spring與Akka的集成》1文。本文所講述的是1款10分簡便的集群監聽器,它通過定閱集群成員的消息,對全部集群的成員進行管理(管理的方式只是打印1行日志)。
根據Akka官網的描寫——Akka集群特性提供了容錯的、去中心化的、基于集群成員關系點對點的,不存在單點問題、單點瓶頸的服務。其實現原理為閑談協議和失敗檢查。
這里以Akka官網提供的成員狀態狀態圖為例,如圖1所示。
圖1
圖1展現了狀態轉換的兩個因素:動作和狀態。
本節將要展現構建集群所需要的最基本的配置,幾近不會引入過量的開發本錢,1個集群就構建完成了。application.conf文件的內容以下:
akka { actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 2551 } } cluster { seed-nodes = [ "akka.tcp://metadataAkkaSystem@127.0.0.1:2551", "akka.tcp://metadataAkkaSystem@127.0.0.1:2552"] #//#snippet # excluded from snippet auto-down-unreachable-after = 10s #//#snippet # auto downing is NOT safe for production deployments. # you may want to use it during development, read more about it in the docs. # # auto-down-unreachable-after = 10s # Disable legacy metrics in akka-cluster. metrics.enabled=off } }此配置文件與我在《使用Akka的遠程調用》1文中的配置有很多不同:
代碼清單1
@Named("SimpleClusterListener") @Scope("prototype") public class SimpleClusterListener extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); Cluster cluster = Cluster.get(getContext().system()); // subscribe to cluster changes @Override public void preStart() { // #subscribe cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class); // #subscribe } // re-subscribe when restart @Override public void postStop() { cluster.unsubscribe(getSelf()); } @Override public void onReceive(Object message) { if (message instanceof MemberUp) { MemberUp mUp = (MemberUp) message; log.info("Member is Up: {}", mUp.member()); } else if (message instanceof UnreachableMember) { UnreachableMember mUnreachable = (UnreachableMember) message; log.info("Member detected as unreachable: {}", mUnreachable.member()); } else if (message instanceof MemberRemoved) { MemberRemoved mRemoved = (MemberRemoved) message; log.info("Member is Removed: {}", mRemoved.member()); } else if (message instanceof MemberEvent) { // ignore } else { unhandled(message); } } }
logger.info("Start simpleClusterListener"); final ActorRef simpleClusterListener = actorSystem.actorOf(springExt.props("SimpleClusterListener"), "simpleClusterListener"); actorMap.put("simpleClusterListener", simpleClusterListener); logger.info("Started simpleClusterListener");
我們首先啟動第1個種子節點,配置跟第1小節完全1致。我們視察SimpleClusterListener的日志輸出以下圖所示。
我們再啟動第2個種子節點,其配置的akka.remote.netty.tcp.port為2552,我們視察SimpleClusterListener的日志輸出以下圖所示。
我們再啟動1個非種子節點,沒有為其指定akka.remote.netty.tcp.port,我們視察SimpleClusterListener的日志輸出以下圖所示。
可以看到新加入的節點信息被SimpleClusterListener打印出來了,仔細的同學可能發現了1些Akka集群中各個節點的狀態遷移信息,第1個種子節點正在加入本身創建的集群時的狀態時JOINING,由于第1個種子節點將自己率先選舉為Leader,因此它還將自己的狀態改變成Up。后面它還將第2個種子節點和第3個節點從JOINING轉換到Up狀態。
我們停止第3個加入的節點,我們視察SimpleClusterListener的日志輸出以下圖所示。
可以看到其狀態首先被標記為Down,最后被轉換為Removed。
總結
通過以上介紹相信大家對使用Akka構建集群有了基本的認識,是否是很輕松?
其它Akka利用的博文以下:
后記:個人總結整理的《深入理解Spark:核心思想與源碼分析》1書現在已正式出版上市,目前京東、鐺鐺、天貓等網站均有銷售,歡迎感興趣的同學購買。
京東:http://item.jd.com/11846120.html
鐺鐺:http://product.dangdang.com/23838168.html
下一篇 CDQ分治&&整體二分