浅谈hadoop中mapreduce的文件分发

hadoop中mapreduce的常用类(1)

写这个文章的时候才意识到新旧API是同时存在于1.1.2的hadoop中的。以前还一直纳闷儿为什么有时候是jobClient提交任务,有时是Job…不管API是否更新,下面这些类也还是存在于API中的,经过自己跟踪源码,发现原理还是这些。只不过进行了重新组织,进行了一些封装,使得扩展性更好。所以还是把这些东西从记事本贴进来吧。

关于这些类的介绍以及使用,有的是在自己debug中看到的,多数为纯翻译API的注释,但是翻译的过程受益良多。

GenericOptionsParser

parseGeneralOptions(Options opts, Configuration conf, String[]
args)解析命令行参数

GenericOptionsParser是为hadoop框架解析命令行参数的工具类。它能够辨认标准的命令行参数,使app能够轻松指定namenode,jobtracker,以及额外的配置资源或信息等。它支持的功能有:

-conf 指定配置文件;

-D 指定配置信息;

-fs 指定namenode

-jt 指定jobtracker

-files 指定需要copy到MR集群的文件,以逗号分隔

-libjars指定需要copy到MR集群的classpath的jar包,以逗号分隔

-archives指定需要copy到MR集群的压缩文件,以逗号分隔,会自动解压缩

  1. String[] otherArgs = new GenericOptionsParser(job, args)

  2. .getRemainingArgs();

  3. if (otherArgs.length != 2) {

  4. System.err.println(“Usage: wordcount “);

  5. System.exit(2);

  6. }

ToolRunner

用来跑实现Tool接口的工具。它与GenericOptionsParser合作来解析命令行参数,只在此次运行中更改configuration的参数。

Tool

处理命令行参数的接口。Tool是MR的任何tool/app的标准。这些实现应该代理对标准命令行参数的处理。下面是典型实现:

public class MyApp extends Configured implements Tool {              public int run(String[] args) throws Exception {   // 即将被ToolRunner执行的Configuration   Configuration conf = getConf();               // 使用conf建立JobConf   JobConf job = new JobConf(conf, MyApp.class);           // 执行客户端参数   Path in = new Path(args[1]);   Path out = new Path(args[2]);               // 指定job相关的参数        job.setJobName("my-app");   job.setInputPath(in);   job.setOutputPath(out);   job.setMapperClass(MyApp.MyMapper.class);   job.setReducerClass(MyApp.MyReducer.class);   *   // 提交job,然后监视进度直到job完成   JobClient.runJob(job);   }              public static void main(String[] args) throws Exception {   // 让ToolRunner 处理命令行参数    int res = ToolRunner.run(new Configuration(), new Sort(), //这里封装了GenericOptionsParser解析args               System.exit(res);   }   }   

MultipleOutputFormat

自定义输出文件名称或者说名称格式。在jobconf中setOutputFormat(MultipleOutputFormat的子类)就行了。而不是那种part-r-00000啥的了。。。并且可以分配结果到多个文件中。

MultipleOutputFormat继承了FileOutputFormat,
允许将输出数据写进不同的输出文件中。有三种应用场景:

a.
最少有一个reducer的mapreduce任务。这个reducer想要根据实际的key将输出写进不同的文件中。假设一个key编码了实际的key和为实际的key指定的位置

b.
只有map的任务。这个任务想要把输入文件或者输入内容的部分名称设为输出文件名。

c. 只有map的任务。这个任务为输出命名时,需要依赖keys和输入文件名。 

//这里是根据key生成多个文件的地方,可以看到还有value,name等参数   @Override   protected String generateFileNameForKeyValue(Text key,   IntWritable value, String name) {   char c = key.toString().toLowerCase().charAt(0);   if (c >= 'a' && c <= 'z') {   return c + ".txt";   }   return "result.txt";   }   

DistributedCache

在集群中快速分发大的只读文件。DistributedCache是MR用来缓存app需要的诸如text,archive,jar等的文件的。app通过jobconf中的url来指定需要缓存的文件。它会假定指定的这个文件已经在url指定的对应位置上了。在job在node上执行之前,DistributedCache会copy必要的文件到这个slave
node。它的功效就是为每个job只copy一次,而且copy到指定位置,能够自动解压缩。

DistributedCache可以用来分发简单的只读文件,或者一些复杂的例如archive,jar文件等。archive文件会自动解压缩,而jar文件会被自动放置到任务的classpath中(lib)。分发压缩archive时,可以指定解压名称如:dict.zip#dict。这样就会解压到dict中,否则默认是dict.zip中。

文件是有执行权限的。用户可以选择在任务的工作目录下建立指向DistributedCache的软链接。

DistributedCache.createSymlink(conf);     DistributedCache.addCacheFile(new Path("hdfs://host:port/absolute-path#link-name").toUri(), conf);      

DistributedCache.createSymlink(Configuration)方法让DistributedCache
在当前工作目录下创建到缓存文件的符号链接。则在task的当前工作目录会有link-name的链接,相当于快捷方法,链接到expr.txt文件,在setup方法使用的情况则要简单许多。或者通过设置配置文件属性mapred.create.symlink为yes。
分布式缓存会截取URI的片段作为链接的名字。 例如,URI是
hdfs://namenode:port/lib.so.1#lib.so,
则在task当前工作目录会有名为lib.so的链接,
它会链接分布式缓存中的lib.so.1


图片 1


)
写这个文章的时候才意识到新旧API是同时存在于1.1.2的hadoop中的。以前还一直纳闷儿为什么有时候是jobClient提交任…

版权声明:本文为博主原创文章,未经博主同意不得转载。

近期在做数据分析的时候。须要在mapreduce中调用c语言写的接口,此时就须要把动态链接库so文件分发到hadoop的各个节点上,原来想自己来做这个分发,大概过程就是把so文件放在hdfs上面,然后做mapreduce的时候把so文件从hdfs下载到本地。但查询资料后发现hadoop有相应的组件来帮助我们完毕这个操作,这个组件就是DistributedCache,分布式缓存。运用这个东西能够做到第三方文件的分发和缓存功能,下面具体解释:

假设我们须要在map之间共享一些数据,假设信息量不大,我们能够保持在conf中。可是假设我们须要共享一些配置文件,jar包之类的。此时DistributedCache能够满足我们的需求,使用DistributedCache的过程例如以下:

1.正确配置被分发的文件的路径

2.在自己定义的mapper或reducer中获取文件下载到本地后的路径(linux文件系统路径);通常是重写configure或者重写setup

3.在自己定义的mapper或reducer类中读取这些文件的内容

下面以代码说明三个步骤:

1.Configuration conf = new
Configuration();

 
DistributedCache.addCacheFile(new
URI(“/user/tinfo/zhangguochen/libJMeshCalc.so”), conf);

  DistributedCache.addCacheArchive(new
URI(“/user/tinfo/zhangguochen/libJMeshCalc.zip”),conf);
  DistributedCache.addFileToClassPath(new
URI(“/user/tinfo/zhangguochen/libMeshCal.jar”), conf);

    或者

    conf.set(“mapred.cache.files”,
“/myapp/file”);

 
 conf.set(“mapred.cache.
archives”, “/mayapp/file.zip”);

 
 以上是配置须要分发的hdfs上的文件,可是前提是这些文件必须在hdfs上存在,看源代码可知道DistributedCache的静态方法事实上就是封装了conf.set的动作。

2.在自己的mapper类中,使用DistributedCache获取下载到本地的文件,大部分情况下这些操作都是重写configure接口,然后把本地文件路径保存在mapper类的成员变量中,供map方法使用。代码例如以下:

 
 private Path[] localFiles;

 
 public void setup(Context context) {

   
   localFiles =
DistributeCache.getLocalCacheFiles(context.getConfiguration;

   
   for(Path temp:localFiles) {

   
        String path =
temp.toString();//path就是此文件在本地的路径

   
        if(path.contains(“myfileName”))
{//获取到自己须要的文件

   
        }

   
   }

   
}

getLocalCacheFiles返回的是数组(元素类型是Path),数组内容是这个task(map或reduce)所属的job设定的全部须要被分发的文件,假设设置了
   
多个文件,能够遍历Path数组,用String.contains(“KeyWord”)来推断是否是你所须要的文件。

 
 获取压缩包的路径

 
 private  File[] inputFiles;

 
 private Path[] localArchives;

 
 public void setup(Context context) {

   
   localArchives = DistributeCache.getLocalCacheArvhives();

   
   for(Path archive : localArchives) {

   
        if(archive.toString.contains(“mytarName”))
{//找到自己须要的文件

   
             inputFiles = new
File(archive.toString.listFiles();//获取压缩包下的全部 文件

   
        }

   
   }

 
 }

   
也能够用DistributedCache将所使用到的第三方jar包载入到classpath中DistributedCache.addFileToClassPath

  

 
 通过以上代码发现假设要使用这些分发到各个节点上的文件操作比較复杂,DistributedCache也提供一种更方便的用法,即能够为每一个分发的文件创建一个符号链接,然后hadoop就会在当前mapreduce的执行路径下创建一个到源文件的链接,我们就能够在mapreduce中直接使用这些文件,而不必关心这些文件在本地的路径。

 
演示样例:

 
1.把文件分发到缓存中

 
 Configuration conf = new Configuration();

 
 DistributedCache.createSymlink;//创建符号链接
   DistributedCache.addCacheFile(new
URI(“/user/tinfo/zhangguochen/file1#myfile”),
conf);//加入分布式缓存,myfile是符号

 
 2.在mapreduce中使用

 
  public void setup(Context context) {

 
     File myfile = new
File;//在这里就能够直接通过符号myfile使用此文件

 
  }

 
 或者用下面方式:

 
  conf.set(“mapred.cache.files”,
“/data/data#mData”);
 
   conf.set(“mapred.cache.archives”,
“/data/data.zip#mDataZip”);
 
   conf.set(“mapred.create.symlink”, “yes”); // 是yes,不是true
 
   DistributedCache.createSymlink(Configuration)
 
   在map阶段,仅仅须要File file = new
File;就可以获得该文件……

下面资料来自网络,如有雷同,纯属意外

DistributedCache是Hadoop提供的文件缓存工具,它能够自己主动将指定的文件分发到各个节点上。缓存到本地,供用户程序读取使用。它具有下面几个特点:缓存的文件是仅仅读的,改动这些文件内容没有意义;用户能够调整文件可见范围(比方仅仅能用户自己使用,全部用户都能够使用等),进而防止反复拷贝现象。按需拷贝。文件是通过HDFS作为共享数据中心分发到各节点的。且仅仅发给任务被调度到的节点。本文将介绍DistributedCache在Hadoop
1.0和2.0中的用法及实现原理。

Hadoop
DistributedCache有下面几种典型的应用场景:1)分发字典文件。一些情况下Mapper或者Reducer须要用到一些外部字典。比方黑白名单、词表等;2)map-side
join:当多表连接时,一种场景是一个表很大。一个表很小,小到足以载入到内存中。这时能够使用DistributedCache将小表分发到各个节点上,以供Mapper载入使用;3)自己主动化软件部署:有些情况下。MapReduce需依赖于特定版本号的库,比方依赖于某个版本号的PHP解释器。一种做法是让集群管理员把这个版本号的PHP装到各个机器上。这通常比較麻烦,还有一种方法是使用DistributedCache分发到各个节点上。程序执行完后。Hadoop自己主动将其删除。

Hadoop提供了两种DistributedCache使用方式,一种是通过API。在程序中设置文件路径,第二种是通过命令行(-files。-archives或-libjars)參数告诉Hadoop,个人建议使用第二种方式,该方式可使用下面三个參数设置文件:

-files:将指定的本地/hdfs文件分发到各个Task的工作文件夹下。不正确文件进行不论什么处理。

-archives:将指定文件分发到各个Task的工作文件夹下,并对名称后缀为“.jar”、“.zip”,“.tar.gz”、“.tgz”的文件自己主动解压,默认情况下,解压后的内容存放到工作文件夹下名称为解压前文件名称的文件夹中,比方压缩包为dict.zip,则解压后内容存放到文件夹dict.zip中。为此。你能够给文件起个别名/软链接。比方dict.zip#dict,这样,压缩包会被解压到文件夹dict中。

-libjars:指定待分发的jar包。Hadoop将这些jar包分发到各个节点上后,会将其自己主动加入到任务的CLASSPATH环境变量中。

hadoop jar xxx.jar -files
hdfs://xxx/xx

hadoop jar xxx.jar -libjars
hdfs://xxx/xxx.jar,hdfs://xxx/xx2.jar

前面提到,DistributedCache分发的文件是有可见范围的,有的文件能够仅仅对当前程序可见,程序执行完后。直接删除;有的文件仅仅对当前用户可见(该用户全部程序都能够訪问)。有的文件对全部用户可见。DistributedCache会为每种资源计算一个唯一ID,以识别每一个资源,从而防止资源反复下载,举个样例。假设文件可见范围是全部用户,则在每一个节点上,第一个使用该文件的用户负责缓存该文件,之后的用户直接使用就可以。无需反复下载。那么,Hadoop是如何区分文件可见范围的呢?

在Hadoop
1.0版本号中。Hadoop是以HDFS文件的属性作为标识推断文件可见性的,须要注意的是,待缓存的文件即使是在Hadoop提交作业的client上,也会首先上传到HDFS的某一文件夹下,再分发到各个节点上的,因此。HDFS是缓存文件的必经之路。

对于常常使用的文件或者字典,建议放到HDFS上。这样能够防止每次反复下载。做法例如以下:

比方将数据保存在HDFS的/dict/public文件夹下。并将/dict和/dict/public两层文件夹的可执行权限全部打开(在Hadoop中,可执行权限的含义与linux中的不同,该权限仅仅对文件夹有意义,表示能够查看该文件夹中的子文件夹)。这样,里面全部的资源便是全部用户可用的,而且第一个用到的应用程序会将之缓存到各个节点上,之后全部的应用程序无需反复下载,能够在提交作业时通过下面命令指定:

-files hdfs:///dict/public/blacklist.txt,
hdfs:///dict/public/whilelist.txt

假设有多个HDFS集群能够指定namenode的对外rpc地址:

-files hdfs://host:port/dict/public/blacklist.txt,
hdfs://host:port/dict/public/whilelist.txt

DistributedCache会将blacklist.txt和whilelist.txt两个文件缓存到各个节点的一个公共文件夹下,并在须要时,在任务的工作文件夹下建立一个指向这两个文件的软连接。

假设可执行权限没有打开,则默认仅仅对该应用程序的拥有者可见,该用户全部应用程序可共享这些文件。

一旦你对/dict/public下的某个文件进行了改动,则下次有作业用到相应文件时。会发现文件被改动过了,进而自己主动又一次缓存文件。

对于一些频繁使用的字典。不建议存放在client,每次通过-files指定。这种文件,每次都要经历下面流程:上传到HDFS上—》缓存到各个节点上—》之后不再使用这些文件,直到被清除,也就是说,这种文件,仅仅会被这次执行的应用程序使用,假设再次执行相同的应用程序,即使文件没有被改动,也会又一次经历以上流程,很耗费时间,尤其是字典许多,很大时。

DistributedCache内置缓存置换算法,一旦缓存(文件数目达到一定上限或者文件总大小超过某一上限)满了之后,会踢除最久没有使用的文件。

在Hadopo
2.0中。自带的MapReduce框架仍支持1.0的这种DistributedCache使用方式。但DistributedCache本身是由YARN实现的,不再集成到MapReduce中。YARN还提供了许多相关编程接口供用户调用,有兴趣的能够阅读源代码。

下面介绍Hadoop 2.0中。DistributedCache通过命令行分发文件的基本使用方式:

执行Hadoop自带的example样例,
dict.txt会被缓存到各个Task的工作文件夹下,因此,直接像读取本地文件一样。在Mapper和Reducer中。读取dict.txt就可以:

123456 bin/Hadoopjar \share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar \wordcount \-files hdfs:///dict/public/dict.txt \/test/input\/test/output

Hadoop
Streaming样例,须要通过-files指定mapper和reducer可执行文件或者脚本文件,这些文件就是通过DistributedCache分发到各个节点上的。

123456789101112131415 #!/bin/bashHADOOP_HOME=/opt/yarn-clientINPUT_PATH=/test/input/dataOUTPUT_PATH=/test/output/dataecho"Clearing output path: $OUTPUT_PATH"$HADOOP_HOME/bin/hadoopfs -rmr $OUTPUT_PATH ${HADOOP_HOME}/bin/hadoopjar\   ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar\  -D mapred.reduce.tasks=2\  -files mapper,reducer\  -input $INPUT_PATH\  -output $OUTPUT_PATH\  -mapper mapper\  -reducer reducer

接下给出一个缓存压缩文件的样例。假设压缩文件为dict.zip,里面存的数据为:

1234 data/1.txtdata/2.txtmapper.listreducer.list

通过-archives參数指定dict.zip后,该文件被解压后,将被缓存到各个Task的工作文件夹下的dict.zip文件夹下,组织结构例如以下:

123456 dict.zip/    data/        1.txt        2.txt    mapper.list    reducer.list

你能够在Mapper或Reducer程序中,使用相似下面的代码读取解压后的文件:

 

123 File file2 = read(“dict.zip/data/1.txt”, “r”);…….File file3 = read(“dict.zip/mapper.list”, “r”);

假设你想直接将内容解压到Task工作文件夹下。而不是子文件夹dict.zip中,能够用“-files”(注意。不要使用-archives,“-files”指定的文件不会被解压)指定dict.zip,并自己在程序中实现解压缩:

1234 #include <cstdlib>…….system(“unzip –q dict.zip”); //C++代码……

总之,Hadoop
DistributedCache是一个很好用的工具。合理的使用它能够解决许多很困难的问题。

 

 
 总结下面:假设mr程序中须要第三方jar包,能够通过在程序中使用DistributedCache,也能够在命令中使用-libjars来实现。可是这些引入的jar都仅仅能够在mr任务启动之后来使用,假设你在启动MR任务之前调用了第三方jar包的类。那这就会有问题,会在启动任务的时候找不到这个类。此时能够使用例如以下方式解决:

 
 在你的project里面建立一个lib文件夹,然后把全部的第三方jar包放到里面去,hadoop会自己主动载入lib依赖里面的jar。
这样就能够在mr启动之前也能够使用第三方jar了。

 
 方法调用顺序为(以libjars为例): -libjars
—>conf.set(“tmpjars”)—>

DistributedCache.addArchiveToClassPath--->conf.set("mapreduce.job.cache.archives","")

相关文章链接:

   
                 
  

 
            

 

相关文章