日本搞逼视频_黄色一级片免费在线观看_色99久久_性明星video另类hd_欧美77_综合在线视频

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > php開源 > php教程 > ActiveMq--學習日記

ActiveMq--學習日記

來源:程序員人生   發布時間:2016-07-04 12:08:21 閱讀次數:2440次

activeMq 簡介

ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是1個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現,雖然JMS規范出臺已是很久的事情了,但是JMS在現今的J2EE利用中間依然扮演著特殊的地位。

特性:

  • 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。利用協議:OpenWire,Stomp REST,WS Notification,XMPP,AMQP完全支持JMS1.1和J2EE 1.4規范 (持久化,XA消息,事務)

  • 對Spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統里面去,而且也支持Spring2.0的特性

  • 通過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5
    resource adaptors的配置,可讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業服務器

  • 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

  • 支持通過JDBC和journal提供高速的消息持久化

  • 從設計上保證了高性能的集群,客戶端-服務器,點對點

  • 支持Ajax

  • 支持與Axis的整合

  • 可以很容易得調用內嵌JMS provider,進行測試

JMS簡介

JMS源于企業利用對消息中間件的需求,使利用程序可以通過消息進行異步處理而互不影響。Sun公司和它的合作火伴設計的JMS API定義了1組公共的利用程序接口和相應語法,使得Java程序能夠和其他消息組件進行通訊。JMS有4個組成部份:JMS服務提供者、消息管理對象、消息的生產者消費者和消息本身。

1. JMS服務提供者

JMS服務提供者實現消息隊列和通知,同時實現消息管理的API。JMS已是J2EE API的1部份,J2EE服務器都提供JMS服務。

2. 消息管理對象

消息管理對象提供對消息進行操作的API。JMS AP中有兩個消息管理對象:創建jms連接使用的工廠(ConnectionFactory)和目的地(Destination),根據消息的消費方式的不同ConnectionFactory可以分為QueueConnectionFactory和TopicConnectionFactory,目的地(Destination)可以分為隊列(Queue)和主題(Topic)兩種。

3. 消息的生產者消費者

消息的產生由JMS的客戶端完成,JMS服務提供者負責管理這些消息,消息的消費者可以接收消息。消息的生產者可以分為――點對點消息發布者(P2P)和主題消息發布者(TopicPublisher)。所以,消息的消費者分為兩類:主題消息的定閱者(TopicSubscriber)和點對點消息的接收者(queue receiver)。

4. JMS消息

JMS消息是服務提供者和客戶端之間傳遞信息所使用的信息單元。JMS消息由以下3部份組成:消息頭(header)、屬性(property)和消息體(body)。

5. 消息標頭

消息標頭是消息的信封,包括為使消息到達目的地所需要的所有信息,可以直接控制其中1些字段的值,其它值則由JMS提供程序填寫。

JMS消息頭包括了許多字段,它們是消息發送后由JMS提供者或消息發送者產生,用來表示消息、設置優先權和失效時間等等,并且為消息肯定路由。

JMSDestination: 由Send方法設置。指定消息的目的地,由JMS提供程序填寫

JMSDeliveryMode: 由Send方法設置。提交消息的模式-延續或非延續。發送消息后JMS提供程序填寫該字段。

JMSMessageID: 由Send方法設置。包括消息的唯1標識符。發送進程中由JMS提供程序填寫

JMSTimeStamp: 由Send 方法設置。記錄消息被傳遞給send方法的時間。發送進程中由JMS提供程序填寫

JMSCorrelationID: 由客戶端設置。包括用于將消息連接在1起的ID??蛻舳?般將其置為所援用消息的ID

JMSReplyTo: 由客戶端設置。響應消息的目的地,如果客戶端期望得到響應消息,則填寫該字段

JMSRedelivered: 由JMS提供程序設置。指出該消息先前被發送過

JMSType: 由客戶端設置。包括由客戶端提供的消息類型標識符。是不是需要該字段,不同的提供程序有不同要求

JMSExpiration: Send 方法設置。1個根據客戶端提供的年齡計算出來的值,如果GMT比該過期值晚,則燒毀消息

JMSPriority: Send 方法設置。包括客戶端在發送消息時所設置有限級值

5. 消息屬性

消息屬性,用來添加刪除消息頭之外的附加信息。除上面的屬性,還可以自定義屬性,以便進行消息的選擇 。1般通過setXXXProperty方法來定義消息屬性,XXX取值為:Boolean、Byte、Double、Float、Int、Long、Object、Short及String。每屬性均由字符串名字和相干的值組成 ,例如:

TextMessage msg = tsession.createTextMessage();

msg.setStringProperty(“CUSTOMER_NAME”,”MyCustomer”);

String customer = msg.getStringProperty(“CUSTOMER_NAME”);

其中的”CUSTOMER_NAME”和”MyCustomer”就是消息當中對應的key和value。

6. 消息主體

消息主體包括了消息的核心數據。

JMS 定義了5中消息類型: TextMessage、MapMessage、BytesMessage、

StreamMessage和ObjectMessage

選擇最適合的消息類型可使JMS最有效 的處理消息。

  • TextMessage(文本消息)

將數據作為簡單字符串寄存在主體中(XML就能夠作為字符串發)

TextMessage msg = session.createTextMessage();

msg.setText(text);

有些廠商支持1種XML專用的消息格式,帶來了便利,但是否是標準的JMS類型,影響移植性。只自己定義了兩個方法setText(String s)、getText()

  • MapMessage(映照消息)

使用1張映照表來寄存其主體內容(參照Jms API)

MapMessage msg = session.createMapMessage();

msg.setString(“CUSTOMER_NAME”,”John”);

msg.setInt(“CUSTOMER_AGE”,12);

String s = msg.getString(“CUSTOMER_NAME”);

int age = msg.getInt(“CUSTOMER_AGE”);

  • BytesMessage(字節消息)

將字節流寄存在消息主體中。合適于以下情況:必須緊縮發送的大量數據、需要與現有

消息格式保持1致等(參照Jms API)

byte[] data;

BytesMessage msg = session.createBytesMessage();

msg.wirte(data);

byte[] msgData = new byte[256];

int bytesRead = msg.readBytes(msgData);

  • StreamMessage(流消息)

用于處理原語類型。這里也支持屬性字段和MapMessage所支持的數據類型。使用這類

消息格式時,收發雙方事前協商好字段的順序,以保證寫讀順序相同(參照Jms API)

StringMessage msg = session.createStreamMessage();

msg.writeString(“John”);

msg.writeInt(12);

String s = msg.readString();

Int age = msg.readInt();

(PS:個人認為有點像socket的信息收發)

  • ObjectMessage(對象消息)

用于往消息中寫入可序列化的對象。

消息中可以寄存1個對象,如果要寄存多個對象,需要建立1個對象集合,然后把這個集合寫入消息。

客戶端接收到1個ObjectMessage時,是read-only模式。如果1個客戶端試圖寫message,將會拋出MessageNotWriteableException。如果調用了clearBody方法,message既可以讀又可以寫自己只單獨定義了兩個方法:getObject()和setObject(Serializable s)

ObjectMessage包括的只是object的1個快照,set以后object的修改對ObjectMessage的body無效 (從兩個方法可以看出,這類消息已強迫要你實現java.io. Serializable接口)

Message只讀時被set拋出MessageNotWriteableException;

set和get時,如果對象序列化失敗拋出MessageFormatException

7. 消息的通訊方式(點對點通訊和發布/定閱方式)

點對點方式(point-to-point)

點對點的消息發送方式主要建立在 Message Queue、Sender、Receiver上,

Message Queue 存貯消息,Sender 發送消息,Receiver接收消息.具體點就是Sender Client發送Message Queue ,而 Receiver Client從Queue中接收消息和”發送消息已接受”到Queue,確認消息接收。消息發送客戶端與接收客戶端沒有時間上的依賴,發送客戶端可以在任什么時候刻發送信息到Queue,而不需要知道接收客戶端是否是在運行。
發布/定閱方式(publish/subscriber Messaging)

發布/定閱方式用于多接收客戶真個方式.作為發布定閱的方式,可能存在多個接收客戶

端,并且接收端客戶端與發送客戶端存在時間上的依賴。1個接收端只能接收他創建以后發送客戶端發送的信息。作為subscriber ,在接收消息時有兩種方法,destination的receive方法,和實現message listener 接口的onMessage 方法。

編程模式

消息產生者向JMS發送消息的步驟

  1. 創建連接使用的工廠類JMS ConnectionFactory
  2. 使用管理對象JMS ConnectionFactory建立連接Connection
  3. 使用連接Connection建立會話Session
  4. 使用會話Session和管理對象Destination創建消息生產者MessageSender
  5. 使用消息生產者MessageSender發送消息

消息消費者從JMS接受消息的步驟

  1. 創建連接使用的工廠類JMS ConnectionFactory
  2. 使用管理對象JMS ConnectionFactory建立連接Connection
  3. 使用連接Connection 建立會話Session
  4. 使用會話Session和管理對象Destination創建消息消費者MessageReceiver
  5. 使用消息消費者MessageReceiver接受消息,需要用setMessageListener將MessageListener接口綁定到MessageReceiver 消息消費者必須實現了MessageListener接口,需要定義onMessage事件方法。

ActiveMQ運行

ActiveMQ5.3版本默許啟動時,啟動了內置的jetty服務器,提供1個demo利用和用于監控ActiveMQ的admin利用。運行%activemq_home%bin/目錄下的 activemq.bat , 以后你會看見以下1段話表示啟動成功。

打開http://localhost:8161/admin/ ,可以查看消息隊列的管理控臺,以下截圖:
這里寫圖片描述

點對點通訊模式demo

消息發送者:

package com.ainong.demo.p2pqueue; import javax.jms.DeliveryMode; import javax.jms.MapMessage; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSession; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import com.alibaba.fastjson.JSONObject; /** * * <b>function:</b> Queue 方式消息發送者 * * @author dong.gang * */ public class QueueSender { // tcp 地址 public static final String BROKER_URL = "tcp://localhost:61616"; // 目標,在ActiveMQ管理員控制臺創建 http://localhost:8161/admin/queues.jsp public static final String DESTINATION_PAYMENT = "mq.queue.payment"; public static final String DESTINATION_QUERY = "mq.queue.query"; public QueueSender() { } /** * * <b>function:</b> 發送消息 * * @throws Exception */ public void sendMessage(QueueSession session, javax.jms.QueueSender sender) throws Exception { for (int i = 0; i < SEND_NUM; i++) { JSONObject msgJson = new JSONObject(); msgJson.put("seqId", i+1); msgJson.put("content", "發送第" + (i + 1) + "條消息"); MapMessage map = session.createMapMessage(); map.setString("msg", msgJson.toJSONString()); map.setLong("time", System.currentTimeMillis()); System.out.println(map); sender.send(map); } } public void run(String mode) throws Exception { QueueConnection connection = null; QueueSession session = null; try { // 創建鏈接工廠 QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL); // 通過工廠創建1個連接 connection = factory.createQueueConnection(); // 啟動連接 connection.start(); // 創建1個session會話 session = connection.createQueueSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); String queueDis = "1".equals(mode) ? DESTINATION_PAYMENT : DESTINATION_QUERY; // 創建1個消息隊列 Queue queue = session.createQueue(queueDis); // 創建消息發送者 javax.jms.QueueSender sender = session.createSender(queue); // 設置持久化模式 sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); sendMessage(session, sender); // 提交會話 session.commit(); } catch (Exception e) { throw e; } finally { // 關閉釋放資源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } public static void main(String[] args) throws Exception { QueueSender sender = new QueueSender(); // sender.run("1"); sender.run("2"); } }

消息接收者:

package com.ainong.demo.p2pqueue; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSession; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import com.alibaba.fastjson.JSONObject; /** * * <b>function:</b> 消息接收者 * * @author donggang * */ public class QueueReceiver { // tcp 地址 public static final String BROKER_URL = "tcp://localhost:61616"; // 目標,在ActiveMQ管理員控制臺創建 http://localhost:8161/admin/queues.jsp public static final String TARGET_PAYMENT = "mq.queue.payment"; public static final String TARGET_QUERY = "mq.queue.query"; public void run(String mode) throws Exception { QueueConnection connection = null; QueueSession session = null; try { // 創建鏈接工廠 QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL); // 通過工廠創建1個連接 connection = factory.createQueueConnection(); // 啟動連接 connection.start(); // 創建1個session會話 session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 創建1個消息隊列 String queueTarg = "1".equals(mode) ? TARGET_PAYMENT : TARGET_QUERY; Queue queue = session.createQueue(queueTarg); // 創建消息制作者 javax.jms.QueueReceiver receiver = session.createReceiver(queue); receiver.setMessageListener(new MessageListener() { public void onMessage(Message msg) { if (msg != null) { MapMessage map = (MapMessage) msg; try { JSONObject jsonObj = JSONObject.parseObject(map.getString("msg")); if ("3".equals(jsonObj.getString("seqId"))) { System.out.println(map.getLong("time") + "接收#" + map.getString("msg")); } } catch (JMSException e) { e.printStackTrace(); } } } }); // 提交會話 session.commit(); // 休眠1s再關閉 Thread.sleep(1000); } catch (Exception e) { throw e; } finally { // 關閉釋放資源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } public static void main(String[] args) throws Exception { QueueReceiver receiver = new QueueReceiver(); // receiver.run("1"); receiver.run("2"); } }

其實上邊的消息接收者已集成了消息監聽類,如果我們需要分離業務操作,可以 receiver.setMessageListener()參數中設為我們的業務監聽處理類(需要實現MessageListener類):

package com.ainong.demo.p2pqueue; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; public class QueueMsgListenner implements MessageListener { @Override public void onMessage(Message msg) { if (msg != null) { MapMessage map = (MapMessage) msg; try { System.out.println(map.getLong("time") + "接收#" + map.getString("id000")); } catch (JMSException e) { e.printStackTrace(); } } } }

發布/定閱模式 demo

消息發布者:

package com.ainong.demo.publish_subscrib; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMapMessage; import com.alibaba.fastjson.JSONObject; public class Publisher { protected static String brokerURL = "tcp://localhost:61616"; protected static transient ConnectionFactory factory; protected transient Connection connection; protected transient Session session; protected transient MessageProducer producer; public Publisher() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); try { // 持久化定閱(消息會保存,特定的消費者可以1段時間落后行消費) connection.setClientID("client1"); connection.start(); } catch (JMSException jmse) { connection.close(); throw jmse; } session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(null); // 發送消息時用使用持久模式(不設置,默許就是持久的) producer.setDeliveryMode(DeliveryMode.PERSISTENT); } public void close() throws JMSException { if (connection != null) { connection.close(); } } protected void sendMessage(JSONObject msg) throws JMSException { Destination destination = session.createTopic("demo"); Message message = createMessage(msg, session); System.out.println("消息發送: " + ((ActiveMQMapMessage) message).getContentMap() + " on destination: " + destination); producer.send(destination, message); } protected Message createMessage(JSONObject msg, Session session) throws JMSException { MapMessage message = session.createMapMessage(); message.setString("id", msg.getString("id")); message.setString("content", msg.getString("content")); return message; } public static void main(String[] args) throws JMSException, InterruptedException { Publisher publisher = new Publisher(); for (int i = 0; i < 3; i++) { JSONObject msgObject = new JSONObject(); msgObject.put("id", "msg00" + i); msgObject.put("content", "第" + i + "條消息"); publisher.sendMessage(msgObject); Thread.sleep(3000); } publisher.close(); } }

消息定閱者1(持久化定閱)

package com.ainong.demo.publish_subscrib; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; /** * 持久化定閱模式(消息會保存,消費者可以在任意時間消費消息) * * @author DG * */ public class Consumer1 { private static String brokerURL = "tcp://localhost:61616"; private static transient ConnectionFactory factory; private transient Connection connection; private transient Session session; public Consumer1() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.setClientID("client1"); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } public void close() throws JMSException { if (connection != null) { connection.close(); } } public Session getSession() { return session; } public static void main(String[] args) throws JMSException { Consumer1 consumer = new Consumer1(); Topic topic = consumer.getSession().createTopic("demo"); // 普通定閱 // MessageConsumer messageConsumer = // consumer.getSession().createConsumer( // destination); // 持久化定閱 MessageConsumer messageConsumer =consumer.getSession().createDurableSubscriber(topic,"client1"); //持久定閱 messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { MapMessage map = (MapMessage) message; String id = map.getString("id"); String content = map.getString("content"); System.out.println("消費者1,消息接收:id = " + id + ";content = " + content); } catch (Exception e) { e.printStackTrace(); } } }); } }

常規定閱者:

package com.ainong.demo.publish_subscrib; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; /** * 常規定閱模式(消費者必須在消息生產前就已啟動監聽消息,否則錯過消息以后,消費就會失效,不會被處理) * * @author DG * */ public class Consumer2 { private static String brokerURL = "tcp://localhost:61616"; private static transient ConnectionFactory factory; private transient Connection connection; private transient Session session; public Consumer2() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } public void close() throws JMSException { if (connection != null) { connection.close(); } } public Session getSession() { return session; } public static void main(String[] args) throws JMSException { Consumer2 consumer = new Consumer2(); Destination destination = consumer.getSession().createTopic("demo"); MessageConsumer messageConsumer = consumer.getSession().createConsumer( destination); messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { MapMessage map = (MapMessage) message; String id = map.getString("id"); String content = map.getString("content"); System.out.println("消費者2:消息接收:id = " + id + ";content = " + content); } catch (Exception e) { e.printStackTrace(); } } }); } }

上面只是我初識activemq時的1些心得,附代碼源碼:
http://download.csdn.net/detail/donggang1992/9561041

生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 九九九精品视频 | 台湾av| 午夜伦理影院 | 亚洲成人av一区二区 | 曰批视频在线观看 | 国产一区二区免费在线观看 | 色婷婷综合成人 | 九九热在线免费视频 | 美女网站视频在线观看 | 叼嘿视频91 | 久久久久久久成人 | 天堂av资源网| 免费国产精品视频 | 午夜精品美女久久久久av福利 | 2015成人永久免费视频 | 日韩精品久久一区 | 日韩在线观看中文字幕 | 日韩欧美一级片 | 国产一区二区三区在线 | 国产精品久久久久久久久 | 国产精品久久久久久久9999 | 日韩成人在线视频观看 | 日韩中文在线 | 免费福利在线视频 | 成人综合久久 | 三级毛片黄色 | 91久久 | 国产做爰全过程免费的视频 | 在线免费观看污 | a级高清免费毛片av在线 | 日日操夜夜 | 欧美色久| 国产视频二区 | 免费不卡视频 | www久久| 国产精品久久久久久久妇 | 精品国产一区av | 视频一区二区在线 | 欧美成人精品二区三区99精品 | 国产成人视屏 | 麻豆乱码国产一区二区三区 |