storm DRPC例子
來源:程序員人生 發(fā)布時間:2015-02-26 21:08:23 閱讀次數(shù):3346次
1,DRPC原理
客戶端給DRPC服務(wù)器發(fā)送要履行的方法的名字,和這個方法的參數(shù)。實(shí)現(xiàn)了這個函數(shù)的topology使用DRPCSpout從DRPC服務(wù)器接收函 數(shù)調(diào)用流。每一個函數(shù)調(diào)用被DRPC服務(wù)器標(biāo)記了1個唯1的id。 這個topology然后計(jì)算結(jié)果,在topology的最后1個叫做ReturnResults的bolt會連接到DRPC服務(wù)器,并且把這個調(diào)用的結(jié)
果發(fā)送給DRPC服務(wù)器(通過那個唯1的id標(biāo)識)。DRPC服務(wù)器用那個唯1id來跟等待的客戶端匹配上,喚醒這個客戶端并且把結(jié)果發(fā)送給它。
2,例子
package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.drpc.DRPCSpout;
import backtype.storm.drpc.ReturnResults;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class ManualDRPC {
public static class ExclamationBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("result", "return-info"));
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String arg = tuple.getString(0);
Object retInfo = tuple.getValue(1);
collector.emit(new Values(arg + "!!!", retInfo));
}
}
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
LocalDRPC drpc = new LocalDRPC();
DRPCSpout spout = new DRPCSpout("exclamation", drpc);
builder.setSpout("drpc", spout);
builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
LocalCluster cluster = new LocalCluster();
Config conf = new Config();
cluster.submitTopology("exclaim", conf, builder.createTopology());
System.out.println(drpc.execute("exclamation", "aaa"));
System.out.println(drpc.execute("exclamation", "bbb"));
}
}
備注:DRPCSpout的名字與drpc.execute指定運(yùn)行的名字1致
生活不易,碼農(nóng)辛苦
如果您覺得本網(wǎng)站對您的學(xué)習(xí)有所幫助,可以手機(jī)掃描二維碼進(jìn)行捐贈