处理器协处理器

协处理器  时间:2021-05-24  阅读:()
HBase中加盐(Salting)之后的表如何读取:协处理器篇在《HBaseRowkey设计指南》文章中,我们介绍了避免数据热点的三种比较常见方法:加盐-Salting哈希-Hashing反转-Reversing其中在加盐(Salting)的方法里面是这么描述的:给Rowkey分配一个随机前缀以使得它和之前排序不同.
但是在Rowkey前面加了随机前缀,那么我们怎么将这些数据读出来呢我将分三篇文章来介绍如何读取加盐之后的表,其中每篇文章提供一种方法,主要包括:使用协处理器读取加盐的表使用Spark读取加盐的表使用MapReduce读取加盐的表关于协处理器的入门及实战,请参见这里.
本文使用的各组件版本:hadoop-2.
7.
7,hbase-2.
0.
4,jdk1.
8.
0_201.
测试数据生成在介绍如何查询数据之前,我们先创建一张名为iteblog的HBase表,用于测试.
为了数据均匀和介绍的方便,这里使用了预分区,并设置了27个分区,如下:hbase(main):002:0>create'iteblog','f',SPLITS=>['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z']0row(s)in2.
4880seconds然后我们使用下面方法生成了1000000条测试数据.
RowKey的形式为UID+当前数据生成时间戳;由于UID的长度为4,所以1000000条数据会存在大量的UID一样的数据,所以我们使用加盐方法将这些数据均匀分散到上述27个Region里面(注意,其实第一个Region其实没数据).
具体代码如下:packagecom.
iteblog.
data;importorg.
apache.
hadoop.
conf.
Configuration;importorg.
apache.
hadoop.
hbase.
HBaseConfiguration;importorg.
apache.
hadoop.
hbase.
HConstants;importorg.
apache.
hadoop.
hbase.
TableName;1/12importorg.
apache.
hadoop.
hbase.
client.
*;importorg.
apache.
hadoop.
hbase.
util.
Bytes;importjava.
io.
IOException;importjava.
util.
ArrayList;importjava.
util.
List;importjava.
util.
Random;importjava.
util.
UUID;publicclassHBaseDataGenerator{privatestaticbyte[]FAMILY="f".
getBytes();privatestaticbyte[]QUALIFIER_UUID="uuid".
getBytes();privatestaticbyte[]QUALIFIER_AGE="age".
getBytes();privatestaticchargenerateLetter(){return(char)(Math.
random()*26+'A');}privatestaticlonggenerateUid(intn){return(long)(Math.
random()*9*Math.
pow(10,n-1))+(long)Math.
pow(10,n-1);}publicstaticvoidmain(String[]args)throwsIOException{BufferedMutatorParamsbmp=newBufferedMutatorParams(TableName.
valueOf("iteblog"));bmp.
writeBufferSize(1024*1024*24);Configurationconf=HBaseConfiguration.
create();conf.
set(HConstants.
ZOOKEEPER_QUORUM,"https://www.
iteblog.
com:2181");Connectionconnection=ConnectionFactory.
createConnection(conf);BufferedMutatorbufferedMutator=connection.
getBufferedMutator(bmp);intBATCH_SIZE=1000;intCOUNTS=1000000;intcount=0;ListputList=newArrayList();for(inti=0;i0){bufferedMutator.
mutate(putList);bufferedMutator.
flush();putList.
clear();}}}运行完上面代码之后,会生成1000000条数据(注意,这里其实不严谨,因为Rowkey设计问题,可能会导致重复的Rowkey生成,所以实际情况下可能没有1000000条数据.
).
我们limit10条数据看下长成什么样:hbase(main):001:0>scan'iteblog',{'LIMIT'=>10}ROWCOLUMN+CELLA-1000-1550572395399column=f:age,timestamp=1549091990253,value=54A-1000-1550572395399column=f:uuid,timestamp=1549091990253,value=e9b10a9f-1218-43fd-bd01A-1000-1550572413799column=f:age,timestamp=1549092008575,value=4A-1000-1550572413799column=f:uuid,timestamp=1549092008575,value=181aa91e-5f1d-454c-959cA-1000-1550572414761column=f:age,timestamp=1549092009531,value=33A-1000-1550572414761column=f:uuid,timestamp=1549092009531,value=19aad8d3-621a-473c-8f9fA-1001-1550572394570column=f:age,timestamp=1549091989341,value=64A-1001-1550572394570column=f:uuid,timestamp=1549091989341,value=c6712a0d-3793-46d5-865bA-1001-1550572405337column=f:age,timestamp=1549092000108,value=96A-1001-1550572405337column=f:uuid,timestamp=1549092000108,value=4bf05d10-bb4d-43e3-99573/12A-1001-1550572419688column=f:age,timestamp=1549092014458,value=8A-1001-1550572419688column=f:uuid,timestamp=1549092014458,value=f04ba835-d8ac-49a3-8f96A-1002-1550572424041column=f:age,timestamp=1549092018816,value=84A-1002-1550572424041column=f:uuid,timestamp=1549092018816,value=99d6c989-afb5-4101-9d95A-1003-1550572431830column=f:age,timestamp=1549092026605,value=21A-1003-1550572431830column=f:uuid,timestamp=1549092026605,value=8c1ff1b6-b97c-4059-9b68A-1004-1550572395399column=f:age,timestamp=1549091990253,value=2A-1004-1550572395399column=f:uuid,timestamp=1549091990253,value=e240aa0f-c044-452f-89c0A-1004-1550572403783column=f:age,timestamp=1549091998555,value=6A-1004-1550572403783column=f:uuid,timestamp=1549091998555,value=e8df15c9-02fa-458e-bd0c10row(s)Took0.
1104seconds使用协处理器查询加盐之后的表现在有数据了,我们需要查询所有UID=1000的用户所有历史数据,那么如何查呢我们知道UID=1000的用户数据是均匀放到上述的27个Region里面的,因为经过加盐了,所以这些数据前缀都是类似于A-,B-,C-等开头的.
其次我们需要知道,每个Region其实是有StartKey和EndKey的,这些StartKey和EndKey其实就是我们创建iteblog表指定的.
如果你看了《HBase协处理器入门及实战》这篇文章,你就知道协处理器的代码其实是在每个Region里面执行的;而这些代码在Region里面执行的时候是可以拿到当前Region的信息,包括了StartKey和EndKey,所以其实我们可以将拿到的StartKey信息和查询的UID进行拼接,这样就可以查询我们要的数据.
协处理器处理篇就是基于这样的思想来查询加盐之后的数据的.
定义proto文件为什么需要定义这个请参见《HBase协处理器入门及实战》这篇文章.
因为我们查询的时候需要传入查询的参数,比如tableName、StartKey、EndKey以及是否加盐等标记;同时当查询到结果的时候,我们还需要将数据返回,所以我们定义的proto文件如下:optionjava_package="com.
iteblog.
data.
coprocessor.
generated";optionjava_outer_classname="DataQueryProtos";optionjava_generic_services=true;optionjava_generate_equals_and_hash=true;optionoptimize_for=SPEED;4/12messageDataQueryRequest{optionalstringtableName=1;optionalstringstartRow=2;optionalstringendRow=3;optionalboolincluedEnd=4;optionalboolisSalting=5;}messageDataQueryResponse{messageCell{requiredbytesvalue=1;requiredbytesfamily=2;requiredbytesqualifier=3;requiredbytesrow=4;requiredint64timestamp=5;}messageRow{optionalbytesrowKey=1;repeatedCellcellList=2;}repeatedRowrowList=1;}serviceQueryDataService{rpcqueryByStartRowAndEndRow(DataQueryRequest)returns(DataQueryResponse);}然后我们使用protobuf-maven-plugin插件将上面的proto生成java类,具体如何操作参见《在IDEA中使用Maven编译proto文件》.
我们将生成的DataQueryProtos.
java类拷贝到com.
iteblog.
data.
coprocessor.
generated包里面.
编写协处理器代码有了请求和返回的类,现在我们需要编写协处理器的处理代码了,结合上面的分析,协处理器的代码实现如下:packagecom.
iteblog.
data.
coprocessor;importcom.
google.
protobuf.
ByteString;5/12importcom.
google.
protobuf.
RpcCallback;importcom.
google.
protobuf.
RpcController;importcom.
google.
protobuf.
Service;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
QueryDataService;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryRequest;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryResponse;importorg.
apache.
hadoop.
hbase.
Cell;importorg.
apache.
hadoop.
hbase.
CoprocessorEnvironment;importorg.
apache.
hadoop.
hbase.
client.
Get;importorg.
apache.
hadoop.
hbase.
client.
Result;importorg.
apache.
hadoop.
hbase.
client.
Scan;importorg.
apache.
hadoop.
hbase.
coprocessor.
CoprocessorException;importorg.
apache.
hadoop.
hbase.
coprocessor.
RegionCoprocessor;importorg.
apache.
hadoop.
hbase.
coprocessor.
RegionCoprocessorEnvironment;importorg.
apache.
hadoop.
hbase.
regionserver.
InternalScanner;importorg.
apache.
hadoop.
hbase.
shaded.
protobuf.
ResponseConverter;importorg.
apache.
hadoop.
hbase.
util.
Bytes;importjava.
io.
IOException;importjava.
util.
ArrayList;importjava.
util.
Collections;importjava.
util.
List;publicclassSlatTableDataSearchextendsQueryDataServiceimplementsRegionCoprocessor{privateRegionCoprocessorEnvironmentenv;publicIterablegetServices(){returnCollections.
singleton(this);}@OverridepublicvoidqueryByStartRowAndEndRow(RpcControllercontroller,DataQueryRequestrequest,RpcCallbackdone){DataQueryResponseresponse=null;StringstartRow=request.
getStartRow();StringendRow=request.
getEndRow();StringregionStartKey=Bytes.
toString(this.
env.
getRegion().
getRegionInfo().
getStartKey());if(request.
getIsSalting()){StringstartSalt=null;if(null!
=regionStartKey&®ionStartKey.
length()!
=0){startSalt=regionStartKey;}if(null!
=startSalt&&null!
=startRow){6/12startRow=startSalt+"-"+startRow;endRow=startSalt+"-"+endRow;}}Scanscan=newScan();if(null!
=startRow){scan.
withStartRow(Bytes.
toBytes(startRow));}if(null!
=endRow){scan.
withStopRow(Bytes.
toBytes(endRow),request.
getIncluedEnd());}try(InternalScannerscanner=this.
env.
getRegion().
getScanner(scan)){Listresults=newArrayList();booleanhasMore;DataQueryResponse.
BuilderresponseBuilder=DataQueryResponse.
newBuilder();do{hasMore=scanner.
next(results);DataQueryResponse.
Row.
BuilderrowBuilder=DataQueryResponse.
Row.
newBuilder();if(results.
size()>0){Cellcell=results.
get(0);rowBuilder.
setRowKey(ByteString.
copyFrom(cell.
getRowArray(),cell.
getRowOffset(),cell.
getRowLength()));for(Cellkv:results){buildCell(rowBuilder,kv);}}responseBuilder.
addRowList(rowBuilder);results.
clear();}while(hasMore);response=responseBuilder.
build();}catch(IOExceptione){ResponseConverter.
setControllerException(controller,e);}done.
run(response);}privatevoidbuildCell(DataQueryResponse.
Row.
BuilderrowBuilder,Cellkv){DataQueryResponse.
Cell.
BuildercellBuilder=DataQueryResponse.
Cell.
newBuilder();7/12cellBuilder.
setFamily(ByteString.
copyFrom(kv.
getFamilyArray(),kv.
getFamilyOffset(),kv.
getFamilyLength()));cellBuilder.
setQualifier(ByteString.
copyFrom(kv.
getQualifierArray(),kv.
getQualifierOffset(),kv.
getQualifierLength()));cellBuilder.
setRow(ByteString.
copyFrom(kv.
getRowArray(),kv.
getRowOffset(),kv.
getRowLength()));cellBuilder.
setValue(ByteString.
copyFrom(kv.
getValueArray(),kv.
getValueOffset(),kv.
getValueLength()));cellBuilder.
setTimestamp(kv.
getTimestamp());rowBuilder.
addCellList(cellBuilder);}/***Storesareferencetothecoprocessorenvironmentprovidedbythe*{@linkorg.
apache.
hadoop.
hbase.
regionserver.
RegionCoprocessorHost}fromtheregionwherethis*coprocessorisloaded.
Sincethisisacoprocessorendpoint,italwaysexpectstobeloaded*onatableregion,soalwaysexpectsthistobeaninstanceof*{@linkRegionCoprocessorEnvironment}.
**@paramenvtheenvironmentprovidedbythecoprocessorhost*@throwsIOExceptioniftheprovidedenvironmentisnotaninstanceof*{@codeRegionCoprocessorEnvironment}*/@Overridepublicvoidstart(CoprocessorEnvironmentenv)throwsIOException{if(envinstanceofRegionCoprocessorEnvironment){this.
env=(RegionCoprocessorEnvironment)env;}else{thrownewCoprocessorException("Mustbeloadedonatableregion!
");}}@Overridepublicvoidstop(CoprocessorEnvironmentenv){//nothingtodo}}大家可以看到,这里面的代码框架和《HBase协处理器入门及实战》里面介绍的HBase提供的RowCountEndpoint示例代码很类似.
主要逻辑在queryByStartRowAndEndRow函数实现里面.
我们通过DataQueryRequest拿到客户端查询的表,StartKey和EndKey8/12等数据.
通过this.
env.
getRegion().
getRegionInfo().
getStartKey()可以拿到当前Region的StartKey,然后再和客户端传进来的StartKey和EndKey进行拼接就可以拿到完整的Rowkey前缀.
剩下的查询就是正常的HBaseScan代码了.
现在我们将SlatTableDataSearch类进行编译打包,并部署到HBase表里面去,具体如何部署参见《HBase协处理器入门及实战》协处理器客户端代码编写到这里,我们的协处理器服务器端的代码和部署已经完成了,现在我们需要编写协处理器客户端代码.
其实也很简单,如下:packagecom.
iteblog.
data;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
QueryDataService;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryRequest;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryResponse;importcom.
iteblog.
data.
coprocessor.
generated.
DataQueryProtos.
DataQueryResponse.
*;importorg.
apache.
hadoop.
conf.
Configuration;importorg.
apache.
hadoop.
hbase.
HBaseConfiguration;importorg.
apache.
hadoop.
hbase.
TableName;importorg.
apache.
hadoop.
hbase.
client.
Connection;importorg.
apache.
hadoop.
hbase.
client.
ConnectionFactory;importorg.
apache.
hadoop.
hbase.
client.
HTable;importorg.
apache.
hadoop.
hbase.
ipc.
CoprocessorRpcUtils.
BlockingRpcCallback;importorg.
apache.
hadoop.
hbase.
ipc.
ServerRpcController;importjava.
util.
LinkedList;importjava.
util.
List;importjava.
util.
Map;publicclassDataQuery{privatestaticConfigurationconf=null;static{conf=HBaseConfiguration.
create();conf.
set("hbase.
zookeeper.
quorum","https://www.
iteblog.
com:2181");}staticListqueryByStartRowAndStopRow(StringtableName,StringstartRow,StringstopRow,booleanisIncludeEnd,booleanisSalting){finalDataQueryRequest.
BuilderrequestBuilder=DataQueryRequest.
newBuilder();requestBuilder.
setTableName(tableName);requestBuilder.
setStartRow(startRow);9/12requestBuilder.
setEndRow(stopRow);requestBuilder.
setIncluedEnd(isIncludeEnd);requestBuilder.
setIsSalting(isSalting);try{Connectionconnection=ConnectionFactory.
createConnection(conf);HTabletable=(HTable)connection.
getTable(TableName.
valueOf(tableName));Map>result=table.
coprocessorService(QueryDataService.
class,null,null,counter->{ServerRpcControllercontroller=newServerRpcController();BlockingRpcCallbackcall=newBlockingRpcCallback();counter.
queryByStartRowAndEndRow(controller,requestBuilder.
build(),call);DataQueryResponseresponse=call.
get();if(controller.
failedOnException()){throwcontroller.
getFailedOn();}returnresponse.
getRowListList();});Listlist=newLinkedList();for(Map.
Entry>entry:result.
entrySet()){if(null!
=entry.
getKey()){list.
addAll(entry.
getValue());}}returnlist;}catch(Throwablee){e.
printStackTrace();}returnnull;}publicstaticvoidmain(String[]args){Listrows=queryByStartRowAndStopRow("iteblog","1000","1001",false,true);if(null!
=rows){System.
out.
println(rows.
size());for(DataQueryResponse.
Rowrow:rows){ListcellListList=row.
getCellListList();for(DataQueryResponse.
Cellcell:cellListList){System.
out.
println(row.
getRowKey().
toStringUtf8()+"\t"+"column="+cell.
getFamily().
toStringUtf8()+":"+cell.
getQualifier().
toStringUtf8("timestamp="+cell.
getTimestamp(10/12"value="+cell.
getValue().
toStringUtf8());}}}}}我们运行上面的代码,可以得到如下的输出:A-1000-1550572395399column=f:age,timestamp=1549091990253,value=54A-1000-1550572395399column=f:uuid,timestamp=1549091990253,value=e9b10a9f-1218-43fd-bd01A-1000-1550572413799column=f:age,timestamp=1549092008575,value=4A-1000-1550572413799column=f:uuid,timestamp=1549092008575,value=181aa91e-5f1d-454c-959cA-1000-1550572414761column=f:age,timestamp=1549092009531,value=33A-1000-1550572414761column=f:uuid,timestamp=1549092009531,value=19aad8d3-621a-473c-8f9fB-1000-1550572388491column=f:age,timestamp=1549091983276,value=1B-1000-1550572388491column=f:uuid,timestamp=1549091983276,value=cf720efe-2ad2-48d6-81b8B-1000-1550572392922column=f:age,timestamp=1549091987701,value=7B-1000-1550572392922column=f:uuid,timestamp=1549091987701,value=8a047118-e130-48cb-adfehbase(main):020:0>scan'iteblog',{STARTROW=>'A-1000',ENDROW=>'A-1001'}ROWCOLUMN+CELLA-1000-1550572395399column=f:age,timestamp=1549091990253,value=54A-1000-1550572395399column=f:uuid,timestamp=1549091990253,value=e9b10a9f-1218-43fd-bd01A-1000-1550572413799column=f:age,timestamp=1549092008575,value=4A-1000-1550572413799column=f:uuid,timestamp=1549092008575,value=181aa91e-5f1d-454c-959cA-1000-1550572414761column=f:age,timestamp=1549092009531,value=33A-1000-1550572414761column=f:uuid,timestamp=1549092009531,value=19aad8d3-621a-473c-8f9f3row(s)Took0.
0569seconds可以看到,和我们使用HBaseShell输出的一致,而且我们还把所有的UID=100011/12的数据拿到了.
好了,到这里,使用协处理器查询HBase加盐之后的表已经算完成了,明天我将介绍使用Spark如何查询加盐之后的表.
本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载.
本文链接:【】()PoweredbyTCPDF(www.
tcpdf.
org)12/12

ReadyDedis:VPS全场5折,1G内存套餐月付2美元起,8个机房可选_服务器安装svn

ReadyDedis是一家2018年成立的国外VPS商家,由印度人开设,主要提供VPS和独立服务器租用等,可选数据中心包括美国洛杉矶、西雅图、亚特兰大、纽约、拉斯维加斯、杰克逊维尔、印度和德国等。目前,商家针对全部VPS主机提供新年5折优惠码,优惠后最低套餐1GB内存每月仅需2美元起,所有VPS均为1Gbps端口不限流量方式。下面列出几款主机配置信息。CPU:1core内存:1GB硬盘:25GB ...

raksmart:香港机房服务器实测评数据分享,告诉你raksmart服务器怎么样

raksmart作为一家老牌美国机房总是被很多人问到raksmart香港服务器怎么样、raksmart好不好?其实,这也好理解。香港服务器离大陆最近、理论上是不需要备案的服务器里面速度最快的,被过多关注也就在情理之中了。本着为大家趟雷就是本站的光荣这一理念,拿了一台raksmart的香港独立服务器,简单做个测评,分享下实测的数据,仅供参考!官方网站:https://www.raksmart.com...

10gbiz七月活动首月半价$2.36/月: 香港/洛杉矶CN2 GIA VPS

10gbiz怎么样?10gbiz 美国万兆带宽供应商,主打美国直连大带宽,真实硬防。除美国外还提供线路非常优质的香港、日本等数据中心可供选择,全部机房均支持增加独立硬防。洛杉矶特色线路去程三网直连(电信、联通、移动)回程CN2 GIA优化,全天低延迟。中国大陆访问质量优秀,最多可增加至600G硬防。香港七星级网络,去程回程均为电信CN2 GIA+联通+移动,大陆访问相较其他香港GIA线路平均速度更...

协处理器为你推荐
urlcss伺服器win7绑定ipad模块ios8模块iphone重庆宽带测速重庆市电信网速测试是哪个网站或ipwindows键是哪个windows 快捷键 大全勒索病毒win7补丁我的电脑是windows7系统,为什么打不了针对勒索病毒的补丁(杀毒软件显phpecho为什么在PHP中使用echo FALSE;什么也输出不了?应该如何输出FALSE?谢谢!谷歌sb为什么百度一搜SB是谷歌,谷歌一搜SB是百度?
下载虚拟主机 大庆服务器租用 simcentric 台湾服务器 20g硬盘 表单样式 windows2003iso 搜狗12306抢票助手 网站cdn加速 php空间推荐 ftp免费空间 免费phpmysql空间 优酷黄金会员账号共享 海外空间 秒杀品 个人免费邮箱 杭州电信 免备案jsp空间 葫芦机 nnt 更多