HBase中加盐(Salting)之后的表如何读取:协处理器篇在《HBaseRowkey设计指南》文章中,我们介绍了避免数据热点的三种比较常见方法:加盐-Salting哈希-Hashing反转-Reversing其中在加盐(Salting)的方法里面是这么描述的:给Rowkey分配一个随机前缀以使得它和之前排序不同.
但是在Rowkey前面加了随机前缀,那么我们怎么将这些数据读出来呢我将分三篇文章来介绍如何读取加盐之后的表,其中每篇文章提供一种方法,主要包括:使用协处理器读取加盐的表使用Spark读取加盐的表使用MapReduce读取加盐的表关于协处理器的入门及实战,请参见这里.
本文使用的各组件版本:hadoop-2.
7.
7,hbase-2.
0.
4,jdk1.
8.
0_201.
测试数据生成在介绍如何查询数据之前,我们先创建一张名为iteblog的HBase表,用于测试.
为了数据均匀和介绍的方便,这里使用了预分区,并设置了27个分区,如下:hbase(main):002:0>create'iteblog','f',SPLITS=>['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z']0row(s)in2.
4880seconds然后我们使用下面方法生成了1000000条测试数据.
RowKey的形式为UID+当前数据生成时间戳;由于UID的长度为4,所以1000000条数据会存在大量的UID一样的数据,所以我们使用加盐方法将这些数据均匀分散到上述27个Region里面(注意,其实第一个Region其实没数据).
具体代码如下:packagecom.
iteblog.
data;importorg.
apache.
hadoop.
conf.
Configuration;importorg.
apache.
hadoop.
hbase.
HBaseConfiguration;importorg.
apache.
hadoop.
hbase.
HConstants;importorg.
apache.
hadoop.
hbase.
TableName;1/12importorg.
apache.
hadoop.
hbase.
client.
*;importorg.
apache.
hadoop.
hbase.
util.
Bytes;importjava.
io.
IOException;importjava.
util.
ArrayList;importjava.
util.
List;importjava.
util.
Random;importjava.
util.
UUID;publicclassHBaseDataGenerator{privatestaticbyte[]FAMILY="f".
getBytes();privatestaticbyte[]QUALIFIER_UUID="uuid".
getBytes();privatestaticbyte[]QUALIFIER_AGE="age".
getBytes();privatestaticchargenerateLetter(){return(char)(Math.
random()*26+'A');}privatestaticlonggenerateUid(intn){return(long)(Math.
random()*9*Math.
pow(10,n-1))+(long)Math.
pow(10,n-1);}publicstaticvoidmain(String[]args)throwsIOException{BufferedMutatorParamsbmp=newBufferedMutatorParams(TableName.
valueOf("iteblog"));bmp.
writeBufferSize(1024*1024*24);Configurationconf=HBaseConfiguration.
create();conf.
set(HConstants.
ZOOKEEPER_QUORUM,"https://www.
iteblog.
com:2181");Connectionconnection=ConnectionFactory.
createConnection(conf);BufferedMutatorbufferedMutator=connection.
getBufferedMutator(bmp);intBATCH_SIZE=1000;intCOUNTS=1000000;intcount=0;ListputList=newArrayList();for(inti=0;i0){bufferedMutator.
mutate(putList);bufferedMutator.
flush();putList.
clear();}}}运行完上面代码之后,会生成1000000条数据(注意,这里其实不严谨,因为Rowkey设计问题,可能会导致重复的Rowkey生成,所以实际情况下可能没有1000000条数据.
).
我们limit10条数据看下长成什么样:hbase(main):001:0>scan'iteblog',{'LIMIT'=>10}ROWCOLUMN+CELLA-1000-1550572395399column=f:age,timestamp=1549091990253,value=54A-1000-1550572395399column=f:uuid,timestamp=1549091990253,value=e9b10a9f-1218-43fd-bd01A-1000-1550572413799column=f:age,timestamp=1549092008575,value=4A-1000-1550572413799column=f:uuid,timestamp=1549092008575,value=181aa91e-5f1d-454c-959cA-1000-1550572414761column=f:age,timestamp=1549092009531,value=33A-1000-1550572414761column=f:uuid,timestamp=1549092009531,value=19aad8d3-621a-473c-8f9fA-1001-1550572394570column=f:age,timestamp=1549091989341,value=64A-1001-1550572394570column=f:uuid,timestamp=1549091989341,value=c6712a0d-3793-46d5-865bA-1001-1550572405337column=f:age,timestamp=1549092000108,value=96A-1001-1550572405337column=f:uuid,timestamp=1549092000108,value=4bf05d10-bb4d-43e3-99573/12A-1001-1550572419688column=f:age,timestamp=1549092014458,value=8A-1001-1550572419688column=f:uuid,timestamp=1549092014458,value=f04ba835-d8ac-49a3-8f96A-1002-1550572424041column=f:age,timestamp=1549092018816,value=84A-1002-1550572424041column=f:uuid,timestamp=1549092018816,value=99d6c989-afb5-4101-9d95A-1003-1550572431830column=f:age,timestamp=1549092026605,value=21A-1003-1550572431830column=f:uuid,timestamp=1549092026605,value=8c1ff1b6-b97c-4059-9b68A-1004-1550572395399column=f:age,timestamp=1549091990253,value=2A-1004-1550572395399column=f:uuid,timestamp=1549091990253,value=e240aa0f-c044-452f-89c0A-1004-1550572403783column=f:age,timestamp=1549091998555,value=6A-1004-1550572403783column=f:uuid,timestamp=1549091998555,value=e8df15c9-02fa-458e-bd0c10row(s)Took0.
1104seconds使用协处理器查询加盐之后的表现在有数据了,我们需要查询所有UID=1000的用户所有历史数据,那么如何查呢我们知道UID=1000的用户数据是均匀放到上述的27个Region里面的,因为经过加盐了,所以这些数据前缀都是类似于A-,B-,C-等开头的.
其次我们需要知道,每个Region其实是有StartKey和EndKey的,这些StartKey和EndKey其实就是我们创建iteblog表指定的.
如果你看了《HBase协处理器入门及实战》这篇文章,你就知道协处理器的代码其实是在每个Region里面执行的;而这些代码在Region里面执行的时候是可以拿到当前Region的信息,包括了StartKey和EndKey,所以其实我们可以将拿到的StartKey信息和查询的UID进行拼接,这样就可以查询我们要的数据.
协处理器处理篇就是基于这样的思想来查询加盐之后的数据的.
定义proto文件为什么需要定义这个请参见《HBase协处理器入门及实战》这篇文章.
因为我们查询的时候需要传入查询的参数,比如tableName、StartKey、EndKey以及是否加盐等标记;同时当查询到结果的时候,我们还需要将数据返回,所以我们定义的proto文件如下:optionjava_package="com.
iteblog.
data.
coprocessor.
generated";optionjava_outer_classname="DataQueryProtos";optionjava_generic_services=true;optionjava_generate_equals_and_hash=true;optionoptimize_for=SPEED;4/12messageDataQueryRequest{optionalstringtableName=1;optionalstringstartRow=2;optionalstringendRow=3;optionalboolincluedEnd=4;optionalboolisSalting=5;}messageDataQueryResponse{messageCell{requiredbytesvalue=1;requiredbytesfamily=2;requiredbytesqualifier=3;requiredbytesrow=4;requiredint64timestamp=5;}messageRow{optionalbytesrowKey=1;repeatedCellcellList=2;}repeatedRowrowList=1;}serviceQueryDataService{rpcqueryByStartRowAndEndRow(DataQueryRequest)returns(DataQueryResponse);}然后我们使用protobuf-maven-plugin插件将上面的proto生成java类,具体如何操作参见《在IDEA中使用Maven编译proto文件》.
我们将生成的DataQueryProtos.
java类拷贝到com.
iteblog.
data.
coprocessor.
generated包里面.
编写协处理器代码有了请求和返回的类,现在我们需要编写协处理器的处理代码了,结合上面的分析,协处理器的代码实现如下:packagecom.
iteblog.
data.
coprocessor;importcom.
google.
protobuf.
ByteString;5/12importcom.
google.
protobuf.
RpcCallback;importcom.
google.
protobuf.
RpcController;importcom.
google.
protobuf.
Service;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
QueryDataService;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryRequest;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryResponse;importorg.
apache.
hadoop.
hbase.
Cell;importorg.
apache.
hadoop.
hbase.
CoprocessorEnvironment;importorg.
apache.
hadoop.
hbase.
client.
Get;importorg.
apache.
hadoop.
hbase.
client.
Result;importorg.
apache.
hadoop.
hbase.
client.
Scan;importorg.
apache.
hadoop.
hbase.
coprocessor.
CoprocessorException;importorg.
apache.
hadoop.
hbase.
coprocessor.
RegionCoprocessor;importorg.
apache.
hadoop.
hbase.
coprocessor.
RegionCoprocessorEnvironment;importorg.
apache.
hadoop.
hbase.
regionserver.
InternalScanner;importorg.
apache.
hadoop.
hbase.
shaded.
protobuf.
ResponseConverter;importorg.
apache.
hadoop.
hbase.
util.
Bytes;importjava.
io.
IOException;importjava.
util.
ArrayList;importjava.
util.
Collections;importjava.
util.
List;publicclassSlatTableDataSearchextendsQueryDataServiceimplementsRegionCoprocessor{privateRegionCoprocessorEnvironmentenv;publicIterablegetServices(){returnCollections.
singleton(this);}@OverridepublicvoidqueryByStartRowAndEndRow(RpcControllercontroller,DataQueryRequestrequest,RpcCallbackdone){DataQueryResponseresponse=null;StringstartRow=request.
getStartRow();StringendRow=request.
getEndRow();StringregionStartKey=Bytes.
toString(this.
env.
getRegion().
getRegionInfo().
getStartKey());if(request.
getIsSalting()){StringstartSalt=null;if(null!
=regionStartKey&®ionStartKey.
length()!
=0){startSalt=regionStartKey;}if(null!
=startSalt&&null!
=startRow){6/12startRow=startSalt+"-"+startRow;endRow=startSalt+"-"+endRow;}}Scanscan=newScan();if(null!
=startRow){scan.
withStartRow(Bytes.
toBytes(startRow));}if(null!
=endRow){scan.
withStopRow(Bytes.
toBytes(endRow),request.
getIncluedEnd());}try(InternalScannerscanner=this.
env.
getRegion().
getScanner(scan)){Listresults=newArrayList();booleanhasMore;DataQueryResponse.
BuilderresponseBuilder=DataQueryResponse.
newBuilder();do{hasMore=scanner.
next(results);DataQueryResponse.
Row.
BuilderrowBuilder=DataQueryResponse.
Row.
newBuilder();if(results.
size()>0){Cellcell=results.
get(0);rowBuilder.
setRowKey(ByteString.
copyFrom(cell.
getRowArray(),cell.
getRowOffset(),cell.
getRowLength()));for(Cellkv:results){buildCell(rowBuilder,kv);}}responseBuilder.
addRowList(rowBuilder);results.
clear();}while(hasMore);response=responseBuilder.
build();}catch(IOExceptione){ResponseConverter.
setControllerException(controller,e);}done.
run(response);}privatevoidbuildCell(DataQueryResponse.
Row.
BuilderrowBuilder,Cellkv){DataQueryResponse.
Cell.
BuildercellBuilder=DataQueryResponse.
Cell.
newBuilder();7/12cellBuilder.
setFamily(ByteString.
copyFrom(kv.
getFamilyArray(),kv.
getFamilyOffset(),kv.
getFamilyLength()));cellBuilder.
setQualifier(ByteString.
copyFrom(kv.
getQualifierArray(),kv.
getQualifierOffset(),kv.
getQualifierLength()));cellBuilder.
setRow(ByteString.
copyFrom(kv.
getRowArray(),kv.
getRowOffset(),kv.
getRowLength()));cellBuilder.
setValue(ByteString.
copyFrom(kv.
getValueArray(),kv.
getValueOffset(),kv.
getValueLength()));cellBuilder.
setTimestamp(kv.
getTimestamp());rowBuilder.
addCellList(cellBuilder);}/***Storesareferencetothecoprocessorenvironmentprovidedbythe*{@linkorg.
apache.
hadoop.
hbase.
regionserver.
RegionCoprocessorHost}fromtheregionwherethis*coprocessorisloaded.
Sincethisisacoprocessorendpoint,italwaysexpectstobeloaded*onatableregion,soalwaysexpectsthistobeaninstanceof*{@linkRegionCoprocessorEnvironment}.
**@paramenvtheenvironmentprovidedbythecoprocessorhost*@throwsIOExceptioniftheprovidedenvironmentisnotaninstanceof*{@codeRegionCoprocessorEnvironment}*/@Overridepublicvoidstart(CoprocessorEnvironmentenv)throwsIOException{if(envinstanceofRegionCoprocessorEnvironment){this.
env=(RegionCoprocessorEnvironment)env;}else{thrownewCoprocessorException("Mustbeloadedonatableregion!
");}}@Overridepublicvoidstop(CoprocessorEnvironmentenv){//nothingtodo}}大家可以看到,这里面的代码框架和《HBase协处理器入门及实战》里面介绍的HBase提供的RowCountEndpoint示例代码很类似.
主要逻辑在queryByStartRowAndEndRow函数实现里面.
我们通过DataQueryRequest拿到客户端查询的表,StartKey和EndKey8/12等数据.
通过this.
env.
getRegion().
getRegionInfo().
getStartKey()可以拿到当前Region的StartKey,然后再和客户端传进来的StartKey和EndKey进行拼接就可以拿到完整的Rowkey前缀.
剩下的查询就是正常的HBaseScan代码了.
现在我们将SlatTableDataSearch类进行编译打包,并部署到HBase表里面去,具体如何部署参见《HBase协处理器入门及实战》协处理器客户端代码编写到这里,我们的协处理器服务器端的代码和部署已经完成了,现在我们需要编写协处理器客户端代码.
其实也很简单,如下:packagecom.
iteblog.
data;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
QueryDataService;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryRequest;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryResponse;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryResponse.
*;importorg.
apache.
hadoop.
conf.
Configuration;importorg.
apache.
hadoop.
hbase.
HBaseConfiguration;importorg.
apache.
hadoop.
hbase.
TableName;importorg.
apache.
hadoop.
hbase.
client.
Connection;importorg.
apache.
hadoop.
hbase.
client.
ConnectionFactory;importorg.
apache.
hadoop.
hbase.
client.
HTable;importorg.
apache.
hadoop.
hbase.
ipc.
CoprocessorRpcUtils.
BlockingRpcCallback;importorg.
apache.
hadoop.
hbase.
ipc.
ServerRpcController;importjava.
util.
LinkedList;importjava.
util.
List;importjava.
util.
Map;publicclassDataQuery{privatestaticConfigurationconf=null;static{conf=HBaseConfiguration.
create();conf.
set("hbase.
zookeeper.
quorum","https://www.
iteblog.
com:2181");}staticListqueryByStartRowAndStopRow(StringtableName,StringstartRow,StringstopRow,booleanisIncludeEnd,booleanisSalting){finalDataQueryRequest.
BuilderrequestBuilder=DataQueryRequest.
newBuilder();requestBuilder.
setTableName(tableName);requestBuilder.
setStartRow(startRow);9/12requestBuilder.
setEndRow(stopRow);requestBuilder.
setIncluedEnd(isIncludeEnd);requestBuilder.
setIsSalting(isSalting);try{Connectionconnection=ConnectionFactory.
createConnection(conf);HTabletable=(HTable)connection.
getTable(TableName.
valueOf(tableName));Map>result=table.
coprocessorService(QueryDataService.
class,null,null,counter->{ServerRpcControllercontroller=newServerRpcController();BlockingRpcCallbackcall=newBlockingRpcCallback();counter.
queryByStartRowAndEndRow(controller,requestBuilder.
build(),call);DataQueryResponseresponse=call.
get();if(controller.
failedOnException()){throwcontroller.
getFailedOn();}returnresponse.
getRowListList();});Listlist=newLinkedList();for(Map.
Entry>entry:result.
entrySet()){if(null!
=entry.
getKey()){list.
addAll(entry.
getValue());}}returnlist;}catch(Throwablee){e.
printStackTrace();}returnnull;}publicstaticvoidmain(String[]args){Listrows=queryByStartRowAndStopRow("iteblog","1000","1001",false,true);if(null!
=rows){System.
out.
println(rows.
size());for(DataQueryResponse.
Rowrow:rows){ListcellListList=row.
getCellListList();for(DataQueryResponse.
Cellcell:cellListList){System.
out.
println(row.
getRowKey().
toStringUtf8()+"\t"+"column="+cell.
getFamily().
toStringUtf8()+":"+cell.
getQualifier().
toStringUtf8("timestamp="+cell.
getTimestamp(10/12"value="+cell.
getValue().
toStringUtf8());}}}}}我们运行上面的代码,可以得到如下的输出:A-1000-1550572395399column=f:age,timestamp=1549091990253,value=54A-1000-1550572395399column=f:uuid,timestamp=1549091990253,value=e9b10a9f-1218-43fd-bd01A-1000-1550572413799column=f:age,timestamp=1549092008575,value=4A-1000-1550572413799column=f:uuid,timestamp=1549092008575,value=181aa91e-5f1d-454c-959cA-1000-1550572414761column=f:age,timestamp=1549092009531,value=33A-1000-1550572414761column=f:uuid,timestamp=1549092009531,value=19aad8d3-621a-473c-8f9fB-1000-1550572388491column=f:age,timestamp=1549091983276,value=1B-1000-1550572388491column=f:uuid,timestamp=1549091983276,value=cf720efe-2ad2-48d6-81b8B-1000-1550572392922column=f:age,timestamp=1549091987701,value=7B-1000-1550572392922column=f:uuid,timestamp=1549091987701,value=8a047118-e130-48cb-adfehbase(main):020:0>scan'iteblog',{STARTROW=>'A-1000',ENDROW=>'A-1001'}ROWCOLUMN+CELLA-1000-1550572395399column=f:age,timestamp=1549091990253,value=54A-1000-1550572395399column=f:uuid,timestamp=1549091990253,value=e9b10a9f-1218-43fd-bd01A-1000-1550572413799column=f:age,timestamp=1549092008575,value=4A-1000-1550572413799column=f:uuid,timestamp=1549092008575,value=181aa91e-5f1d-454c-959cA-1000-1550572414761column=f:age,timestamp=1549092009531,value=33A-1000-1550572414761column=f:uuid,timestamp=1549092009531,value=19aad8d3-621a-473c-8f9f3row(s)Took0.
0569seconds可以看到,和我们使用HBaseShell输出的一致,而且我们还把所有的UID=100011/12的数据拿到了.
好了,到这里,使用协处理器查询HBase加盐之后的表已经算完成了,明天我将介绍使用Spark如何查询加盐之后的表.
本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载.
本文链接:【】()PoweredbyTCPDF(www.
tcpdf.
org)12/12
创梦网络怎么样,创梦网络公司位于四川省达州市,属于四川本地企业,资质齐全,IDC/ISP均有,从创梦网络这边租的服务器均可以****,属于一手资源,高防机柜、大带宽、高防IP业务,另外创梦网络近期还会上线四川眉山联通、广东优化线路高防机柜,CN2专线相关业务。广东电信大带宽近期可以预约机柜了,成都优化线路,机柜租用、服务器云服务器租用,适合建站做游戏,不须要在套CDN,全国访问快,直连省骨干,大网...
硅云怎么样?硅云是一家专业的云服务商,硅云的主营产品包括域名和服务器,其中香港云服务器、香港云虚拟主机是非常受欢迎的产品。硅云香港可用区接入了中国电信CN2 GIA、中国联通直连、中国移动直连、HGC、NTT、COGENT、PCCW在内的数十家优质的全球顶级运营商,是为数不多的多线香港云服务商之一。目前,硅云香港云服务器,CN2+BGP线路,1核1G香港云主机仅188元/年起,域名无需备案,支持个...
hostkvm在2021年3月新上线洛杉矶新VPS业务,强制三网接入中国联通优化线路,是当前中美之间性价比最高、最火热的线路之一,性价比高、速度非常好,接近联通AS9929和电信AS4809的效果,带宽充裕,晚高峰也不爆炸。 官方网站:https://hostkvm.com 全场优惠码:2021(全场通用八折,终身码,长期) 美国 US-Plan0【三网联通优化线路】 内存:1G CPU:...
协处理器为你推荐
可現場列印的全自動單面和雙面印相機经营策略iphone支持ipadIOJsios8支持ipad特斯拉苹果5itunes备份怎样用itunes备份iphonetcpip上的netbios怎么启用TCP/IP上的NetBIOScanvas2七尾奈留除了DC canvas2 sola EF 快乐小兔幸运草 以外改编成动画的作品有哪些?迅雷快鸟迅雷快鸟支持移动宽带提速吗
新加坡虚拟主机 域名反查 com域名抢注 火山主机 google电话 webhostingpad java主机 韩国加速器 512m lighttpd 亚马逊香港官网 空间合租 太原网通测速平台 100mbps 银盘服务 国外在线代理服务器 电信网络测速器 湖南idc 贵阳电信测速 php服务器 更多