日本搞逼视频_黄色一级片免费在线观看_色99久久_性明星video另类hd_欧美77_综合在线视频

國(guó)內(nèi)最全I(xiàn)T社區(qū)平臺(tái) 聯(lián)系我們 | 收藏本站
阿里云優(yōu)惠2
您當(dāng)前位置:首頁 > php開源 > 綜合技術(shù) > 深入學(xué)習(xí)RabbitMQ(一):mandatory標(biāo)志的作用

深入學(xué)習(xí)RabbitMQ(一):mandatory標(biāo)志的作用

來源:程序員人生   發(fā)布時(shí)間:2017-02-23 09:30:07 閱讀次數(shù):2786次

        在生產(chǎn)者通過channel的basicPublish方法發(fā)布消息時(shí),通常有幾個(gè)參數(shù)需要設(shè)置,為此我們有必要了解清楚這些參數(shù)代表的具體含義及其作用,查看Channel接口,會(huì)發(fā)現(xiàn)存在3個(gè)重載的basicPublish方法

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
            throws IOException;

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
            throws IOException;
        他們共有的參數(shù)分別是:
        exchange:交換機(jī)名稱
        routingKey:路由鍵
        props:消息屬性字段,比如消息頭部信息等等
        body:消息主體部份
        除此以外,還有mandatory和immediate這兩個(gè)參數(shù),鑒于RabbitMQ3.0不再支持immediate標(biāo)志,因此我們重點(diǎn)討論mandatory標(biāo)志
        mandatory的作用:

        當(dāng)mandatory標(biāo)志位設(shè)置為true時(shí),如果exchange根據(jù)本身類型和消息routingKey沒法找到1個(gè)適合的queue存儲(chǔ)消息,那末broker會(huì)調(diào)用basic.return方法將消息返還給生產(chǎn)者;當(dāng)mandatory設(shè)置為false時(shí),出現(xiàn)上述情況broker會(huì)直接將消息拋棄;通俗的講,mandatory標(biāo)志告知broker代理服務(wù)器最少將消息route到1個(gè)隊(duì)列中,否則就將消息return給發(fā)送者;

       下面我們通過幾個(gè)實(shí)例測(cè)試下mandatory標(biāo)志的作用:
        測(cè)試1:設(shè)置mandatory標(biāo)志,且exchange未綁定隊(duì)列

public class ProducerTest {
	public static void main(String[] args) {
		String exchangeName = "confirmExchange";
		String queueName = "confirmQueue";
		String routingKey = "confirmRoutingKey";
		String bindingKey = "confirmBindingKey";
		int count = 3;
		
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("172.16.151.74");
		factory.setUsername("test");
		factory.setPassword("test");
		factory.setPort(5672);
		
		//創(chuàng)建生產(chǎn)者
		Sender producer = new Sender(factory, count, exchangeName, routingKey);
		producer.run();
	}
}

class Sender
{
	private ConnectionFactory factory;
	private int count;
	private String exchangeName;
	private String routingKey;
	
	public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) {
		this.factory = factory;
		this.count = count;
		this.exchangeName = exchangeName;
		this.routingKey = routingKey;
	}
	
	public void run() {
		try {
			Connection connection = factory.newConnection();
			Channel channel = connection.createChannel();
			//創(chuàng)建exchange
			channel.exchangeDeclare(exchangeName, "direct", true, false, null);
			//發(fā)送持久化消息
			for(int i = 0;i < count;i++)
			{
				//第1個(gè)參數(shù)是exchangeName(默許情況下代理服務(wù)器端是存在1個(gè)""名字的exchange的,因此如果不創(chuàng)建exchange的話我們可以直接將該參數(shù)設(shè)置成"",如果創(chuàng)建了exchange的話我們需要將該參數(shù)設(shè)置成創(chuàng)建的exchange的名字),第2個(gè)參數(shù)是路由鍵
				channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes());
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

       第45行我們將basicPublish的第3個(gè)參數(shù)mandatory設(shè)置成了true,表示開啟了mandatory標(biāo)志,但我們沒有為當(dāng)前exchange綁定任何隊(duì)列;

       通過wireshark抓包看到下面輸出: 

       可以看到最后履行了basic.return方法,將發(fā)布者發(fā)出的消息返還給了發(fā)布者,查看協(xié)議的Arguments參數(shù)部份可以看到,Reply-Text字段值為:NO_ROUTE,表示消息并沒有路由到適合的隊(duì)列中;

       那末我們?cè)撛鯓荧@得到?jīng)]有被正確路由到適合隊(duì)列的消息呢?這時(shí)候候可以通過為channel信道設(shè)置ReturnListener監(jiān)聽器來實(shí)現(xiàn),具體實(shí)現(xiàn)代碼見下:

public class ProducerTest {
	public static void main(String[] args) {
		String exchangeName = "confirmExchange";
		String queueName = "confirmQueue";
		String routingKey = "confirmRoutingKey";
		String bindingKey = "confirmBindingKey";
		int count = 3;
		
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("172.16.151.74");
		factory.setUsername("test");
		factory.setPassword("test");
		factory.setPort(5672);
		
		//創(chuàng)建生產(chǎn)者
		Sender producer = new Sender(factory, count, exchangeName, routingKey);
		producer.run();
	}
}

class Sender
{
	private ConnectionFactory factory;
	private int count;
	private String exchangeName;
	private String routingKey;
	
	public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) {
		this.factory = factory;
		this.count = count;
		this.exchangeName = exchangeName;
		this.routingKey = routingKey;
	}
	
	public void run() {
		try {
			Connection connection = factory.newConnection();
			Channel channel = connection.createChannel();
			//創(chuàng)建exchange
			channel.exchangeDeclare(exchangeName, "direct", true, false, null);
			//發(fā)送持久化消息
			for(int i = 0;i < count;i++)
			{
				//第1個(gè)參數(shù)是exchangeName(默許情況下代理服務(wù)器端是存在1個(gè)""名字的exchange的,
				//因此如果不創(chuàng)建exchange的話我們可以直接將該參數(shù)設(shè)置成"",如果創(chuàng)建了exchange的話
				//我們需要將該參數(shù)設(shè)置成創(chuàng)建的exchange的名字),第2個(gè)參數(shù)是路由鍵
				channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes());
			}
			channel.addReturnListener(new ReturnListener() {
				
				@Override
				public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
						throws IOException {
					//此處便是履行Basic.Return以后回調(diào)的地方
					String message = new String(arg5);
					System.out.println("Basic.Return返回的結(jié)果:  "+message);
				}
			});
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}
       在設(shè)置了ReturnListener監(jiān)聽器以后,broker(代理服務(wù)器)發(fā)出basic.return方法以后,就會(huì)回調(diào)第52行的handleReturn方法,在這個(gè)方法里面我們就能夠進(jìn)行消息的重新發(fā)布操作啦;

       測(cè)試2:設(shè)置mandatory標(biāo)志,且為exchange綁定隊(duì)列(路由鍵和綁定鍵1致)

public class ProducerTest {
	public static void main(String[] args) {
		String exchangeName = "confirmExchange";
		String queueName = "confirmQueue";
		String routingKey = "confirmRoutingKey";
		String bindingKey = "confirmRoutingKey";
		//String bindingKey = "confirmBindingKey";
		int count = 3;
		
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("172.16.151.74");
		factory.setUsername("test");
		factory.setPassword("test");
		factory.setPort(5672);
		
		//創(chuàng)建生產(chǎn)者
		Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);
		producer.run();
	}
}

class Sender
{
	private ConnectionFactory factory;
	private int count;
	private String exchangeName;
	private String 	queueName;
	private String routingKey;
	private String bindingKey;
	
	public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {
		this.factory = factory;
		this.count = count;
		this.exchangeName = exchangeName;
		this.queueName = queueName;
		this.routingKey = routingKey;
		this.bindingKey = bindingKey;
	}
	
	public void run() {
		try {
			Connection connection = factory.newConnection();
			Channel channel = connection.createChannel();
			//創(chuàng)建exchange
			channel.exchangeDeclare(exchangeName, "direct", true, false, null);
			//創(chuàng)建隊(duì)列
			channel.queueDeclare(queueName, true, false, false, null);
			//綁定exchange和queue
			channel.queueBind(queueName, exchangeName, bindingKey);
			//發(fā)送持久化消息
			for(int i = 0;i < count;i++)
			{
				//第1個(gè)參數(shù)是exchangeName(默許情況下代理服務(wù)器端是存在1個(gè)""名字的exchange的,
				//因此如果不創(chuàng)建exchange的話我們可以直接將該參數(shù)設(shè)置成"",如果創(chuàng)建了exchange的話
				//我們需要將該參數(shù)設(shè)置成創(chuàng)建的exchange的名字),第2個(gè)參數(shù)是路由鍵
				channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes());
			}
			channel.addReturnListener(new ReturnListener() {
				
				@Override
				public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
						throws IOException {
					//此處便是履行Basic.Return以后回調(diào)的地方
					String message = new String(arg5);
					System.out.println("Basic.Return返回的結(jié)果:  "+message);
				}
			});
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

        通過抓包發(fā)現(xiàn)其實(shí)不會(huì)有basic.return方法被調(diào)用,查看RabbitMQ管理界面發(fā)現(xiàn)消息已到達(dá)了隊(duì)列;



        測(cè)試3:設(shè)置mandatory標(biāo)志,且exchange綁定隊(duì)列(路由鍵和綁定鍵不1致)

        代碼就是把測(cè)試2中第6行注釋,第7行注釋打開,注意到此時(shí)的routingKey和bindingKey是不1致的,此時(shí)我們運(yùn)行程序,同時(shí)抓包得到下面截圖:


       注意1點(diǎn),我們發(fā)送了3條消息,那末相應(yīng)的應(yīng)當(dāng)履行3次basic.return,其中第1次和第2次basic.return顯示在1行上了,第3次是單唯一行,不要誤認(rèn)為只履行了兩次,從協(xié)議的具體返回內(nèi)容里我們一樣看到了Reply-Text字段值是NO_ROUTE,這類現(xiàn)象在測(cè)試1中已見過了;

       到此,我們明白了mandatory標(biāo)志的作用:在消息沒有被路由到適合隊(duì)列情況下會(huì)將消息返還給消息發(fā)布者,同時(shí)我們測(cè)試了哪些情況下消息不會(huì)到達(dá)適合的隊(duì)列,測(cè)試1演示的是創(chuàng)建了exchange但是沒有為他綁定隊(duì)列致使的消息未到達(dá)適合隊(duì)列,測(cè)試3演示的是創(chuàng)建了exchange同時(shí)創(chuàng)建了queue,但是在將二者綁定的時(shí)候,使用的bindingKey和消息發(fā)布者使用的rountingKey不1致致使的消息未到達(dá)適合隊(duì)列;

       參考資料:

       RabbitMQ(2) AMQP協(xié)議mandatory和immediate標(biāo)志位區(qū)分

       RabbitMQ之mandatory




生活不易,碼農(nóng)辛苦
如果您覺得本網(wǎng)站對(duì)您的學(xué)習(xí)有所幫助,可以手機(jī)掃描二維碼進(jìn)行捐贈(zèng)
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關(guān)閉
程序員人生
主站蜘蛛池模板: 疯狂做受xxxx国产 | 久久久久久久女女女又又 | 精品一区二区三区久久久 | av日韩 | 18久久| 黄a大片| 精品免费视频 | 激情毛片| 另类专区亚洲 | 黄色福利视频 | 国产日韩欧美视频 | 国产在线一二 | 黄色大片在线播放 | 国产不卡视频在线 | 三级网站免费播放 | 中文字幕一区二区在线播放 | 福利片网站 | 久久久精品一区二区三区 | 亚洲欧美中文日韩在线v日本 | 黄色大片免费看 | 国产精品美女久久 | 玖玖色在线 | 欧洲一二三区 | 91久久久久久久一区二区 | 欧美二区视频 | 高清国产一区 | 日韩精品在线一区 | 中文字幕av第一页 | 久久久久亚洲综合 | 色综合久久一区二区三区 | 99亚洲精品 | 国产精品久久久久永久免费观看 | 久久久精品免费观看 | 国产视频在线一区二区 | 麻豆一二三区 | 久久精品国产精品 | 亚洲 自拍 另类 欧美 丝袜 | 黄色小视频在线播放 | 亚洲一区二区三区在线 | 免费日韩一区二区 | 成人久久久精品国产乱码一区二区 |