日本搞逼视频_黄色一级片免费在线观看_色99久久_性明星video另类hd_欧美77_综合在线视频

國(guó)內(nèi)最全I(xiàn)T社區(qū)平臺(tái) 聯(lián)系我們 | 收藏本站
阿里云優(yōu)惠2
您當(dāng)前位置:首頁(yè) > php開源 > php教程 > ActiveMQ源碼解析(四):聊聊消息的可靠傳輸機(jī)制和事務(wù)控制

ActiveMQ源碼解析(四):聊聊消息的可靠傳輸機(jī)制和事務(wù)控制

來(lái)源:程序員人生   發(fā)布時(shí)間:2016-06-30 08:57:02 閱讀次數(shù):4571次

在消息傳遞的進(jìn)程中,某些情況下比如網(wǎng)絡(luò)閃斷、丟包等會(huì)致使消息永久性丟失,這時(shí)候消費(fèi)者是接收不到消息的,這樣就會(huì)造成數(shù)據(jù)不1致的問(wèn)題。那末我們?cè)鯓硬拍鼙WC消息1定能發(fā)送給消費(fèi)者呢?怎樣才能避免數(shù)據(jù)不1致呢?又比如我們發(fā)送多條消息,有時(shí)候我們期望都發(fā)送成功但實(shí)際上其中1部份發(fā)送成功,另外一部份發(fā)送失敗了,沒到達(dá)我們的預(yù)期效果,那末我們?cè)鯓咏鉀Q這個(gè)問(wèn)題呢?

前1種問(wèn)題我們通過(guò)消息確認(rèn)機(jī)制來(lái)解決,它分為幾種模式,需要在創(chuàng)建session時(shí)指定是不是開啟事務(wù)和確認(rèn)模式,像下面這樣:

<span style="font-size:12px;">ActiveMQSession session = connection.createSession(true,Session.CLIENT_ACKNOWLEDGE);</span>
        然后我們來(lái)看在PubSubscribe模式下消息的全部從發(fā)送到消費(fèi)確認(rèn)的流程來(lái)了解消息的確認(rèn)機(jī)制和事務(wù)。首先看看producer怎樣發(fā)送消息的:

public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException { //檢查session狀態(tài),如果session已關(guān)閉則拋出狀態(tài)異常 checkClosed(); //檢查destination類型,如果不符合要求就轉(zhuǎn)變成ActiveMQDestination if (destination == null) { if (info.getDestination() == null) { throw new UnsupportedOperationException("A destination must be specified."); } throw new InvalidDestinationException("Don't understand null destinations"); } ActiveMQDestination dest; if (destination.equals(info.getDestination())) { dest = (ActiveMQDestination)destination; } else if (info.getDestination() == null) { dest = ActiveMQDestination.transform(destination); } else { throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName()); } if (dest == null) { throw new JMSException("No destination specified"); } if (transformer != null) { //把各種不同的message轉(zhuǎn)換成ActiveMQMessage Message transformedMessage = transformer.producerTransform(session, this, message); if (transformedMessage != null) { message = transformedMessage; } } if (producerWindow != null) { try { producerWindow.waitForSpace(); } catch (InterruptedException e) { throw new JMSException("Send aborted due to thread interrupt."); } } //發(fā)送消息到broker中的topic this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete); //消息計(jì)數(shù) stats.onMessage(); }

        我們以ActiveMQSession的send為例再來(lái)看看session是怎樣發(fā)送消息的:

protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException { //檢查session狀態(tài)如果closed拋出狀態(tài)異常 checkClosed(); if (destination.isTemporary() && connection.isDeleted(destination)) { throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); } //競(jìng)爭(zhēng)鎖(互斥信號(hào)量),如果1個(gè)session的多個(gè)producer發(fā)送消息這里會(huì)保證有序性 synchronized (sendMutex) { // tell the Broker we are about to start a new transaction //告知broker開始1個(gè)新事務(wù),只有session的確認(rèn)模式是SESSION_TRANSACTED時(shí)事務(wù)上下網(wǎng)才會(huì)開啟事務(wù) doStartTransaction(); //從事務(wù)上下文中獲得事務(wù)id TransactionId txid = transactionContext.getTransactionId(); long sequenceNumber = producer.getMessageSequence(); //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11 //在jms協(xié)議頭中設(shè)置傳輸模式即消息是不是需要持久化 message.setJMSDeliveryMode(deliveryMode); long expiration = 0L; //檢查producer中的message是不是過(guò)期 if (!producer.getDisableMessageTimestamp()) { //message獲得時(shí)間戳 long timeStamp = System.currentTimeMillis(); message.setJMSTimestamp(timeStamp); //設(shè)置過(guò)期時(shí)間 if (timeToLive > 0) { expiration = timeToLive + timeStamp; } } //設(shè)置消息過(guò)期時(shí)間 message.setJMSExpiration(expiration); //設(shè)置消息優(yōu)先級(jí) message.setJMSPriority(priority); //設(shè)置消息是非重發(fā)的 message.setJMSRedelivered(false); // transform to our own message format here //將消息轉(zhuǎn)化成ActiveMQMessage,message針對(duì)不同的數(shù)據(jù)格式有很多種,比如map message,blob message ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); //設(shè)置目的地,這里是1個(gè)topic msg.setDestination(destination); //設(shè)置消息id msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); // Set the message id. //如果消息是經(jīng)過(guò)轉(zhuǎn)換的,那末原消息更新新的id if (msg != message) { message.setJMSMessageID(msg.getMessageId().toString()); // Make sure the JMS destination is set on the foreign messages too. //設(shè)置目的地 message.setJMSDestination(destination); } //clear the brokerPath in case we are re-sending this message //清除brokerpath msg.setBrokerPath(null); //設(shè)置事務(wù)id msg.setTransactionId(txid); // if (connection.isCopyMessageOnSend()) { msg = (ActiveMQMessage)msg.copy(); } //設(shè)置連接器 msg.setConnection(connection); //把消息的屬性和消息體都設(shè)置為只讀,避免被修改 msg.onSend(); //生產(chǎn)者id msg.setProducerId(msg.getMessageId().getProducerId()); if (LOG.isTraceEnabled()) { LOG.trace(getSessionId() + " sending message: " + msg); } //如果onComplete沒設(shè)置,且發(fā)送超時(shí)時(shí)間小于0,且消息不需要反饋,且連接器不是同步發(fā)送模式,且(消息非持久化或連接器是異步發(fā)送模式或存在事務(wù)id的情況下)異步發(fā)送,否則同步發(fā)送 if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { //異步發(fā)送走transport的oneway通道 this.connection.asyncSendPacket(msg); if (producerWindow != null) { // Since we defer lots of the marshaling till we hit the // wire, this might not // provide and accurate size. We may change over to doing // more aggressive marshaling, // to get more accurate sizes.. this is more important once // users start using producer window // flow control. int size = msg.getSize(); producerWindow.increaseUsage(size); } } else { if (sendTimeout > 0 && onComplete==null) { //同步發(fā)送走transport的request和asyncrequest通道 this.connection.syncSendPacket(msg,sendTimeout); }else { this.connection.syncSendPacket(msg, onComplete); } } } }

       這樣消息就被發(fā)送到broker的topic中了,接下來(lái)broker中會(huì)根據(jù)topic下的subscriber的id找出定閱者,并向這些消費(fèi)者發(fā)送消息,消費(fèi)者接收到消息后會(huì)消費(fèi)消息,我們接下來(lái)看看消費(fèi)者怎樣消費(fèi)消息的。

      下面是ActiveMQConsumer和ActiveMQSession中的方法,session沒創(chuàng)建1個(gè)consumer便可能會(huì)重啟session線程,session線程的run中會(huì)調(diào)用message的listener中的onMessage方法       

public void setMessageListener(MessageListener listener) throws JMSException { checkClosed(); if (info.getPrefetchSize() == 0) { throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1"); } if (listener != null) { boolean wasRunning = session.isRunning(); //停止session線程 if (wasRunning) { session.stop(); } this.messageListener.set(listener); //session重新分發(fā)未消費(fèi)的message session.redispatch(this, unconsumedMessages); //開啟session線程 if (wasRunning) { session.start(); } } else { this.messageListener.set(null); } }
public void run() { MessageDispatch messageDispatch; while ((messageDispatch = executor.dequeueNoWait()) != null) { final MessageDispatch md = messageDispatch; final ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); MessageAck earlyAck = null; //如果消息過(guò)期創(chuàng)建新的確認(rèn)消息 if (message.isExpired()) { earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1); earlyAck.setFirstMessageId(message.getMessageId()); } else if (connection.isDuplicate(ActiveMQSession.this, message)) { LOG.debug("{} got duplicate: {}", this, message.getMessageId()); earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); earlyAck.setFirstMessageId(md.getMessage().getMessageId()); earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this)); } //如果消息已過(guò)期,或消息有沖突則發(fā)送確認(rèn)消息重新開始while循環(huán) if (earlyAck != null) { try { asyncSendPacket(earlyAck); } catch (Throwable t) { LOG.error("error dispatching ack: {} ", earlyAck, t); connection.onClientInternalException(t); } finally { continue; } } //如果是確認(rèn)模式是CLIENT_ACKNOWLEDGE或INDIVIDUAL_ACKONWLEDGE則設(shè)置空回調(diào)函數(shù),這樣consumer確認(rèn)消息后會(huì)履行回調(diào)函數(shù) if (isClientAcknowledge()||isIndividualAcknowledge()) { message.setAcknowledgeCallback(new Callback() { @Override public void execute() throws Exception { } }); } //在發(fā)送前調(diào)用途理函數(shù) if (deliveryListener != null) { deliveryListener.beforeDelivery(this, message); } //設(shè)置delivery id md.setDeliverySequenceId(getNextDeliveryId()); lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId(); final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); final AtomicBoolean afterDeliveryError = new AtomicBoolean(false); /* * The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched. * We dont want the after deliver being called after the redeliver as it may cause some weird stuff. * */ synchronized (redeliveryGuard) { try { ack.setFirstMessageId(md.getMessage().getMessageId()); //如果是事務(wù)模式則開啟事務(wù) doStartTransaction(); ack.setTransactionId(getTransactionContext().getTransactionId()); if (ack.getTransactionId() != null) { //事務(wù)狀態(tài)下添加1個(gè)匿名同步器,用于處理同步事務(wù)比如回滾 getTransactionContext().addSynchronization(new Synchronization() { final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get()); @Override public void beforeEnd() throws Exception { // validate our consumer so we don't push stale acks that get ignored if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection); throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection); } LOG.trace("beforeEnd ack {}", ack); sendAck(ack); } @Override public void afterRollback() throws Exception { LOG.trace("rollback {}", ack, new Throwable("here")); // ensure we don't filter this as a duplicate connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage()); // don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect if (clearRequestsCounter.get() > clearRequestCount) { LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport()); return; } // validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport()); return; } RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy(); int redeliveryCounter = md.getMessage().getRedeliveryCounter(); if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) { // We need to NACK the messages so that they get // sent to the // DLQ. // Acknowledge the last message. MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); ack.setFirstMessageId(md.getMessage().getMessageId()); ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy)); asyncSendPacket(ack); } else { MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1); ack.setFirstMessageId(md.getMessage().getMessageId()); asyncSendPacket(ack); // Figure out how long we should wait to resend // this message. long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); for (int i = 0; i < redeliveryCounter; i++) { redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); } /* * If we are a non blocking delivery then we need to stop the executor to avoid more * messages being delivered, once the message is redelivered we can restart it. * */ if (!connection.isNonBlockingRedelivery()) { LOG.debug("Blocking session until re-delivery..."); executor.stop(); } connection.getScheduler().executeAfterDelay(new Runnable() { @Override public void run() { /* * wait for the first delivery to be complete, i.e. after delivery has been called. * */ synchronized (redeliveryGuard) { /* * If its non blocking then we can just dispatch in a new session. * */ if (connection.isNonBlockingRedelivery()) { ((ActiveMQDispatcher) md.getConsumer()).dispatch(md); } else { /* * If there has been an error thrown during afterDelivery then the * endpoint will be marked as dead so redelivery will fail (and eventually * the session marked as stale), in this case we can only call dispatch * which will create a new session with a new endpoint. * */ if (afterDeliveryError.get()) { ((ActiveMQDispatcher) md.getConsumer()).dispatch(md); } else { executor.executeFirst(md); executor.start(); } } } } }, redeliveryDelay); } md.getMessage().onMessageRolledBack(); } }); } LOG.trace("{} onMessage({})", this, message.getMessageId()); //觸發(fā)消息事件監(jiān)聽函數(shù) messageListener.onMessage(message); } catch (Throwable e) { LOG.error("error dispatching message: ", e); // A problem while invoking the MessageListener does not // in general indicate a problem with the connection to the broker, i.e. // it will usually be sufficient to let the afterDelivery() method either // commit or roll back in order to deal with the exception. // However, we notify any registered client internal exception listener // of the problem. connection.onClientInternalException(e); } finally { //發(fā)送確認(rèn)消息 if (ack.getTransactionId() == null) { try { asyncSendPacket(ack); } catch (Throwable e) { connection.onClientInternalException(e); } } } //觸發(fā)投遞事件監(jiān)聽函數(shù) if (deliveryListener != null) { try { deliveryListener.afterDelivery(this, message); } catch (Throwable t) { LOG.debug("Unable to call after delivery", t); afterDeliveryError.set(true); throw new RuntimeException(t); } } } /* * this can be outside the try/catch as if an exception is thrown then this session will be marked as stale anyway. * It also needs to be outside the redelivery guard. * */ try { executor.waitForQueueRestart(); } catch (InterruptedException ex) { connection.onClientInternalException(ex); } } }

總結(jié)

      消息確認(rèn)機(jī)制

      消費(fèi)者和broker通訊終究實(shí)現(xiàn)消息確認(rèn),消息確認(rèn)機(jī)制1共有5種,4種jms的和1種activemq補(bǔ)充的,AUTO_ACKNOWLEDGE(自動(dòng)確認(rèn))、CLIENT_ACKNOWLEDGE(客戶確認(rèn))、DUPS_OK_ACKNOWLEDGE(批量確認(rèn))、SESSION_TRANSACTED(事務(wù)確認(rèn))、INDIVIDUAL_ACKNOWLEDGE(單條確認(rèn)),consumer在不同的模式下會(huì)發(fā)不同的命令到broker,broker再根據(jù)不同的命令進(jìn)行操作,如果consumer正常發(fā)送ack命令給broker,broker會(huì)從topic移除消息并燒毀,如果未從消費(fèi)者接遭到確認(rèn)命令,broker會(huì)將消息轉(zhuǎn)移到dlq隊(duì)列(dead letter queue),并根據(jù)delivery mode進(jìn)行重試或報(bào)異常。

       消息事務(wù)

       消息事務(wù)是在生產(chǎn)者producer到broker或broker到consumer進(jìn)程中同1個(gè)session中產(chǎn)生的,保證幾條消息在發(fā)送進(jìn)程中的原子性??梢栽赾onnection的createSession方法中指定1個(gè)布爾值開啟,如果消息確認(rèn)機(jī)制是事務(wù)確認(rèn),那末在發(fā)送message的進(jìn)程中session就會(huì)開啟事務(wù)(實(shí)際上broker的),不用用戶顯示調(diào)用 beginTransaction,這時(shí)候所有通過(guò)session發(fā)送的消息都被緩存下來(lái),用戶調(diào)用session.commit時(shí)會(huì)發(fā)送所有消息,當(dāng)發(fā)送出現(xiàn)異常時(shí)用戶可以調(diào)用rollback進(jìn)行回滾操作,只有在開啟事務(wù)狀態(tài)下有效。  最后附上1張他人畫的activemq消息處理的流轉(zhuǎn)圖。
  


生活不易,碼農(nóng)辛苦
如果您覺得本網(wǎng)站對(duì)您的學(xué)習(xí)有所幫助,可以手機(jī)掃描二維碼進(jìn)行捐贈(zèng)
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關(guān)閉
程序員人生
主站蜘蛛池模板: 亚洲三级免费电影 | 激情欧美日韩一区二区 | 最污网站 | 精品在线一区二区三区 | 精品av | 欧美插插视频 | 国产日韩视频 | 亚洲成色999久久网站 | 日本久久精品视频 | 欧美久久成人 | 国产av毛片 | 亚洲欧洲视频在线观看 | 亚洲影院一区 | 爱爱免费 | 色网在线免费观看 | 午夜国产一区 | 国产在线一区二区 | 天堂在线精品 | 一区二区三区在线视频播放 | 国产伦精品一区二区三区视频孕妇 | 国产精品日本一区二区不卡视频 | 黄网址在线免费观看 | 午夜视频在线免费观看 | 中文字幕亚洲一区二区三区 | 亚洲精品日韩综合观看成人91 | 在线理论视频 | 欧美日韩精品在线观看 | 亚洲自拍偷拍网站 | 国产一区av在线 | 欧美乱子伦 | 免费av黄| 成人在线免费电影 | 欧美日韩亚洲成人 | 国产精品久久久久久久免费软件 | 久久国产午夜 | 国产一区二区视频免费观看 | 国产精品一区在线播放 | 久久久久综合 | 国产精品zjzjzj在线观看 | 亚洲综合一区二区 | 贼王1995|