【Hadoop】17-在集群上运行MapRedece

2020-12-08 14:25:53 浏览数 (1)

参考链接: Java中的实例初始化块(IIB)

1.打包作业

本地作业运行器使用单JVM运行一个作业,只要作业需要的所有类都在类路径(classpath)上,那么作业就可以正常执行。在分布式的环境中,情况稍微复杂一些。开始的时候作业的类必须打包成一个作业JAR文件并发送给集群。Hadoop通过搜索驱动程序的类路径自动找到该作业JAR文件,该类路径包含JonfConf或Job上的setJarByClass()方法中设置的类。另一种方法,如果你想通过文件路径设置一个指定的JAR文件,可以使用setJar()方法。JAR文件路径可以是本地的,也可以是一个HDFS文件路径。通过使用像Ant或Maven的构建工具可以方便地创建作业的JAR文件。当给定范例所示的POM时,下面的Maven命令将在包含所有已编译的类的工程目录中创建一个名为hadoop-example.jar的JAR文件:

mvn package -DskipTests

如果每个JAR文件都有一个作业,可以在JAR文件的manifest中指定要运行的主类。如果主类不在manifest中,则老须在命令行指定。任何有依赖关系的JAR文件应该打包到作业的JAR文件的lib子目录中。当然也有其他的方法将依赖包含进来,这我们稍后会讨论。类似地,资源文件也可以打包进一个classes子目录。这与Java Web application archive或WAR文件类似,只不过JAR文件是放在WAR文件的WEB-INF/Iib子目录下,而类则是放在WAR文件的WEB-INF/classes子目录中。

1.1客户端的类路径由hadoop jar <jar>设置的用户客户端类路径包括以下几个组成部分:

作业的JAR文件作业JAR文件的目录中的所有JAR文件以及class目录(如果定义)HADOOP_CLASSPH定义的类路径(如果已经设置)顺便说一下,这解释了如果你在没有作业JAR(hadoop CLASSNAME)情况下使用本地作业运行器时,为什么必须设置HADOOP__CLASSPATH来指明依赖类和库。

1.2任务的类路径

在集群上(包括伪分布式模式),map和reduce任务在各自的JVM上运行,它们的类路径不受HADOOP_CLASSPATH控制。HADOOP_CLASSPATH是一项客户端设置,并只针对驱动程序的JVM的类路径进行设置。

反之,用户任务的类路径有以下几个部分组成:

作业的JAR文件作业JAR文件的lib目录中包含的所有JAR文件以及classes目录(如果存在的话)使用-libjars选项(参见表)或DistributedCache的addFileToClassPath()方法(老版本的API)或Job(新版本的API)添加到分布式缓存的所有文件

1.3打包依赖

给定这些不同的方法来控制客户端和类路径上的内容,也有相应的操作处理作业的库依赖:

将库解包和重新打包进作业JAR将作业JAR的目录中的库打包保持库与作业JAR分开,并且通过HADOOP_CLASSPATH将它们添加到客户端的类路径,通过-libjars将它们添加到任务的类路径从创建的角度来看,最后使用分布式缓存的选项是最简单的,因为依赖不需要在作业的JAR中重新创建。同时,使用分布式缓存意味着在集群上更少的JAR文件转移,因为文件可能缓存在任务间的一个节点上了。

1.4任务类路径的优先权用户的JAR文件被添加到客户端类路径和任务类路径的最后,如果Hadoop使用的库版本和你的代码使用的不同或不相容,在某些情况下可能会引发和Hadoop内置库的依赖冲突。有时需要控制任务类路径的次序,这样你的类能够被先提取出来。在客户端,可以通过设置环境变最HADOOP_USER_CLASSPATH_FIRST为true强制使Hadoop将用户的类路径优先放到搜索顺序中。对于任务的类路径,你可以将mapreduce.job.user.classpath.first设为true。注意,设置这些选项就改变了针对Hadoop框架依赖的类(但仅仅对你的作业而言),这可能会引起作业的提交失败或者任务失败,因此请谨慎使用这些选项。

2.启动作业

为了启动作业,我们需要运行驱动程序,使用-conf选项来指定想要运行作业的集群(同样,也可以使用-fs和-jt选项):

unset HADOOP_CLASSPATH

hadoop jar hadoop-examples.jar v2.MaxTemperatureDriver -conf conf/hadoop-cluster.xml input/ncdc/all max-temp

我们不设置HADOOP_CLASSPATH环境变量是因为对于该作业没有任何第三方依赖。如果它被设置为/target/classes/(本章前面的内容),那么Hadoop将找不到作业JAR,Hadoop会从target/classes而不是从JAR装载MaxTempratureDriver类,从而导致作业失败。

Job上的waitForCompletion()方法启动作业并检查进展情况。如果有任何变化,就输出一行map和reduce进度总结。

作业、任务和任务尝试ID

Hadoop2中,MapReduce作业ID由YARN资源管理器创建的YARN应用ID生成。一个应用ID的格式包含两部分:资源管理器(不是应用)开始时间和唯一标识此应用的由资源管理器维护的增量计数器。例如:ID为application_1419459259596_0003的应用是资源管理器运行的第三个应用(0003,应用ID从1开始计数),时间戳1419459259596表示资源管理器开始时间。计数器的数字前面由0开始,以便于ID在目录列表中进行排序·然而,计数器达到10000时,不能重新设置,会导致应用ID更长(这些ID就不能很好地排序了)。

将应用ID的application前缀替换为job前缀即可得到相应的作业ID,如job_14194592595069993。任务属于作业,任务ID是这样形成的,将作业ID的job前缀替换为task前缀,然后加上一个后缀表示是作业里的哪个任务。例如:task_1419459259596_0003_m_000003表示ID为job_1419459259596_0003的作业的第4个map任务(000003,任务ID从0开始计数)。作业的任务ID在作业初始化时产生,因此,任务ID的顺序不必是任务执行的顺序。

由于失败或推测执行,任务可以执行多次,所以,为了标识任务执行的不同实例,任务尝试(task attempt)都会被指定一个唯一的ID。例如:attempt_1419459259596_0003_m_000003_0示正在运行的task_1419459259596_0003_m_000003任务的第一个尝试(0,任务尝试ID从0开始计数)。任务尝试在作业运行时根据需要分配,所以,它们的顺序代表被创建运行的先后顺序。

3.MapReduce的Web界面Hadoop的界面用来浏览作业信息,对于跟踪作业运行进度、查找作业完成后的统计信息和日志非常有用。可以在http://resource-manager-host:8088/找到用户界面信息。

3.1资源管理器页面

下图展示了主页的截屏。"Cluster Metrics”部分给出了集群的概要信息,包括当前集群上处于运行及其他不同状态的应用的数量,集群上可用的资源数量("Memory Total”)及节点管理器的相关信息。

接下来的主表中列出了集群上所有曾经运行或正在运行的应用。有个搜索窗口可以用于过滤寻找所感兴趣的应用。主视图中每页可以显示100个条目,资源管理器在同一时刻能够在内存中保存近10000个已完成的应用(通过设置yarn.resourcemanager.max-completed-applications),随后只能通过作业历史页面获取这些应用信息。注意,作业历史是永久存储的,因此也可以通过作业历史找到资源管理器以前运行过的作业。

作业历史:

作业历史指已完成的MapReduce作业的事件和配置信息。不管作业是否成功执行,作业历史都将保存下来,为运行作业的用户提供有用信息。作业历史文件由MapReduce的applicationmaster存放在HDFS中,通过mapreduce.jobhistory.done-dir属性来设置存放目录。作业的历史文件会保存一周,随后被系统删除。历史日志包括作业、任务和尝试事件,所有这些信息以JSON格式存放在文件中。特定作业的历史可以通过作业历史服务器的web界面(通过资源管理器页面裢接)查看,或在命令行方法下用mapredjob·history(指向作业历史文件中)查看。

3.2MapReduce作业页面

单击"TrackingUI”链接进人application master的web界面(如果应用已经完成则进人历史页面)。在MapReduce中,将进人作业页面,如图所示。

作业运行期间,可以在作业页面监视作业进度。底部的表展示map和reduce进度。"Total”显示该作业map和reduce的总数。其他列显示的是这些任务的状态:pending(等待运行)、Running(运行中)或Complete(成功完成)。

表下面的部分显示的是map或reduce任务中失败和被终止的任务尝试的总数。任务尝试(task attempt)可标记为被终止,如果它们是推测执行的副本,或它们运行的节点已结束,或它们已被用户终止。导航栏中还有许多有用的链接。例如,"Configuration"链接指向作业的统一配置文件,该文件包含了作业运行过程中生效的所有属性及属性值。如果不确定某个属性的设置值,可以通过该链接查看文件。

4.获取结果

一且作业完成,有许多方法可以获取结果。每个reducer产生一个输出文件,因此,在max-temp目录中会有30个部分文件(part file),命名为part-00000到00029。

正如文件名所示,这些"part"文件可以认为是”文件的一部分。如果输出文件很大(本例不是这种情况),那么把文件分为多个part文件很重要,这样才能使多个reducer并行工作。通常情况下,如果文件采用这种分割形式,使用起来仍然很方便:例如作为另一个MapReduce作业的输人。在某些情况下,可以探索多个分割文件的结构来进行map端连接操作。

这个作业产生的输出很少,所以很容易从HDFS中将其复制到开发机器上。hadoopfs命令的-getmerge选项在这时很有用,因为它得到了源模式指定目录下所有的文件,并将其合并为本地文件系统的一个文件:

hadoop fs -getmerge max-temp max-temp-local

sort max-temp-local丨tail

1991 607

1992 605

1993 567

1994 568

1995 567

1996 561

1997 565

1998 568

1999 568

2000 558因为reduce的输出分区是无序的(使用哈希分区函数的缘故),我们对输出进行排序。对MapReduce的数据做些后期处理是很常见的,把这些数据送人分析工具(例如R、电子数据表甚至关系型数据库)进行处理。

如果输出文件比较小,另外一种获取输出的方式是使用-cat选项将输出文件打印到控制台:

hadoop fs -cat max-temp/*

深人分析后,我们发现某些结果看起来似乎没有道理。比如,1951年(此处没有显示)的最高气温是590℃!这个结果是怎么产生的呢?是不正确的输人数据还是程序中的bug?

5.作业调试

最经典的方法通过打印语句来调试程序,这在Hadoop中同样适用。然而,需要考虑复杂的情况:当程序运行在几十台、几百台甚至几千台节点上时,如何找到并检测调试语句分散在这些节点中的输出呢?为了处理我们这种要查找一个不寻常情况的需求,可以用一个调试语句记录到一个标准错误中,同时配合更新任务状态信息以提示我们查看错误日志。我们将看到,webUI简化了这个操作。我们还要创建一个自定义的计数器来统计整个数据集中不合理的气温记录总数。这就提供了很有价值的信息来处理如下情况,如果这种情况经常发生,我们需要从中进一步了解事件发生的条件以及如何提取气温值,而不是简单地丢掉这些记录。事实上,调试一个作业的时候,应当总想是否能够使用计数器来获得需要找出事件发生来源的相关信息。即使需要使用日志或状态信息,但使用计数器来衡量问题的严重程度仍然也是有帮助的。

如果调试期间产生的日志数据规模比较大,可以有多种选择。一种是将这些信息写到map的输出流供reduce任务分析和汇总,而不是写到标准错误流。这种方法通常必须改变程序结构,所以先选用其他技术。另一种是可以写一个程序(当然是MapReduce程序)来分析作业产生的日志。我们把调试加人mapper(版本3),而不是reducer,因为我们希望找到导致这些异常输出的数据源:

public class MaxTemperatureMapper extends Mapper<LongWritable,Text,Text,IntWritable>{

    enum Temperature{

        OVER_100

    }

    private NcdcRecordParser parser=new NcdcRecordParser();

    @Override

    public void map(LongWritabIe key,Text value,Context context) throws IOException,InterruptedException{

        parser.parse(value);

        if(parser.isValidTemperature()){

            int airTemperature=parser.getAirTemperature();

            if(airTemperature>1000){

                System.err.println("Temperature over 100 degrees for input: " value);

                context.setStatus("Detected possibly corrupt record:see logs.");

                context.getCounter(Temperature.OVER_100.increment(1));

                context.write(new Text(parser.getYear()),new IntWritable(airTenperature));

            }

        }

    }

}

如果气温超过100℃(表示为1000,因为气温只保留小数点后一位),我们输出一行到标准错误流以代表有问题的行,同时使用context的setStatus()方法来更新map的状态信息,引导我们查看日志。我们还增加了计数器,在Java中用enum类型的字段表示。在这个程序中,定义一个OVER_100字段来统计气温超过100℃的记录数。完成这些修改,我们重新编译代码,重新创建JAR文件,然后重新运行作业并在运行时进人任务页面。

5.1任务和任务尝试页面

作业页面包含了一些查看作业中任务细节的链接。例如,点击“Map"链接,将进人一个列举了所有map任务的信息的页面。截图显示了一个作业的任务信息页面,该作业带有调试语句,运行时在任务的“Status”列中显示调试信息。

点击任务链接将进人任务尝试页面,页面显示了该任务的每个任务尝试。每个任务尝试页面都有链接指向日志文件和计数器。如果进人成功任务尝试的日志文件链接,将发现所记录的可疑输人记录。这里考虑到篇幅,已经进行了转行和截断处理:

此记录的格式看上去与其他记录不同。可能是因为行中有空格,规范中没有这方面的描述。

作业完成后,查看我们定义的计数器的值,检查在整个数据集中有多少记录超过100℃。通过web界面或命令行,可以查看计数器:

mapred job -counter job_1414234234234_0006 'v3.MaxTemperaturemapper$Temperature' OVER_1003

-counter选项的输人参数包括作业ID,计数器的组名(这里一般是类名)和计数器名称(enum名)。这里,在超过十亿条记录的整个数据集中,只有三个异常记录。直接扔掉不正确的记录,是许多大数据问题中的标准做法。然而,这里我们需要谨慎处理这种情况,因为我们寻找的是一个极限值一最高气温值,而不是一个累计测量值。当然,在本例中,扔掉三个记录可能并不会影响结果。

5.2处理不合理的数据

捕获引发问题的输人数据是很有价值的,因为我们可以在测试中用它来检查mapper的工作是否正常。在这个MRUnit测试中,我们将检查对于不合理的输人计数器是否进行了更新:

@Test

    public void parsesMalformedTemperature() throws IOException,InterruptedException{

        Text value=new Text("9335999999433181957942392995 37959 139117SA0 0904"

                //Year

                "RJSNV02011359003150970356999999433201957010100005 353");

                //Temperature

        Counters counters=new Counters();

        new MapDriver<IongWritable,Text,Text,IntWritabIe()>

                withMapper(newMaxTemperatureMapper())

        withlnput(newLongWritable(0),value)

        withCounters(counters)

        runTest();

        Counter c=ounters.findCounter(MaxTemperatureMapper.Temperature.MALFORMED);

        assertThat(c.getVa1ue(),is(1L));

    }

引发问题的记录与其他行的格式是不同的。范例显示了修改过的程序(版本4),它使用的解析器忽略了那些没有首符号( 或-)气温字段的行。我们还引人一个计数器来统计因为这个原因而被忽略的记录数。

范例,该mapper用于查找最高气温。

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable {

        enum Temperature {

            MALFORMED

        }

        private NcdcRecordParser parser = new NcdcRecordParser();

        @Override

        public void map(LongWritabIe key, Text value, Context context) throws IOException, InterruptedException {

            parser.parse(value);

            if (parser.isVaIidTemperature()) {

                intairTemperature = parser.getAirTemperature();

                context.write(newText(parser.getYear()), newIntWritable(airTemperature));

            }

            elseif(parser.isMa1formedTemperature()) {

                System.err.printIn("Ignoring possibly corrupt input: value);

                context.getCounter(Temperature.Temperature).increment(l);

            }

        }

    }

6Hadoop日志针对不同用户,Hadoop在不同的地方生成日志。下表对此进行了总结。

YARN有一个日志聚合(log aggregation)服务,可以取到已完成的应用的任务日志,并把其搬移到HDFS中,在那里任务日志被存储在一个容器文件中用于存档。如果服务已被启用(通过在集群上将yarn.log-aggregation-enable设置为true),可以通过点击任务尝试web界面中logs链接,或使用mapred job -logs命令查看任务日志。默认情况下,日志聚合服务处于关闭状态。此时,可以通过访问节点管理器的界面(http://node-manager-host:8042/logs/userlogs)查看任务日志。

日志主要对象描述更多信息系统守护进程日志管理员每个Hadoop守护进程产生一个日志文件(使用log4j)和另一个(文件合并标准输出和错误)。这些文件分别写入HADOOP_LOG_DIR环境变量定义的目录环境设置日志HDFS审计日志管理员这个日志记录所有HDFS请求,默认是关闭状态。虽然该日志存放位置可以配置,但一般写人namenode的日志日志审计MapReduce作业历史日志用户记录作业运行期间发生的事件(如任务完成)。集中保存在HDFS中MapReducce的Web界面MapReduce任务日志用户每个任务子进程都用10g4j产生一个日志文件(称作syslog),一个保存发到标准输出(stdout)叫数据的文件,一个保存标准错误(stderr)的文件。这些文件写人到YARN_LOG_DIR环境变量定义的目录的userlogs的子目录中本小节对这些日志文件的写操作是很直观的。任何到标准输出或标准错误流的写操作都直接写到相关日志文件。当然,在Streaming方式下,标准输出用于map或reduce的输出,所以不会出现在标准输出日志文件中。

在Java中,如果愿意的话,用Apache Commons LoggingAPI(实际上可以使用任何能写人log4j的日志API)就可以写人任务的系统日志文件中文件),如范例所示。

范例,这个等价的Mapper写到标准输出(使用Apache Commons Logging API)

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.mapreduce.Mapper;

public class LoggingIdentityMapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 

        extends Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {

    private static final Log LOG=LogFactory.getLog(LoggingIdentityMapper.class);

    @Override

    @SuppressWarnings("unchecked")

    public void map(KEYIN key,VALUEIN value,Context context) throws IOException,InterruptedException{

        //LogtOstdoutfiLe

        System.out.printIn("Mapkey:" key);

        //Logt to syslog file

        LOG.info("Map key:" key);

        if(LOG.isDebugEnabled()){

            LOG.debug("Mapvalue:" value);

        }

        context.write((KEYOUT)key,(VALUEOUT)value);

     }

}默认的日志级别是INFO,因此DEBUG级别的消息不在饭任务日志文件中出现。然而,有时候又希望看到这些消息。这时可以适当设置mapreduce.map.log.level或者mapreduce.reduce.log.level。例如,对于上面的情况你可以为mapper进行如下设置,以便能够看到日志中的map值。

hadoop jar hadoop-examples.jar LoggingDriver -conf conf/hadoop-cluster.xml

    -D mapreduce.map.Iog.IeveI=DEBUG input/ncdc/sample.txt logging-out

有一些控制用于管理任务日志的大小和记录保留时间。在默认情况下,日志最短在3小时后删除(时间可以通过yarn.nodemanager.log.retain-seconds属性来设置,当然,如果日志聚合被激活,这个时间可以被忽略)。也可以用mapreduce.task.userlog.limit.kb属性为每个日志文件的最大规模设置一个阈值,默认值是0,表示没有上限。

有时你可能需要调试一个问题,这个问题你怀疑在运行一个Hadoop命令的JVM上发生,而不是在集群上。可以通过如下调用将DEBUG级别的日志发送给控制台:

HADOOP_ROOT_LOGGER=DEBUG,console hadoop fs -text /foo/bar

7.远程调试

当一个任务失败并且没有足够多的记录信息来诊断错误时,可以选择用调试器运行该任务。在集群上运行作业时,很难使用调试器,因为不知道哪个节点处理哪部分输人,所以不能在错误发生之前安装调试器。然而,有其他一些方法可以用。

在本地重新产生错误:对于特定的输人,失败的任务通常总会失败。你可以尝试通过下载致使任务失败的文件到本地运行重现问题,这可以使用到调试器(如Java的VisualVM)。使用JVM调试选项:失败的常见原因是任务JVM中Java内存溢出。可以将mapred.child.java.opts设为包含-XX:HeapDumpOnOutOfMemoryError-XX:HeapDumpPath=/path/to/dumps。该设置将产生一个堆转储(heap dump),这可以通过jhat或Eclipse Memory Analyzer这样的工具来检查。注意,该JVM选项应当添加到由mapred.child.java.opts指定的已有内存设置中。使用任务分析:Java的profiler提供了很多JVM的内部细节,Hadoop提供了分析作业中部分任务的机制。

在一些情况下保存失败的任务尝试的中间结果文件对于以后的检查是有用的,特别是在任务工作路径中建立转储或配置文件。可以将mapreduce.task.files·preserve.failedtasks设为true来保存失败的任务文件。也可以保存成功任务的中间结果文件,以便解释任务没有失败。这时,将属性mapreduce.task.files.preserve.filepattern设置为一个正则表达式(与保留的任务ID匹配)。

对调试有用的另一个属性是yarn.nodemanager.delete.debug-delay-sec,以秒为单位,表示等待删除本地尝试文件(如用于启动任务容器JVM的脚本)的时间。如果在集群上该属性值被设置为一个比较大的合理值(例如,600,表示10分钟),那么在文件删除前有足够的时间查看。

为了检查任务尝试文件,登录到任务失败的节点并找到该任务尝试的目录。它在一个本地MapReduce目录下,由mapreduce.cluster.local.dir的设置决定。如果这个属性是以逗号分隔的目录列表(在一台机器的物理磁盘上分布负载),在找到那个特定的任务尝试@skattempt)之前,需要搜索整个目录。task attemp的目录在以下位置:mapreduce.cluster.local.dir/usercache/user/appcache/application-ID/output/task-attempt-ID

0 人点赞