大数据迅速发展,但是Hadoop的基础地位一直没有改变。理解并掌握Hadoop相关知识对于之后的相关组件学习有着地基的作用。本文整理了Hadoop基础理论知识与常用组件介绍,虽然有一些组件已经不太常用。但是理解第一批组件的相关知识对于以后的学习很有帮助,未来的很多组件也借鉴了之前的设计理念。
文章较长,建议收藏后阅读。
相关学习资料可以通过下面的方式下载,本文只是整理了大数据Hadoop基本知识,有精力的同学可以通过相关书籍进行更深入的学习。
一个最简单的大数据系统就是通过,zookeeper进行协调服务,并通过任务调度对hive或者mr进行计算任务执行,通过数据传输与外部系统建立联系。当然架构在不变化,最新的大数据架构远不止于此。但这些基本的组件对于理解大数据的原理非常的有帮助。
这些组件互相配合,最终形成了Hadoop的生态体系。
正文开始~
一、大数据发展史
信息时代数据量的爆炸性增长,让大数据的发展异常迅速。简单来说大数据是:
1、有海量的数据
2、有对海量数据进行挖掘的需求
3、有对海量数据进行挖掘的软件工具(hadoop、spark、flink......)
Hadoop与大数据
HADOOP最早起源于Nutch项目。Nutch的设计目标是构建一个大型的全网搜索引擎,包括网页抓取、索引、查询等功能,但随着抓取网页数量的增加,遇到了严重的可扩展性问题——如何解决数十亿网页的存储和索引问题。
2003年、2004年谷歌发表的两篇论文为该问题提供了可行的解决方案。
——分布式文件系统(GFS),可用于处理海量网页的存储。
——分布式计算框架MAPREDUCE,可用于处理海量网页的索引计算问题。
Nutch的开发人员完成了相应的开源实现HDFS和MAPREDUCE,并从Nutch中剥离成为独立项目HADOOP,到2008年1月,HADOOP成为Apache顶级项目,迎来了它的快速发展期。
大数据组件
在大数据的发展中,组件化一直都是一个非常大的趋势。屏蔽复杂的底层研发,只关注数据工程与数据分析本身,让大数据得以迅速地发展。而开源的技术发展更是让大数据的发展得到了长足的进步,大量的公司及个人贡献了很多的开源方案。这也让数据采集,清洗,分析,应用都变得轻而易举。
Hadoop,Hive,Spark,Flink等等开源框架不断的发展出现。
这些组件相互配合,共同构建起了大数据的平台体系。所以学习好大数据的相关组件知识就非常的重要,也是做好大数据应用的基础。
大数据架构
大数据的技术与应用的发展同步进行,催生着架构的不断演变。
从离线到实时,从数据仓库到数据湖,从大数据平台到数据中台。有人会说大数据有点夸大,大屏泛滥没有实际应用。但是事物的发展正是经过了从概念到实践到落地的过程。不得不承认,大数据的架构在不断的向更好的方向演变。
大数据发展
大数据的应用范围在逐渐的扩大,用户画像,推荐系统等等领域都是大数据在支撑。而数据治理的发展让数据安全,数据质量也得到了重视。
未来的大数据,将是大数据 数据分析 人工智能的结合体,架构和技术都将不断的演进,越来越影响并改变我们的生活。
大数据的发展让大数据相关岗位的需求猛增,大数据工程师,架构师,数据分析师,大数据运维等等都是非常不错的职业选择。不过要提醒的是大数据的技术发展迅速,要保持学习,不断的获取新的知识。
二、分布式协调服务——Zookeeper
在学习hadoop组件之前,要先了解下zookeeper。zookeeper是一个分布式协调服务;就是为用户的分布式应用程序提供协调服务。
简单的说zk解决了分布式系统的一致性问题,可以将需要一致性的数据放在zk中,同时zk也提供了监听等机制。zk为hadoop分布式的实现提供了保证,所以大家之后不用纠结hadoop很多的操作是如何实现的,很多都依赖了zk。
zk是什么?
1、Zookeeper是为别的分布式程序服务的
2、Zookeeper本身就是一个分布式程序(只要有半数以上节点存活,zk就能正常服务)
3、Zookeeper所提供的服务涵盖:主从协调、服务器节点动态上下线、统一配置管理、分布式共享锁、统一名称服务……
4、虽然说可以提供各种服务,但是zookeeper在底层其实只提供了两个功能:
a、管理(存储,读取)用户程序提交的数据;
b、并为用户程序提供数据节点监听服务;
不仅是大数据领域,在很多分布式系统中,zk都有着非常大的应用。
Zookeeper工作机制
1、Zookeeper:一个leader,多个follower组成的集群
2、全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的
3、分布式读写,更新请求转发,由leader实施
4、更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行
5、数据更新原子性,一次数据更新要么成功(半数以上节点成功),要么失败
6、实时性,在一定时间范围内,client能读到最新数据
Zookeeper数据结构
1、层次化的目录结构,命名符合常规文件系统规范(见下图)
2、每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识
3、节点Znode可以包含数据(只能存储很小量的数据,<1M;最好是1k字节以内)和子节点
4、客户端应用可以在节点上设置监视器
zookeeper的选举机制
(1)Zookeeper集群中只有超过半数以上的服务器启动,集群才能正常工作;
(2)在集群正常工作之前,myid小的服务器给myid大的服务器投票,直到集群正常工作,选出Leader;
(3)选出Leader之后,之前的服务器状态由Looking改变为Following,以后的服务器都是Follower。
zk命令行操作
运行 zkCli.sh –server进入命令行工具
查看znode路径 ls /aaa
获取znode数据 get /aaa
zk客户端API
org.apache.zookeeper.Zookeeper是客户端入口主类,负责建立与server的会话
它提供以下几类主要方法 :
功能 | 描述 |
---|---|
create | 在本地目录树中创建一个节点 |
delete | 删除一个节点 |
exists | 测试本地是否存在目标节点 |
get/set data | 从目标节点上读取 / 写数据 |
get/set ACL | 获取 / 设置目标节点访问控制列表信息 |
get children | 检索一个子节点上的列表 |
sync | 等待要被传送的数据 |
三、分布式文件系统——HDFS
HDFS概念
分而治之:将大文件、大批量文件,分布式存放在大量服务器上,以便于采取分而治之的方式对海量数据进行运算分析;
HDFS是一个文件系统,用于存储文件,通过统一的命名空间——目录树来定位文件;
HDFS是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自的角色;
重要特性:
HDFS中的文件在物理上是分块存储(block),块的大小可以配置;
HDFS文件系统会给客户端提供一个统一的抽象目录树,客户端通过路径来访问文件,形如:hdfs://namenode:port/dir/file;
目录结构及文件分块位置信息(元数据)的管理由namenode节点承担——namenode是HDFS集群主节点,负责维护整个hdfs文件系统的目录树,以及每一个路径(文件)所对应的block块信息;
文件的各个block的存储管理由datanode节点承担——datanode是HDFS集群从节点,每一个block都可以在多个datanode上存储多个副本;
HDFS是设计成适应一次写入,多次读出的场景,且不支持文件的修改(适合用来做数据分析,并不适合用来做网盘应用,因为,不便修改,延迟大,网络开销大,成本太高)
HDFS基本操作
不同的hadoop版本,hdfs操作命令不同。下面是hadoop3的操作命令,如果是其他版本要查询对应的操作命令,可以使用-help 来查看帮助。
1、查询命令 hdfs dfs -ls / 查询/目录下的所有文件和文件夹
hdfs dfs -ls -R 以递归的方式查询/目录下的所有文件
2、创建文件夹 hdfs dfs -mkdir /test 创建test文件夹
3、创建新的空文件 hdfs dfs -touchz /aa.txt 在/目录下创建一个空文件aa.txt
4、增加文件 hdfs dfs -put aa.txt /test 将当前目录下的aa.txt文件复制到/test目录下(把-put换成-copyFromLocal效果一样-moveFromLocal会移除本地文件)
5、查看文件内容 hdfs dfs -cat /test/aa.txt 查看/test目录下文件aa.txt的内容(将-cat 换成-text效果一样)
6、复制文件 hdfs dfs -copyToLocal /test/aa.txt . 将/test/aa.txt文件复制到当前目录(.是指当前目录,也可指定其他的目录)
7、删除文件或文件夹 hdfs dfs -rm -r /test/aa.txt 删除/test/aa.txt文件(/test/aa.txt可以替换成文件夹就是删除文件夹)
8、重命名文件 hdfs dfs -mv /aa.txt /bb.txt 将/aa.txt文件重命名为/bb.txt
9、将源目录中的所有文件排序合并到一个本地文件 hdfs dfs -getmerge / local-file 将/目录下的所有文件合并到本地文件local-file中
可以访问web端对文件操作有一个直观的认识。访问NameNode Web UI进行查看。
我们可以理解为我们通过命令对文件及文件夹进行了操作,但这都是hdfs给我们提供的服务,而hdfs底层会将我们的文件分布式存储。
HDFS工作机制
可以通过hdfs的工作机制来理解一下原理。来了解一下hdfs是如何通过指令完成文件存取工作的。
- HDFS集群分为两大角色:NameNode、DataNode (Secondary Namenode)
- NameNode负责管理整个文件系统的元数据
- DataNode 负责管理用户的文件数据块
- 文件会按照固定的大小(blocksize)切成若干块后分布式存储在若干台datanode上
- 每一个文件块可以有多个副本,并存放在不同的datanode上
- Datanode会定期向Namenode汇报自身所保存的文件block信息,而namenode则会负责保持文件的副本数量
- HDFS的内部工作机制对客户端保持透明,客户端请求访问HDFS都是通过向namenode申请来进行
写数据
客户端要向HDFS写数据,首先要跟namenode通信以确认可以写文件并获得接收文件block的datanode,然后,客户端按顺序将文件逐个block传递给相应datanode,并由接收到block的datanode负责向其他datanode复制block的副本。
读数据
客户端将要读取的文件路径发送给namenode,namenode获取文件的元信息(主要是block的存放位置信息)返回给客户端,客户端根据返回的信息找到相应datanode逐个获取文件的block并在客户端本地进行数据追加合并从而获得整个文件。
我们要理解的是namenode的工作机制尤其是元数据管理机制,这对于以后做数据治理也非常的有帮助。
Namenode的工作机制
1、namenode职责:负责客户端请求的响应,元数据的管理(查询,修改)。
2、namenode对数据的管理采用了三种存储形式:
内存元数据(NameSystem)
磁盘元数据镜像文件
数据操作日志文件(可通过日志运算出元数据)
3、元数据存储方式:
内存中有一份完整的元数据(内存meta data)
磁盘有一个“准完整”的元数据镜像(fsimage)文件(在namenode的工作目录中)
用于衔接内存metadata和持久化元数据镜像fsimage之间的操作日志(edits文件)
4、checkpoint:每隔一段时间,会由secondary namenode将namenode上积累的所有edits和一个最新的fsimage下载到本地,并加载到内存进行merge(这个过程称为checkpoint)
Datanode工作机制
1、Datanode工作职责:
存储管理用户的文件块数据
定期向namenode汇报自身所持有的block信息(通过心跳信息上报)
2、Datanode掉线判断
datanode进程死亡或者网络故障造成datanode无法与namenode通信,namenode不会立即把该节点判定为死亡,要经过一段时间。
客户端操作
hdfs提供了对外的api,可以进行客户端的操作。我们只需要引入相关依赖就可以进行操作了。
这里是java示例,也有其他语言的操作。这种基本的操作后期的新组件也都有替代的方案,这里主要是熟悉为主。
代码语言:javascript复制<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
</dependency>
如果是windows下研发,要指定hadoop安装包位置,才能引入相关包操作,安装包可以去资料中查看。
示例代码如下:
代码语言:javascript复制Configuration conf = new Configuration();
fs = FileSystem.get(new URI("hdfs://192.168.137.101:9820"), conf, "root");
fs.copyFromLocalFile(new Path("D:\aaa.txt"), new Path("/aaa"));
fs.close();
四、分布式运算框架——Mapreduce
Mapreduce是一个分布式运算程序的编程框架。大概的意思可以理解为对于hdfs中的分布式数据,可以通过Mapreduce这种分布式的框架方式来进行复杂的运算。试想一下,如果手写分布式运算,要进行任务分配,分批执行,再汇总。这是非常复杂的工程,分布式运算框架的作用就是简化这个过程。
Mapreduce是偏底层的技术,后期的Hive框架将sql语句转化成Mapreduce语句进行执行,来简化操作。后期的spark,flink也都是支持sql语句的。不过这种分布式预算的思想还是非常的重要,也影响了后来很多框架的运算原理。实际工作中不会遇到,但是要对原理有一个了解。
一个完整的mapreduce程序在分布式运行时有三类实例进程:
1、MRAppMaster:负责整个程序的过程调度及状态协调
2、mapTask:负责map阶段的整个数据处理流程
3、ReduceTask:负责reduce阶段的整个数据处理流程
执行流程
1、 一个mr程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程
2、 maptask进程启动之后,根据给定的数据切片(哪个文件的哪个偏移量范围)范围进行数据处理,主体流程为:
a) 利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对
b) 将输入KV对传递给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对收集到缓存
c) 将缓存中的KV对按照K分区排序后不断溢写到磁盘文件
3、 MRAppMaster监控到所有maptask进程任务完成之后(真实情况是,某些maptask进程处理完成后,就会开始启动reducetask去已完成的maptask处fetch数据),会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据范围(数据分区)
4、Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储。
mapreduce的shuffle机制
mapreduce中,map阶段处理的数据如何传递给reduce阶段,是mapreduce框架中最关键的一个流程,这个流程就叫shuffle;
具体来说:就是将maptask输出的处理结果数据,分发给reducetask,并在分发的过程中,对数据按key进行了分区和排序;
Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。
随后将mr的程序开发好,并运行即可,这就涉及到一个问题。如何运行。
五、资源调度——Yarn
在hadoop最开始的版本中,mapreduce的程序要想运行必须自己进行调度,调配资源。这就导致管理越老越混乱,Yarn就出现了。
Apache Hadoop YARN:Yet Another Resource Negotiator,另一种资源协调者。
Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而mapreduce等运算程序则相当于运行于操作系统之上的应用程序。随着hadoop的发展,yarn一直是最核心的资源调度中心,未来我们写的spark,flink程序都可以通过Yarn来进行调度。
YARN的重要概念
1、 yarn并不清楚用户提交的程序的运行机制
2、 yarn只提供运算资源的调度(用户程序向yarn申请资源,yarn就负责分配资源)
3、 yarn中的主管角色叫ResourceManager
4、 yarn中具体提供运算资源的角色叫NodeManager
5、 这样一来,yarn其实就与运行的用户程序完全解耦,就意味着yarn上可以运行各种类型的分布式运算程序(mapreduce只是其中的一种),比如mapreduce、spark,flink……
6、 所以,spark等运算框架都可以整合在yarn上运行,只要他们各自的框架中有符合yarn规范的资源请求机制即可
Yarn就成为一个通用的资源调度平台,从此,企业中以前存在的各种运算集群都可以整合在一个物理集群上,提高资源利用率,方便数据共享。
ResourceManager
ResourceManager是YARN中的主节点服务,它负责集群中所有资源的统一管理和作业调度。
简单来讲,ResourceManager主要完成的功能包括:
- 与客户端交互,处理来自客户端的请求;
- 启动和管理ApplicationMaster,并在它运行失败时重新启动它;
- 管理NodeManager,接收来自NodeManager的资源汇报信息,并向NodeManager下达管理指令(比如杀死container等);
- 资源管理与调度,接收来自ApplicationMaster的资源申请请求,并为之分配资源。
NodeManager
NodeManager是YARN集群中的每个具体节点的资源和任务管理者。NodeManager的主要功能包括:
- 定时向ResourceManager汇报本节点上的资源使用情况和各个Container的运行状态;
- 接收并处理ApplicationMaster对container的启动、停止等各种请求;
- 管理Container的生命周期,监控Container的资源使用;
- 管理任务日志和不同应用程序用到的附属服务(auxiliary service)。
ApplicationMaster
用户提交的每个应用程序均包含一个ApplicationMaster,主要功能包括:
- 与ResourceManager调度器协商以获取资源;
- 将得到的资源进一步分配给内部的任务;
- 与NodeManager通信以启动或停止任务;
- 监控所有任务的运行状态,并在任务运行失败时负责进行容错处理。
Container
Container是YARN中的资源抽象,它封装了某个节点上的多个维度的资源,如CPU、内存、磁盘、网络等。当ApplicationMaster向ResourceManager申请资源时,ResourceManager为ApplicationMaster 返回的资源是用Container表示的。
当用户向YARN中提交一个应用程序后,YARN将分两个阶段运行该应用程序:
第一阶段:启动ApplicationMaster;
第二阶段:由ApplicationMaster创建应用程序;为它申请资源,并监控它的整个运行过程,直到运行完成。
第1步:
client 读取作业配置信息并创建Job的环境,调用job.waitForCompletion 方法,向集群提交一个MapReduce 作业 。
第2步:
资源管理器给任务分配一个新的作业ID 。
第3步:
作业的client核实作业的输出路径,计算输入文件的分片,将作业的资源 (包括:Jar包、配置文件,split信息等) 拷贝到HDFS集群上的作业提交目录。
第4步:
通过调用资源管理器的submitApplication()来提交作业。
第5步:
当资源管理器收到submitApplciation()的请求时,就将该请求发给调度器 (scheduler),调度器向NodeManager发送一个启动container的请求。
第6步:
节点管理器NodeManager启动container,内部运行着一个主类为 MRAppMaster的Java应用。其通过创造一些对象来监控作业的进度,得到各个task的进度和完成报告 。
第7步:
然后其通过分布式文件系统HDFS来获取由客户端提前计算好的输入split,然后为每个输入split创建一个map任务,根据mapreduce.job.reduces创建 reduce任务对象。
第8步:
如果不是小作业,那应用管理器向资源管理器请求container来运行所有的map和reduce任务 。
这些请求是通过心跳来传输的,包括每个map任务的数据位置。比如:存放输入split的主机名和机架(rack),调度器利用这些信息来调度任务,尽量将任务分配给存储数据的节点或相同机架的节点。
第9步:
当一个任务由资源管理器的调度器分配给一个container后,AppMaster通过联系NodeManager来启动container。
第10步:
任务由一个主类为YarnChild的Java应用执行,在运行任务之前首先本地化任务需要的资源。比如:作业配置、JAR文件以及分布式缓存的所有依赖文件 。
第11步:
最后,启动并运行map或reduce任务 。
同理在向yarn提交spark程序时也会按这种方式进行。这就让资源的调度与程序本身分离。
六、数仓工具——Hive
Hive是基于Hadoop的一个数据仓库工具(离线),可以将结构化的数据文件映射为一张数据库表,并提供类SQL查询功能。
Hive解决了MapReduce的复杂研发问题,采用类SQL语法学习成本低。
Hive需要有一个存储元数据的数据库,可以用mysql等等。
简单来说,通过Hive就可以与hdfs文件建立映射关系。我们只需要通过开发hivesql语句,就可以对hdfs上的文件进行操作了。
Hive基本操作
hive中有一个默认的库:
库名:default
库目录:hdfs://ip:9000/user/hive/warehouse
新建库:
create database db_order;
库建好后,在hdfs中会生成一个库目录:
hdfs://hdp20-01:9000/user/hive/warehouse/db_order.db
建表:
use db_order;
create table t_order(id string,create_time string,amount float,uid string);
表建好后,会在所属的库目录中生成一个表目录
/user/hive/warehouse/db_order.db/t_order
只是,这样建表的话,hive会认为表数据文件中的字段分隔符为 ^A
正确的建表语句为:
create table t_order(id string,create_time string,amount float,uid string)
row format delimited
fields terminated by ',';
这样就指定了,我们的表数据文件中的字段分隔符为 ","
删除表:
drop table t_order;
删除表的效果是:
hive会从元数据库中清除关于这个表的信息;
hive还会从hdfs中删除这个表的表目录;
内部表与外部表
内部表(MANAGED_TABLE):表目录按照hive的规范来部署,位于hive的仓库目录/user/hive/warehouse中
外部表(EXTERNAL_TABLE):表目录由建表用户自己指定
create external table t_access(ip string,url string,access_time string)
row format delimited
fields terminated by ','
location '/access/log';
外部表和内部表的特性差别:
1、内部表的目录在hive的仓库目录中 VS 外部表的目录由用户指定
2、drop一个内部表时:hive会清除相关元数据,并删除表数据目录
3、drop一个外部表时:hive只会清除相关元数据;
一个hive的数据仓库,最底层的表,一定是来自于外部系统,为了不影响外部系统的工作逻辑,在hive中可建external表来映射这些外部系统产生的数据目录;
然后,后续的etl操作,产生的各种表建议用managed_table
分区表
分区表的实质是:在表目录中为数据文件创建分区子目录,以便于在查询时,MR程序可以针对分区子目录中的数据进行处理,缩减读取数据的范围。
比如,网站每天产生的浏览记录,浏览记录应该建一个表来存放,但是,有时候,我们可能只需要对某一天的浏览记录进行分析
这时,就可以将这个表建为分区表,每天的数据导入其中的一个分区;
当然,每日的分区目录,应该有一个目录名(分区字段)
示例:
代码语言:javascript复制create table t_access(ip string,url string,access_time string)
partitioned by(dt string)
row format delimited
fields terminated by ',';
数据导入导出
方式1:导入数据的一种方式:
手动用hdfs命令,将文件放入表目录;
方式2:在hive的交互式shell中用hive命令来导入本地数据到表目录
hive>load data local inpath '/root/order.data.2' into table t_order;
方式3:用hive命令导入hdfs中的数据文件到表目录
hive>load data inpath '/access.log' into table t_access partition(dt='20210806');
文件格式
HIVE支持很多种文件格式:SEQUENCE FILE | TEXT FILE | PARQUET FILE | RC FILE
create table t_pq(movie string,rate int) stored as textfile;
create table t_pq(movie string,rate int) stored as sequencefile;
create table t_pq(movie string,rate int) stored as parquetfile;
七、任务调度——azkaban
azkaban是一个工作流调度系统。与之类似的还有oozie,airflow等等。
一个完整的数据分析系统通常都是由大量任务单元组成:
shell脚本程序,java程序,mapreduce程序、hive脚本等;
各任务单元之间存在时间先后及前后依赖关系;
为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行。
在实际工作中,绝不是一个程序就能搞定一切的。需要分为多个程序运行,还有前后顺序,所以任务调度系统一直存在。也在不断的发展。
简单的任务调度:直接使用linux的crontab来定义;
复杂的任务调度:开发调度平台
或使用现成的开源调度系统,比如ooize、azkaban等。
Azkaban介绍
Azkaban是由Linkedin开源的一个批量工作流任务调度器。用于在一个工作流内以一个特定的顺序运行一组工作和流程。Azkaban定义了一种KV文件格式来建立任务之间的依赖关系,并提供一个易于使用的web用户界面维护和跟踪你的工作流。
地址:https://github.com/azkaban/azkaban
Azkaban使用
Azkaba内置的任务类型支持command、java
1、创建job描述文件
vi command.job
command.jobtype=command command=echo 'hello'
2、将job资源文件打包成zip文件
zip command.job
3、通过azkaban的web管理平台创建project并上传job压缩包
首先创建project
上传zip包
4、启动执行该job
Command类型多job工作流flow
1、创建有依赖关系的多个job描述
第一个job:foo.job
foo.jobtype=commandcommand=echo foo
第二个job:bar.job依赖foo.job
bar.jobtype=commanddependencies=foocommand=echo bar
2、将所有job资源文件打到一个zip包中
3、在azkaban的web管理界面创建工程并上传zip包
4、启动工作流flow
HDFS操作任务
1、创建job描述文件
fs.jobtype=commandcommand=/home/hadoop/apps/hadoop-2.6.1/bin/hadoop fs -mkdir /azaz
2、将job资源文件打包成zip文件
3、通过azkaban的web管理平台创建project并上传job压缩包
4、启动执行该job
八、数据传输Sqoop
Sqoop是一个用于在Hadoop和关系型数据库之间流转数据的一个工具。可以使用Sqoop将数据从关系型数据库系统(RDBMS)比如MySQL或者Oracle导入到hadoop分布式文件系统(HDFS)上,然后数据在Hadoop MapReduce上转换,以及将数据导出到RDBMS中。 Sqoop自动实现了上面提到的很多过程,Sqoop使用MapReduce来导入和导出数据,这样既可以提供并行化操作又可以提高容错能力。
Sqoop是Apache软件基金会的一个开源项目。可以访问http://Sqoop.apache.org获取,sqoop目前已经趋于稳定,从apache退休了。
在每天定时定时调度把mysql数据传到大数据集群中,或者把hive中数据传走时会用到。不过随时数据实时化的要求变高,sqoop的作用小了很多。但是一些历史数据的导入还是需要的。
Sqoop使用
Sqoop提供了一系列的操作工具,使用Sqoop需要指定你想要使用的具体工具,以及提供对应的一些参数,使用方式如下。
代码语言:javascript复制sqoop tool-name [tool-arguments]
可以使用sqoop help命令查看帮助信息
代码语言:javascript复制sqoop help
Available commands:
codegen 生成Java代码
create-hive-table 根据表结构生成hive表
eval 执行SQL语句并返回结果
export 导出HDFS文件到数据库表
help 帮助
import 从数据库导入数据到HDFS
import-all-tables 导入数据库所有表到HDFS
list-databases 列举所有的database
list-tables 列举数据库中的所有表
version 查看版本信息
可以看到,sqoop提供的操作工具有10个。具体工具的使用帮助可以sqoop help (tool-name)或者sqoop tool-name --help进行查看。
sqoop-import
import工具可以用于从RDBMS中导入一张表到HDFS。表中的每一条记录对应生成HDFS文件中的每一行。这些记录可以以text files或者Avro或者SequenceFiles格式进行存储。
使用方法如下
代码语言:javascript复制$ sqoop-import (generic-args) (import-args)
参数列表-import基本参数
参数 | 描述 |
---|---|
–connect < jdbc-uri > | JDBC连接串 |
–connection-manager < class-name > | 连接管理类 |
–driver < class-name > | 手动指定JDBC驱动类 |
–hadoop-mapred-home < dir > | 可以覆盖$HADOOP_MAPRED_HOME |
–help | 使用帮助 |
–password-file | 指定包含密码的文件 |
-P | 执行import时会暂停,等待用户手动输入密码 |
–password < password > | 直接将密码写在命令行中 |
–username < username > | 指定用户名 |
–verbose | 显示Sqoop任务更多执行信息 |
–connection-param-file < filename > | 可选的参数,用于提供连接参数 |
–relaxed-isolation | 设置每个mapmer的连接事务隔离 |
Hive参数
以下是导入到 Hive 中时可选的参数:
代码语言:javascript复制--hive-home <dir>:覆盖 $HIVE_HOME。
--hive-import:将表导入Hive(如果没有设置,则使用Hive的默认分隔符。)
--hive-overwrite:覆盖Hive表中的现有数据。
--create-hive-table:如果设置,那么如果存在目标hivetable,作业将失败。默认情况下,此属性为false。
--hive-table <table-name>:设置导入到Hive时要使用的表名。
--hive-drop-import-delims:导入到Hive时,从字符串字段中删除n、r和 1。
--hive-delims-replacement:在导入到Hive时,将字符串字段中的n、r和 1替换为用户定义的字符串。
--hive-partition-key:分配到分区的Hive字段的名称。
--hive-partition-value <v>:作为该任务导入到Hive中的分区键的字符串值。
示例:
代码语言:javascript复制bin/sqoop import
--connect jdbc:mysql://hostname:3306/mydb
--username root
--password root
--table mytable
--num-mappers 1
--hive-import
--hive-database mydb
--hive-table mytable
--fields-terminated-by "t"
--delete-target-dir
--hive-overwrite
sqoop-export
Sqoop的export工具可以从HDFS同步一系列文件数据到RDBMS中。使用这个工具的前提是导出目标表在数据库必须存在。导出文件根据用户指定的分隔符转化成一系列的输出记录。 默认的导出操作会将这些记录转化成一系列的INSERT语句,根据这些语句将记录插入到关系型数据库中。而在update模式下,Sqoop会生成一系列的UPDATE语句,将数据库中已经存在的记录进行更新。在call模式下,Sqoop会为每一条记录调用一个存储过程来处理。
代码语言:javascript复制$ sqoop-export (generic-args) (export-args)
基本参数
参数** | 描述** |
---|---|
–connect < jdbc-uri > | JDBC连接串 |
–connection-manager < class-name > | 连接管理类 |
–driver < class-name > | 手动指定JDBC驱动类 |
–hadoop-mapred-home < dir > | 可以覆盖$HADOOP_MAPRED_HOME |
–help | 使用帮助 |
–password-file | 指定包含密码的文件 |
-P | 执行import时会暂停,等待用户手动输入密码 |
–password < password > | 直接将密码写在命令行中 |
–username < username > | 指定用户名 |
示例:
代码语言:javascript复制$ bin/sqoop export
--connect jdbc:mysql://hostname:3306/mydb
--username root
--password root
--table mytable
--num-mappers 1
--export-dir /user/hive/warehouse/mydb.db/mytable
--input-fields-terminated-by "t"
九、数据收集-Flume
- Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统
- 支持在日志系统中定制各类数据发送方,用于收集数据
- Flume提供对数据进行简单处理,并写到各种数据接收方
Flume是成熟的开源日志采集系统,且本身就是hadoop生态体系中的一员,与hadoop体系中的各种框架组件具有天生的亲和力,可扩展性强。
相对于用Shell脚本和Java的收集方式,规避了对日志采集过程中的容错处理不便控制,减少了开发工作量。
例如对于实时的日志分析这种场景中,对数据采集部分的可靠性、容错能力要求通常不会非常严苛,因此使用通用的flume日志采集框架完全可以满足需求。
Flume的配置
安装好flume以后需要对其进行配置。
flume通过事件(agent)进行运作,事件下包含如下的概念。
Source: 用来定义采集系统的源头
Channel: 把Source采集到的日志进行传输,处理
Sink:定义数据的目的地
下面是一个示例。
有一个概念就是,我们定义了agent1这个agent。
定义了agent1.sources的系列设置去执行tail -F实时的采集日志数据。
通过Channel传输,最后指定Sink将日志存入hdfs。
代码语言:javascript复制agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Describe/configure tail -F source1
#使用exec作为数据源source组件
agent1.sources.source1.type = exec
#使用tail -F命令实时收集新产生的日志数据
agent1.sources.source1.command = tail -F /var/logs/nginx/access_log
agent1.sources.source1.channels = channel1
#configure host for source
#配置一个拦截器插件
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
#使用拦截器插件获取agent所在服务器的主机名
agent1.sources.source1.interceptors.i1.hostHeader = hostname
#配置sink组件为hdfs
agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
#agent1.sinks.sink1.hdfs.path=hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H%M%S
#指定文件sink到hdfs上的路径
agent1.sinks.sink1.hdfs.path=
hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M_%hostname
#指定文件名前缀
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
#指定每批下沉数据的记录条数
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
#指定下沉文件按1G大小滚动
agent1.sinks.sink1.hdfs.rollSize = 1024*1024*1024
#指定下沉文件按1000000条数滚动
agent1.sinks.sink1.hdfs.rollCount = 1000000
#指定下沉文件按30分钟滚动
agent1.sinks.sink1.hdfs.rollInterval = 30
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
#使用memory类型channel
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
随后将flume用指定的配置文件启动即可。
代码语言:javascript复制bin/flume-ng agent --conf ./conf -f ./conf/weblog.properties.2 -n agent
注意:启动命令中的 -n 参数要给配置文件中配置的agent名称
目前市面针对日志采集的有 Flume,Logstash,Filebeat,Fluentd ,rsyslog 很多种。但基本的原理是相同的,要根据公司的情况进行选择。
本文从大数据理论到常用的基础组件进行的笔记的整理,更深入的hadoop理论知识建议通过书籍进行深入的阅读学习。而Spark,Flink等组件的学习将会通过单独的文章进行笔记整理。希望对大家有所帮助。
代码语言:javascript复制最近整理了大数据的相关的书籍,包含python、linux、java、大数据、数据分析、数据治理、人工智能等等。(持续更新中~)获取方式:关注#公众号:大数据流动,回复:福利,就可以获得!