【編者按】毋庸置疑,Hadoop已成為當下最流行的大數據處理平臺,讓機構可以用更低廉的價格對海量數據進行深度挖掘,同時,在YARN出現后,其生態圈也愈加繁榮;然而,Hadoop原生數據庫HBase卻因眾多問題飽受詬病,比如部署難、以Java為中心等工程問題,以及故障轉移、面向主從設計的架構問題,這直接導致了HBase人氣甚至不如同為列存儲類型的Cassandra。幸運的是,在我們之前有很多先行者對Hadoop進入了深入的研究,本次即為大家帶來@無塵道長 的心得分享――HBase寫數據過程。
CSDN推薦:歡迎免費訂閱《Hadoop與大數據周刊》獲取更多Hadoop技術文獻、大數據技術分析、企業實戰經驗,生態圈發展趨勢。
以下為原文
博文說明:1、研究版本HBase 0.94.12;2、貼出的源代碼可能會有刪減,只保留關鍵的代碼。
從client和server兩個方面探討HBase的寫數據過程。
一、client端
1、寫數據API
寫數據主要是HTable的單條寫和批量寫兩個API,源碼如下:
//單條寫API public void put(final Put put) throws IOException { doPut(put); if (autoFlush) { flushCommits(); } } //批量寫API public void put(final List<Put> puts) throws IOException { for (Put put : puts) { doPut(put); } if (autoFlush) { flushCommits(); } } //具體的put實現 private void doPut(Put put) throws IOException{ validatePut(put); writeBuffer.add(put); currentWriteBufferSize += put.heapSize(); if (currentWriteBufferSize > writeBufferSize) { flushCommits(); } } public void close() throws IOException { if (this.closed) { return; } flushCommits(); …. } |
通過兩個put API可以看出如果autoFlush為false,則無論是否是批量寫效果均是相同,均是等待寫入的數據超過配置的writeBufferSize(通過hbase.client.write.buffer配置,默認為2M)時才提交寫數據請求,如果最后的寫入數據沒有超過2M,則在調用close方法時會進行最后的提交,當然,如果使用批量的put方法時,自己控制flushCommits則效果不同,比如每隔1000條進行一次提交,如果1000條數據的總大小超過了2M,則實際上會發生多次提交,導致最終的提交次數多過只由writeBufferSize控制的提交次數,因此在實際的項目中,如果對寫性能的要求比對數據的實時可查詢和不可丟失的要求更高則可以設置autoFlush為false并采用單條寫的put(final Put put)API,這樣即可以簡化寫操作數據的程序代碼,寫入效率也更優,需要注意的是如果對數據的實時可查詢和不可丟失有較高的要求則應該設置autoFlush為true并采用單條寫的API,這樣可以確保寫一條即提交一條。
2、關于多線程寫
在0.94.12這個版本中,對于寫操作,HBase內部就是多線程,線程數量與批量提交的數據涉及的region個數相同,通常情況下不需要再自己寫多線程代碼,自己寫的多線程代碼主要是解決數據到HTable的put這個過程中的性能問題,數據進入put的緩存,當達到writeBufferSize設定的大小后才會真正發起寫操作(如果不是自己控制flush),這個過程的線程數與這批數據涉及的region個數相同,會并行寫入所有相關region,一般不會出現性能問題,當涉及的region個數過多時會導致創建過多的線程,消耗大量的內存,甚至會出現線程把內存耗盡而導致OutOfMemory的情況,比較理想的寫入場景是調大writeBufferSize,并且一次寫入適量的不同regionserver的region,這樣可以充分把寫壓力分攤到多個服務器。
HBase寫數據的客戶端核心方法是HConnectionManager的processBatchCallback方法,相關源碼如下:
public void flushCommits() throws IOException { try { Object[] results = new Object[writeBuffer.size()]; try { this.connection.processBatch(writeBuffer, tableName, pool, results); } catch (InterruptedException e) { throw new IOException(e); } finally { … } finally { … } } public void processBatch(List<? extends Row> list, final byte[] tableName, ExecutorService pool, Object[] results) throws IOException, InterruptedException { … processBatchCallback(list, tableName, pool, results, null); } public <R> void processBatchCallback(List<? extends Row> list, byte[] tableName, ExecutorService pool, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException { …. HRegionLocation [] lastServers = new HRegionLocation[results.length]; for (int tries = 0; tries < numRetries && retry; ++tries) { … // step 1: break up into regionserver-sized chunks and build the data structs Map<HRegionLocation, MultiAction<R>> actionsByServer = new HashMap<HRegionLocation, MultiAction<R>>(); for (int i = 0; i < workingList.size(); i++) { Row row = workingList.get(i); if (row != null) { HRegionLocation loc = locateRegion(tableName, row.getRow()); byte[] regionName = loc.getRegionInfo().getRegionName(); MultiAction<R> actions = actionsByServer.get(loc); if (actions == null) { actions = new MultiAction<R>(); actionsByServer.put(loc, actions); //每一個region對應一個MultiAction對象,每個MultiAction對象持有該region所有的put Action } Action<R> action = new Action<R>(row, i); lastServers[i] = loc; actions.add(regionName, action); } } // step 2: make the requests,每個region 開啟一個線程 Map<HRegionLocation, Future<MultiResponse>> futures = new HashMap<HRegionLocation, Future<MultiResponse>>(actionsByServer.size()); for (Entry<HRegionLocation, MultiAction<R>> e: actionsByServer.entrySet()) { futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName))); } // step 3: collect the failures and successes and prepare for retry … // step 4: identify failures and prep for a retry (if applicable). … } … } |
3、在寫入數據前,需要先定位具體的數據應該寫入的region,核心方法:
//從緩存中定位region,通過NavigableMap實現,如果沒有緩存則需查詢.META.表 HRegionLocation getCachedLocation(final byte [] tableName, final byte [] row) { SoftValueSortedMap<byte [], HRegionLocation> tableLocations = getTableLocations(tableName); … //找到小于rowKey并且最接近rowKey的startKey對應的region,通過NavigableMap實現 possibleRegion = tableLocations.lowerValueByKey(row); if (possibleRegion == null) { return null; } //表的最末一個region的endKey是空字符串,如果不是最末一個region,則只有當rowKey小于endKey才返回region。 byte[] endKey = possibleRegion.getRegionInfo().getEndKey(); if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) || KeyValue.getRowComparator(tableName).compareRows( endKey, 0, endKey.length, row, 0, row.length) > 0) { return possibleRegion; } return null; } |
二、服務端
服務端寫數據的主要過程是:寫WAL日志(如果沒有關閉寫WAL日志)-》寫memstore-》觸發flush memstore(如果memstore大小超過hbase.hregion.memstore.flush.size的設置值),在flush memstore過程中可能會觸發compact和split操作,在以下內容會對寫put方法、flush memstore、compact和split進行講解。
1、HTableInterface接口操作HBase數據的API對應的服務端是由HRegionServer類實現,源代碼如下:
//單條put public void put(final byte[] regionName, final Put put) throws IOException { HRegion region = getRegion(regionName); if (!region.getRegionInfo().isMetaTable()) { //檢查HRegionServer的memstore總內存占用量是否已經超過了hbase.regionserver.global.memstore.upperLimit(默認值是0.4)或者hbase.regionserver.global.memstore.lowerLimit(默認值是0.35)的限制,如果超過了則會在flush隊列中添加一個任務,其中如果是超過了upper的限制則會阻塞所有的寫memstore的操作,直到內存降至lower限制以下。 this.cacheFlusher.reclaimMemStoreMemory(); } boolean writeToWAL = put.getWriteToWAL(); //region會調用Store的add()方法把數據保存到相關Store的memstore中 //region在保存完數據后,會檢查是否需要flush memstore,如果需要則發出flush請求,由HRegionServer的flush守護線程異步執行。 region.put(put, getLockFromId(put.getLockId()), writeToWAL); } //批量put public int put(final byte[] regionName, final List<Put> puts) throws IOException { region = getRegion(regionName); if (!region.getRegionInfo().isMetaTable()) { this.cacheFlusher.reclaimMemStoreMemory(); } OperationStatus codes[] = region.batchMutate(putsWithLocks); for (i = 0; i < codes.length; i++) { if (codes[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { return i; } } return -1; } |
2、Flush Memstore
memstore的flush過程由類MemStoreFlusher控制,該類是Runnable的實現類,在HRegionServer啟動時會啟動一個MemStoreFlusher的守護線程,每隔10s從flushQueue中獲取flush任務進行刷新,如果需要flush memstore時,只需調用MemStoreFlusher的requestFlush或者requestDelayedFlush方法把flush請求加入到flush隊列中即可,具體的flush是異步執行的。
memstore的大小有兩個控制級別:
1)Region級
a、hbase.hregion.memstore.flush.size:默認值128M,超過將被flush到磁盤
b、hbase.hregion.memstore.block.multiplier:默認值2,如果memstore的內存大小已經超過了hbase.hregion.memstore.flush.size的2倍,則會阻塞該region的寫操作,直到內存大小降至該值以下
2)RegionServer級
a、hbase.regionserver.global.memstore.lowerLimit:默認值0.35,HRegionServer的所有memstore占用內存在HRegionServer總內存中占的lower比例,當達到該值,則會觸發整個RegionServer的flush(并不會真正flush所有的region,關于該點請參看后續內容),直到總內存比例降至該數限制以下
b、hbase.regionserver.global.memstore.upperLimit:默認值0.4,HRegionServer的所有memstore占用內存在總內存中的upper比例,當達到該值,則會觸發整個RegionServer的flush,直到總內存比例降至該數限制以下,并且在降至限制比例以下前將阻塞所有的寫memstore的操作
在對整個HRegionServer進行flush操作時,并不會刷新所有的region,而是每次均會根據region的memstore大小、storeFile數量等因素找出最需要flush的region進行flush,flush完成后再進行內存總比例的判斷,如果還未降至lower限制以下則會再尋找新的region進行flush。
在flush region時會flush該region下所有的store,雖然可能某些store的memstore內容很少。
在flush memstore時會產生updatesLock(HRegion類的一個屬性,采用jdk的ReentrantReadWriteLock實現)的排它鎖write lock,當獲取完memstore的快照后釋放updatesLock的write lock,在釋放之前,所有的需要獲取updatesLock的write、read lock的操作均會被阻塞,該影響是整個HRegion范圍,因此如果表的HRegion數量過少,或者數據寫入時熱點在一個region時會導致該region不斷flush memstore,由于該過程會產生write排他鎖(雖然進行memstore快照的時間會很快),因此會影響region 的整體寫能力。
3、Compact操作
HBase有兩種compact:minor和major,minor通常會把若干個小的storeFile合并成一個大的storeFile,minor不會刪除標示為刪除的數據和過期的數據,major則會刪除這些數據,major合并之后,一個store只有一個storeFile文件,這個過程對store的所有數據進行重寫,有較大的資源開銷,major 合并默認1天執行一次,可以通過hbase.hregion.majorcompaction配置執行周期,通常是把該值設置為0進行關閉,采用手工執行,這樣可以避免當集群繁忙時執行整個集群的major合并,major合并是必須執行的操作,因為刪除標示為刪除和過期的數據操作是在該合并過程中進行的。通過merge可以對表的兩個region進行合并,以減少region的數量,執行命令:
$ bin/hbase org.apache.hadoop.hbase.util.Merge <tablename> <region1> <region2>
參數<region1>需要寫region的名稱,比如:
gd500M,4-605-52-78641,1384227418983.ccf74696ef8a241088356039a65e1aca
執行該操作時需要先停止運行HBase集群,并且如果hdfs不是與HBase擁有相同的用戶組和用戶且hdfs配置為需要進行權限控制(由配置項dfs.permissions控制,默認為true)時需要切換linux用戶到hdfs用戶下執行該操作,執行完成后,需要通過hadoop dfs