HBase中加盐(Salting)之后的表如何读取:协处理器篇在《HBaseRowkey设计指南》文章中,我们介绍了避免数据热点的三种比较常见方法:加盐-Salting哈希-Hashing反转-Reversing其中在加盐(Salting)的方法里面是这么描述的:给Rowkey分配一个随机前缀以使得它和之前排序不同.
但是在Rowkey前面加了随机前缀,那么我们怎么将这些数据读出来呢我将分三篇文章来介绍如何读取加盐之后的表,其中每篇文章提供一种方法,主要包括:使用协处理器读取加盐的表使用Spark读取加盐的表使用MapReduce读取加盐的表关于协处理器的入门及实战,请参见这里.
本文使用的各组件版本:hadoop-2.
7.
7,hbase-2.
0.
4,jdk1.
8.
0_201.
测试数据生成在介绍如何查询数据之前,我们先创建一张名为iteblog的HBase表,用于测试.
为了数据均匀和介绍的方便,这里使用了预分区,并设置了27个分区,如下:hbase(main):002:0>create'iteblog','f',SPLITS=>['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z']0row(s)in2.
4880seconds然后我们使用下面方法生成了1000000条测试数据.
RowKey的形式为UID+当前数据生成时间戳;由于UID的长度为4,所以1000000条数据会存在大量的UID一样的数据,所以我们使用加盐方法将这些数据均匀分散到上述27个Region里面(注意,其实第一个Region其实没数据).
具体代码如下:packagecom.
iteblog.
data;importorg.
apache.
hadoop.
conf.
Configuration;importorg.
apache.
hadoop.
hbase.
HBaseConfiguration;importorg.
apache.
hadoop.
hbase.
HConstants;importorg.
apache.
hadoop.
hbase.
TableName;1/12importorg.
apache.
hadoop.
hbase.
client.
*;importorg.
apache.
hadoop.
hbase.
util.
Bytes;importjava.
io.
IOException;importjava.
util.
ArrayList;importjava.
util.
List;importjava.
util.
Random;importjava.
util.
UUID;publicclassHBaseDataGenerator{privatestaticbyte[]FAMILY="f".
getBytes();privatestaticbyte[]QUALIFIER_UUID="uuid".
getBytes();privatestaticbyte[]QUALIFIER_AGE="age".
getBytes();privatestaticchargenerateLetter(){return(char)(Math.
random()*26+'A');}privatestaticlonggenerateUid(intn){return(long)(Math.
random()*9*Math.
pow(10,n-1))+(long)Math.
pow(10,n-1);}publicstaticvoidmain(String[]args)throwsIOException{BufferedMutatorParamsbmp=newBufferedMutatorParams(TableName.
valueOf("iteblog"));bmp.
writeBufferSize(1024*1024*24);Configurationconf=HBaseConfiguration.
create();conf.
set(HConstants.
ZOOKEEPER_QUORUM,"https://www.
iteblog.
com:2181");Connectionconnection=ConnectionFactory.
createConnection(conf);BufferedMutatorbufferedMutator=connection.
getBufferedMutator(bmp);intBATCH_SIZE=1000;intCOUNTS=1000000;intcount=0;ListputList=newArrayList();for(inti=0;i0){bufferedMutator.
mutate(putList);bufferedMutator.
flush();putList.
clear();}}}运行完上面代码之后,会生成1000000条数据(注意,这里其实不严谨,因为Rowkey设计问题,可能会导致重复的Rowkey生成,所以实际情况下可能没有1000000条数据.
).
我们limit10条数据看下长成什么样:hbase(main):001:0>scan'iteblog',{'LIMIT'=>10}ROWCOLUMN+CELLA-1000-1550572395399column=f:age,timestamp=1549091990253,value=54A-1000-1550572395399column=f:uuid,timestamp=1549091990253,value=e9b10a9f-1218-43fd-bd01A-1000-1550572413799column=f:age,timestamp=1549092008575,value=4A-1000-1550572413799column=f:uuid,timestamp=1549092008575,value=181aa91e-5f1d-454c-959cA-1000-1550572414761column=f:age,timestamp=1549092009531,value=33A-1000-1550572414761column=f:uuid,timestamp=1549092009531,value=19aad8d3-621a-473c-8f9fA-1001-1550572394570column=f:age,timestamp=1549091989341,value=64A-1001-1550572394570column=f:uuid,timestamp=1549091989341,value=c6712a0d-3793-46d5-865bA-1001-1550572405337column=f:age,timestamp=1549092000108,value=96A-1001-1550572405337column=f:uuid,timestamp=1549092000108,value=4bf05d10-bb4d-43e3-99573/12A-1001-1550572419688column=f:age,timestamp=1549092014458,value=8A-1001-1550572419688column=f:uuid,timestamp=1549092014458,value=f04ba835-d8ac-49a3-8f96A-1002-1550572424041column=f:age,timestamp=1549092018816,value=84A-1002-1550572424041column=f:uuid,timestamp=1549092018816,value=99d6c989-afb5-4101-9d95A-1003-1550572431830column=f:age,timestamp=1549092026605,value=21A-1003-1550572431830column=f:uuid,timestamp=1549092026605,value=8c1ff1b6-b97c-4059-9b68A-1004-1550572395399column=f:age,timestamp=1549091990253,value=2A-1004-1550572395399column=f:uuid,timestamp=1549091990253,value=e240aa0f-c044-452f-89c0A-1004-1550572403783column=f:age,timestamp=1549091998555,value=6A-1004-1550572403783column=f:uuid,timestamp=1549091998555,value=e8df15c9-02fa-458e-bd0c10row(s)Took0.
1104seconds使用协处理器查询加盐之后的表现在有数据了,我们需要查询所有UID=1000的用户所有历史数据,那么如何查呢我们知道UID=1000的用户数据是均匀放到上述的27个Region里面的,因为经过加盐了,所以这些数据前缀都是类似于A-,B-,C-等开头的.
其次我们需要知道,每个Region其实是有StartKey和EndKey的,这些StartKey和EndKey其实就是我们创建iteblog表指定的.
如果你看了《HBase协处理器入门及实战》这篇文章,你就知道协处理器的代码其实是在每个Region里面执行的;而这些代码在Region里面执行的时候是可以拿到当前Region的信息,包括了StartKey和EndKey,所以其实我们可以将拿到的StartKey信息和查询的UID进行拼接,这样就可以查询我们要的数据.
协处理器处理篇就是基于这样的思想来查询加盐之后的数据的.
定义proto文件为什么需要定义这个请参见《HBase协处理器入门及实战》这篇文章.
因为我们查询的时候需要传入查询的参数,比如tableName、StartKey、EndKey以及是否加盐等标记;同时当查询到结果的时候,我们还需要将数据返回,所以我们定义的proto文件如下:optionjava_package="com.
iteblog.
data.
coprocessor.
generated";optionjava_outer_classname="DataQueryProtos";optionjava_generic_services=true;optionjava_generate_equals_and_hash=true;optionoptimize_for=SPEED;4/12messageDataQueryRequest{optionalstringtableName=1;optionalstringstartRow=2;optionalstringendRow=3;optionalboolincluedEnd=4;optionalboolisSalting=5;}messageDataQueryResponse{messageCell{requiredbytesvalue=1;requiredbytesfamily=2;requiredbytesqualifier=3;requiredbytesrow=4;requiredint64timestamp=5;}messageRow{optionalbytesrowKey=1;repeatedCellcellList=2;}repeatedRowrowList=1;}serviceQueryDataService{rpcqueryByStartRowAndEndRow(DataQueryRequest)returns(DataQueryResponse);}然后我们使用protobuf-maven-plugin插件将上面的proto生成java类,具体如何操作参见《在IDEA中使用Maven编译proto文件》.
我们将生成的DataQueryProtos.
java类拷贝到com.
iteblog.
data.
coprocessor.
generated包里面.
编写协处理器代码有了请求和返回的类,现在我们需要编写协处理器的处理代码了,结合上面的分析,协处理器的代码实现如下:packagecom.
iteblog.
data.
coprocessor;importcom.
google.
protobuf.
ByteString;5/12importcom.
google.
protobuf.
RpcCallback;importcom.
google.
protobuf.
RpcController;importcom.
google.
protobuf.
Service;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
QueryDataService;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryRequest;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryResponse;importorg.
apache.
hadoop.
hbase.
Cell;importorg.
apache.
hadoop.
hbase.
CoprocessorEnvironment;importorg.
apache.
hadoop.
hbase.
client.
Get;importorg.
apache.
hadoop.
hbase.
client.
Result;importorg.
apache.
hadoop.
hbase.
client.
Scan;importorg.
apache.
hadoop.
hbase.
coprocessor.
CoprocessorException;importorg.
apache.
hadoop.
hbase.
coprocessor.
RegionCoprocessor;importorg.
apache.
hadoop.
hbase.
coprocessor.
RegionCoprocessorEnvironment;importorg.
apache.
hadoop.
hbase.
regionserver.
InternalScanner;importorg.
apache.
hadoop.
hbase.
shaded.
protobuf.
ResponseConverter;importorg.
apache.
hadoop.
hbase.
util.
Bytes;importjava.
io.
IOException;importjava.
util.
ArrayList;importjava.
util.
Collections;importjava.
util.
List;publicclassSlatTableDataSearchextendsQueryDataServiceimplementsRegionCoprocessor{privateRegionCoprocessorEnvironmentenv;publicIterablegetServices(){returnCollections.
singleton(this);}@OverridepublicvoidqueryByStartRowAndEndRow(RpcControllercontroller,DataQueryRequestrequest,RpcCallbackdone){DataQueryResponseresponse=null;StringstartRow=request.
getStartRow();StringendRow=request.
getEndRow();StringregionStartKey=Bytes.
toString(this.
env.
getRegion().
getRegionInfo().
getStartKey());if(request.
getIsSalting()){StringstartSalt=null;if(null!
=regionStartKey&®ionStartKey.
length()!
=0){startSalt=regionStartKey;}if(null!
=startSalt&&null!
=startRow){6/12startRow=startSalt+"-"+startRow;endRow=startSalt+"-"+endRow;}}Scanscan=newScan();if(null!
=startRow){scan.
withStartRow(Bytes.
toBytes(startRow));}if(null!
=endRow){scan.
withStopRow(Bytes.
toBytes(endRow),request.
getIncluedEnd());}try(InternalScannerscanner=this.
env.
getRegion().
getScanner(scan)){Listresults=newArrayList();booleanhasMore;DataQueryResponse.
BuilderresponseBuilder=DataQueryResponse.
newBuilder();do{hasMore=scanner.
next(results);DataQueryResponse.
Row.
BuilderrowBuilder=DataQueryResponse.
Row.
newBuilder();if(results.
size()>0){Cellcell=results.
get(0);rowBuilder.
setRowKey(ByteString.
copyFrom(cell.
getRowArray(),cell.
getRowOffset(),cell.
getRowLength()));for(Cellkv:results){buildCell(rowBuilder,kv);}}responseBuilder.
addRowList(rowBuilder);results.
clear();}while(hasMore);response=responseBuilder.
build();}catch(IOExceptione){ResponseConverter.
setControllerException(controller,e);}done.
run(response);}privatevoidbuildCell(DataQueryResponse.
Row.
BuilderrowBuilder,Cellkv){DataQueryResponse.
Cell.
BuildercellBuilder=DataQueryResponse.
Cell.
newBuilder();7/12cellBuilder.
setFamily(ByteString.
copyFrom(kv.
getFamilyArray(),kv.
getFamilyOffset(),kv.
getFamilyLength()));cellBuilder.
setQualifier(ByteString.
copyFrom(kv.
getQualifierArray(),kv.
getQualifierOffset(),kv.
getQualifierLength()));cellBuilder.
setRow(ByteString.
copyFrom(kv.
getRowArray(),kv.
getRowOffset(),kv.
getRowLength()));cellBuilder.
setValue(ByteString.
copyFrom(kv.
getValueArray(),kv.
getValueOffset(),kv.
getValueLength()));cellBuilder.
setTimestamp(kv.
getTimestamp());rowBuilder.
addCellList(cellBuilder);}/***Storesareferencetothecoprocessorenvironmentprovidedbythe*{@linkorg.
apache.
hadoop.
hbase.
regionserver.
RegionCoprocessorHost}fromtheregionwherethis*coprocessorisloaded.
Sincethisisacoprocessorendpoint,italwaysexpectstobeloaded*onatableregion,soalwaysexpectsthistobeaninstanceof*{@linkRegionCoprocessorEnvironment}.
**@paramenvtheenvironmentprovidedbythecoprocessorhost*@throwsIOExceptioniftheprovidedenvironmentisnotaninstanceof*{@codeRegionCoprocessorEnvironment}*/@Overridepublicvoidstart(CoprocessorEnvironmentenv)throwsIOException{if(envinstanceofRegionCoprocessorEnvironment){this.
env=(RegionCoprocessorEnvironment)env;}else{thrownewCoprocessorException("Mustbeloadedonatableregion!
");}}@Overridepublicvoidstop(CoprocessorEnvironmentenv){//nothingtodo}}大家可以看到,这里面的代码框架和《HBase协处理器入门及实战》里面介绍的HBase提供的RowCountEndpoint示例代码很类似.
主要逻辑在queryByStartRowAndEndRow函数实现里面.
我们通过DataQueryRequest拿到客户端查询的表,StartKey和EndKey8/12等数据.
通过this.
env.
getRegion().
getRegionInfo().
getStartKey()可以拿到当前Region的StartKey,然后再和客户端传进来的StartKey和EndKey进行拼接就可以拿到完整的Rowkey前缀.
剩下的查询就是正常的HBaseScan代码了.
现在我们将SlatTableDataSearch类进行编译打包,并部署到HBase表里面去,具体如何部署参见《HBase协处理器入门及实战》协处理器客户端代码编写到这里,我们的协处理器服务器端的代码和部署已经完成了,现在我们需要编写协处理器客户端代码.
其实也很简单,如下:packagecom.
iteblog.
data;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
QueryDataService;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryRequest;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryResponse;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryResponse.
*;importorg.
apache.
hadoop.
conf.
Configuration;importorg.
apache.
hadoop.
hbase.
HBaseConfiguration;importorg.
apache.
hadoop.
hbase.
TableName;importorg.
apache.
hadoop.
hbase.
client.
Connection;importorg.
apache.
hadoop.
hbase.
client.
ConnectionFactory;importorg.
apache.
hadoop.
hbase.
client.
HTable;importorg.
apache.
hadoop.
hbase.
ipc.
CoprocessorRpcUtils.
BlockingRpcCallback;importorg.
apache.
hadoop.
hbase.
ipc.
ServerRpcController;importjava.
util.
LinkedList;importjava.
util.
List;importjava.
util.
Map;publicclassDataQuery{privatestaticConfigurationconf=null;static{conf=HBaseConfiguration.
create();conf.
set("hbase.
zookeeper.
quorum","https://www.
iteblog.
com:2181");}staticListqueryByStartRowAndStopRow(StringtableName,StringstartRow,StringstopRow,booleanisIncludeEnd,booleanisSalting){finalDataQueryRequest.
BuilderrequestBuilder=DataQueryRequest.
newBuilder();requestBuilder.
setTableName(tableName);requestBuilder.
setStartRow(startRow);9/12requestBuilder.
setEndRow(stopRow);requestBuilder.
setIncluedEnd(isIncludeEnd);requestBuilder.
setIsSalting(isSalting);try{Connectionconnection=ConnectionFactory.
createConnection(conf);HTabletable=(HTable)connection.
getTable(TableName.
valueOf(tableName));Map>result=table.
coprocessorService(QueryDataService.
class,null,null,counter->{ServerRpcControllercontroller=newServerRpcController();BlockingRpcCallbackcall=newBlockingRpcCallback();counter.
queryByStartRowAndEndRow(controller,requestBuilder.
build(),call);DataQueryResponseresponse=call.
get();if(controller.
failedOnException()){throwcontroller.
getFailedOn();}returnresponse.
getRowListList();});Listlist=newLinkedList();for(Map.
Entry>entry:result.
entrySet()){if(null!
=entry.
getKey()){list.
addAll(entry.
getValue());}}returnlist;}catch(Throwablee){e.
printStackTrace();}returnnull;}publicstaticvoidmain(String[]args){Listrows=queryByStartRowAndStopRow("iteblog","1000","1001",false,true);if(null!
=rows){System.
out.
println(rows.
size());for(DataQueryResponse.
Rowrow:rows){ListcellListList=row.
getCellListList();for(DataQueryResponse.
Cellcell:cellListList){System.
out.
println(row.
getRowKey().
toStringUtf8()+"\t"+"column="+cell.
getFamily().
toStringUtf8()+":"+cell.
getQualifier().
toStringUtf8("timestamp="+cell.
getTimestamp(10/12"value="+cell.
getValue().
toStringUtf8());}}}}}我们运行上面的代码,可以得到如下的输出:A-1000-1550572395399column=f:age,timestamp=1549091990253,value=54A-1000-1550572395399column=f:uuid,timestamp=1549091990253,value=e9b10a9f-1218-43fd-bd01A-1000-1550572413799column=f:age,timestamp=1549092008575,value=4A-1000-1550572413799column=f:uuid,timestamp=1549092008575,value=181aa91e-5f1d-454c-959cA-1000-1550572414761column=f:age,timestamp=1549092009531,value=33A-1000-1550572414761column=f:uuid,timestamp=1549092009531,value=19aad8d3-621a-473c-8f9fB-1000-1550572388491column=f:age,timestamp=1549091983276,value=1B-1000-1550572388491column=f:uuid,timestamp=1549091983276,value=cf720efe-2ad2-48d6-81b8B-1000-1550572392922column=f:age,timestamp=1549091987701,value=7B-1000-1550572392922column=f:uuid,timestamp=1549091987701,value=8a047118-e130-48cb-adfehbase(main):020:0>scan'iteblog',{STARTROW=>'A-1000',ENDROW=>'A-1001'}ROWCOLUMN+CELLA-1000-1550572395399column=f:age,timestamp=1549091990253,value=54A-1000-1550572395399column=f:uuid,timestamp=1549091990253,value=e9b10a9f-1218-43fd-bd01A-1000-1550572413799column=f:age,timestamp=1549092008575,value=4A-1000-1550572413799column=f:uuid,timestamp=1549092008575,value=181aa91e-5f1d-454c-959cA-1000-1550572414761column=f:age,timestamp=1549092009531,value=33A-1000-1550572414761column=f:uuid,timestamp=1549092009531,value=19aad8d3-621a-473c-8f9f3row(s)Took0.
0569seconds可以看到,和我们使用HBaseShell输出的一致,而且我们还把所有的UID=100011/12的数据拿到了.
好了,到这里,使用协处理器查询HBase加盐之后的表已经算完成了,明天我将介绍使用Spark如何查询加盐之后的表.
本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载.
本文链接:【】()PoweredbyTCPDF(www.
tcpdf.
org)12/12
关于HostDare服务商在之前的文章中有介绍过几次,算是比较老牌的服务商,但是商家背景财力不是特别雄厚,算是比较小众的个人服务商。目前主流提供CKVM和QKVM套餐。前者是电信CN2 GIA,不过库存储备也不是很足,这不九月份发布新的补货库存活动,有提供九折优惠CN2 GIA,以及六五折优惠QKVM普通线路方案。这次活动截止到9月30日,不清楚商家这次库存补货多少。比如 QKVM基础的五个方案都...
介绍:819云怎么样?819云创办于2019,由一家从2017年开始从业的idc行业商家创办,主要从事云服务器,和物理机器819云—-带来了9月最新的秋季便宜vps促销活动,一共4款便宜vps,从2~32G内存,支持Windows系统,…高速建站的美国vps位于洛杉矶cera机房,服务器接入1Gbps带宽,采用魔方管理系统,适合新手玩耍!官方网站:https://www.8...
ATCLOUD.NET怎么样?ATCLOUD.NET主要提供KVM架构的VPS产品、LXC容器化产品、权威DNS智能解析、域名注册、SSL证书等海外网站建设服务。 其大部分数据中心是由OVH机房提供,其节点包括美国(俄勒冈、弗吉尼亚)、加拿大、英国、法国、德国以及新加坡。 提供超过480Gbps的DDoS高防保护,杜绝DDoS攻击骚扰,比较适合海外建站等业务。官方网站:点击访问ATCLOUD官网活...
协处理器为你推荐
计算机网络实验系统深圳做网站-确认收货手太快网店发来空箱子支持ipad支持ipad支持ipad重庆宽带测速重庆云阳电信宽带测速网址谁知道,帮个忙?ms17-010win10华为 slatl10是什么型号google图片搜索谁能教我怎么在手机用google的图片搜索啊!!!ipad无法加入网络ipad无法加入网络但是手机能用fastreport2.5护套线BV2.5中的2.5是指什么尺寸,单位是什么,BV又是什么意思?
西安域名注册 idc评测 腾讯云盘 Hello图床 info域名 web服务器架设软件 linux空间 怎么测试下载速度 最好的免费空间 搜索引擎提交入口 免费网页空间 上海联通宽带测速 ca187 闪讯官网 联通网站 网页提速 网通服务器 个人免费邮箱 谷歌台湾 qq金券 更多