fileinputformat不同mapreduce程序可以连续运行吗?比如说多个这样的程序,用上一个的输出作为下一个的输入,求

fileinputformat  时间:2021-06-08  阅读:()

mapreduce中reducenum数量对程序有什么影响

mapreduce中的reduce数量是由什么来进行控制的呢? 1、numReduceTasks 如下是用来进行测试的一段wordcount的代码 import java.io.IOException; import java.util.StringTokenizer; .apache.hadoop.fs.Path; .apache.hadoop.io.IntWritable; .apache.hadoop.io.LongWritable; .apache.hadoop.io.Text; .apache.hadoop.mapred.JobConf; .apache.hadoop.mapreduce.Job; .apache.hadoop.mapreduce.Mapper; .apache.hadoop.mapreduce.Partitioner; .apache.hadoop.mapreduce.Reducer; .apache.hadoop.mapreduce.lib.input.FileInputFormat; .apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class PartTest { public static void main(String[] args){ Path inFile = new Path(args[0]); Path outFile = new Path(args[1]); Job job; try { job = Job.getInstance(); job.setJarByClass(PartTest.class); FileInputFormat.addInputPath(job , inFile); FileOutputFormat.setOutputPath(job, outFile); job.setReducerClass(PartTestreducer.class); job.setMapperClass(PartTestmapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); try { job.waitForCompletion(true); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } /** * InputFormat描述map-reduce中对job的输入定义 * setInputPaths():为map-reduce job设置路径数组作为输入列表 * setInputPath():为map-reduce job设置路径数组作为输出列表 */ } } class PartTestmapper extends Mapper{ private final IntWritable one = new IntWritable(1); //private Text word = new Text(); @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub /* String line = value.toString(); for(String s : line.split("\s+")){ //if(s.length() > 0){ context.write(new Text(s), one); //} } */ String[] line = value.toString().split("\W+"); for(int i = 0 ; i<= line.length-1 ;i++){ String s = line[i]; context.write(new Text(s), new IntWritable(1)); } /* String line = value.toString(); Text word = new Text(); StringTokenizer token = new StringTokenizer(line); while (token.hasMoreTokens()) { word.set(token.nextToken()); context.write(word, one); } */ } } class PartTestreducer extends Reducer{ @Override protected void reduce(Text arg0, Iterable arg1, Reducer.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub int sum = 0; for(IntWritable i : arg1){ sum += i.get(); } context.write(arg0, new IntWritable(sum)); } } 将上述代码打包成 parttest.jar,并上传到服务器的opt目录 创建文件/opt/test.txt,并上传到hdfs的/tmp目录下 文本内容如下: hello world hello test test hadoop hadoop hdfs hive sql sqoop 在服务器上执行: hadoop jar parttest.jar "PartTest" "/tmp/test.txt" "/tmp/part/out1" 我们可以看到日志输出文件: 在这里可以看到只启动了一个reduce任务 然后使用 hadoop fs -ls /tmp/part/out1 可以看到只生成了一个分区文件part-r-00000: 如果我们把上述代码进行修改: job = Job.getInstance(); job.setJarByClass(PartTest.class); FileInputFormat.addInputPath(job , inFile); FileOutputFormat.setOutputPath(job, outFile); job.setNumReduceTasks(3); job.setReducerClass(PartTestreducer.class); job.setMapperClass(PartTestmapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); 我们在代码里新加了一行 :job.setNumReduceTasks(3); 将代码重新打包上传,执行: hadoop jar parttest.jar "PartTest" "/tmp/test.txt" "/tmp/part/out2" 将结果输出到/tmp/part/out2目录 可以看到启动了3个reduce任务。

然后使用 hadoop fs -ls /tmp/part/out2 可以看到/tmp/part/out2文件夹中生成了3个part文件: 所以可以使用 setNumReduceTasks 来设置reduce的数量 2、mapreduce的分区 我们在原来的代码的最后一段加上如下代码: class PartTestPartitioner extends Partitioner{ @Override //参数含义:第一个参数为map任务的outputkey。

class,第二个参数为map任务的alue。

class,第三个参数为分区的数量,默认为1 public int getPartition(Text key, IntWritable value, int numPartitions) { // TODO Auto-generated method stub if(key.toString().startsWith("h")){ return 0%numPartitions; } else if(key.toString().startsWith("s")){ return 1%numPartitions; } else{ return 2%numPartitions; } } } 这段代码的含义是: 将以h开头的统计结果输出到part-r-00000 将以s开头的统计结果输出到part-r-00001 将以其他字母开头的统计结果输出到part-r-00002 对原有代码进行如下修改: job = Job.getInstance(); job.setJarByClass(PartTest.class); FileInputFormat.addInputPath(job , inFile); FileOutputFormat.setOutputPath(job, outFile); job.setNumReduceTasks(3); job.setPartitionerClass(PartTestPartitioner.class); job.setReducerClass(PartTestreducer.class); job.setMapperClass(PartTestmapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); 新加了一行代码:job.setPartitionerClass(PartTestPartitioner.class); 将代码重新打包上传,执行: hadoop jar parttest.jar "PartTest" "/tmp/test.txt" "/tmp/part/out3" 将结果输出到/tmp/part/out3目录 可以看到启动了3个reduce任务。

然后使用 hadoop fs -ls /tmp/part/out3 可以看到/tmp/part/out3文件夹中生成了3个part文件: 分别查看三个文件: 可以看到输出结果已经分别输出到对应的分区文件。

注意: job.setNumReduceTasks(3); job.setPartitionerClass(PartTestPartitioner.class); NumReduceTasks的数量不能小于partitioner的数量,否则结果会写到part-r-00000中 分类: HADOOP

mapreduce 怎么查看每个reducer处理的数据量

您好,第一种方法是用Mapper读取文本文件用StringTokenizer对读取文件内的每一行的数字(Hadoop处理文本文件时,处理时是一行一行记取的)进行分隔,获取每一个数字,然后求和,再将求得的值按Key/Value格式写入Context,最后用Reducer对求得中间值进行汇总求和,得出整个文件所有数字的和。

第二种方法是用Mapper读取文本文件用StringTokenizer对文件内的数字进行分隔,获取每一个数字,并救出文件中该数字有多少个,在合并过程中,求出每个数字在文件中的和,最后用Reducer对求得每个数字求得的和进行汇总求和,得出整个文件所有数字的和。

.hadoop; import java.io.IOException; import java.util.StringTokenizer; .apache.hadoop.conf.Configuration; .apache.hadoop.fs.Path; .apache.hadoop.io.LongWritable; .apache.hadoop.io.Text; .apache.hadoop.mapreduce.Job; .apache.hadoop.mapreduce.Mapper; .apache.hadoop.mapreduce.Reducer; .apache.hadoop.mapreduce.lib.input.FileInputFormat; .apache.hadoop.mapreduce.lib.output.FileOutputFormat; .apache.hadoop.util.GenericOptionsParser; public class NumberSum { //对每一行数据进行分隔,并求和 public static class SumMapper extends Mapper<Object, Text, Text, LongWritable> { private Text word = new Text("sum"); private static LongWritable numValue = new LongWritable(1); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); long sum = 0; while (itr.hasMoreTokens()) { String s = itr.nextToken(); long val = Long.parseLong(s); sum += val; } numValue.set(sum); context.write(word, numValue); } } // 汇总求和,输出 public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> { private LongWritable result = new LongWritable(); private Text k = new Text("sum"); public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable val : values) { long v = val.get(); sum += v; } result.set(sum); context.write(k, result); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: numbersum <in> <out>"); System.exit(2); } Job job = new Job(conf, "number sum"); job.setJarByClass(NumberSum.class); job.setMapperClass(SumMapper.class); job.setReducerClass(SumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); System.out.println("ok"); } } 第一种实现的方法相对简单,第二种实现方法用到了Combiner类,Hadoop 在 对中间求的的数值进行Combiner时,是通过递归的方式不停地对 Combiner方法进行调用,进而合并数据的。

从两种方法中,我们可以看出Map/Reduce的核心是在怎样对输入的数据按照何种方式是进行Key/Value分对的,分的合理对整个结果输出及算法实现有很大的影响。

spark 读取多个文件支持通配符吗?为什么用通配符提示找不到文件

val wc = sc.textFile("/user/boco/yy/_*").flatMap(_.split()).map((_,1)).groupByKey 直接用*代替,不用加“/”,刚我试过了。

而且就算加,怎么会加到*后面啊,加到后面就是找"_*"文件夹了

Hadoop中mapred包和mapreduce包的区别

今天写了段代码突然发现,很多类在mapred和mapreduce中分别都有定义,下面是小菜写的一段代码: public class MyJob extends Configured implements Tool { public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, Text> {// public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { output.collect(value, key); } } public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String csv = ""; while (values.hasNext()) { csv += csv.length() > 0 ? "," : ""; csv += values.next().toString(); } output.collect(key, new Text(csv)); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); JobConf job = new JobConf(conf, MyJob.class); //JobConf job = new JobConf(conf, MyJob.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("MyJob"); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormat(KeyValueTextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.set("key.value.separator.in.input.line", ","); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { // TODO Auto-generated method stub int res = ToolRunner.run(new Configuration(), new MyJob(), args); System.exit(res); } } 主要看run方法: 上面代码中的Jobconf无可厚非,只有在mapred包中有定义,这个没问题。

但是FileInputFormat和FileOutputFormat在mapred和mapreduce中都有定义,刚开始脑海里对这些都没有概念,就引用了mapreduce中的FileInputFormat和FIleOutputFormat。

这样操作就带来了后面的问题 FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); 这两条语句不能通过编译,为什么呢,因为FileInputFormat.setInputPaths和FileOutputFormat.setOutputPath的第一个参数都是Job,而不是JobConf,找了很多资料,由于对Hadoop了解少,所以找资料没有方向感,这也是学习新东西效率低下的原因,如果有哪位大牛,知道怎么克服效率低下的问题,请不吝赐教! 后来,无意中,看到mapred包中也有这两个类的定义,于是火箭速度修改为mapred下的包,OK,顺利通过编译! 下面还有 job.setOutputFormat(TextOutputFormat.class);语句编译不同通过,提示参数需要扩展。





的参数;于是小菜也去mapred下面查找是否存在此类,正如期望,也存在此类,当即立段,修改为此包下的类,顺利编译通过,此时,颇有成就感! 可是现在小菜发现,mapred包下和mapreduce包下同时都存在又相应的类,不知道是为什么,那么下面就有目标的请教搜索引擎啦,呵呵,比刚才有很大进步。

结果令小菜很失望,就找到了一个符合理想的帖子。

但是通过这个帖子,小菜知道了,mapred代表的是hadoop旧API,而mapreduce代表的是hadoop新的API。

OK,小菜在google输入框中输入“hadoop新旧API的区别”,结果很多。

看了之后,又结合权威指南归结如下: 1. 首先第一条,也是小菜今天碰到这些问题的原因,新旧API不兼容。

所以,以前用旧API写的hadoop程序,如果旧API不可用之后需要重写,也就是上面我的程序需要重写,如果旧API不能用的话,如果真不能用,这个有点儿小遗憾! 2. 新的API倾向于使用抽象类,而不是接口,使用抽象类更容易扩展。

例如,我们可以向一个抽象类中添加一个方法(用默认的实现)而不用修改类之前的实现方法。

因此,在新的API中,Mapper和Reducer是抽象类。

3. 新的API广泛使用context object(上下文对象),并允许用户代码与MapReduce系统进行通信。

例如,在新的API中,MapContext基本上充当着JobConf的OutputCollector和Reporter的角色。

4. 新的API同时支持"推"和"拉"式的迭代。

在这两个新老API中,键/值记录对被推mapper中,但除此之外,新的API允许把记录从map()方法中拉出,这也适用于reducer。

分批处理记录是应用"拉"式的一个例子。

5. 新的API统一了配置。

旧的API有一个特殊的JobConf对象用于作业配置,这是一个对于Hadoop通常的Configuration对象的扩展。

在新的API中,这种区别没有了,所以作业配置通过Configuration来完成。

作业控制的执行由Job类来负责,而不是JobClient,并且JobConf和JobClient在新的API中已经荡然无存。

这就是上面提到的,为什么只有在mapred中才有Jobconf的原因。

6. 输出文件的命名也略有不同,map的输出命名为part-m-nnnnn,而reduce的输出命名为part-r-nnnnn,这里nnnnn指的是从0开始的部分编号。

这样了解了二者的区别就可以通过程序的引用包来判别新旧API编写的程序了。

小菜建议最好用新的API编写hadoop程序,以防旧的API被抛弃!!!

fileinputformat是抽象类吗

不是抽象类,你看看API啊,继承于InputStream是抽象类, FileInputStream不是,它有3个构造器 FileInputStream(File file) Creates a FileInputStream by opening a connection to an actual file, the file named by the File object file in the file system. FileInputStream(FileDescriptor fdObj) Creates a FileInputStream by using the file descriptor fdObj, which represents an existing connection to an actual file in the file system. FileInputStream(String name) Creates a FileInputStream by opening a connection to an actual file, the file named by the path name name in the file system.

不同mapreduce程序可以连续运行吗?比如说多个这样的程序,用上一个的输出作为下一个的输入,求

可以的。

最简单的方式就是在一个类里定义 map1 reduce1 map2 reduce2 map3 reduce3 map2 的输入设为map1的输出 map3 的输入可以设为map2和map3的输入。

青云互联-洛杉矶CN2弹性云限时五折,9.5元/月起,三网CN2gia回程,可选Windows,可自定义配置

官方网站:点击访问青云互联官网优惠码:五折优惠码:5LHbEhaS (一次性五折,可月付、季付、半年付、年付)活动方案:的套餐分为大带宽限流和小带宽不限流两种套餐,全部为KVM虚拟架构,而且配置都可以弹性设置1、洛杉矶cera机房三网回程cn2gia 洛杉矶cera机房                ...

FBICDN,0.1元解决伪墙/假墙攻击,超500 Gbps DDos 防御,每天免费流量高达100G,免费高防网站加速服务

最近很多网站都遭受到了伪墙/假墙攻击,导致网站流量大跌,间歇性打不开网站。这是一种新型的攻击方式,攻击者利用GWF规则漏洞,使用国内服务器绑定host的方式来触发GWF的自动过滤机制,造成GWF暂时性屏蔽你的网站和服务器IP(大概15分钟左右),使你的网站在国内无法打开,如果攻击请求不断,那么你的网站就会是一个一直无法正常访问的状态。常规解决办法:1,快速备案后使用国内服务器,2,使用国内免备案服...

Hostodo:$34.99/年KVM-2.5GB/25G NVMe/8TB/3个数据中心

Hostodo在九月份又发布了两款特别套餐,开设在美国拉斯维加斯、迈阿密和斯波坎机房,基于KVM架构,采用NVMe SSD高性能磁盘,最低1.5GB内存8TB月流量套餐年付34.99美元起。Hostodo是一家成立于2014年的国外VPS主机商,主打低价VPS套餐且年付为主,基于OpenVZ和KVM架构,美国三个地区机房,支持支付宝或者PayPal、加密货币等付款。下面列出这两款主机配置信息。CP...

fileinputformat为你推荐
Honeypotnc如何使用mac地址克隆路由器的Mac地址克隆有什么作用?awvAWV的转换器 要免费的 看好是AWV不是AMV鄂n鄂N的车牌是那里的?遗传算法实例如何用C语言实现遗传算法的实际应用?棋牌论坛三个知名的游戏相关论坛,要求有网址的!labelforandroid:labelfor是什么意思腾讯贴吧腾讯论坛里找自己发的帖网络备份win7中如何备份网络设置tokenstream微信公众平台上,Token是什么?怎么填写?
万网域名查询 西安服务器租用 香港vps主机 重庆服务器托管 buyvm vpsio 鲨鱼机 60g硬盘 mysql主机 免费个人网站申请 北京双线 域名dns 申请网站 腾讯总部在哪 中国电信测速器 群英网络 广东服务器托管 mteam 谷歌搜索打不开 服务器机柜 更多