ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是1個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現,雖然JMS規范出臺已是很久的事情了,但是JMS在現今的J2EE利用中間依然扮演著特殊的地位。
特性:
多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。利用協議:OpenWire,Stomp REST,WS Notification,XMPP,AMQP完全支持JMS1.1和J2EE 1.4規范 (持久化,XA消息,事務)
對Spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統里面去,而且也支持Spring2.0的特性
通過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5
resource adaptors的配置,可讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業服務器上
支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
支持通過JDBC和journal提供高速的消息持久化
從設計上保證了高性能的集群,客戶端-服務器,點對點
支持Ajax
支持與Axis的整合
可以很容易得調用內嵌JMS provider,進行測試
JMS源于企業利用對消息中間件的需求,使利用程序可以通過消息進行異步處理而互不影響。Sun公司和它的合作火伴設計的JMS API定義了1組公共的利用程序接口和相應語法,使得Java程序能夠和其他消息組件進行通訊。JMS有4個組成部份:JMS服務提供者、消息管理對象、消息的生產者消費者和消息本身。
JMS服務提供者實現消息隊列和通知,同時實現消息管理的API。JMS已是J2EE API的1部份,J2EE服務器都提供JMS服務。
消息管理對象提供對消息進行操作的API。JMS AP中有兩個消息管理對象:創建jms連接使用的工廠(ConnectionFactory)和目的地(Destination),根據消息的消費方式的不同ConnectionFactory可以分為QueueConnectionFactory和TopicConnectionFactory,目的地(Destination)可以分為隊列(Queue)和主題(Topic)兩種。
消息的產生由JMS的客戶端完成,JMS服務提供者負責管理這些消息,消息的消費者可以接收消息。消息的生產者可以分為――點對點消息發布者(P2P)和主題消息發布者(TopicPublisher)。所以,消息的消費者分為兩類:主題消息的定閱者(TopicSubscriber)和點對點消息的接收者(queue receiver)。
JMS消息是服務提供者和客戶端之間傳遞信息所使用的信息單元。JMS消息由以下3部份組成:消息頭(header)、屬性(property)和消息體(body)。
消息標頭是消息的信封,包括為使消息到達目的地所需要的所有信息,可以直接控制其中1些字段的值,其它值則由JMS提供程序填寫。
JMS消息頭包括了許多字段,它們是消息發送后由JMS提供者或消息發送者產生,用來表示消息、設置優先權和失效時間等等,并且為消息肯定路由。
JMSDestination: 由Send方法設置。指定消息的目的地,由JMS提供程序填寫
JMSDeliveryMode: 由Send方法設置。提交消息的模式-延續或非延續。發送消息后JMS提供程序填寫該字段。
JMSMessageID: 由Send方法設置。包括消息的唯1標識符。發送進程中由JMS提供程序填寫
JMSTimeStamp: 由Send 方法設置。記錄消息被傳遞給send方法的時間。發送進程中由JMS提供程序填寫
JMSCorrelationID: 由客戶端設置。包括用于將消息連接在1起的ID??蛻舳?般將其置為所援用消息的ID
JMSReplyTo: 由客戶端設置。響應消息的目的地,如果客戶端期望得到響應消息,則填寫該字段
JMSRedelivered: 由JMS提供程序設置。指出該消息先前被發送過
JMSType: 由客戶端設置。包括由客戶端提供的消息類型標識符。是不是需要該字段,不同的提供程序有不同要求
JMSExpiration: Send 方法設置。1個根據客戶端提供的年齡計算出來的值,如果GMT比該過期值晚,則燒毀消息
JMSPriority: Send 方法設置。包括客戶端在發送消息時所設置有限級值
消息屬性,用來添加刪除消息頭之外的附加信息。除上面的屬性,還可以自定義屬性,以便進行消息的選擇 。1般通過setXXXProperty方法來定義消息屬性,XXX取值為:Boolean、Byte、Double、Float、Int、Long、Object、Short及String。每屬性均由字符串名字和相干的值組成 ,例如:
TextMessage msg = tsession.createTextMessage();
msg.setStringProperty(“CUSTOMER_NAME”,”MyCustomer”);
String customer = msg.getStringProperty(“CUSTOMER_NAME”);
其中的”CUSTOMER_NAME”和”MyCustomer”就是消息當中對應的key和value。
消息主體包括了消息的核心數據。
JMS 定義了5中消息類型: TextMessage、MapMessage、BytesMessage、
StreamMessage和ObjectMessage
選擇最適合的消息類型可使JMS最有效 的處理消息。
將數據作為簡單字符串寄存在主體中(XML就能夠作為字符串發)
TextMessage msg = session.createTextMessage();
msg.setText(text);
有些廠商支持1種XML專用的消息格式,帶來了便利,但是否是標準的JMS類型,影響移植性。只自己定義了兩個方法setText(String s)、getText()
使用1張映照表來寄存其主體內容(參照Jms API)
MapMessage msg = session.createMapMessage();
msg.setString(“CUSTOMER_NAME”,”John”);
msg.setInt(“CUSTOMER_AGE”,12);
String s = msg.getString(“CUSTOMER_NAME”);
int age = msg.getInt(“CUSTOMER_AGE”);
將字節流寄存在消息主體中。合適于以下情況:必須緊縮發送的大量數據、需要與現有
消息格式保持1致等(參照Jms API)
byte[] data;
BytesMessage msg = session.createBytesMessage();
msg.wirte(data);
byte[] msgData = new byte[256];
int bytesRead = msg.readBytes(msgData);
用于處理原語類型。這里也支持屬性字段和MapMessage所支持的數據類型。使用這類
消息格式時,收發雙方事前協商好字段的順序,以保證寫讀順序相同(參照Jms API)
StringMessage msg = session.createStreamMessage();
msg.writeString(“John”);
msg.writeInt(12);
String s = msg.readString();
Int age = msg.readInt();
(PS:個人認為有點像socket的信息收發)
用于往消息中寫入可序列化的對象。
消息中可以寄存1個對象,如果要寄存多個對象,需要建立1個對象集合,然后把這個集合寫入消息。
客戶端接收到1個ObjectMessage時,是read-only模式。如果1個客戶端試圖寫message,將會拋出MessageNotWriteableException。如果調用了clearBody方法,message既可以讀又可以寫自己只單獨定義了兩個方法:getObject()和setObject(Serializable s)
ObjectMessage包括的只是object的1個快照,set以后object的修改對ObjectMessage的body無效 (從兩個方法可以看出,這類消息已強迫要你實現java.io. Serializable接口)
Message只讀時被set拋出MessageNotWriteableException;
set和get時,如果對象序列化失敗拋出MessageFormatException
點對點方式(point-to-point)
點對點的消息發送方式主要建立在 Message Queue、Sender、Receiver上,
Message Queue 存貯消息,Sender 發送消息,Receiver接收消息.具體點就是Sender Client發送Message Queue ,而 Receiver Client從Queue中接收消息和”發送消息已接受”到Queue,確認消息接收。消息發送客戶端與接收客戶端沒有時間上的依賴,發送客戶端可以在任什么時候刻發送信息到Queue,而不需要知道接收客戶端是否是在運行。
發布/定閱方式(publish/subscriber Messaging)
發布/定閱方式用于多接收客戶真個方式.作為發布定閱的方式,可能存在多個接收客戶
端,并且接收端客戶端與發送客戶端存在時間上的依賴。1個接收端只能接收他創建以后發送客戶端發送的信息。作為subscriber ,在接收消息時有兩種方法,destination的receive方法,和實現message listener 接口的onMessage 方法。
消息產生者向JMS發送消息的步驟
消息消費者從JMS接受消息的步驟
ActiveMQ5.3版本默許啟動時,啟動了內置的jetty服務器,提供1個demo利用和用于監控ActiveMQ的admin利用。運行%activemq_home%bin/目錄下的 activemq.bat , 以后你會看見以下1段話表示啟動成功。
打開http://localhost:8161/admin/ ,可以查看消息隊列的管理控臺,以下截圖:
消息發送者:
package com.ainong.demo.p2pqueue;
import javax.jms.DeliveryMode;
import javax.jms.MapMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.alibaba.fastjson.JSONObject;
/**
*
* <b>function:</b> Queue 方式消息發送者
*
* @author dong.gang
*
*/
public class QueueSender {
// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";
// 目標,在ActiveMQ管理員控制臺創建 http://localhost:8161/admin/queues.jsp
public static final String DESTINATION_PAYMENT = "mq.queue.payment";
public static final String DESTINATION_QUERY = "mq.queue.query";
public QueueSender() {
}
/**
*
* <b>function:</b> 發送消息
*
* @throws Exception
*/
public void sendMessage(QueueSession session, javax.jms.QueueSender sender) throws Exception {
for (int i = 0; i < SEND_NUM; i++) {
JSONObject msgJson = new JSONObject();
msgJson.put("seqId", i+1);
msgJson.put("content", "發送第" + (i + 1) + "條消息");
MapMessage map = session.createMapMessage();
map.setString("msg", msgJson.toJSONString());
map.setLong("time", System.currentTimeMillis());
System.out.println(map);
sender.send(map);
}
}
public void run(String mode) throws Exception {
QueueConnection connection = null;
QueueSession session = null;
try {
// 創建鏈接工廠
QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通過工廠創建1個連接
connection = factory.createQueueConnection();
// 啟動連接
connection.start();
// 創建1個session會話
session = connection.createQueueSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
String queueDis = "1".equals(mode) ? DESTINATION_PAYMENT : DESTINATION_QUERY;
// 創建1個消息隊列
Queue queue = session.createQueue(queueDis);
// 創建消息發送者
javax.jms.QueueSender sender = session.createSender(queue);
// 設置持久化模式
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, sender);
// 提交會話
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 關閉釋放資源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
QueueSender sender = new QueueSender();
// sender.run("1");
sender.run("2");
}
}
消息接收者:
package com.ainong.demo.p2pqueue;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.alibaba.fastjson.JSONObject;
/**
*
* <b>function:</b> 消息接收者
*
* @author donggang
*
*/
public class QueueReceiver {
// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";
// 目標,在ActiveMQ管理員控制臺創建 http://localhost:8161/admin/queues.jsp
public static final String TARGET_PAYMENT = "mq.queue.payment";
public static final String TARGET_QUERY = "mq.queue.query";
public void run(String mode) throws Exception {
QueueConnection connection = null;
QueueSession session = null;
try {
// 創建鏈接工廠
QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通過工廠創建1個連接
connection = factory.createQueueConnection();
// 啟動連接
connection.start();
// 創建1個session會話
session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 創建1個消息隊列
String queueTarg = "1".equals(mode) ? TARGET_PAYMENT : TARGET_QUERY;
Queue queue = session.createQueue(queueTarg);
// 創建消息制作者
javax.jms.QueueReceiver receiver = session.createReceiver(queue);
receiver.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
if (msg != null) {
MapMessage map = (MapMessage) msg;
try {
JSONObject jsonObj = JSONObject.parseObject(map.getString("msg"));
if ("3".equals(jsonObj.getString("seqId"))) {
System.out.println(map.getLong("time") + "接收#" + map.getString("msg"));
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 提交會話
session.commit();
// 休眠1s再關閉
Thread.sleep(1000);
} catch (Exception e) {
throw e;
} finally {
// 關閉釋放資源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
QueueReceiver receiver = new QueueReceiver();
// receiver.run("1");
receiver.run("2");
}
}
其實上邊的消息接收者已集成了消息監聽類,如果我們需要分離業務操作,可以 receiver.setMessageListener()參數中設為我們的業務監聽處理類(需要實現MessageListener類):
package com.ainong.demo.p2pqueue;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
public class QueueMsgListenner implements MessageListener {
@Override
public void onMessage(Message msg) {
if (msg != null) {
MapMessage map = (MapMessage) msg;
try {
System.out.println(map.getLong("time") + "接收#" + map.getString("id000"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
消息發布者:
package com.ainong.demo.publish_subscrib;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMapMessage;
import com.alibaba.fastjson.JSONObject;
public class Publisher {
protected static String brokerURL = "tcp://localhost:61616";
protected static transient ConnectionFactory factory;
protected transient Connection connection;
protected transient Session session;
protected transient MessageProducer producer;
public Publisher() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
try {
// 持久化定閱(消息會保存,特定的消費者可以1段時間落后行消費)
connection.setClientID("client1");
connection.start();
} catch (JMSException jmse) {
connection.close();
throw jmse;
}
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(null);
// 發送消息時用使用持久模式(不設置,默許就是持久的)
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
}
public void close() throws JMSException {
if (connection != null) {
connection.close();
}
}
protected void sendMessage(JSONObject msg) throws JMSException {
Destination destination = session.createTopic("demo");
Message message = createMessage(msg, session);
System.out.println("消息發送: "
+ ((ActiveMQMapMessage) message).getContentMap()
+ " on destination: " + destination);
producer.send(destination, message);
}
protected Message createMessage(JSONObject msg, Session session)
throws JMSException {
MapMessage message = session.createMapMessage();
message.setString("id", msg.getString("id"));
message.setString("content", msg.getString("content"));
return message;
}
public static void main(String[] args) throws JMSException,
InterruptedException {
Publisher publisher = new Publisher();
for (int i = 0; i < 3; i++) {
JSONObject msgObject = new JSONObject();
msgObject.put("id", "msg00" + i);
msgObject.put("content", "第" + i + "條消息");
publisher.sendMessage(msgObject);
Thread.sleep(3000);
}
publisher.close();
}
}
消息定閱者1(持久化定閱)
package com.ainong.demo.publish_subscrib;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 持久化定閱模式(消息會保存,消費者可以在任意時間消費消息)
*
* @author DG
*
*/
public class Consumer1 {
private static String brokerURL = "tcp://localhost:61616";
private static transient ConnectionFactory factory;
private transient Connection connection;
private transient Session session;
public Consumer1() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.setClientID("client1");
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void close() throws JMSException {
if (connection != null) {
connection.close();
}
}
public Session getSession() {
return session;
}
public static void main(String[] args) throws JMSException {
Consumer1 consumer = new Consumer1();
Topic topic = consumer.getSession().createTopic("demo");
// 普通定閱
// MessageConsumer messageConsumer =
// consumer.getSession().createConsumer(
// destination);
// 持久化定閱
MessageConsumer messageConsumer =consumer.getSession().createDurableSubscriber(topic,"client1"); //持久定閱
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
MapMessage map = (MapMessage) message;
String id = map.getString("id");
String content = map.getString("content");
System.out.println("消費者1,消息接收:id = " + id + ";content = "
+ content);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
常規定閱者:
package com.ainong.demo.publish_subscrib;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 常規定閱模式(消費者必須在消息生產前就已啟動監聽消息,否則錯過消息以后,消費就會失效,不會被處理)
*
* @author DG
*
*/
public class Consumer2 {
private static String brokerURL = "tcp://localhost:61616";
private static transient ConnectionFactory factory;
private transient Connection connection;
private transient Session session;
public Consumer2() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void close() throws JMSException {
if (connection != null) {
connection.close();
}
}
public Session getSession() {
return session;
}
public static void main(String[] args) throws JMSException {
Consumer2 consumer = new Consumer2();
Destination destination = consumer.getSession().createTopic("demo");
MessageConsumer messageConsumer = consumer.getSession().createConsumer(
destination);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
MapMessage map = (MapMessage) message;
String id = map.getString("id");
String content = map.getString("content");
System.out.println("消費者2:消息接收:id = " + id + ";content = "
+ content);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
上面只是我初識activemq時的1些心得,附代碼源碼:
http://download.csdn.net/detail/donggang1992/9561041
上一篇 從1到n整數中1出現的次數