处理器协处理器

协处理器  时间:2021-05-24  阅读:()
HBase中加盐(Salting)之后的表如何读取:协处理器篇在《HBaseRowkey设计指南》文章中,我们介绍了避免数据热点的三种比较常见方法:加盐-Salting哈希-Hashing反转-Reversing其中在加盐(Salting)的方法里面是这么描述的:给Rowkey分配一个随机前缀以使得它和之前排序不同.
但是在Rowkey前面加了随机前缀,那么我们怎么将这些数据读出来呢我将分三篇文章来介绍如何读取加盐之后的表,其中每篇文章提供一种方法,主要包括:使用协处理器读取加盐的表使用Spark读取加盐的表使用MapReduce读取加盐的表关于协处理器的入门及实战,请参见这里.
本文使用的各组件版本:hadoop-2.
7.
7,hbase-2.
0.
4,jdk1.
8.
0_201.
测试数据生成在介绍如何查询数据之前,我们先创建一张名为iteblog的HBase表,用于测试.
为了数据均匀和介绍的方便,这里使用了预分区,并设置了27个分区,如下:hbase(main):002:0>create'iteblog','f',SPLITS=>['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z']0row(s)in2.
4880seconds然后我们使用下面方法生成了1000000条测试数据.
RowKey的形式为UID+当前数据生成时间戳;由于UID的长度为4,所以1000000条数据会存在大量的UID一样的数据,所以我们使用加盐方法将这些数据均匀分散到上述27个Region里面(注意,其实第一个Region其实没数据).
具体代码如下:packagecom.
iteblog.
data;importorg.
apache.
hadoop.
conf.
Configuration;importorg.
apache.
hadoop.
hbase.
HBaseConfiguration;importorg.
apache.
hadoop.
hbase.
HConstants;importorg.
apache.
hadoop.
hbase.
TableName;1/12importorg.
apache.
hadoop.
hbase.
client.
*;importorg.
apache.
hadoop.
hbase.
util.
Bytes;importjava.
io.
IOException;importjava.
util.
ArrayList;importjava.
util.
List;importjava.
util.
Random;importjava.
util.
UUID;publicclassHBaseDataGenerator{privatestaticbyte[]FAMILY="f".
getBytes();privatestaticbyte[]QUALIFIER_UUID="uuid".
getBytes();privatestaticbyte[]QUALIFIER_AGE="age".
getBytes();privatestaticchargenerateLetter(){return(char)(Math.
random()*26+'A');}privatestaticlonggenerateUid(intn){return(long)(Math.
random()*9*Math.
pow(10,n-1))+(long)Math.
pow(10,n-1);}publicstaticvoidmain(String[]args)throwsIOException{BufferedMutatorParamsbmp=newBufferedMutatorParams(TableName.
valueOf("iteblog"));bmp.
writeBufferSize(1024*1024*24);Configurationconf=HBaseConfiguration.
create();conf.
set(HConstants.
ZOOKEEPER_QUORUM,"https://www.
iteblog.
com:2181");Connectionconnection=ConnectionFactory.
createConnection(conf);BufferedMutatorbufferedMutator=connection.
getBufferedMutator(bmp);intBATCH_SIZE=1000;intCOUNTS=1000000;intcount=0;ListputList=newArrayList();for(inti=0;i0){bufferedMutator.
mutate(putList);bufferedMutator.
flush();putList.
clear();}}}运行完上面代码之后,会生成1000000条数据(注意,这里其实不严谨,因为Rowkey设计问题,可能会导致重复的Rowkey生成,所以实际情况下可能没有1000000条数据.
).
我们limit10条数据看下长成什么样:hbase(main):001:0>scan'iteblog',{'LIMIT'=>10}ROWCOLUMN+CELLA-1000-1550572395399column=f:age,timestamp=1549091990253,value=54A-1000-1550572395399column=f:uuid,timestamp=1549091990253,value=e9b10a9f-1218-43fd-bd01A-1000-1550572413799column=f:age,timestamp=1549092008575,value=4A-1000-1550572413799column=f:uuid,timestamp=1549092008575,value=181aa91e-5f1d-454c-959cA-1000-1550572414761column=f:age,timestamp=1549092009531,value=33A-1000-1550572414761column=f:uuid,timestamp=1549092009531,value=19aad8d3-621a-473c-8f9fA-1001-1550572394570column=f:age,timestamp=1549091989341,value=64A-1001-1550572394570column=f:uuid,timestamp=1549091989341,value=c6712a0d-3793-46d5-865bA-1001-1550572405337column=f:age,timestamp=1549092000108,value=96A-1001-1550572405337column=f:uuid,timestamp=1549092000108,value=4bf05d10-bb4d-43e3-99573/12A-1001-1550572419688column=f:age,timestamp=1549092014458,value=8A-1001-1550572419688column=f:uuid,timestamp=1549092014458,value=f04ba835-d8ac-49a3-8f96A-1002-1550572424041column=f:age,timestamp=1549092018816,value=84A-1002-1550572424041column=f:uuid,timestamp=1549092018816,value=99d6c989-afb5-4101-9d95A-1003-1550572431830column=f:age,timestamp=1549092026605,value=21A-1003-1550572431830column=f:uuid,timestamp=1549092026605,value=8c1ff1b6-b97c-4059-9b68A-1004-1550572395399column=f:age,timestamp=1549091990253,value=2A-1004-1550572395399column=f:uuid,timestamp=1549091990253,value=e240aa0f-c044-452f-89c0A-1004-1550572403783column=f:age,timestamp=1549091998555,value=6A-1004-1550572403783column=f:uuid,timestamp=1549091998555,value=e8df15c9-02fa-458e-bd0c10row(s)Took0.
1104seconds使用协处理器查询加盐之后的表现在有数据了,我们需要查询所有UID=1000的用户所有历史数据,那么如何查呢我们知道UID=1000的用户数据是均匀放到上述的27个Region里面的,因为经过加盐了,所以这些数据前缀都是类似于A-,B-,C-等开头的.
其次我们需要知道,每个Region其实是有StartKey和EndKey的,这些StartKey和EndKey其实就是我们创建iteblog表指定的.
如果你看了《HBase协处理器入门及实战》这篇文章,你就知道协处理器的代码其实是在每个Region里面执行的;而这些代码在Region里面执行的时候是可以拿到当前Region的信息,包括了StartKey和EndKey,所以其实我们可以将拿到的StartKey信息和查询的UID进行拼接,这样就可以查询我们要的数据.
协处理器处理篇就是基于这样的思想来查询加盐之后的数据的.
定义proto文件为什么需要定义这个请参见《HBase协处理器入门及实战》这篇文章.
因为我们查询的时候需要传入查询的参数,比如tableName、StartKey、EndKey以及是否加盐等标记;同时当查询到结果的时候,我们还需要将数据返回,所以我们定义的proto文件如下:optionjava_package="com.
iteblog.
data.
coprocessor.
generated";optionjava_outer_classname="DataQueryProtos";optionjava_generic_services=true;optionjava_generate_equals_and_hash=true;optionoptimize_for=SPEED;4/12messageDataQueryRequest{optionalstringtableName=1;optionalstringstartRow=2;optionalstringendRow=3;optionalboolincluedEnd=4;optionalboolisSalting=5;}messageDataQueryResponse{messageCell{requiredbytesvalue=1;requiredbytesfamily=2;requiredbytesqualifier=3;requiredbytesrow=4;requiredint64timestamp=5;}messageRow{optionalbytesrowKey=1;repeatedCellcellList=2;}repeatedRowrowList=1;}serviceQueryDataService{rpcqueryByStartRowAndEndRow(DataQueryRequest)returns(DataQueryResponse);}然后我们使用protobuf-maven-plugin插件将上面的proto生成java类,具体如何操作参见《在IDEA中使用Maven编译proto文件》.
我们将生成的DataQueryProtos.
java类拷贝到com.
iteblog.
data.
coprocessor.
generated包里面.
编写协处理器代码有了请求和返回的类,现在我们需要编写协处理器的处理代码了,结合上面的分析,协处理器的代码实现如下:packagecom.
iteblog.
data.
coprocessor;importcom.
google.
protobuf.
ByteString;5/12importcom.
google.
protobuf.
RpcCallback;importcom.
google.
protobuf.
RpcController;importcom.
google.
protobuf.
Service;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
QueryDataService;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryRequest;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryResponse;importorg.
apache.
hadoop.
hbase.
Cell;importorg.
apache.
hadoop.
hbase.
CoprocessorEnvironment;importorg.
apache.
hadoop.
hbase.
client.
Get;importorg.
apache.
hadoop.
hbase.
client.
Result;importorg.
apache.
hadoop.
hbase.
client.
Scan;importorg.
apache.
hadoop.
hbase.
coprocessor.
CoprocessorException;importorg.
apache.
hadoop.
hbase.
coprocessor.
RegionCoprocessor;importorg.
apache.
hadoop.
hbase.
coprocessor.
RegionCoprocessorEnvironment;importorg.
apache.
hadoop.
hbase.
regionserver.
InternalScanner;importorg.
apache.
hadoop.
hbase.
shaded.
protobuf.
ResponseConverter;importorg.
apache.
hadoop.
hbase.
util.
Bytes;importjava.
io.
IOException;importjava.
util.
ArrayList;importjava.
util.
Collections;importjava.
util.
List;publicclassSlatTableDataSearchextendsQueryDataServiceimplementsRegionCoprocessor{privateRegionCoprocessorEnvironmentenv;publicIterablegetServices(){returnCollections.
singleton(this);}@OverridepublicvoidqueryByStartRowAndEndRow(RpcControllercontroller,DataQueryRequestrequest,RpcCallbackdone){DataQueryResponseresponse=null;StringstartRow=request.
getStartRow();StringendRow=request.
getEndRow();StringregionStartKey=Bytes.
toString(this.
env.
getRegion().
getRegionInfo().
getStartKey());if(request.
getIsSalting()){StringstartSalt=null;if(null!
=regionStartKey&®ionStartKey.
length()!
=0){startSalt=regionStartKey;}if(null!
=startSalt&&null!
=startRow){6/12startRow=startSalt+"-"+startRow;endRow=startSalt+"-"+endRow;}}Scanscan=newScan();if(null!
=startRow){scan.
withStartRow(Bytes.
toBytes(startRow));}if(null!
=endRow){scan.
withStopRow(Bytes.
toBytes(endRow),request.
getIncluedEnd());}try(InternalScannerscanner=this.
env.
getRegion().
getScanner(scan)){Listresults=newArrayList();booleanhasMore;DataQueryResponse.
BuilderresponseBuilder=DataQueryResponse.
newBuilder();do{hasMore=scanner.
next(results);DataQueryResponse.
Row.
BuilderrowBuilder=DataQueryResponse.
Row.
newBuilder();if(results.
size()>0){Cellcell=results.
get(0);rowBuilder.
setRowKey(ByteString.
copyFrom(cell.
getRowArray(),cell.
getRowOffset(),cell.
getRowLength()));for(Cellkv:results){buildCell(rowBuilder,kv);}}responseBuilder.
addRowList(rowBuilder);results.
clear();}while(hasMore);response=responseBuilder.
build();}catch(IOExceptione){ResponseConverter.
setControllerException(controller,e);}done.
run(response);}privatevoidbuildCell(DataQueryResponse.
Row.
BuilderrowBuilder,Cellkv){DataQueryResponse.
Cell.
BuildercellBuilder=DataQueryResponse.
Cell.
newBuilder();7/12cellBuilder.
setFamily(ByteString.
copyFrom(kv.
getFamilyArray(),kv.
getFamilyOffset(),kv.
getFamilyLength()));cellBuilder.
setQualifier(ByteString.
copyFrom(kv.
getQualifierArray(),kv.
getQualifierOffset(),kv.
getQualifierLength()));cellBuilder.
setRow(ByteString.
copyFrom(kv.
getRowArray(),kv.
getRowOffset(),kv.
getRowLength()));cellBuilder.
setValue(ByteString.
copyFrom(kv.
getValueArray(),kv.
getValueOffset(),kv.
getValueLength()));cellBuilder.
setTimestamp(kv.
getTimestamp());rowBuilder.
addCellList(cellBuilder);}/***Storesareferencetothecoprocessorenvironmentprovidedbythe*{@linkorg.
apache.
hadoop.
hbase.
regionserver.
RegionCoprocessorHost}fromtheregionwherethis*coprocessorisloaded.
Sincethisisacoprocessorendpoint,italwaysexpectstobeloaded*onatableregion,soalwaysexpectsthistobeaninstanceof*{@linkRegionCoprocessorEnvironment}.
**@paramenvtheenvironmentprovidedbythecoprocessorhost*@throwsIOExceptioniftheprovidedenvironmentisnotaninstanceof*{@codeRegionCoprocessorEnvironment}*/@Overridepublicvoidstart(CoprocessorEnvironmentenv)throwsIOException{if(envinstanceofRegionCoprocessorEnvironment){this.
env=(RegionCoprocessorEnvironment)env;}else{thrownewCoprocessorException("Mustbeloadedonatableregion!
");}}@Overridepublicvoidstop(CoprocessorEnvironmentenv){//nothingtodo}}大家可以看到,这里面的代码框架和《HBase协处理器入门及实战》里面介绍的HBase提供的RowCountEndpoint示例代码很类似.
主要逻辑在queryByStartRowAndEndRow函数实现里面.
我们通过DataQueryRequest拿到客户端查询的表,StartKey和EndKey8/12等数据.
通过this.
env.
getRegion().
getRegionInfo().
getStartKey()可以拿到当前Region的StartKey,然后再和客户端传进来的StartKey和EndKey进行拼接就可以拿到完整的Rowkey前缀.
剩下的查询就是正常的HBaseScan代码了.
现在我们将SlatTableDataSearch类进行编译打包,并部署到HBase表里面去,具体如何部署参见《HBase协处理器入门及实战》协处理器客户端代码编写到这里,我们的协处理器服务器端的代码和部署已经完成了,现在我们需要编写协处理器客户端代码.
其实也很简单,如下:packagecom.
iteblog.
data;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
QueryDataService;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryRequest;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryResponse;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryResponse.
*;importorg.
apache.
hadoop.
conf.
Configuration;importorg.
apache.
hadoop.
hbase.
HBaseConfiguration;importorg.
apache.
hadoop.
hbase.
TableName;importorg.
apache.
hadoop.
hbase.
client.
Connection;importorg.
apache.
hadoop.
hbase.
client.
ConnectionFactory;importorg.
apache.
hadoop.
hbase.
client.
HTable;importorg.
apache.
hadoop.
hbase.
ipc.
CoprocessorRpcUtils.
BlockingRpcCallback;importorg.
apache.
hadoop.
hbase.
ipc.
ServerRpcController;importjava.
util.
LinkedList;importjava.
util.
List;importjava.
util.
Map;publicclassDataQuery{privatestaticConfigurationconf=null;static{conf=HBaseConfiguration.
create();conf.
set("hbase.
zookeeper.
quorum","https://www.
iteblog.
com:2181");}staticListqueryByStartRowAndStopRow(StringtableName,StringstartRow,StringstopRow,booleanisIncludeEnd,booleanisSalting){finalDataQueryRequest.
BuilderrequestBuilder=DataQueryRequest.
newBuilder();requestBuilder.
setTableName(tableName);requestBuilder.
setStartRow(startRow);9/12requestBuilder.
setEndRow(stopRow);requestBuilder.
setIncluedEnd(isIncludeEnd);requestBuilder.
setIsSalting(isSalting);try{Connectionconnection=ConnectionFactory.
createConnection(conf);HTabletable=(HTable)connection.
getTable(TableName.
valueOf(tableName));Map>result=table.
coprocessorService(QueryDataService.
class,null,null,counter->{ServerRpcControllercontroller=newServerRpcController();BlockingRpcCallbackcall=newBlockingRpcCallback();counter.
queryByStartRowAndEndRow(controller,requestBuilder.
build(),call);DataQueryResponseresponse=call.
get();if(controller.
failedOnException()){throwcontroller.
getFailedOn();}returnresponse.
getRowListList();});Listlist=newLinkedList();for(Map.
Entry>entry:result.
entrySet()){if(null!
=entry.
getKey()){list.
addAll(entry.
getValue());}}returnlist;}catch(Throwablee){e.
printStackTrace();}returnnull;}publicstaticvoidmain(String[]args){Listrows=queryByStartRowAndStopRow("iteblog","1000","1001",false,true);if(null!
=rows){System.
out.
println(rows.
size());for(DataQueryResponse.
Rowrow:rows){ListcellListList=row.
getCellListList();for(DataQueryResponse.
Cellcell:cellListList){System.
out.
println(row.
getRowKey().
toStringUtf8()+"\t"+"column="+cell.
getFamily().
toStringUtf8()+":"+cell.
getQualifier().
toStringUtf8("timestamp="+cell.
getTimestamp(10/12"value="+cell.
getValue().
toStringUtf8());}}}}}我们运行上面的代码,可以得到如下的输出:A-1000-1550572395399column=f:age,timestamp=1549091990253,value=54A-1000-1550572395399column=f:uuid,timestamp=1549091990253,value=e9b10a9f-1218-43fd-bd01A-1000-1550572413799column=f:age,timestamp=1549092008575,value=4A-1000-1550572413799column=f:uuid,timestamp=1549092008575,value=181aa91e-5f1d-454c-959cA-1000-1550572414761column=f:age,timestamp=1549092009531,value=33A-1000-1550572414761column=f:uuid,timestamp=1549092009531,value=19aad8d3-621a-473c-8f9fB-1000-1550572388491column=f:age,timestamp=1549091983276,value=1B-1000-1550572388491column=f:uuid,timestamp=1549091983276,value=cf720efe-2ad2-48d6-81b8B-1000-1550572392922column=f:age,timestamp=1549091987701,value=7B-1000-1550572392922column=f:uuid,timestamp=1549091987701,value=8a047118-e130-48cb-adfehbase(main):020:0>scan'iteblog',{STARTROW=>'A-1000',ENDROW=>'A-1001'}ROWCOLUMN+CELLA-1000-1550572395399column=f:age,timestamp=1549091990253,value=54A-1000-1550572395399column=f:uuid,timestamp=1549091990253,value=e9b10a9f-1218-43fd-bd01A-1000-1550572413799column=f:age,timestamp=1549092008575,value=4A-1000-1550572413799column=f:uuid,timestamp=1549092008575,value=181aa91e-5f1d-454c-959cA-1000-1550572414761column=f:age,timestamp=1549092009531,value=33A-1000-1550572414761column=f:uuid,timestamp=1549092009531,value=19aad8d3-621a-473c-8f9f3row(s)Took0.
0569seconds可以看到,和我们使用HBaseShell输出的一致,而且我们还把所有的UID=100011/12的数据拿到了.
好了,到这里,使用协处理器查询HBase加盐之后的表已经算完成了,明天我将介绍使用Spark如何查询加盐之后的表.
本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载.
本文链接:【】()PoweredbyTCPDF(www.
tcpdf.
org)12/12

IMIDC日本多IP服务器$88/月起,E3-123x/16GB/512G SSD/30M带宽

IMIDC是一家香港本土运营商,商家名为彩虹数据(Rainbow Cloud),全线产品自营,自有IP网络资源等,提供的产品包括VPS主机、独立服务器、站群独立服务器等,数据中心区域包括香港、日本、台湾、美国和南非等地机房,CN2网络直连到中国大陆。目前主机商针对日本独立服务器做促销活动,而且提供/28 IPv4,国内直连带宽优惠后每月仅88美元起。JP Multiple IP Customize...

虎跃云-物理机16H/32G/50M山东枣庄高防BGP服务器低至550元每月!

虎跃科技怎么样?虎跃科技(虎跃云)是一家成立于2017年的国内专业服务商,专业主营云服务器和独立服务器(物理机)高防机房有着高端华为T级清洗能力,目前产品地区有:山东,江苏,浙江等多地区云服务器和独立服务器,今天虎跃云给大家带来了优惠活动,为了更好的促销,枣庄高防BGP服务器最高配置16核32G仅需550元/月,有需要的小伙伴可以来看看哦!产品可以支持24H无条件退款(活动产品退款请以活动规则为准...

Stablehost 美国主机商黑五虚拟主机四折

如今我们网友可能较多的会选择云服务器、VPS主机,对于虚拟主机的话可能很多人不会选择。但是我们有些外贸业务用途的建站项目还是会有选择虚拟主机的。今天看到的Stablehost 商家虚拟主机在黑五期间也有四折优惠,对于这个服务商而言不是特别的喜欢,虽然他们商家和我们熟悉的老鹰主机商有些类似,且在后来老鹰主机改版和方案后,Stablehost 商家也会跟随改版,但是性价比认为不如老鹰主机。这次黑色星期...

协处理器为你推荐
包过滤防火墙和灵巧网关设置经济开发区127basedcss支持ipad深圳市富满电子集团股份有限公司netbios端口netbios ssn是什么意思?win10445端口Win10系统开放端口号怎样查看?xp系统关闭445端口xp中,如何关闭掉一些没有用的端口,请高手解答?win7还原系统win7如何一键还原电脑系统怎么操作苹果5.1完美越狱iOS5.1.1完美越狱教程
万网免费域名 域名抢注工具 云网数据 冰山互联 uk2 nerd gateone 777te godaddy域名证书 e蜗 空间论坛 789电视 赞助 91vps 中国电信测网速 速度云 秒杀汇 超级服务器 starry 浙江服务器 更多