java后端IM消息推送服務(wù)開(kāi)發(fā)——協(xié)議
來(lái)源:程序員人生 發(fā)布時(shí)間:2016-07-04 16:58:46 閱讀次數(shù):3516次
最近在1家saas企業(yè)使用Mqtt開(kāi)發(fā)IM消息推送服務(wù),把開(kāi)發(fā)中的1些問(wèn)題記錄下來(lái),項(xiàng)目仍在商用中,完全的消息服務(wù)包括4個(gè)模塊---協(xié)議protocol,信令Signal,規(guī)則Rule,狀態(tài)Status,這個(gè)主題主要是協(xié)議protocol部份。
主要技術(shù)觸及到MongoDB,webservice,httpclient,Mqtt等
protocol分為4個(gè)模塊類來(lái)實(shí)現(xiàn),固然這是為了以后的擴(kuò)大性比較好
首先看1下我們的主類,主要是mqtt基礎(chǔ)方法的1個(gè)框架
public class MqttProtocol
{
private static Logger logger = Logger.getLogger(MqttProtocol.class);
public static final String HOST = "tcp://xx.xx.xx.xx:1883";
private static final String CLIENTID = "yyyy";
private MqttClient client;
private MqttConnectOptions options = new MqttConnectOptions();
//private String userName = "admin";
//private String passWord = "public";
public MqttMessage message;
private PushCallback callback;
/**
* 用于初始化mqttclient客戶端,設(shè)置回調(diào)函數(shù),同時(shí)連接mqtt
服務(wù)器
* @throws MqttException
*/
public MqttProtocol() throws MqttException
{
//MemoryPersistence設(shè)置clientid的保存情勢(shì),默許為之內(nèi)存保存
client = new MqttClient(HOST, CLIENTID, new MemoryPersistence());
callback = new PushCallback();
client.setCallback(callback);
options = new MqttConnectOptions();
options.setCleanSession(false);
options.setKeepAliveInterval(60);
connect();
}
/**
* 連接mqtt消息
服務(wù)器,同時(shí)設(shè)置了斷開(kāi)重連的功能,主要是為了高可用性斟酌,在斷網(wǎng)
服務(wù)器崩潰時(shí)候我們的程序依然不會(huì)終止
*/
private void connect()
{
SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS);
System.out.println(sdf.format(System.currentTimeMillis()));
boolean tryConnecting = true;
while (tryConnecting) {
try {
client.connect(options);
} catch (Exception e1) {
System.out.println("Connection attempt failed with '"+e1.getCause()+
"'. Retrying.");
}
if (client.isConnected()) {
System.out.println("Connected.");
tryConnecting = false;
} else {
pause();
}
}
}
private void pause() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Error handling goes here...
}
}
/**
*
* @param topic
* @param qos
* @throws MqttPersistenceException
* @throws MqttException
* 定閱相干主題
*/
public void subscribe(String topic , int qos) throws MqttPersistenceException,
MqttException
{
client.subscribe(topic, qos);
}
/**
*
* @throws MqttPersistenceException
* @throws MqttException
* 斷開(kāi)連接
服務(wù)器
*/
public void disconnect() throws MqttPersistenceException,
MqttException
{
client.disconnect();
}
/**
*
* @author binshi
*實(shí)現(xiàn)mqttcallback接口,主要用于接收消息后的處理方法
*/
private class PushCallback implements MqttCallback {
/**
* 斷開(kāi)后 系統(tǒng)會(huì)自動(dòng)調(diào)用這個(gè)函數(shù),同時(shí)在這個(gè)函數(shù)里進(jìn)行重連操作
*/
public void connectionLost(Throwable cause) {
// 連接丟失后,1般在這里面進(jìn)行重連
System.out.println("連接斷開(kāi),可以做重連");
connect();
try {
subscribe(Constant.TOPIC_MQTT_PROTOCOL, 2);
} catch (MqttPersistenceException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 消息成功傳送后,系統(tǒng)會(huì)自動(dòng)調(diào)用此函數(shù),表明成功向topic發(fā)送消息
*/
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
// TODO Auto-generated method stub
System.out.println("deliveryComplete---------" + arg0.isComplete());
}
/**
* 連接mongo
數(shù)據(jù)庫(kù),返回關(guān)于具體collection的Mongocollection
* @param collectionname
* @return
*/
public void messageArrived(String topic, MqttMessage message) throws Exception
{
System.out.println(topic);
SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS);
System.out.println(sdf.format(System.currentTimeMillis()));
System.out.println("接收消息主題 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息內(nèi)容 : " + new String(message.getPayload()));
//1 抽取事件信令消息
String messagejudge=new String(message.getPayload());
System.out.println("疏忽所有robot消息和offline離線消息");
JSONObject jo=new JSONObject();
try {
jo=JSONObject.fromObject(messagejudge);
} catch (Exception e) {
e.printStackTrace();
}
String from=jo.getString("from");
System.out.println("取得from"+from);
System.out.println("肯定消息是不是包括offline,如果包括獲得offline,為1就不處理");
String offline=null;
if(messagejudge.contains("offline"))
{
offline=jo.getString("offline");
}
if((offline==null)&&(!from.contains("robot")))
{
System.out.println("處理非系統(tǒng)消息和非離線消息");
String type=jo.getString("type");
System.out.println("取得type"+type);
if(type.equals("shakehand"))
{
System.out.println("處理shakehand消息");
String admin="doyounkowwhy";
if(jo.toString().contains("admin"))
{
admin=jo.getString("admin");
}
System.out.println("獲得admin 如果為1定義為客服,否則為普通用戶 admin為"+admin);
if(admin.equals("1"))
{
System.out.println("處理客服握手消息");
System.out.println("發(fā)送握手成功消息");
MqttTopic retopic=client.getTopic(topic);
MsgOperation.sendSysMsgToClient(from,"0", "1005", "握手成功", null,retopic);
System.out.println("向客戶端發(fā)送離線未接收的消息");
String convid=jo.getString("convid");
String database="dolina";
String collection="messages";
MongoDBDao.getMongoDBDaoInstance().sendOfflineMsgToClient(from, convid,retopic,database,collection);
}
else
{
System.out.println("處理普通用戶的握手消息");
String appid=jo.getString("appid");
String pageid=jo.getString("pageid");
String convid=jo.getString("convid");
MqttTopic retopic=client.getTopic(topic);
MsgOperation.sendShakeHandInfo(from,convid,appid,pageid,retopic);
}
}
else if(type.equals("text")||type.equals("image"))
{
System.out.println("處理圖片和文字消息");
String tmpindex=jo.getString("tmpindex");
String convid=jo.getString("convid");
MqttTopic retopic=client.getTopic(topic);
MsgOperation.getTextMsg( tmpindex, from, convid, retopic);
System.out.println("保存圖片文字消息");
String database="dolina";
String collection="messages";
MongoDBDao.getMongoDBDaoInstance().saveTextMsg(database,collection,jo);
}
else if(type.equals("ack"))
{
System.out.println("處理ack消息");
String tmpindex=jo.getString("tmpindex");
String convid=jo.getString("convid");
String database="dolina";
String collection="messages";
MongoDBDao.getMongoDBDaoInstance().getAck(tmpindex,convid,from,database,collection);
}
}
}
}
/**
*
* @param args
* @throws MqttException
* 全部工程從這里開(kāi)始履行,生成可履行jar包,這個(gè)設(shè)置為主類。
*/
public static void main(String[] args) throws MqttException
{
MqttProtocol signal = new MqttProtocol();
signal.message = new MqttMessage();
/**
server.message.setQos(2);
server.message.setRetained(false);
server.message.setPayload("給客戶端124推送的信息".getBytes());
server.subscribe("/engyne/1/7/169573fcbc96a816281192222", 2);
*/
signal.subscribe(Constant.TOPIC_MQTT_PROTOCOL, 2);
System.out.println(signal.message.isRetained() + "------ratained狀態(tài)");
}
}
接下來(lái)使我們的遠(yuǎn)程連接模塊,主要是通過(guò)給定的url調(diào)用遠(yuǎn)程接口
public class RemoteOperation
{
private static Logger logger = Logger.getLogger(MqttProtocol.class);
public static JSONObject remoteCall(String url) throws HttpException, IOException
{
HttpClient httpClient = new HttpClient();
GetMethod method =null ;
method=new GetMethod(url);
int retcode = httpClient.executeMethod(method);
if (retcode != HttpStatus.SC_OK)
{// 發(fā)送不成功
logger.info("遠(yuǎn)程調(diào)用出錯(cuò)");
return null;
}
else
{
String body = method.getResponseBodyAsString();
logger.info(body+"遠(yuǎn)程調(diào)用php成功");
JSONObject jsonObject=new JSONObject();
try {
jsonObject=JSONObject.fromObject(body);
} catch (Exception e) {
e.printStackTrace();
}
if (method != null)
{
method.releaseConnection();
}
return jsonObject;
}
}
}
下面是Mongo
數(shù)據(jù)庫(kù)的相干操作的1個(gè)封裝,設(shè)計(jì)為單例模式,相當(dāng)于每次都使用同1個(gè)client打開(kāi)連接,類似于連接池的概念,固然業(yè)務(wù)邏輯部份可以更換
public class MongoDBDao
{
private static Logger logger = Logger.getLogger(MongoDBDao.class);
/**
* MongoClient的實(shí)例代表
數(shù)據(jù)庫(kù)連接池,是線程安全的,可以被多線程同享,客戶端在多線程條件下僅保持1個(gè)實(shí)例便可
* Mongo是非線程安全的,目前mongodb API中已建議用MongoClient替換Mongo
*/
private MongoClient mongoClient = null;
/**
*
* 私有的構(gòu)造函數(shù)
* 作者:shibin
*/
private MongoDBDao(){
if(mongoClient == null){
String url = Constant.MONGO_MQTT_URL;
String user = Constant.MONGO_MQTT_USER;
String password = Constant.MONGO_MQTT_PASSWORD;
String database = Constant.MONGO_MQTT_DATABASE;
int port = 27017;
ServerAddress serverAddress = new ServerAddress(url, port);
List<ServerAddress> serverAddresses = new ArrayList<ServerAddress>();
serverAddresses.add(serverAddress);
MongoCredential credential = MongoCredential.createCredential(user, database, password.toCharArray());
List<MongoCredential> credentials = new ArrayList<MongoCredential>();
credentials.add(credential);
mongoClient = new MongoClient(serverAddresses, credentials);
System.out.println(mongoClient);
System.out.println("初始化client完成");
}
}
/********單例模式聲明開(kāi)始,采取餓漢式方式生成,保證線程安全********************/
//類初始化時(shí),自行實(shí)例化,餓漢式單例模式
private static final MongoDBDao mongoDBDao = new MongoDBDao();
/**
*
* 方法名:getMongoDBDaoImplInstance
* 作者:shibin
*
* 描寫(xiě):?jiǎn)卫撵o態(tài)工廠方法
* @return
*/
public static MongoDBDao getMongoDBDaoInstance(){
return mongoDBDao;
}
public void sendOfflineMsgToClient(String from, String convid,MqttTopic retopic,String database,String collection) throws MqttPersistenceException, MqttException
{
System.out.println("取得message的連接");
MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
System.out.println("獲得convid所對(duì)應(yīng)的msg列表");
BasicDBObject query = new BasicDBObject();
query.put("_id", convid);
FindIterable<Document> iterable=null;
iterable = mongoCollection.find(query);
if(iterable.first()!=null)
{
System.out.println(iterable.first());
String res= iterable.first().toJson();
JSONObject jo=new JSONObject();
try {
jo=JSONObject.fromObject(res);
} catch (Exception e) {
e.printStackTrace();
}
JSONArray jsonArray=jo.getJSONArray("msg");
for(int i=0;i<jsonArray.length();i++)
{
String read=jsonArray.getJSONObject(i).getString("read");
System.out.println("取得msg對(duì)應(yīng)的第"+i+"條記錄的read信息"+read);
System.out.println("判斷read是不是包括from的信息,如果不包括且這條消息不是他自己發(fā)的就給她發(fā)送這條消息");
if(!read.contains(from)&&!jsonArray.getJSONObject(i).getString("from").equals(from))
{
System.out.println("取得這條消息的原型,然后加上offline=1并發(fā)送消息");
JSONObject msg=jsonArray.getJSONObject(i);
msg.put("offline", "1");
retopic.publish(msg.toString().getBytes(), 0, false);
}
else
{
System.out.println("no offline message for "+from);
}
}
}
}
public void saveTextMsg(String database,String collection,JSONObject jo)
{
MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
BasicDBObject query = new BasicDBObject();
String convid=jo.getString("convid");
query.put("_id", convid);
FindIterable iterable;
iterable = mongoCollection.find(query);
System.out.println("更新message之前的值"+iterable.first());
Bson filter = Filters.eq("_id", convid);
Document content = new Document();
String type=jo.getString("type");
if(type.equals("text"))
{
String contentMsg=jo.getJSONObject("content").getString("content");
content.put("content", contentMsg);
}
else
{
String url=jo.getJSONObject("content").getString("url");
content.put("url", url);
}
String admin=jo.getJSONObject("extra").getString("admin");
String headimgurl=jo.getJSONObject("extra").getString("headimgurl");
String nickname=jo.getJSONObject("extra").getString("nickname");
String from=jo.getString("from");
String tmpindex=jo.getString("tmpindex");
Document extra = new Document();
extra.put("nickname", nickname);
Document doc = new Document();
doc.put("from",from );
ArrayList<String> read=new ArrayList<String>();
doc.put("read", read);
Document tdoc = new Document();
tdoc.put("msg", doc);
UpdateOptions updateOptions=new UpdateOptions();
updateOptions.upsert(true);
mongoCollection.updateOne(filter, new Document("$addToSet", tdoc), updateOptions);
iterable = mongoCollection.find(query);
System.out.println("更新message以后的值"+iterable.first());
}
public void getAck(String tmpindex,String convid,String from,String database,String collection)
{
System.out.println("接收到ack消息后更新message中的read字段");
MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
BasicDBObject query = new BasicDBObject();
query.put("_id", convid);
query.put("msg.tmpindex", tmpindex);
BasicDBObject query1 = new BasicDBObject();
query1.put("_id", convid);
FindIterable iterable;
FindIterable iterable2;
iterable = mongoCollection.find(query1);
iterable2 = mongoCollection.find(query);
System.out.println("更新message滿足id過(guò)濾條件之前的值"+iterable.first());
System.out.println("更新message滿足id和tmpindex過(guò)濾條件之前的值"+iterable2.first());
if(iterable2.first()!=null)
{
Document doc = new Document();
doc.put("msg.$.read", from);
UpdateOptions updateOptions=new UpdateOptions();
updateOptions.upsert(true);
mongoCollection.updateOne(query, new Document("$addToSet", doc), updateOptions);
}
iterable = mongoCollection.find(query1);
System.out.println("更新messages以后的值"+iterable.first());
}
}
剩下的關(guān)于業(yè)務(wù)邏輯方面的就不多說(shuō)了,主要是關(guān)于mqtt高可用性斷開(kāi)重連的功能和mongo相干的操作
生活不易,碼農(nóng)辛苦
如果您覺(jué)得本網(wǎng)站對(duì)您的學(xué)習(xí)有所幫助,可以手機(jī)掃描二維碼進(jìn)行捐贈(zèng)