日本搞逼视频_黄色一级片免费在线观看_色99久久_性明星video另类hd_欧美77_综合在线视频

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > 互聯網 > 利用Hadoop Streaming處理二進制格式文件

利用Hadoop Streaming處理二進制格式文件

來源:程序員人生   發布時間:2014-09-10 17:18:42 閱讀次數:2242次

編者按:Streaming是Hadoop的一個工具,用來創建和運行一類特殊的Map/Reduce作業。Streaming使用“標準輸入”和“標準輸出”與我們編寫的Map和Reduce進行數據的交換。由此可知,任何能夠使用“標準輸入”和“標準輸出”的編程語言都可以用來編寫MapReduce程序。今天給大家分享一篇來自董西成的博文“利用Hadoop Streaming處理二進制格式文件”,文中介紹了如何使用Streaming處理二進制格式的文件。


CSDN推薦:歡迎免費訂閱《Hadoop與大數據周刊》獲取更多Hadoop技術文獻、大數據技術分析、企業實戰經驗,生態圈發展趨勢。


Hadoop Streaming是Hadoop提供的多語言編程工具,用戶可以使用自己擅長的編程語言(比如python、php或C#等)編寫Mapper和Reducer處理文本數據。Hadoop Streaming自帶了一些配置參數可友好地支持多字段文本數據的處理,參與Hadoop Streaming介紹和編程,可參考我的這篇文章:“Hadoop Streaming編程實例”。然而,隨著Hadoop應用越來越廣泛,用戶希望Hadoop Streaming不局限在處理文本數據上,而是具備更加強大的功能,包括能夠處理二進制數據;能夠支持多語言編寫Combiner等組件。隨著Hadoop 2.x的發布,這些功能已經基本上得到了完整的實現,本文將介紹如何使用Hadoop Streaming處理二進制格式的文件,包括SequenceFile,HFile等。

注:本文用到的程序實例可在百度云:hadoop-streaming-binary-examples 下載。

在詳細介紹操作步驟之前,先介紹本文給出的實例。假設有這樣的SequenceFile,它保存了手機通訊錄信息,其中,key是好友名,value是描述該好友的一個結構體或者對象,為此,本文使用了google開源的protocol buffer這一序列化/反序列化框架,protocol buffer結構體定義如下:

option java_package = "";
option java_outer_classname="PersonInfo";
message Person {
  optional string name = 1;
  optional int32 age = 2;
  optional int64 phone = 3;
  optional string address = 4;
}

SequenceFile文件中的value便是保存的Person對象序列化后的字符串,這是典型的二進制數據,不能像文本數據那樣可通過換行符解析出每條記錄,因為二進制數據的每條記錄中可能包含任意字符,包括換行符。

一旦有了這樣的SequenceFile之后,我們將使用Hadoop Streaming編寫這樣的MapReduce程序:這個MapReduce程序只有Map Task,任務是解析出文件中的每條好友記錄,并以name age,phone,address的文本格式保存到HDFS上。

1. 準備數據

首先,我們需要準備上面介紹的SequenceFile數據,生成數據的核心代碼如下:

final SequenceFile.Writer out =
        SequenceFile.createWriter(fs, getConf(), new Path(args[0]),
                Text.class, BytesWritable.class);
Text nameWrapper = new Text();
BytesWritable personWrapper = new BytesWritable();
System.out.println("Generating " + num + " Records......");
for(int i = 0; i < num; i++) {
  genOnePerson(nameWrapper, personWrapper);
  System.out.println("Generating " + i + " Records," + nameWrapper.toString() + "......");
  out.append(nameWrapper, personWrapper);
}
out.close();

當然,為了驗證我們產生的數據是否正確,需要編寫一個解析程序,核心代碼如下:

Reader reader = new Reader(fs, new Path(args[0]), getConf());
Text key = new Text();
BytesWritable value = new BytesWritable();
while(reader.next(key, value)) {
  System.out.println("key:" + key.toString());
  value.setCapacity(value.getSize()); // Very important!!! Very Tricky!!!
  PersonInfo.Person person = PersonInfo.Person.parseFrom(value.getBytes());
  System.out.println("age:" + person.getAge()
          + ",address:" + person.getAddress()
          +",phone:" + person.getPhone());
}
reader.close();

需要注意的,Value保存類型為BytesWritable,使用這個類型非常容易犯錯誤。當你把一堆byte[]數據保存到BytesWritable后,通過BytesWritable.getBytes()再讀到的數據并不一定是原數據,可能變長了很多,這是因為BytesWritable采用了自動內存增長算法,你保存的數據長度為size時,它可能將數據保存到了長度為capacity(capacity>size)的buffer中,這時候,你通過BytesWritable.getBytes()得到的數據最后一些字符是多余的,如果里面保存的是protocol buffer序列化后的字符串,則無法反序列化,這時候可以使用BytesWritable.setCapacity (value.getSize())將后面多余空間剔除掉。

2. 使用Hadoop Streaming編寫C++程序

為了說明Hadoop Streaming如何處理二進制格式數據,本文僅僅以C++語言為例進行說明,其他語言的設計方法類似。

先簡單說一下原理。當輸入數據是二進制格式時,Hadoop Streaming會對輸入key和value進行編碼后,通過標準輸入傳遞給你的Hadoop Streaming程序,目前提供了兩種編碼格式,分別是rawtypes和  typedbytes,你可以設計你想采用的格式,這兩種編碼規則如下(具體在文章“Hadoop Streaming高級編程”中已經介紹了):

rawbytes:key和value均用【4個字節的長度+原始字節】表示

typedbytes:key和value均用【1字節類型+4字節長度+原始字節】表示

本文將采用第一種編碼格式進行說明。采用這種編碼意味著你不能想文本數據那樣一次獲得一行內容,而是依次獲得key和value序列,其中key和value都由兩部分組成,第一部分是長度(4個字節),第二部分是字節內容,比如你的key是dongxicheng,value是goodman,則傳遞給hadoop streaming程序的輸入數據格式為11 dongxicheng 7 goodman。為此,我們編寫下面的Mapper程序解析這種數據:

int main() {
 string key, value;
 while(!cin.eof()) {
  if(!FileUtil::ReadString(key, cin))
   break;
  FileUtil::ReadString(value, cin);
  Person person;
  ProtoUtil::ParseFromString(value, person);
  cout << person.name() << " " << person.age()
       << "," << person.address()
       << "," << person.phone() << endl;
 }
 return 0;
}

其中,輔助函數實現如下:

class ProtoUtil {
 public:
  static bool ParseFromString(const string& str, Person &person) {
   if(person.ParseFromString(str))
    return true;
   return false;
  }
};
class FileUtil {
 public:
  static bool ReadInt(unsigned int *len, istream &stream) {
   if(!stream.read((char *)len, sizeof(unsigned int)))
    return false;
   *len = bswap_32(*len);
   return true;
  }
  static bool ReadString(string &str, istream &stream) {
   unsigned int len;
   if(!ReadInt(&len, stream))
    return false;
   str.resize(len);
   if(!ReadBytes(&str[0], len, stream))
    return false;
   return true;
  }
  static bool ReadBytes(char *ptr, unsigned int len, istream &stream) {
   stream.read(ptr, sizeof(unsigned char) * len);
   if(stream.eof()) return false;
   return true;
  }
};

該程序需要注意以下幾點:

(1)注意大小端編碼規則,解析key和value長度時,需要對長度進行字節翻轉。

(2)注意循環結束條件,僅僅靠!cin.eof()判定是不夠的,僅靠這個判定會導致多輸出一條重復數據。

(3)本程序只能運行在linux系統下,windows操作系統下將無法運行,因為windows下的標準輸入cin并直接支持二進制數據讀取,需要將其強制以二進制模式重新打開后再使用。

3. 程序測試與運行

程序寫好后,第一步是編譯C++程序。由于該程序需要運行在多節點的Hadoop集群上,為了避免部署或者分發動態庫帶來的麻煩,我們直接采用靜態編譯方式,這也是編寫Hadoop C++程序的基本規則。為了靜態編譯以上MapReduce程序,安裝protocol buffers時,需采用以下流程(強調第一步),

./configure
生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 99草在线| 国产精品成人一区 | 一级毛片在线观看网站 | 久久久久国产精品免费免费搜索 | 国产日韩欧美91 | 亚洲欧美综合一区 | 久久久91精品国产一区二区三区 | 日韩精品久久久久久久电影99爱 | 毛片久久久 | 欧美日韩免费视频 | 色综合久久久久 | 欧美电影一区二区三区 | 久久精品亚洲 | 亚洲国产精品成人 | 久热综合| 日韩欧美一二三 | 毛片在线免费观看网站 | 国产精品成人一区二区 | 99精品一区二区 | 欧美一区二区三区公司 | 国产伦精品一区二区三区免费迷 | 日本在线观看一区 | 日韩三区 | 精品一区二区三区久久 | 免费毛片大全 | 999精品视频 | 国产精品一区二区久久久 | 天天综合国产 | 久久久久久国产一区二区三区 | 黄色成年人网站在线观看 | 97久久人人超碰caoprom欧美 | 欧美激情精品久久久久久变态 | 国产在线看 | 日韩一区不卡 | 久久国产在线观看 | 亚洲第一网站 | 久久久www成人免费精品 | 色婷婷成人做爰视频免费 | 69视频免费 | 91久久精品人人做人人爽综合 | 九九av|