如何使用Hadoop的Partitioner
Hadoop里面的MapReduce编程模型,非常灵活,大部分环节我们都可以重写它的API,来灵活定制我们自己的一些特殊需求。
今天散仙要说的这个分区函数Partitioner,也是一样如此,下面我们先来看下Partitioner的作用:
对map端输出的数据key作一个散列,使数据能够均匀分布在各个reduce上进行后续操作,避免产生热点区。
Hadoop默认使用的分区函数是Hash Partitioner,源码如下:
/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
//默认使用key的hash值与上int的最大值,避免出现数据溢出 的情况
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
大部分情况下,我们都会使用默认的分区函数,但有时我们又有一些,特殊的需求,而需要定制Partition来完成我们的业务,案例如下:
对如下数据,按字符串的长度分区,长度为1的放在一个,2的一个,3的各一个。
河南省;1
河南;2
中国;3
中国人;4
大;1
小;3
中;11
这时候,我们使用默认的分区函数,就不行了,所以需要我们定制自己的Partition,首先分析下,我们需要3个分区输出,所以在设置reduce的个数时,一定要设置为3,其次在partition里,进行分区时,要根据长度具体分区,而不是根据字符串的hash码来分区。
核心代码如下:
/**
* Partitioner
*
*
* */
public static class PPartition extends Partitioner<Text, Text>{
@Override
public int getPartition(Text arg0, Text arg1, int arg2) {
/**
* 自定义分区,实现长度不同的字符串,分到不同的reduce里面
*
* 现在只有3个长度的字符串,所以可以把reduce的个数设置为3
* 有几个分区,就设置为几
* */
String key=arg0.toString();
if(key.length()==1){
return 1%arg2;
}else if(key.length()==2){
return 2%arg2;
}else if(key.length()==3){
return 3%arg2;
}
return 0;
}
}
全部代码如下:
.partition.test;
import java.io.IOException;
.apache.hadoop.fs.FileSystem;
.apache.hadoop.fs.Path;
.apache.hadoop.io.LongWritable;
.apache.hadoop.io.Text;
.apache.hadoop.mapred.JobConf;
.apache.hadoop.mapreduce.Job;
.apache.hadoop.mapreduce.Mapper;
.apache.hadoop.mapreduce.Partitioner;
.apache.hadoop.mapreduce.Reducer;
.apache.hadoop.mapreduce.lib.db.DBConfiguration;
.apache.hadoop.mapreduce.lib.db.DBInputFormat;
.apache.hadoop.mapreduce.lib.input.FileInputFormat;
.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
.qin.operadb.PersonRecoder;
.qin.operadb.ReadMapDB;
/**
* @author qindongliang
*
* 大数据交流群:376932160
*
*
* **/
public class MyTestPartition {
/**
* map任务
*
* */
public static class PMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
// System.out.println("进map了");
//mos.write(namedOutput, key, value);
String ss[]=value.toString().split(";");
context.write(new Text(ss[0]), new Text(ss[1]));
}
}
/**
* Partitioner
*
*
* */
public static class PPartition extends Partitioner<Text, Text>{
@Override
public int getPartition(Text arg0, Text arg1, int arg2) {
/**
* 自定义分区,实现长度不同的字符串,分到不同的reduce里面
*
* 现在只有3个长度的字符串,所以可以把reduce的个数设置为3
* 有几个分区,就设置为几
* */
String key=arg0.toString();
if(key.length()==1){
return 1%arg2;
}else if(key.length()==2){
return 2%arg2;
}else if(key.length()==3){
return 3%arg2;
}
return 0;
}
}
/***
* Reduce任务
*
* **/
public static class PReduce extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2)
throws IOException, InterruptedException {
String key=arg0.toString().split(",")[0];
System.out.println("key==> "+key);
for(Text t:arg1){
//System.out.println("Reduce: "+arg0.toString()+" "+t.toString());
arg2.write(arg0, t);
}
}
}
public static void main(String[] args) throws Exception{
JobConf conf=new JobConf(ReadMapDB.class);
//Configuration conf=new Configuration();
conf.set("mapred.job.tracker","192.168.75.130:9001");
//读取person中的数据字段
conf.setJar("tt.jar");
//注意这行代码放在最前面,进行初始化,否则会报
/**Job任务**/
Job job=new Job(conf, "testpartion");
job.setJarByClass(MyTestPartition.class);
System.out.println("模式: "+conf.get("mapred.job.tracker"));;
// job.setCombinerClass(PCombine.class);
job.setPartitionerClass(PPartition.class);
job.setNumReduceTasks(3);//设置为3
job.setMapperClass(PMapper.class);
// MultipleOutputs.addNamedOutput(job, "hebei", TextOutputFormat.class, Text.class, Text.class);
// MultipleOutputs.addNamedOutput(job, "henan", TextOutputFormat.class, Text.class, Text.class);
job.setReducerClass(PReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
String path="hdfs://192.168.75.130:9000/root/outputdb";
FileSystem fs=FileSystem.get(conf);
Path p=new Path(path);
if(fs.exists(p)){
fs.delete(p, true);
System.out.println("输出路径存在,已删除!");
}
FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input");
FileOutputFormat.setOutputPath(job,p );
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}如何使用eclipse调试Hadoop作业
将hadoop开发包里面的相关jar导进工程就行,
至于想调试,就看hadoop计数器返回到eclipse里的内容就可以了.
不过有一点,
如果调试的是MapReduce,速度可能不快.Hadoop,Combiner有什么用?
Combiner,Combiner号称本地的Reduce,Reduce最终的输入,是Combiner的输出。
Combiner是用reducer来定义的,多数的情况下Combiner和reduce处理的是同一种逻辑,所以job.setCombinerClass()的参数可以直接使用定义的reduce。
当然也可以单独去定义一个有别于reduce的Combiner,继承Reducer,写法基本上定义reduce一样。
炭云怎么样?炭云(之前的碳云),国人商家,正规公司(哈尔滨桓林信息技术有限公司),主机之家测评介绍过多次。现在上海CN2共享IP的VPS有一款特价,上海cn2 vps,2核/384MB内存/8GB空间/800GB流量/77Mbps端口/共享IP/Hyper-v,188元/年,特别适合电信网络。有需要的可以关注一下。点击进入:炭云官方网站地址炭云vps套餐:套餐cpu内存硬盘流量/带宽ip价格购买上...
TNAHosting是一家成立于2012年的国外主机商,提供VPS主机及独立服务器租用等业务,其中VPS主机基于OpenVZ和KVM架构,数据中心在美国芝加哥机房。目前,商家在LET推出芝加哥机房大硬盘高配VPS套餐,再次刷新了价格底线,基于OpenVZ架构,12GB内存,500GB大硬盘,支持月付仅5美元起。下面列出这款VPS主机配置信息。CPU:4 cores内存:12GB硬盘:500GB月流...
ZJI原名维翔主机,是原来Wordpress圈知名主机商家,成立于2011年,2018年9月更名为ZJI,提供香港、日本、美国独立服务器(自营/数据中心直营)租用及VDS、虚拟主机空间、域名注册业务。ZJI今年全新上架了台湾CN2线路服务器,本月针对香港高主频服务器和台湾CN2服务器提供7折优惠码,其他机房及产品提供8折优惠码,优惠后台湾CN2线路E5服务器月付595元起。台湾一型CPU:Inte...
fileinputformat为你推荐
ostringstreamC++中ostringstream和ostream有什么区别,菜鸟求问视频压缩算法视频压缩原理扫图高清扫图是什么意思,在很多的贴吧里,都有提到一些高清扫图,是自己照杂志上的图片,然后自己再修一下吗搜索引擎的概念什么叫搜索引擎?搜索引擎的类型有哪些?模式识别算法机器学习和模式识别有什么区别?看教材,发现它们的算法都差不多一样啊。。。jstz举手望,草上马跑,打什么数字?微软操作系统下载微软的系统到哪下载谷歌图片识别怎么通过一张GIF图在网上搜索出其出处(你们懂的...)以图搜图那个百度只找到了一模一样的..,有微信智能机器人有没有可以拉进微信群的聊天机器人超级播放器那种电影网站就是那种不需要下载播放器可以直接看
域名空间 河北服务器租用 二级域名申请 cve-2014-6271 鲨鱼机 softbank官网 表格样式 中国电信宽带测速网 免费外链相册 云营销系统 SmartAXMT800 cdn免备案空间 木马检测 宿主机 小米电视主机 电脑主机内部结构 网通ip地址 未注册双拼域名 彩虹云点播点点版 杭州电信宽带测速 更多