HBase中加盐(Salting)之后的表如何读取:协处理器篇在《HBaseRowkey设计指南》文章中,我们介绍了避免数据热点的三种比较常见方法:加盐-Salting哈希-Hashing反转-Reversing其中在加盐(Salting)的方法里面是这么描述的:给Rowkey分配一个随机前缀以使得它和之前排序不同.
但是在Rowkey前面加了随机前缀,那么我们怎么将这些数据读出来呢我将分三篇文章来介绍如何读取加盐之后的表,其中每篇文章提供一种方法,主要包括:使用协处理器读取加盐的表使用Spark读取加盐的表使用MapReduce读取加盐的表关于协处理器的入门及实战,请参见这里.
本文使用的各组件版本:hadoop-2.
7.
7,hbase-2.
0.
4,jdk1.
8.
0_201.
测试数据生成在介绍如何查询数据之前,我们先创建一张名为iteblog的HBase表,用于测试.
为了数据均匀和介绍的方便,这里使用了预分区,并设置了27个分区,如下:hbase(main):002:0>create'iteblog','f',SPLITS=>['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z']0row(s)in2.
4880seconds然后我们使用下面方法生成了1000000条测试数据.
RowKey的形式为UID+当前数据生成时间戳;由于UID的长度为4,所以1000000条数据会存在大量的UID一样的数据,所以我们使用加盐方法将这些数据均匀分散到上述27个Region里面(注意,其实第一个Region其实没数据).
具体代码如下:packagecom.
iteblog.
data;importorg.
apache.
hadoop.
conf.
Configuration;importorg.
apache.
hadoop.
hbase.
HBaseConfiguration;importorg.
apache.
hadoop.
hbase.
HConstants;importorg.
apache.
hadoop.
hbase.
TableName;1/12importorg.
apache.
hadoop.
hbase.
client.
*;importorg.
apache.
hadoop.
hbase.
util.
Bytes;importjava.
io.
IOException;importjava.
util.
ArrayList;importjava.
util.
List;importjava.
util.
Random;importjava.
util.
UUID;publicclassHBaseDataGenerator{privatestaticbyte[]FAMILY="f".
getBytes();privatestaticbyte[]QUALIFIER_UUID="uuid".
getBytes();privatestaticbyte[]QUALIFIER_AGE="age".
getBytes();privatestaticchargenerateLetter(){return(char)(Math.
random()*26+'A');}privatestaticlonggenerateUid(intn){return(long)(Math.
random()*9*Math.
pow(10,n-1))+(long)Math.
pow(10,n-1);}publicstaticvoidmain(String[]args)throwsIOException{BufferedMutatorParamsbmp=newBufferedMutatorParams(TableName.
valueOf("iteblog"));bmp.
writeBufferSize(1024*1024*24);Configurationconf=HBaseConfiguration.
create();conf.
set(HConstants.
ZOOKEEPER_QUORUM,"https://www.
iteblog.
com:2181");Connectionconnection=ConnectionFactory.
createConnection(conf);BufferedMutatorbufferedMutator=connection.
getBufferedMutator(bmp);intBATCH_SIZE=1000;intCOUNTS=1000000;intcount=0;ListputList=newArrayList();for(inti=0;i0){bufferedMutator.
mutate(putList);bufferedMutator.
flush();putList.
clear();}}}运行完上面代码之后,会生成1000000条数据(注意,这里其实不严谨,因为Rowkey设计问题,可能会导致重复的Rowkey生成,所以实际情况下可能没有1000000条数据.
).
我们limit10条数据看下长成什么样:hbase(main):001:0>scan'iteblog',{'LIMIT'=>10}ROWCOLUMN+CELLA-1000-1550572395399column=f:age,timestamp=1549091990253,value=54A-1000-1550572395399column=f:uuid,timestamp=1549091990253,value=e9b10a9f-1218-43fd-bd01A-1000-1550572413799column=f:age,timestamp=1549092008575,value=4A-1000-1550572413799column=f:uuid,timestamp=1549092008575,value=181aa91e-5f1d-454c-959cA-1000-1550572414761column=f:age,timestamp=1549092009531,value=33A-1000-1550572414761column=f:uuid,timestamp=1549092009531,value=19aad8d3-621a-473c-8f9fA-1001-1550572394570column=f:age,timestamp=1549091989341,value=64A-1001-1550572394570column=f:uuid,timestamp=1549091989341,value=c6712a0d-3793-46d5-865bA-1001-1550572405337column=f:age,timestamp=1549092000108,value=96A-1001-1550572405337column=f:uuid,timestamp=1549092000108,value=4bf05d10-bb4d-43e3-99573/12A-1001-1550572419688column=f:age,timestamp=1549092014458,value=8A-1001-1550572419688column=f:uuid,timestamp=1549092014458,value=f04ba835-d8ac-49a3-8f96A-1002-1550572424041column=f:age,timestamp=1549092018816,value=84A-1002-1550572424041column=f:uuid,timestamp=1549092018816,value=99d6c989-afb5-4101-9d95A-1003-1550572431830column=f:age,timestamp=1549092026605,value=21A-1003-1550572431830column=f:uuid,timestamp=1549092026605,value=8c1ff1b6-b97c-4059-9b68A-1004-1550572395399column=f:age,timestamp=1549091990253,value=2A-1004-1550572395399column=f:uuid,timestamp=1549091990253,value=e240aa0f-c044-452f-89c0A-1004-1550572403783column=f:age,timestamp=1549091998555,value=6A-1004-1550572403783column=f:uuid,timestamp=1549091998555,value=e8df15c9-02fa-458e-bd0c10row(s)Took0.
1104seconds使用协处理器查询加盐之后的表现在有数据了,我们需要查询所有UID=1000的用户所有历史数据,那么如何查呢我们知道UID=1000的用户数据是均匀放到上述的27个Region里面的,因为经过加盐了,所以这些数据前缀都是类似于A-,B-,C-等开头的.
其次我们需要知道,每个Region其实是有StartKey和EndKey的,这些StartKey和EndKey其实就是我们创建iteblog表指定的.
如果你看了《HBase协处理器入门及实战》这篇文章,你就知道协处理器的代码其实是在每个Region里面执行的;而这些代码在Region里面执行的时候是可以拿到当前Region的信息,包括了StartKey和EndKey,所以其实我们可以将拿到的StartKey信息和查询的UID进行拼接,这样就可以查询我们要的数据.
协处理器处理篇就是基于这样的思想来查询加盐之后的数据的.
定义proto文件为什么需要定义这个请参见《HBase协处理器入门及实战》这篇文章.
因为我们查询的时候需要传入查询的参数,比如tableName、StartKey、EndKey以及是否加盐等标记;同时当查询到结果的时候,我们还需要将数据返回,所以我们定义的proto文件如下:optionjava_package="com.
iteblog.
data.
coprocessor.
generated";optionjava_outer_classname="DataQueryProtos";optionjava_generic_services=true;optionjava_generate_equals_and_hash=true;optionoptimize_for=SPEED;4/12messageDataQueryRequest{optionalstringtableName=1;optionalstringstartRow=2;optionalstringendRow=3;optionalboolincluedEnd=4;optionalboolisSalting=5;}messageDataQueryResponse{messageCell{requiredbytesvalue=1;requiredbytesfamily=2;requiredbytesqualifier=3;requiredbytesrow=4;requiredint64timestamp=5;}messageRow{optionalbytesrowKey=1;repeatedCellcellList=2;}repeatedRowrowList=1;}serviceQueryDataService{rpcqueryByStartRowAndEndRow(DataQueryRequest)returns(DataQueryResponse);}然后我们使用protobuf-maven-plugin插件将上面的proto生成java类,具体如何操作参见《在IDEA中使用Maven编译proto文件》.
我们将生成的DataQueryProtos.
java类拷贝到com.
iteblog.
data.
coprocessor.
generated包里面.
编写协处理器代码有了请求和返回的类,现在我们需要编写协处理器的处理代码了,结合上面的分析,协处理器的代码实现如下:packagecom.
iteblog.
data.
coprocessor;importcom.
google.
protobuf.
ByteString;5/12importcom.
google.
protobuf.
RpcCallback;importcom.
google.
protobuf.
RpcController;importcom.
google.
protobuf.
Service;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
QueryDataService;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryRequest;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryResponse;importorg.
apache.
hadoop.
hbase.
Cell;importorg.
apache.
hadoop.
hbase.
CoprocessorEnvironment;importorg.
apache.
hadoop.
hbase.
client.
Get;importorg.
apache.
hadoop.
hbase.
client.
Result;importorg.
apache.
hadoop.
hbase.
client.
Scan;importorg.
apache.
hadoop.
hbase.
coprocessor.
CoprocessorException;importorg.
apache.
hadoop.
hbase.
coprocessor.
RegionCoprocessor;importorg.
apache.
hadoop.
hbase.
coprocessor.
RegionCoprocessorEnvironment;importorg.
apache.
hadoop.
hbase.
regionserver.
InternalScanner;importorg.
apache.
hadoop.
hbase.
shaded.
protobuf.
ResponseConverter;importorg.
apache.
hadoop.
hbase.
util.
Bytes;importjava.
io.
IOException;importjava.
util.
ArrayList;importjava.
util.
Collections;importjava.
util.
List;publicclassSlatTableDataSearchextendsQueryDataServiceimplementsRegionCoprocessor{privateRegionCoprocessorEnvironmentenv;publicIterablegetServices(){returnCollections.
singleton(this);}@OverridepublicvoidqueryByStartRowAndEndRow(RpcControllercontroller,DataQueryRequestrequest,RpcCallbackdone){DataQueryResponseresponse=null;StringstartRow=request.
getStartRow();StringendRow=request.
getEndRow();StringregionStartKey=Bytes.
toString(this.
env.
getRegion().
getRegionInfo().
getStartKey());if(request.
getIsSalting()){StringstartSalt=null;if(null!
=regionStartKey&®ionStartKey.
length()!
=0){startSalt=regionStartKey;}if(null!
=startSalt&&null!
=startRow){6/12startRow=startSalt+"-"+startRow;endRow=startSalt+"-"+endRow;}}Scanscan=newScan();if(null!
=startRow){scan.
withStartRow(Bytes.
toBytes(startRow));}if(null!
=endRow){scan.
withStopRow(Bytes.
toBytes(endRow),request.
getIncluedEnd());}try(InternalScannerscanner=this.
env.
getRegion().
getScanner(scan)){Listresults=newArrayList();booleanhasMore;DataQueryResponse.
BuilderresponseBuilder=DataQueryResponse.
newBuilder();do{hasMore=scanner.
next(results);DataQueryResponse.
Row.
BuilderrowBuilder=DataQueryResponse.
Row.
newBuilder();if(results.
size()>0){Cellcell=results.
get(0);rowBuilder.
setRowKey(ByteString.
copyFrom(cell.
getRowArray(),cell.
getRowOffset(),cell.
getRowLength()));for(Cellkv:results){buildCell(rowBuilder,kv);}}responseBuilder.
addRowList(rowBuilder);results.
clear();}while(hasMore);response=responseBuilder.
build();}catch(IOExceptione){ResponseConverter.
setControllerException(controller,e);}done.
run(response);}privatevoidbuildCell(DataQueryResponse.
Row.
BuilderrowBuilder,Cellkv){DataQueryResponse.
Cell.
BuildercellBuilder=DataQueryResponse.
Cell.
newBuilder();7/12cellBuilder.
setFamily(ByteString.
copyFrom(kv.
getFamilyArray(),kv.
getFamilyOffset(),kv.
getFamilyLength()));cellBuilder.
setQualifier(ByteString.
copyFrom(kv.
getQualifierArray(),kv.
getQualifierOffset(),kv.
getQualifierLength()));cellBuilder.
setRow(ByteString.
copyFrom(kv.
getRowArray(),kv.
getRowOffset(),kv.
getRowLength()));cellBuilder.
setValue(ByteString.
copyFrom(kv.
getValueArray(),kv.
getValueOffset(),kv.
getValueLength()));cellBuilder.
setTimestamp(kv.
getTimestamp());rowBuilder.
addCellList(cellBuilder);}/***Storesareferencetothecoprocessorenvironmentprovidedbythe*{@linkorg.
apache.
hadoop.
hbase.
regionserver.
RegionCoprocessorHost}fromtheregionwherethis*coprocessorisloaded.
Sincethisisacoprocessorendpoint,italwaysexpectstobeloaded*onatableregion,soalwaysexpectsthistobeaninstanceof*{@linkRegionCoprocessorEnvironment}.
**@paramenvtheenvironmentprovidedbythecoprocessorhost*@throwsIOExceptioniftheprovidedenvironmentisnotaninstanceof*{@codeRegionCoprocessorEnvironment}*/@Overridepublicvoidstart(CoprocessorEnvironmentenv)throwsIOException{if(envinstanceofRegionCoprocessorEnvironment){this.
env=(RegionCoprocessorEnvironment)env;}else{thrownewCoprocessorException("Mustbeloadedonatableregion!
");}}@Overridepublicvoidstop(CoprocessorEnvironmentenv){//nothingtodo}}大家可以看到,这里面的代码框架和《HBase协处理器入门及实战》里面介绍的HBase提供的RowCountEndpoint示例代码很类似.
主要逻辑在queryByStartRowAndEndRow函数实现里面.
我们通过DataQueryRequest拿到客户端查询的表,StartKey和EndKey8/12等数据.
通过this.
env.
getRegion().
getRegionInfo().
getStartKey()可以拿到当前Region的StartKey,然后再和客户端传进来的StartKey和EndKey进行拼接就可以拿到完整的Rowkey前缀.
剩下的查询就是正常的HBaseScan代码了.
现在我们将SlatTableDataSearch类进行编译打包,并部署到HBase表里面去,具体如何部署参见《HBase协处理器入门及实战》协处理器客户端代码编写到这里,我们的协处理器服务器端的代码和部署已经完成了,现在我们需要编写协处理器客户端代码.
其实也很简单,如下:packagecom.
iteblog.
data;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
QueryDataService;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryRequest;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryResponse;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryResponse.
*;importorg.
apache.
hadoop.
conf.
Configuration;importorg.
apache.
hadoop.
hbase.
HBaseConfiguration;importorg.
apache.
hadoop.
hbase.
TableName;importorg.
apache.
hadoop.
hbase.
client.
Connection;importorg.
apache.
hadoop.
hbase.
client.
ConnectionFactory;importorg.
apache.
hadoop.
hbase.
client.
HTable;importorg.
apache.
hadoop.
hbase.
ipc.
CoprocessorRpcUtils.
BlockingRpcCallback;importorg.
apache.
hadoop.
hbase.
ipc.
ServerRpcController;importjava.
util.
LinkedList;importjava.
util.
List;importjava.
util.
Map;publicclassDataQuery{privatestaticConfigurationconf=null;static{conf=HBaseConfiguration.
create();conf.
set("hbase.
zookeeper.
quorum","https://www.
iteblog.
com:2181");}staticListqueryByStartRowAndStopRow(StringtableName,StringstartRow,StringstopRow,booleanisIncludeEnd,booleanisSalting){finalDataQueryRequest.
BuilderrequestBuilder=DataQueryRequest.
newBuilder();requestBuilder.
setTableName(tableName);requestBuilder.
setStartRow(startRow);9/12requestBuilder.
setEndRow(stopRow);requestBuilder.
setIncluedEnd(isIncludeEnd);requestBuilder.
setIsSalting(isSalting);try{Connectionconnection=ConnectionFactory.
createConnection(conf);HTabletable=(HTable)connection.
getTable(TableName.
valueOf(tableName));Map>result=table.
coprocessorService(QueryDataService.
class,null,null,counter->{ServerRpcControllercontroller=newServerRpcController();BlockingRpcCallbackcall=newBlockingRpcCallback();counter.
queryByStartRowAndEndRow(controller,requestBuilder.
build(),call);DataQueryResponseresponse=call.
get();if(controller.
failedOnException()){throwcontroller.
getFailedOn();}returnresponse.
getRowListList();});Listlist=newLinkedList();for(Map.
Entry>entry:result.
entrySet()){if(null!
=entry.
getKey()){list.
addAll(entry.
getValue());}}returnlist;}catch(Throwablee){e.
printStackTrace();}returnnull;}publicstaticvoidmain(String[]args){Listrows=queryByStartRowAndStopRow("iteblog","1000","1001",false,true);if(null!
=rows){System.
out.
println(rows.
size());for(DataQueryResponse.
Rowrow:rows){ListcellListList=row.
getCellListList();for(DataQueryResponse.
Cellcell:cellListList){System.
out.
println(row.
getRowKey().
toStringUtf8()+"\t"+"column="+cell.
getFamily().
toStringUtf8()+":"+cell.
getQualifier().
toStringUtf8("timestamp="+cell.
getTimestamp(10/12"value="+cell.
getValue().
toStringUtf8());}}}}}我们运行上面的代码,可以得到如下的输出:A-1000-1550572395399column=f:age,timestamp=1549091990253,value=54A-1000-1550572395399column=f:uuid,timestamp=1549091990253,value=e9b10a9f-1218-43fd-bd01A-1000-1550572413799column=f:age,timestamp=1549092008575,value=4A-1000-1550572413799column=f:uuid,timestamp=1549092008575,value=181aa91e-5f1d-454c-959cA-1000-1550572414761column=f:age,timestamp=1549092009531,value=33A-1000-1550572414761column=f:uuid,timestamp=1549092009531,value=19aad8d3-621a-473c-8f9fB-1000-1550572388491column=f:age,timestamp=1549091983276,value=1B-1000-1550572388491column=f:uuid,timestamp=1549091983276,value=cf720efe-2ad2-48d6-81b8B-1000-1550572392922column=f:age,timestamp=1549091987701,value=7B-1000-1550572392922column=f:uuid,timestamp=1549091987701,value=8a047118-e130-48cb-adfehbase(main):020:0>scan'iteblog',{STARTROW=>'A-1000',ENDROW=>'A-1001'}ROWCOLUMN+CELLA-1000-1550572395399column=f:age,timestamp=1549091990253,value=54A-1000-1550572395399column=f:uuid,timestamp=1549091990253,value=e9b10a9f-1218-43fd-bd01A-1000-1550572413799column=f:age,timestamp=1549092008575,value=4A-1000-1550572413799column=f:uuid,timestamp=1549092008575,value=181aa91e-5f1d-454c-959cA-1000-1550572414761column=f:age,timestamp=1549092009531,value=33A-1000-1550572414761column=f:uuid,timestamp=1549092009531,value=19aad8d3-621a-473c-8f9f3row(s)Took0.
0569seconds可以看到,和我们使用HBaseShell输出的一致,而且我们还把所有的UID=100011/12的数据拿到了.
好了,到这里,使用协处理器查询HBase加盐之后的表已经算完成了,明天我将介绍使用Spark如何查询加盐之后的表.
本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载.
本文链接:【】()PoweredbyTCPDF(www.
tcpdf.
org)12/12
有一段时间没有分享Gcore(gcorelabs)的信息了,这是一家成立于2011年的国外主机商,总部位于卢森堡,主要提供VPS主机和独立服务器租用等,数据中心包括俄罗斯、美国、日本、韩国、新加坡、荷兰、中国(香港)等多个国家和地区的十几个机房,商家针对不同系列的产品分为不同管理系统,比如VPS(Hosting)、Cloud等都是独立的用户中心体系,部落分享的主要是商家的Hosting(Virtu...
韩国云服务器哪个好?韩国云服务器好用吗?韩国是距离我国很近的一个国家,很多站长用户在考虑国外云服务器时,也会将韩国云服务器列入其中。绝大部分用户都是接触的免备案香港和美国居多,在加上服务器确实不错,所以形成了习惯性依赖。但也有不少用户开始寻找其它的海外免备案云服务器,比如韩国云服务器。下面云服务器网(yuntue.com)就推荐最好用的韩国cn2云服务器,韩国CN2云服务器租用推荐。为什么推荐租用...
sparkedhost怎么样?sparkedhost主机。Sparkedhost于2017年7月注册在美国康涅狄格州,2018年收购了ClynexHost,2019年8月从Taltum Solutions SL收购了The Beast Hosting,同年10月从Reilly Bauer收购了OptNode Hosting。sparkedhost当前的业务主要为:为游戏“我的世界”提供服务器、虚拟...
协处理器为你推荐
Committeesios11支持ipad支持ipad支持ipad支持ipadtcpip上的netbios怎么启用TCP/IP上的NetBIOSicloudiphone没开启icloud的iphone怎么用find my iphone找回win7关闭135端口win7系统怎么关闭135端口?网上很多方法都不好用!google搜图google的直接搜索图片的功能为什么没了ipad无法加入网络为什么我的ipad加入网络没法用
网站空间申请 如何查询域名备案号 bandwagonhost 好看的桌面背景图 台湾谷歌网址 北京双线机房 seednet cdn联盟 共享主机 泉州移动 中国电信测速网 t云 免费网页申请 流媒体加速 中国电信网络测速 服务器论坛 攻击服务器 privatetracker 镇江高防服务器 美国vpn服务器 更多