日本搞逼视频_黄色一级片免费在线观看_色99久久_性明星video另类hd_欧美77_综合在线视频

國內(nèi)最全I(xiàn)T社區(qū)平臺(tái) 聯(lián)系我們 | 收藏本站
阿里云優(yōu)惠2
您當(dāng)前位置:首頁 > php框架 > 框架設(shè)計(jì) > RocketMQ(二)集群配置

RocketMQ(二)集群配置

來源:程序員人生   發(fā)布時(shí)間:2016-07-02 13:12:06 閱讀次數(shù):6338次

RocketMQ網(wǎng)絡(luò)部署圖

這里寫圖片描述

RocketMQ 網(wǎng)絡(luò)部署特點(diǎn)

  • Name Server 是1個(gè)幾近無狀態(tài)節(jié)點(diǎn),可集群部署,節(jié)點(diǎn)之間無任何信息同步。

  • Broker 部署相對(duì)復(fù)雜,Broker 分為Master 與Slave,1個(gè)Master 可以對(duì)應(yīng)多個(gè)Slave,但是1個(gè)Slave > 只能對(duì)應(yīng)1個(gè)Master,Master 與Slave 的對(duì)應(yīng)關(guān)系通過指定相同的BrokerName,不同的BrokerId來定> 義,BrokerId為0 表示Master,非0 表示Slave。Master 也能夠部署多個(gè)。每一個(gè)Broker 與Name

  • Producer 與Name Server 集群中的其中1個(gè)節(jié)點(diǎn)(隨機(jī)選擇)建立長連接,定期從Name Server 取Topic 路由信息,并向提供Topic 服務(wù)的Master 建立長連接,且定時(shí)向Master 發(fā)送心跳。Producer 完全無> 狀態(tài),可集群部署。

  • Consumer 與Name Server 集群中的其中1個(gè)節(jié)點(diǎn)(隨機(jī)選擇)建立長連接,定期從Name Server 取Topic 路由信息,并向提供Topic 服務(wù)的Master、Slave 建立長連接,且定時(shí)向Master、Slave 發(fā)送心跳。Consumer既可以從Master 定閱消息,也能夠從Slave 定閱消息,定閱規(guī)則由Broker 配置決定。

Broker集群部署方式主要有以下幾種:(Slave 不可寫,但可讀)

單個(gè)Master

這類方式風(fēng)險(xiǎn)較大,1旦Broker 重啟或宕機(jī)時(shí),會(huì)致使全部服務(wù)不可用,不建議線上環(huán)境使用。


多Master模式

1個(gè)集群無 Slave,全是 Master,例如 2 個(gè) Master 或 3 個(gè) Master。

優(yōu)點(diǎn):配置簡單,單個(gè)Master 宕機(jī)或重啟保護(hù)對(duì)利用無影響,在磁盤配置為 RAID10 時(shí),即便機(jī)器宕機(jī)不可恢復(fù)情況下,由于RAID10 磁盤非常可靠,消息也不會(huì)丟(異步刷盤丟失少許消息,同步刷盤1條不丟)。性能最高。

缺點(diǎn):單臺(tái)機(jī)器宕機(jī)期間,這臺(tái)機(jī)器上未被消費(fèi)的消息在機(jī)器恢復(fù)之前不可定閱,消息實(shí)時(shí)性會(huì)遭到遭到影響。

先啟動(dòng) NameServer,例如機(jī)器 IP 為:192.168.1.101:9876

nohup sh mqnamesrv &
  • 在機(jī)器 A,啟動(dòng)第1個(gè) Master
nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &
  • 在機(jī)器 B,啟動(dòng)第2個(gè) Master
nohup sh mqbroker -n 192.168.1.102:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &

多Master多Slave模式,異步復(fù)制

每一個(gè) Master 配置1個(gè) Slave,有多對(duì)Master-Slave,HA 采取異步復(fù)制方式,主備有短暫消息延遲,毫秒級(jí)。

優(yōu)點(diǎn):即便磁盤破壞,消息丟失的非常少,且消息實(shí)時(shí)性不會(huì)受影響,由于 Master 宕機(jī)后,消費(fèi)者依然可以從 Slave 消費(fèi),此進(jìn)程對(duì)利用透明。不需要人工干預(yù)。性能同多 Master 模式幾近1樣。

缺點(diǎn):Master宕機(jī),磁盤破壞情況,會(huì)丟失少許消息。

先啟動(dòng)兩臺(tái)服務(wù)器的NameServer,例如機(jī)器 IP 為:192.168.1.101:9876 和192.168.1.102:9876

nohup sh mqnamesrv 1>$ROCKETMQ_HOME/log/ng.log 2>$ROCKETMQ_HOME/log/ng-error.log &
  • 在機(jī)器 A,啟動(dòng)第1個(gè) Master
nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties >$ROCKETMQ_HOME/log/mq.log &
  • 在機(jī)器 B,啟動(dòng)第2個(gè) Master
nohup sh mqbroker -n 192.168.1.102:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties >$ROCKETMQ_HOME/log/mq.log &
  • 在機(jī)器 C,啟動(dòng)第1個(gè) Slave
nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties >$ROCKETMQ_HOME/log/mq.log &
  • 在機(jī)器 D,啟動(dòng)第2個(gè) Slave
nohup sh mqbroker -n 192.168.1.102:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties >$ROCKETMQ_HOME/log/mq.log &

多Master多Slave模式,同步雙寫

每一個(gè) Master 配置1個(gè) Slave,有多對(duì)Master-Slave,HA 采取同步雙寫方式,主備都寫成功,向利用返回成功。

優(yōu)點(diǎn):數(shù)據(jù)與服務(wù)都無單點(diǎn),Master宕機(jī)情況下,消息無延遲,服務(wù)可用性與數(shù)據(jù)可用性都非常高

缺點(diǎn):性能比異步復(fù)制模式略低,大約低10%左右,發(fā)送單個(gè)消息的 RT 會(huì)略高。目前主宕機(jī)后,備機(jī)不能自動(dòng)切換為主機(jī),后續(xù)會(huì)支持自動(dòng)切換功能。

先啟動(dòng)兩臺(tái)服務(wù)器的NameServer,例如機(jī)器 IP 為:192.168.1.101:9876 和192.168.1.102:9876

nohup sh mqnamesrv 1>$ROCKETMQ_HOME/log/ng.log 2>$ROCKETMQ_HOME/log/ng-error.log &
  • 在機(jī)器 A,啟動(dòng)第1個(gè) Master
nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties >$ROCKETMQ_HOME/log/mq.log &
  • 在機(jī)器 B,啟動(dòng)第2個(gè) Master
nohup sh mqbroker -n 192.168.1.102:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties >$ROCKETMQ_HOME/log/mq.log &
  • 在機(jī)器 C,啟動(dòng)第1個(gè) Slave
nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties >$ROCKETMQ_HOME/log/mq.log &
  • 在機(jī)器 D,啟動(dòng)第2個(gè) Slave
nohup sh mqbroker -n 192.168.1.102:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties >$ROCKETMQ_HOME/log/mq.log &

以上 Broker 與 Slave 配對(duì)是通過指定相同的brokerName 參數(shù)來配對(duì),Master 的 BrokerId 必須是
0,Slave 的BrokerId 必須是大與 0 的數(shù)。另外1個(gè) Master 下面可以掛載多個(gè) Slave,同1 Master 下的多個(gè)
Slave 通過指定不同的 BrokerId 來辨別。

除此以外,nameserver也需要集群。

下面以配置1主1備(同步),2個(gè)nameserver為例測試。

1、環(huán)境兩臺(tái)機(jī)器:

  • 192.168.36.101 為主
  • 192.168.36.102 為備

同時(shí)在2臺(tái)機(jī)器個(gè)啟動(dòng)1個(gè)nameserver。安裝RocketMq請(qǐng)參考:
http://blog.csdn.net/zhu_tianwei/article/details/40948447

2、修改配置

(1)創(chuàng)建目錄

mkdir /usr/local/alibaba-rocketmq/log #創(chuàng)建日志目錄 mkdir -p /usr/local/alibaba-rocketmq/data/store/commitlog #創(chuàng)建數(shù)據(jù)存儲(chǔ)目錄

更改日志目錄

cd /usr/local/alibaba-rocketmq/conf sed -i 's#${user.home}#${user.home}/alibaba-rocketmq#g' *.xml

(2)修改主配置

vi ./conf/2m-2s-sync/broker-a.properties
#Broker所屬哪一個(gè)集群,默許【DefaultCluster】 brokerClusterName=DefaultCluster #本機(jī)主機(jī)名 brokerName=broker-a #BrokerId,必須是大等于0的整數(shù),0表示Master,>0表示Slave,1個(gè)Master可以掛多個(gè)Slave,Master與Slave通過BrokerName來配對(duì),默許【0】 brokerId=0 #刪除文件時(shí)間點(diǎn),默許清晨4點(diǎn) deleteWhen=04 #文件保存時(shí)間,默許48小時(shí) fileReservedTime=48 #Broker的角色 - ASYNC_MASTER 異步復(fù)制Master - SYNC_MASTER 同步雙寫Master - SLAVE brokerRole=SYNC_MASTER #刷盤方式 - ASYNC_FLUSH 異步刷盤 - SYNC_FLUSH 同步刷盤 flushDiskType=ASYNC_FLUSH #Name Server地址 namesrvAddr=192.168.1.101:9876;192.168.1.102:9876 #Broker對(duì)外服務(wù)的監(jiān)聽端口,默許【10911】 listenPort=10911 defaultTopicQueueNums=4 #是不是允許Broker自動(dòng)創(chuàng)建Topic,建議線下開啟,線上關(guān)閉,默許【true】 autoCreateTopicEnable=true #是不是允許Broker自動(dòng)創(chuàng)建定閱組,建議線下開啟,線上關(guān)閉,默許【true】 autoCreateSubscriptionGroup=true mapedFileSizeCommitLog=1073741824 mapedFileSizeConsumeQueue=50000000 destroyMapedFileIntervalForcibly=120000 redeleteHangedFileInterval=120000 diskMaxUsedSpaceRatio=88 storePathRootDir=/usr/local/alibaba-rocketmq/data/store storePathCommitLog=/usr/local/alibaba-rocketmq/data/store/commitlog maxMessageSize=65536 flushCommitLogLeastPages=4 flushConsumeQueueLeastPages=2 flushCommitLogThoroughInterval=10000 flushConsumeQueueThoroughInterval=60000 checkTransactionMessageEnable=false sendMessageThreadPoolNums=128 pullMessageThreadPoolNums=128

(3)修改備配置

vi ./conf/2m-2s-sync/broker-a-s.properties
#Broker所屬哪一個(gè)集群,默許【DefaultCluster】 brokerClusterName=DefaultCluster #本機(jī)主機(jī)名 brokerName=broker-a #BrokerId,必須是大等于0的整數(shù),0表示Master,>0表示Slave,1個(gè)Master可以掛多個(gè)Slave,Master與Slave通過BrokerName來配對(duì),默許【0】 brokerId=1 #刪除文件時(shí)間點(diǎn),默許清晨4點(diǎn) deleteWhen=04 #文件保存時(shí)間,默許48小時(shí) fileReservedTime=48 #Broker的角色 - ASYNC_MASTER 異步復(fù)制Master - SYNC_MASTER 同步雙寫Master - SLAVE brokerRole=SLAVE #刷盤方式 - ASYNC_FLUSH 異步刷盤 - SYNC_FLUSH 同步刷盤 flushDiskType=ASYNC_FLUSH #Name Server地址 namesrvAddr=192.168.1.101:9876;192.168.1.102:9876 #Broker對(duì)外服務(wù)的監(jiān)聽端口,默許【10911】 listenPort=10911 defaultTopicQueueNums=4 #是不是允許Broker自動(dòng)創(chuàng)建Topic,建議線下開啟,線上關(guān)閉,默許【true】 autoCreateTopicEnable=true #是不是允許Broker自動(dòng)創(chuàng)建定閱組,建議線下開啟,線上關(guān)閉,默許【true】 autoCreateSubscriptionGroup=true mapedFileSizeCommitLog=1073741824 mapedFileSizeConsumeQueue=50000000 destroyMapedFileIntervalForcibly=120000 redeleteHangedFileInterval=120000 diskMaxUsedSpaceRatio=88 storePathRootDir=/usr/local/alibaba-rocketmq/data/store storePathCommitLog=/usr/local/alibaba-rocketmq/data/store/commitlog maxMessageSize=65536 flushCommitLogLeastPages=4 flushConsumeQueueLeastPages=2 flushCommitLogThoroughInterval=10000 flushConsumeQueueThoroughInterval=60000 checkTransactionMessageEnable=false sendMessageThreadPoolNums=128 pullMessageThreadPoolNums=128

實(shí)例:

1.生產(chǎn)者Producer.java ,TransactionMQProducer使用

package com.somnus.rocketmq; import java.util.concurrent.TimeUnit; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.client.producer.TransactionMQProducer; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageExt; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { /** * 1個(gè)利用創(chuàng)建1個(gè)Producer,由利用來保護(hù)此對(duì)象,可以設(shè)置為全局對(duì)象或單例<br> * 注意:ProducerGroupName需要由利用來保證唯1,1類Producer集合的名稱,這類Producer通常發(fā)送1類消息, * 且發(fā)送邏輯1致<br> * ProducerGroup這個(gè)概念發(fā)送普通的消息時(shí),作用不大,但是發(fā)送散布式事務(wù)消息時(shí),比較關(guān)鍵, * 由于服務(wù)器會(huì)回查這個(gè)Group下的任意1個(gè)Producer */ final TransactionMQProducer producer = new TransactionMQProducer("ProducerGroupName"); // nameserver服務(wù) producer.setNamesrvAddr("172.16.235.77:9876;172.16.235.78:9876"); producer.setInstanceName("Producer"); /** * Producer對(duì)象在使用之前必須要調(diào)用start初始化,初始化1次便可<br> * 注意:切記不可以在每次發(fā)送消息時(shí),都調(diào)用start方法 */ producer.start(); // 服務(wù)器回調(diào)Producer,檢查本地事務(wù)分支成功還是失敗 producer.setTransactionCheckListener(new TransactionCheckListener() { public LocalTransactionState checkLocalTransactionState( MessageExt msg) { System.out.println("checkLocalTransactionState --" + new String(msg.getBody())); return LocalTransactionState.COMMIT_MESSAGE; } }); /** * 下面這段代碼表明1個(gè)Producer對(duì)象可以發(fā)送多個(gè)topic,多個(gè)tag的消息。 * 注意:send方法是同步調(diào)用,只要不拋異常就標(biāo)識(shí)成功。但是發(fā)送成功也可會(huì)有多種狀態(tài),<br> * 例如消息寫入Master成功,但是Slave不成功,這類情況消息屬于成功,但是對(duì)個(gè)別利用如果對(duì)消息可靠性要求極高,<br> * 需要對(duì)這類情況做處理。另外,消息可能會(huì)存在發(fā)送失敗的情況,失敗重試由利用來處理。 */ for (int i = 0; i < 10; i++) { try { { Message msg = new Message("TopicTest1", // topic "TagA", // tag "OrderID001", // key消息關(guān)鍵詞,多個(gè)Key用KEY_SEPARATOR隔開(查詢消息使用) ("Hello MetaQA").getBytes()); // body SendResult sendResult = producer.sendMessageInTransaction( msg, new LocalTransactionExecuter(){ public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { System.out.println("executeLocalTransactionBranch--msg=" + new String(msg.getBody())); System.out.println("executeLocalTransactionBranch--arg=" + arg); return LocalTransactionState.COMMIT_MESSAGE; } }, "$$$"); System.out.println(sendResult); } { Message msg = new Message("TopicTest2", // topic "TagB", // tag "OrderID0034", // key 消息關(guān)鍵詞,多個(gè)Key用KEY_SEPARATOR隔開(查詢消息使用) ("Hello MetaQB").getBytes()); // body SendResult sendResult = producer.sendMessageInTransaction( msg, new LocalTransactionExecuter(){ public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { System.out.println("executeLocalTransactionBranch--msg=" + new String(msg.getBody())); System.out.println("executeLocalTransactionBranch--arg=" + arg); return LocalTransactionState.COMMIT_MESSAGE; } }, "$$$"); System.out.println(sendResult); } { Message msg = new Message("TopicTest3", // topic "TagC", // tag "OrderID061", // key ("Hello MetaQC").getBytes()); // body SendResult sendResult = producer.sendMessageInTransaction( msg, new LocalTransactionExecuter(){ public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { System.out.println("executeLocalTransactionBranch--msg=" + new String(msg.getBody())); System.out.println("executeLocalTransactionBranch--arg=" + arg); return LocalTransactionState.COMMIT_MESSAGE; } }, "$$$"); System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); } TimeUnit.MILLISECONDS.sleep(1000); } /** * 利用退出時(shí),要調(diào)用shutdown來清算資源,關(guān)閉網(wǎng)絡(luò)連接,從MetaQ服務(wù)器上注銷自己 * 注意:我們建議利用在JBOSS、Tomcat等容器的退出鉤子里調(diào)用shutdown方法 */ // producer.shutdown(); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { producer.shutdown(); } })); System.exit(0); } // 履行本地事務(wù),由客戶端回調(diào) }

2、消費(fèi)者Consumer.java ,采取主動(dòng)拉取方式消費(fèi)。

package com.somnus.rocketmq; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; import com.alibaba.rocketmq.client.consumer.PullResult; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.message.MessageQueue; public class Consumer { // Java緩存 private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>(); /** * 主動(dòng)拉取方式消費(fèi) * * @throws MQClientException */ public static void main(String[] args) throws MQClientException { /** * 1個(gè)利用創(chuàng)建1個(gè)Consumer,由利用來保護(hù)此對(duì)象,可以設(shè)置為全局對(duì)象或單例<br> * 注意:ConsumerGroupName需要由利用來保證唯1 ,最好使用服務(wù)的包名辨別同1服務(wù),1類Consumer集合的名稱, * 這類Consumer通常消費(fèi)1類消息,且消費(fèi)邏輯1致 * PullConsumer:Consumer的1種,利用通常主動(dòng)調(diào)用Consumer的拉取消息方法從Broker拉消息,主動(dòng)權(quán)由利用控制 */ DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("ConsumerGroupName"); // //nameserver服務(wù) consumer.setNamesrvAddr("172.16.235.77:9876;172.16.235.78:9876"); consumer.setInstanceName("Consumber"); consumer.start(); // 拉取定閱主題的隊(duì)列,默許隊(duì)列大小是4 Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1"); for (MessageQueue mq : mqs) { System.out.println("Consume from the queue: " + mq); SINGLE_MQ: while (true) { try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); List<MessageExt> list = pullResult.getMsgFoundList(); if (list != null && list.size() < 100) { for (MessageExt msg : list) { System.out.println(new String(msg.getBody())); } } System.out.println(pullResult.getNextBeginOffset()); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static void putMessageQueueOffset(MessageQueue mq, long offset) { offseTable.put(mq, offset); } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = offseTable.get(mq); if (offset != null) { System.out.println(offset); return offset; } return 0; } }
生活不易,碼農(nóng)辛苦
如果您覺得本網(wǎng)站對(duì)您的學(xué)習(xí)有所幫助,可以手機(jī)掃描二維碼進(jìn)行捐贈(zèng)
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關(guān)閉
程序員人生
主站蜘蛛池模板: a级片免费视频 | 久久精品中文 | 久久精品日| 国产乱码精品一区二区三 | 欧美福利网站 | 亚洲一区二区三区在线电影 | 色综合欧美 | 亚洲女人天堂成人av在线 | 在线亚洲网站 | 国产日本一区二区 | 久久久久久亚洲精品视频 | 免费成人黄色 | 一区二区三区视频在线播放 | 成人福利| 综合精品久久 | 国产精一区 | 天堂a√在线 | 中文字幕日本在线观看 | 亚洲国产精品第一区二区 | 国内精品久久久 | 97av中文字幕 | 国产精品爱久久久久久久 | 国产一区二区免费播放 | 久久久国产一区二区三区四区小说 | 午夜免费激情 | 中文字幕黄色 | 午夜精品av | 第四色中文综合网 | 99re6这里只有精品视频在线观看 | 欧美视频一区二区 | 国产精品久久精品 | 成人免费在线观看 | 在线一区视频 | 午夜国产精品视频 | 午夜精品久久久久99热蜜桃导演 | 免费观看亚洲 | 日韩99 | 国内精品视频 | 欧美日韩国产大片 | 男女毛片 | 日韩在线视频一区 |