日本搞逼视频_黄色一级片免费在线观看_色99久久_性明星video另类hd_欧美77_综合在线视频

國(guó)內(nèi)最全I(xiàn)T社區(qū)平臺(tái) 聯(lián)系我們 | 收藏本站
阿里云優(yōu)惠2
您當(dāng)前位置:首頁(yè) > php開(kāi)源 > 綜合技術(shù) > java后端IM消息推送服務(wù)開(kāi)發(fā)——協(xié)議

java后端IM消息推送服務(wù)開(kāi)發(fā)——協(xié)議

來(lái)源:程序員人生   發(fā)布時(shí)間:2016-07-04 16:58:46 閱讀次數(shù):3516次

最近在1家saas企業(yè)使用Mqtt開(kāi)發(fā)IM消息推送服務(wù),把開(kāi)發(fā)中的1些問(wèn)題記錄下來(lái),項(xiàng)目仍在商用中,完全的消息服務(wù)包括4個(gè)模塊---協(xié)議protocol,信令Signal,規(guī)則Rule,狀態(tài)Status,這個(gè)主題主要是協(xié)議protocol部份。

主要技術(shù)觸及到MongoDB,webservice,httpclient,Mqtt等

protocol分為4個(gè)模塊類來(lái)實(shí)現(xiàn),固然這是為了以后的擴(kuò)大性比較好

首先看1下我們的主類,主要是mqtt基礎(chǔ)方法的1個(gè)框架

public class MqttProtocol { private static Logger logger = Logger.getLogger(MqttProtocol.class); public static final String HOST = "tcp://xx.xx.xx.xx:1883"; private static final String CLIENTID = "yyyy"; private MqttClient client; private MqttConnectOptions options = new MqttConnectOptions(); //private String userName = "admin"; //private String passWord = "public"; public MqttMessage message; private PushCallback callback; /** * 用于初始化mqttclient客戶端,設(shè)置回調(diào)函數(shù),同時(shí)連接mqtt服務(wù)器 * @throws MqttException */ public MqttProtocol() throws MqttException { //MemoryPersistence設(shè)置clientid的保存情勢(shì),默許為之內(nèi)存保存 client = new MqttClient(HOST, CLIENTID, new MemoryPersistence()); callback = new PushCallback(); client.setCallback(callback); options = new MqttConnectOptions(); options.setCleanSession(false); options.setKeepAliveInterval(60); connect(); } /** * 連接mqtt消息服務(wù)器,同時(shí)設(shè)置了斷開(kāi)重連的功能,主要是為了高可用性斟酌,在斷網(wǎng)服務(wù)器崩潰時(shí)候我們的程序依然不會(huì)終止 */ private void connect() { SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS); System.out.println(sdf.format(System.currentTimeMillis())); boolean tryConnecting = true; while (tryConnecting) { try { client.connect(options); } catch (Exception e1) { System.out.println("Connection attempt failed with '"+e1.getCause()+ "'. Retrying."); } if (client.isConnected()) { System.out.println("Connected."); tryConnecting = false; } else { pause(); } } } private void pause() { try { Thread.sleep(1000); } catch (InterruptedException e) { // Error handling goes here... } } /** * * @param topic * @param qos * @throws MqttPersistenceException * @throws MqttException * 定閱相干主題 */ public void subscribe(String topic , int qos) throws MqttPersistenceException, MqttException { client.subscribe(topic, qos); } /** * * @throws MqttPersistenceException * @throws MqttException * 斷開(kāi)連接服務(wù)器 */ public void disconnect() throws MqttPersistenceException, MqttException { client.disconnect(); } /** * * @author binshi *實(shí)現(xiàn)mqttcallback接口,主要用于接收消息后的處理方法 */ private class PushCallback implements MqttCallback { /** * 斷開(kāi)后 系統(tǒng)會(huì)自動(dòng)調(diào)用這個(gè)函數(shù),同時(shí)在這個(gè)函數(shù)里進(jìn)行重連操作 */ public void connectionLost(Throwable cause) { // 連接丟失后,1般在這里面進(jìn)行重連 System.out.println("連接斷開(kāi),可以做重連"); connect(); try { subscribe(Constant.TOPIC_MQTT_PROTOCOL, 2); } catch (MqttPersistenceException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 消息成功傳送后,系統(tǒng)會(huì)自動(dòng)調(diào)用此函數(shù),表明成功向topic發(fā)送消息 */ @Override public void deliveryComplete(IMqttDeliveryToken arg0) { // TODO Auto-generated method stub System.out.println("deliveryComplete---------" + arg0.isComplete()); } /** * 連接mongo數(shù)據(jù)庫(kù),返回關(guān)于具體collection的Mongocollection * @param collectionname * @return */ public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println(topic); SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS); System.out.println(sdf.format(System.currentTimeMillis())); System.out.println("接收消息主題 : " + topic); System.out.println("接收消息Qos : " + message.getQos()); System.out.println("接收消息內(nèi)容 : " + new String(message.getPayload())); //1 抽取事件信令消息 String messagejudge=new String(message.getPayload()); System.out.println("疏忽所有robot消息和offline離線消息"); JSONObject jo=new JSONObject(); try { jo=JSONObject.fromObject(messagejudge); } catch (Exception e) { e.printStackTrace(); } String from=jo.getString("from"); System.out.println("取得from"+from); System.out.println("肯定消息是不是包括offline,如果包括獲得offline,為1就不處理"); String offline=null; if(messagejudge.contains("offline")) { offline=jo.getString("offline"); } if((offline==null)&&(!from.contains("robot"))) { System.out.println("處理非系統(tǒng)消息和非離線消息"); String type=jo.getString("type"); System.out.println("取得type"+type); if(type.equals("shakehand")) { System.out.println("處理shakehand消息"); String admin="doyounkowwhy"; if(jo.toString().contains("admin")) { admin=jo.getString("admin"); } System.out.println("獲得admin 如果為1定義為客服,否則為普通用戶 admin為"+admin); if(admin.equals("1")) { System.out.println("處理客服握手消息"); System.out.println("發(fā)送握手成功消息"); MqttTopic retopic=client.getTopic(topic); MsgOperation.sendSysMsgToClient(from,"0", "1005", "握手成功", null,retopic); System.out.println("向客戶端發(fā)送離線未接收的消息"); String convid=jo.getString("convid"); String database="dolina"; String collection="messages"; MongoDBDao.getMongoDBDaoInstance().sendOfflineMsgToClient(from, convid,retopic,database,collection); } else { System.out.println("處理普通用戶的握手消息"); String appid=jo.getString("appid"); String pageid=jo.getString("pageid"); String convid=jo.getString("convid"); MqttTopic retopic=client.getTopic(topic); MsgOperation.sendShakeHandInfo(from,convid,appid,pageid,retopic); } } else if(type.equals("text")||type.equals("image")) { System.out.println("處理圖片和文字消息"); String tmpindex=jo.getString("tmpindex"); String convid=jo.getString("convid"); MqttTopic retopic=client.getTopic(topic); MsgOperation.getTextMsg( tmpindex, from, convid, retopic); System.out.println("保存圖片文字消息"); String database="dolina"; String collection="messages"; MongoDBDao.getMongoDBDaoInstance().saveTextMsg(database,collection,jo); } else if(type.equals("ack")) { System.out.println("處理ack消息"); String tmpindex=jo.getString("tmpindex"); String convid=jo.getString("convid"); String database="dolina"; String collection="messages"; MongoDBDao.getMongoDBDaoInstance().getAck(tmpindex,convid,from,database,collection); } } } } /** * * @param args * @throws MqttException * 全部工程從這里開(kāi)始履行,生成可履行jar包,這個(gè)設(shè)置為主類。 */ public static void main(String[] args) throws MqttException { MqttProtocol signal = new MqttProtocol(); signal.message = new MqttMessage(); /** server.message.setQos(2); server.message.setRetained(false); server.message.setPayload("給客戶端124推送的信息".getBytes()); server.subscribe("/engyne/1/7/169573fcbc96a816281192222", 2); */ signal.subscribe(Constant.TOPIC_MQTT_PROTOCOL, 2); System.out.println(signal.message.isRetained() + "------ratained狀態(tài)"); } }
接下來(lái)使我們的遠(yuǎn)程連接模塊,主要是通過(guò)給定的url調(diào)用遠(yuǎn)程接口

public class RemoteOperation { private static Logger logger = Logger.getLogger(MqttProtocol.class); public static JSONObject remoteCall(String url) throws HttpException, IOException { HttpClient httpClient = new HttpClient(); GetMethod method =null ; method=new GetMethod(url); int retcode = httpClient.executeMethod(method); if (retcode != HttpStatus.SC_OK) {// 發(fā)送不成功 logger.info("遠(yuǎn)程調(diào)用出錯(cuò)"); return null; } else { String body = method.getResponseBodyAsString(); logger.info(body+"遠(yuǎn)程調(diào)用php成功"); JSONObject jsonObject=new JSONObject(); try { jsonObject=JSONObject.fromObject(body); } catch (Exception e) { e.printStackTrace(); } if (method != null) { method.releaseConnection(); } return jsonObject; } } }

下面是Mongo數(shù)據(jù)庫(kù)的相干操作的1個(gè)封裝,設(shè)計(jì)為單例模式,相當(dāng)于每次都使用同1個(gè)client打開(kāi)連接,類似于連接池的概念,固然業(yè)務(wù)邏輯部份可以更換

public class MongoDBDao { private static Logger logger = Logger.getLogger(MongoDBDao.class); /** * MongoClient的實(shí)例代表數(shù)據(jù)庫(kù)連接池,是線程安全的,可以被多線程同享,客戶端在多線程條件下僅保持1個(gè)實(shí)例便可 * Mongo是非線程安全的,目前mongodb API中已建議用MongoClient替換Mongo */ private MongoClient mongoClient = null; /** * * 私有的構(gòu)造函數(shù) * 作者:shibin */ private MongoDBDao(){ if(mongoClient == null){ String url = Constant.MONGO_MQTT_URL; String user = Constant.MONGO_MQTT_USER; String password = Constant.MONGO_MQTT_PASSWORD; String database = Constant.MONGO_MQTT_DATABASE; int port = 27017; ServerAddress serverAddress = new ServerAddress(url, port); List<ServerAddress> serverAddresses = new ArrayList<ServerAddress>(); serverAddresses.add(serverAddress); MongoCredential credential = MongoCredential.createCredential(user, database, password.toCharArray()); List<MongoCredential> credentials = new ArrayList<MongoCredential>(); credentials.add(credential); mongoClient = new MongoClient(serverAddresses, credentials); System.out.println(mongoClient); System.out.println("初始化client完成"); } } /********單例模式聲明開(kāi)始,采取餓漢式方式生成,保證線程安全********************/ //類初始化時(shí),自行實(shí)例化,餓漢式單例模式 private static final MongoDBDao mongoDBDao = new MongoDBDao(); /** * * 方法名:getMongoDBDaoImplInstance * 作者:shibin * * 描寫(xiě):?jiǎn)卫撵o態(tài)工廠方法 * @return */ public static MongoDBDao getMongoDBDaoInstance(){ return mongoDBDao; } public void sendOfflineMsgToClient(String from, String convid,MqttTopic retopic,String database,String collection) throws MqttPersistenceException, MqttException { System.out.println("取得message的連接"); MongoDatabase mongoDatabase = mongoClient.getDatabase(database); MongoCollection mongoCollection = mongoDatabase.getCollection(collection); System.out.println("獲得convid所對(duì)應(yīng)的msg列表"); BasicDBObject query = new BasicDBObject(); query.put("_id", convid); FindIterable<Document> iterable=null; iterable = mongoCollection.find(query); if(iterable.first()!=null) { System.out.println(iterable.first()); String res= iterable.first().toJson(); JSONObject jo=new JSONObject(); try { jo=JSONObject.fromObject(res); } catch (Exception e) { e.printStackTrace(); } JSONArray jsonArray=jo.getJSONArray("msg"); for(int i=0;i<jsonArray.length();i++) { String read=jsonArray.getJSONObject(i).getString("read"); System.out.println("取得msg對(duì)應(yīng)的第"+i+"條記錄的read信息"+read); System.out.println("判斷read是不是包括from的信息,如果不包括且這條消息不是他自己發(fā)的就給她發(fā)送這條消息"); if(!read.contains(from)&&!jsonArray.getJSONObject(i).getString("from").equals(from)) { System.out.println("取得這條消息的原型,然后加上offline=1并發(fā)送消息"); JSONObject msg=jsonArray.getJSONObject(i); msg.put("offline", "1"); retopic.publish(msg.toString().getBytes(), 0, false); } else { System.out.println("no offline message for "+from); } } } } public void saveTextMsg(String database,String collection,JSONObject jo) { MongoDatabase mongoDatabase = mongoClient.getDatabase(database); MongoCollection mongoCollection = mongoDatabase.getCollection(collection); BasicDBObject query = new BasicDBObject(); String convid=jo.getString("convid"); query.put("_id", convid); FindIterable iterable; iterable = mongoCollection.find(query); System.out.println("更新message之前的值"+iterable.first()); Bson filter = Filters.eq("_id", convid); Document content = new Document(); String type=jo.getString("type"); if(type.equals("text")) { String contentMsg=jo.getJSONObject("content").getString("content"); content.put("content", contentMsg); } else { String url=jo.getJSONObject("content").getString("url"); content.put("url", url); } String admin=jo.getJSONObject("extra").getString("admin"); String headimgurl=jo.getJSONObject("extra").getString("headimgurl"); String nickname=jo.getJSONObject("extra").getString("nickname"); String from=jo.getString("from"); String tmpindex=jo.getString("tmpindex"); Document extra = new Document(); extra.put("nickname", nickname); Document doc = new Document(); doc.put("from",from ); ArrayList<String> read=new ArrayList<String>(); doc.put("read", read); Document tdoc = new Document(); tdoc.put("msg", doc); UpdateOptions updateOptions=new UpdateOptions(); updateOptions.upsert(true); mongoCollection.updateOne(filter, new Document("$addToSet", tdoc), updateOptions); iterable = mongoCollection.find(query); System.out.println("更新message以后的值"+iterable.first()); } public void getAck(String tmpindex,String convid,String from,String database,String collection) { System.out.println("接收到ack消息后更新message中的read字段"); MongoDatabase mongoDatabase = mongoClient.getDatabase(database); MongoCollection mongoCollection = mongoDatabase.getCollection(collection); BasicDBObject query = new BasicDBObject(); query.put("_id", convid); query.put("msg.tmpindex", tmpindex); BasicDBObject query1 = new BasicDBObject(); query1.put("_id", convid); FindIterable iterable; FindIterable iterable2; iterable = mongoCollection.find(query1); iterable2 = mongoCollection.find(query); System.out.println("更新message滿足id過(guò)濾條件之前的值"+iterable.first()); System.out.println("更新message滿足id和tmpindex過(guò)濾條件之前的值"+iterable2.first()); if(iterable2.first()!=null) { Document doc = new Document(); doc.put("msg.$.read", from); UpdateOptions updateOptions=new UpdateOptions(); updateOptions.upsert(true); mongoCollection.updateOne(query, new Document("$addToSet", doc), updateOptions); } iterable = mongoCollection.find(query1); System.out.println("更新messages以后的值"+iterable.first()); } }

剩下的關(guān)于業(yè)務(wù)邏輯方面的就不多說(shuō)了,主要是關(guān)于mqtt高可用性斷開(kāi)重連的功能和mongo相干的操作

生活不易,碼農(nóng)辛苦
如果您覺(jué)得本網(wǎng)站對(duì)您的學(xué)習(xí)有所幫助,可以手機(jī)掃描二維碼進(jìn)行捐贈(zèng)
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關(guān)閉
程序員人生
主站蜘蛛池模板: 中文字幕精品一区二区三区精品 | 欧美一区二区三区四区视频 | 日韩中文字幕视频 | 黄色片网站 | 国产一区二区在线精品 | 欧美激情欧美激情在线五月 | 亚洲 欧美 日韩 在线 | 超碰成人91 | 国产亚洲精品久久久优势 | av一区在线| 最近的中文字幕在线看 | 一区二区三区精品 | 国产黄色免费网站 | 亚洲色图综合网 | 欧美日韩1区2区3区 久久五月天婷婷 | 国产黄色在线播放 | 日韩精品视频免费观看 | 亚洲一区二区三区免费视频 | 欧美日韩中文字幕一区二区 | 黄色aaa大片 | 久久久久久一区二区三区四区别墅 | 欧美香蕉网 | 国产精品国产三级国产普通话三级 | 国产理论在线观看 | 欧美不卡一二三 | 国产精品一区二区三区久久久 | www.国产91| 成人福利网站在线观看 | 日韩精品色网 | 国产精品亚洲精品 | 精品国产一二区 | 精品美女久久久久久免费 | 欧美日韩三级在线 | 午夜精品久久久久久久久久久久久 | swag国产精品一区二区 | 日韩欧美高清视频 | 成人免费福利视频 | 蜜桃久久av | 国产伦精品一区二区三区视频孕妇 | 久久久久久久久久久一区二区 | 欧美一区一区 |