hadoop怎么重新启动tasktracker
如果是某个TaskTracker挂掉了想要重启,比较稳妥的方式是在master直接运行一次start-mapred.sh(或者start-all.sh也可以)脚本。
脚本会忽略正在正常运行的hadoop进程,并在没有运行TaskTracker的节点上启动TaskTracker。
如果是想停止某个节点上的Taskracker再启动,直接登录到该节点,kill掉TaskTracker进程,再返回master运行start-mapred.sh。
Hadoop,怎么实现多个输入路径的输入?
1.多路径输入
1)FileInputFormat.addInputPath 多次调用加载不同路径
.apache.hadoop.mapreduce.lib.input.FileInputFormat;
.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
String?in0?=?args[0];
String?in1?=?args[1];
String?out?=?args[2];
FileInputFormat.addInputPath(job,new?Path(in0));
FileInputFormat.addInputPath(job,new?Path(in1));
FileOutputFormat.setOutputPath(job,new?Path(out));2)FileInputFormat.addInputPaths一次调用加载 多路径字符串用逗号隔开
FileInputFormat.addInputPaths(job, "hdfs://RS5-112:9000/cs/path1,hdfs://RS5-112:9000/cs/path2");
2.多种输入
MultipleInputs可以加载不同路径的输入文件,并且每个路径可用不同的maper
MultipleInputs.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path1"), TextInputFormat.class,MultiTypeFileInput1Mapper.class);
MultipleInputs.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path3"), TextInputFormat.class,MultiTypeFileInput3Mapper.class);Hadoop是怎么分块的
hadoop的分块有两部分,其中第一部分更为人熟知一点。
第一部分就是数据的划分(即把File划分成Block),这个是物理上真真实实的进行了划分,数据文件上传到HDFS里的时候,需要划分成一块一块,每块的大小由hadoop-default.xml里配置选项进行划分。
<property>
<name>dfs.block.size</name>
<value>67108864</value>
<description>The default block size for new files.</description>
</property>
这个就是默认的每个块64MB。
数据划分的时候有冗余,个数是由
<property>
<name>dfs.replication</name>
<value>3</value>
<description>Default block replication.
The actual number of replications can be specified when the file is created.
The default is used if replication is not specified in create time.
</description>
</property>
指定的。
具体的物理划分步骤要看Namenode,这里要说的是更有意思的hadoop中的第二种划分。
在hadoop中第二种划分是由InputFormat这个接口来定义的,其中有个getSplits方法。
这里就有了一个新的不为人熟知的概念:Split。
Split的作用是什么,Split和Block是什么关系,下面就可以说明清楚。
在Hadoop0.1中,split划分是在JobTracker端完成的,发生在JobInitThread对JobInProgress调用inittasks()的时候;而在0.18.3中是由JobClient完成的,JobClient划分好后,把split.file写入hdfs里,到时候jobtracker端只需要读这个文件,就知道Split是怎么划分的了。
第二种划分只是一种逻辑上划分,目的是为了让Map Task更好的获取数据输入,仔细分析如下这个场景:
File 1 : Block11, Block 12, Block 13, Block 14, Block 15
File 2 : Block21, Block 22, Block 23
File1有5个Block,最后一个Block当然可能小于64MB;File2有3个Block
如果用户在程序中指定map tasks的个数,比如说是2(如果不指定的话maptasks个数默认是1),那么在
FileInputFormat(最常见的InputFormat实现)的getSplits方法中,首先会计算totalSize=8(可以对照源码看看,注意getSplits这个函数里的计量单位是Block个数,而不是Byte个数,后面有个变量叫bytesremaining仍然表示剩余的Block个数,有些变量名让人无语),然后会计算goalSize=totalSize/numSplits=4,对于File1,计算一个Split有多少个Block是这样计算的
long splitSize =puteSplitSize(goalSize, minSize, blockSize);
protected puteSplitSize(long goalSize, long minSize, long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
这里minSize是1(说明了一个Split至少包含一个Block,不会出现一个Split包含零点几个Block的情况),计算得出splitSize=4,所以接下来Split划分是这样分的:
Split 1: Block11, Block12, Block13,Block14
Split 2: Block15
Split 3: Block21, Block22, Block23
那用户指定的map个数是2,出现了三个split怎么办?在JobInProgress里其实maptasks的个数是根据Splits的长度来指定的,所以用户指定的map个数只是个参考。
可以参看JobInProgress: initTasks()
里的代码:
try {
splits = JobClient.readSplitFile(splitFile);
} finally {
splitFile.close();
}
numMapTasks = splits.length;
maps = new TaskInProgress[numMapTasks];
所以问题就很清晰了,还如果用户指定了20个map作业,那么最后会有8个Split(每个Split一个Block),所以最后实际上就有8个MapTasks,也就是说maptask的个数是由splits的长度决定的。
几个简单的结论:
1. 一个split不会包含零点几或者几点几个Block,一定是包含大于等于1个整数个Block
2. 一个split不会包含两个File的Block,不会跨越File边界
3. split和Block的关系是一对多的关系
4. maptasks的个数最终决定于splits的长度
还有一点需要说明,在FileSplit类中,有一项是private String[] hosts;
看上去是说明这个FileSplit是放在哪些机器上的,实际上hosts里只是存储了一个Block的冗余机器列表。
比如上面例子中的Split 1: Block11, Block12, Block13,Block14,这个FileSplit中的hosts里最终存储的是Block11本身和其冗余所在的机器列表,也就是说Block12,Block13,Block14存在哪些机器上没有在FileSplit中记录。
FileSplit中的这个属性有利于调度作业时候的数据本地性问题。
如果一个tasktracker前来索取task,jobtracker就会找个task给他,找到一个maptask,得先看这个task的输入的FileSplit里hosts是否包含tasktracker所在机器,也就是判断和该tasktracker同时存在一个机器上的datanode是否拥有FileSplit中某个Block的备份。
但总之,只能牵就一个Block,其他Block就从网络上传罢。
mapreduce怎么导入hbace
1、先看一个标准的hbase作为数据读取源和输出源的样例:
[java] view plaincopy在CODE上查看代码片派生到我的代码片
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "job name ");
job.setJarByClass(test.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(inputTable, scan, mapper.class,
Writable.class, Writable.class, job);
TableMapReduceUtil.initTableReducerJob(outputTable, reducer.class, job);
job.waitForCompletion(true);
首先创建配置信息和作业对象,设置作业的类。
这些和正常的mapreduce一样,
[java] view plaincopy在CODE上查看代码片派生到我的代码片
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "job name ");
job.setJarByClass(test.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(inputTable, scan, mapper.class,
Writable.class, Writable.class, job);
TableMapReduceUtil.initTableReducerJob(outputTable, reducer.class, job);
job.waitForCompletion(true);
唯一不一样的就是数据源的说明部分,TableMapReduceUtil的initTableMapperJob和initTableReducerJob方法来实现。
用如上代码:
数据输入源是hbase的inputTable表,执行mapper.class进行map过程,输出的key/value类型是ImmutableBytesWritable和Put类型,最后一个参数是作业对象。
需要指出的是需要声明一个扫描读入对象scan,进行表扫描读取数据用,其中scan可以配置参数,这里为了例子简单不再详述。
数据输出目标是hbase的outputTable表,输出执行的reduce过程是reducer.class类,操作的作业目标是job。
与map比缺少输出类型的标注,因为他们不是必要的,看过源代码就知道mapreduce的TableRecordWriter中write(key,value)方法中,key值是没有用到的。
value只能是Put或者Delete两种类型,write方法会自行判断并不用用户指明。
接下来就是mapper类:
[java] view plaincopy在CODE上查看代码片派生到我的代码片
public class mapper extends
TableMapper {
public void map(Writable key, Writable value, Context context)
throws IOException, InterruptedException {
//mapper逻辑
context.write(key, value);
}
}
}
继承的是hbase中提供的TableMapper类,其实这个类也是继承的MapReduce类。
后边跟的两个泛型参数指定类型是mapper输出的数据类型,该类型必须继承自Writable类,例如可能用到的put和delete就可以。
需要注意的是要和initTableMapperJob方法指定的数据类型一直。
该过程会自动从指定hbase表内一行一行读取数据进行处理。
然后reducer类:
[java] view plaincopy在CODE上查看代码片派生到我的代码片
public class countUniteRedcuer extends
TableReducer {
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
//reducer逻辑
context.write(null, put or delete);
}
}
reducer继承的是TableReducer类。
后边指定三个泛型参数,前两个必须对应map过程的输出key/value类型,第三个必须是put或者delete。
write的时候可以把key写null,它是不必要的。
这样reducer输出的数据会自动插入outputTable指定的表内。
2、有时候我们需要数据源是hdfs的文本,输出对象是hbase。
这时候变化也很简单:
[java] view plaincopy
你会发现只需要像平常的mapreduce的作业声明过程一样,指定mapper的执行类和输出key/value类型,指定FileInputFormat.setInputPaths的数据源路径,输出声明不变。
便完成了从hdfs文本读取数据输出到hbase的命令声明过程。
mapper和reducer如下:
[java] view plaincopy在CODE上查看代码片派生到我的代码片
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "job name ");
job.setJarByClass(test.class);
job.setMapperClass(mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, path);
TableMapReduceUtil.initTableReducerJob(tableName,
reducer.class, job);
[java] view plaincopy在CODE上查看代码片派生到我的代码片
public class mapper extends Mapper {
public void map(LongWritable key, Text line, Context context) {
//mapper逻辑
context.write(k, one);
}
}
public class redcuer extends
TableReducer {
public void reduce(Writable key, Iterable values, Context context)
throws IOException, InterruptedException {
//reducer逻辑
context.write(null, put or delete);
}
}
[java] view plaincopy
mapper还依旧继承原来的MapReduce类中的Mapper即可。
同样注意这前后数据类型的key/value一直性。
[java] view plaincopy
3、最后就是从hbase中的表作为数据源读取,hdfs作为数据输出,简单的如下:
[java] view plaincopy在CODE上查看代码片派生到我的代码片
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "job name ");
job.setJarByClass(test.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(inputTable, scan, mapper.class,
Writable.class, Writable.class, job);
job.setOutputKeyClass(Writable.class);
job.setOutputValueClass(Writable.class);
FileOutputFormat.setOutputPath(job, Path);
job.waitForCompletion(true);
mapper和reducer简单如下:
[java] view plaincopy在CODE上查看代码片派生到我的代码片Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "job name ");
job.setJarByCl ass(test.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(inputTable, scan, mapper.class,
Writable.class, Writable.class, job);
TableMapReduceUtil.initTableReducerJob(outputTable, reducer.class, job);
job.waitForCompletion(true); Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "job name ");
job.setJarByClass(test.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(inputTable, scan, mapper.class,
Writable.class, Writable.class, job);
TableMapReduceUtil.initTableReducerJob(outputTable, reducer.class, job);
job.waitForCompletion(true);
唯一不一样的就是数据源的说明部分,TableMapReduceUtil的initTableMapperJob和initTableReducerJob方法来实现。
public class mapper extends
TableMapper {
public void map(Writable key, Writable value, Context context)
throws IOException, InterruptedException {
//mapper逻辑
context.write(key, value);
}
}
} public class countUniteRedcuer extends
TableReducer {
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
//reducer逻辑
context.write(null, put or delete);
}
}mapreduce 键值对怎么定义的
一般情况下Mapreduce输出的键值对是以制表符 为分隔符的,如下图所示:
但有时候我们像将其设置为其它的分隔符输出,比如",",如下图所示:
此时可以在Mapreduce的主函数中添加如下的两行代码:
[java] view plain copy print?
conf.set("mapred.textoutputformat.ignoreseparator","true");
conf.set("mapred.textoutputformat.separator",",");
具体如下的WordCount程序:
[java] view plain copy print?
import java.io.IOException;
import java.util.StringTokenizer;
.apache.hadoop.conf.Configuration;
.apache.hadoop.fs.Path;
.apache.hadoop.io.IntWritable;
.apache.hadoop.io.Text;
.apache.hadoop.mapreduce.Job;
.apache.hadoop.mapreduce.Mapper;
.apache.hadoop.mapreduce.Reducer;
.apache.hadoop.mapreduce.lib.input.FileInputFormat;
.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper extends
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//设置MapReduce的输出的分隔符为逗号
conf.set("mapred.textoutputformat.ignoreseparator", "true");
conf.set("mapred.textoutputformat.separator", ",");
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
鲨鱼机房(Sharktech)我们也叫它SK机房,是一家成立于2003年的老牌国外主机商,提供的产品包括独立服务器租用、VPS主机等,自营机房在美国洛杉矶、丹佛、芝加哥和荷兰阿姆斯特丹等,主打高防产品,独立服务器免费提供60Gbps/48Mpps攻击防御。机房提供1-10Gbps带宽不限流量服务器,最低丹佛/荷兰机房每月49美元起,洛杉矶机房最低59美元/月起。下面列出部分促销机型的配置信息。机房...
官方网站:点击访问酷番云官网活动方案:优惠方案一(限时秒杀专场)有需要海外的可以看看,比较划算29月,建议年付划算,月付续费不同价,这个专区。国内节点可以看看,性能高IO为主, 比较少见。平常一般就100IO 左右。优惠方案二(高防专场)高防专区主要以高防为主,节点有宿迁,绍兴,成都,宁波等,节点挺多,都支持防火墙自助控制。续费同价以下专场。 优惠方案三(精选物理机)西南地区节点比较划算,赠送5...
今天有看到Raksmart账户中有一台VPS主机即将到期,这台机器之前是用来测试评测使用的。这里有不打算续费,这不面对万一导致被自动续费忘记,所以我还是取消自动续费设置。如果我们也有类似的问题,这里就演示截图设置Raksmart取消自动续费。这里我们可以看到上图,在对应VPS主机的【其余操作】中可以看到默认已经是不自动续费,所以我们也不要担心被自动续费的。当然,如果有被自动续费,我们确实不想续费的...
fileinputformat为你推荐
草莓派怎么做草莓派?qq博客怎样开通QQ博客?vga接口定义主板上的VGA接口有什么用?搜索引擎的概念7 什么是搜索引擎?如何在Internet上搜索图片和文字资料的?seo优化技术什么是SEO优化,seo优化有什么用?awvawv转换器哪里下?12种颜色水粉颜料调色过程十二种颜色assemblyinfoasp.net这几个文件是干什么的?jstz请帮忙翻译asp大马黑帽seo的webshell中,什么是大马和小马
河北服务器租用 广东vps cve-2014-6271 kdata 韩国加速器 正版win8.1升级win10 网站被封 丹弗 太原联通测速平台 七夕促销 789电视剧 网站在线扫描 香港亚马逊 空间租赁 谷歌台湾 xuni 锐速 accountsuspended 第八届中美互联网论坛 西部数码主机 更多