序言
在使用大数据的时候,各种不同的数据都要将数据采集同步到数据仓库中,一个是属于业务系统的RDBMS系统,也就是各种关系型数据库,一个是hadoop生态的存储,中间用于传输的数据的工具可以使用sqoop,也就是sql to hadoop。
在数据进入数仓的ODS层的时候,使用sqoop,在进入hadoop之后,就可以使用其他的计算框架进行分析,例如hive,MR,spark等。
sqoop
1 sqoop所处的位置
sqoop是一个用于数据传输的工具,是连接RDBMS和数仓ODS的桥梁:
sqoop是将结构化数据同步到hdfs中,也可以是hive和hbase等,支持不同的数据库,只要将相关的连接数据库驱动放到安装sqoop的lib库中即可,从而能连接,进行数据的导入导出操作。
在进行使用sqoop的时候,考虑到任务数量的众多,需要从不同的业务系统中同步数据,而业务系统使用的数据库又是多种多样的,从数仓的建立来说,需要确定相关的指标,从而需要首先规划好哪些数据库,哪些数据需要同步,任务数量多,从而需要考虑任务的优先级管理,任务的优先级决定了指标的产出时间,可以划分核心指标,从而得到核心任务,也就是需要优先产出的任务,从而定义好相应的任务。
血缘关系,也就是sqoop数据导入任务才是第一步,后面还有数仓中的各种数据清洗,数据统计分析的任务,从而需要划分好任务依赖。
为了方便问题的排查,也就是对于sqoop的导入数据任务来说,每个导入使用一个导入job来实现。
ODS作为第一层,保持业务数据的一致性,基本不会对数据进行任何处理,直接保存在数仓中。从而表命名可以使用ods_databasename_tablename_di,ods表示表所在的层级,数据库名称表示数据的来源,而表名和业务系统一致,di表示每日增量day increase,全量的就不需要标注了,对于任务名称来说,也可以使用表名一致的方式,从而便于问题的排查,见名知意。
2 数据分类
在业务系统的表里面的数据,可以分为三类数据,一种是全量数据,这种表示同步的时候,只要全部同步就好,数据量比较少,而且变化的频率不高,像用户,地区这种维度表。
对于增量数据,也就是这个表里面的数据只会存在insert操作,而不会存在update操作,像交易表,这种只会新增记录,基本不会涉及修改。
对于新增及变化的数据,这种一般就是订单表,不但每天都会有新增的数据,而且会存在修改的动作,也就是修改订单的状态,从而每天都会发生变化。
3 sqoop使用
sqoop的安装很简单,只要下载之后,修改一下简单的配置就可以使用,但是由于sqoop需要使用hadoop的map任务,从而需要提前安装好hadoop,在这个基础上安装sqoop(注意将连接数据库的驱动放到sqoop的lib目录中)。
代码语言:javascript复制[root@KEL1 conf]# cat sqoop-env.sh
# Set Hadoop-specific environment variables here.
#Set path to where bin/hadoop is available
export HADOOP_COMMON_HOME=/opt/module/hadoop-2.7.2
#Set path to where hadoop-*-core.jar is available
export HADOOP_MAPRED_HOME=/opt/module/hadoop-2.7.2
#set the path to where bin/hbase is available
#export HBASE_HOME=
#Set the path to where bin/hive is available
export HIVE_HOME=/opt/module/apache-hive-1.2.2
#Set the path for where zookeper config dir is
#export ZOOCFGDIR=
[root@KEL1 conf]# which hadoop
/opt/module/hadoop-2.7.2/bin/hadoop
全量导入(使用--query的方式):
代码语言:javascript复制#!/bin/bash
SQOOP=/opt/module/sqoop/bin/sqoop
JOB_DATE=$(date -d '-1 day' %F)
if [[ -n "$2" ]];then
JOB_DATE=$2
fi
export_date() {
$SQOOP import
--connect "jdbc:mysql://kel1:3306/shop?useUnicode=true&characterEncoding=utf-8"
--username root
--password root
--target-dir /user/root/shop/$1/$JOB_DATE
--delete-target-dir
--query "$2 and $CONDITIONS"
--num-mappers 1
--fields-terminated-by ','
--null-string '\N'
--null-non-string '\N'
}
#passowrd表示连接数据库的密码,target-dir表示保存在hdfs的哪个路径
#delete-target-dir表示删除已经存在的目录,否则如果目录存在报错
#query表示查询导入的sql语句,num-mappers表示使用的map个数
#fields-terminated-by表示在hdfs上用什么分割字段
#null-string和null-non-stinrg表示在hdfs统一用N存储
export_date s_auction "select * from s_auction where 1=1"
全量导出:
代码语言:javascript复制#!/bin/bash
SQOOP=/opt/module/sqoop/bin/sqoop
JOB_DATE=$(date -d '-1 day' %F)
if [[ -n "$2" ]];then
JOB_DATE=$2
fi
export_date() {
$SQOOP export
--connect "jdbc:mysql://kel1:3306/shop?useUnicode=true&characterEncoding=utf-8"
--username root
--password root
--table $1
--input-fields-terminated-by "," --fields-terminated-by ','
--map-column-java gmt_create=java.sql.Date
--export-dir /user/hive/warehouse/ods_shop.db/$1/ds=$JOB_DATE
}
#注意数据类型的转换,从而使用了参数--map-column-java
export_date $1
执行之后,可以看到map程序的执行结果:
从文件名也可以看到只有map阶段,查看结果:
使用table的参数进行全量导入:
代码语言:javascript复制[root@KEL1 jobscript]# cat ods_shop_s_sale.sh
#!/bin/bash
SQOOP=/opt/module/sqoop/bin/sqoop
JOB_DATE=$(date -d '-1 day' %F)
if [[ -n "$2" ]];then
JOB_DATE=$2
fi
export_date() {
$SQOOP import
--connect "jdbc:mysql://kel1:3306/shop?useUnicode=true&characterEncoding=utf-8"
--username root
--password root
--target-dir /user/root/shop/$1/$JOB_DATE
--delete-target-dir
--num-mappers 1
--fields-terminated-by ','
--null-string '\N'
--null-non-string '\N'
--table s_sale
}
export_date s_sale
在使用--query的时候,可以指定你想要的列进行导入数据,也就是select指定具体的字段,从而可以进行导入。
在导出的时候,注意字段的对应关系,如果字段不对应,可能导致数据错位从而导致数据错误。
增量导入:
代码语言:javascript复制#!/bin/bash
SQOOP=/opt/module/sqoop/bin/sqoop
JOB_DATE=$(date -d '-1 day' %F)
if [[ -n "$2" ]];then
JOB_DATE=$2
fi
export_date() {
$SQOOP import
--connect "jdbc:mysql://kel1:3306/shop?useUnicode=true&characterEncoding=utf-8"
--username root
--password root
--target-dir /user/root/shop/$1/$JOB_DATE
--num-mappers 1
--fields-terminated-by ','
--null-string '\N'
--null-non-string '\N'
--table s_sale
--incremental append
--check-column id
--last-value 42
}
export_date s_sale
确定增量的时候,主要是根据你给的字段来进行判断是否为insert,从而每次也需要一个last-value来确定一个比较的值,最后会把增量的数据放在单独的文件中。
其中的内容则是新增的数据:
注意在使用这种增量数据导入的时候,数据库中id的类型,否则会报错,无法执行:
代码语言:javascript复制#使用增量导入的时候,checkcolumn必须是自增的rowid,否则导入报错
21/02/28 10:40:04 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(`id`) FROM `s_sale`
21/02/28 10:40:04 ERROR tool.ImportTool: Import failed: Character column (id) can not be used to determine which rows to incrementally import.
后续的值可以用来重新生成任务:
变化及新增:
代码语言:javascript复制#!/bin/bash
SQOOP=/opt/module/sqoop/bin/sqoop
JOB_DATE=$(date -d '-1 day' %F)
if [[ -n "$2" ]];then
JOB_DATE=$2
fi
export_date() {
$SQOOP import
--connect "jdbc:mysql://kel1:3306/shop?useUnicode=true&characterEncoding=utf-8"
--username root
--password root
--target-dir /user/root/shop/$1/$JOB_DATE
--num-mappers 1
--fields-terminated-by ','
--null-string '\N'
--null-non-string '\N'
--table s_sale
--incremental lastmodified
--check-column starts
--last-value "2018-01-01 00:00:00"
--mergy-key id
}
export_date s_sale
这个执行的时候,最好starts能根据数据的改变自动修改时间,这样就能进行一个时间的对比,并且按照mergykey的设定来进行数据的合并。
总体看来,如果表里面有两个时间字段,那么还是使用--query的形式来选择时间方便点。
3 导入到hive
如果是需要导入到hive里面,也有相关的参数可以用,做了两部分内容,一部分是创建hive的表结构,第二部分是导入数据;如果任务是导入到hdfs,那么还有一个手动load的过程。
在从hive里面导出的时候,需要手动建表,例如mysql里面的表需要提前创建好。
4 可能出现的问题
a 查找相关日志
导入数据的时候,使用sqoop的时候,不会显示详细的报错日志,如下所示,只能看到是export的任务失败,至于失败原因就不清楚了:
可以找到jobhistory的web页面,对比相关的任务号,在sqoop的日志中有相应的打印信息:
点击其中的job id,从而进入到job的详情页面,在application master处点击对应的log信息:
在日志的页面中,可以看到部分日志,需要再次找到full的日志,点击如下的here查看全部日志信息:
可以看到是因为日期的转换出现问题:
在对20190206转换成yyyy-mm-dd的时候,出现错误。
b 任务执行失败,jobhistory服务未启动
代码语言:javascript复制2021-02-25 23:30:50,336 Stage-1 map = 0%, reduce = 0%
java.io.IOException: java.net.ConnectException: Call From KEL1/192.168.1.99 to KEL:10020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
at org.apache.hadoop.mapred.ClientServiceDelegate.invoke(ClientServiceDelegate.java:343)
at org.apache.hadoop.mapred.ClientServiceDelegate.getJobStatus(ClientServiceDelegate.java:428)
at org.apache.hadoop.mapred.YARNRunner.getJobStatus(YARNRunner.java:572)
at org.apache.hadoop.mapreduce.Cluster.getJob(Cluster.java:184)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:593)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:591)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.JobClient.getJobUsingCluster(JobClient.java:591)
at org.apache.hadoop.mapred.JobClient.getJobInner(JobClient.java:601)
at org.apache.hadoop.mapred.JobClient.getJob(JobClient.java:631)
sqoop连接10020端口,将相关日志发送到jobhistory服务器。
c 导入的目录已经存在
代码语言:javascript复制//导入的目录已经存在 --delete-target-dir
21/02/26 02:15:21 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
21/02/26 02:15:22 ERROR tool.ImportAllTablesTool: Encountered IOException running import job: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://ns/user/root/BUCKETING_COLS already exists
d 默认4个map任务,无法进行分割,split-by指定,或者使用1一个map任务
代码语言:javascript复制//到处的表没有主键,要么使用-m 1来指定一个map任务,要么使用split-by
21/02/26 02:12:14 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-root/compile/e4ea9c1fb8c56a292e224d796c6812f8/COMPLETED_TXN_COMPONENTS.jar
21/02/26 02:12:14 ERROR tool.ImportAllTablesTool: Error during import: No primary key could be found for table COMPLETED_TXN_COMPONENTS. Please specify one with --split-by or perform a sequential import with '-m 1'.
e 字符串切割出错
代码语言:javascript复制// 在sqoop import 后面加参数 -Dorg.apache.sqoop.splitter.allow_text_splitter=true
21/02/26 02:19:42 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`TOKEN_IDENT`), MAX(`TOKEN_IDENT`) FROM `DELEGATION_TOKENS`
21/02/26 02:19:42 INFO mapreduce.JobSubmitter: Cleaning up the staging area /tmp/hadoop-yarn/staging/root/.staging/job_1614267326631_0013
21/02/26 02:19:42 ERROR tool.ImportAllTablesTool: Encountered IOException running import job: java.io.IOException: Generating splits for a textual index column allowed only in case of "-Dorg.apache.sqoop.splitter.allow_text_splitter=true" property passed as a parameter
f 转换时间格式
代码语言:javascript复制2021-02-26 10:53:49,020 INFO [IPC Server handler 3 on 41321] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Diagnostics report from attempt_1614267326631_0082_m_000003_0: Error: java.io.IOException: Can't export data, please check failed map task logs
at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:122)
at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:39)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
at org.apache.sqoop.mapreduce.AutoProgressMapper.run(AutoProgressMapper.java:64)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.RuntimeException: Can't parse input data: '20190308'
at s_auction.__loadFromFields(s_auction.java:1176)
at s_auction.parse(s_auction.java:958)
at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:89)
... 10 more
Caused by: java.lang.IllegalArgumentException
at java.sql.Date.valueOf(Date.java:143)
at s_auction.__loadFromFields(s_auction.java:1029)
... 12 more
在hive中进行时间转换:
代码语言:javascript复制insert overwrite table ss_pay_order_delta PARTITION(ds='20210222')
select pay_order_id,total_fee,seller_id,buyer_id,
pay_status,from_unixtime(unix_timestamp(pay_time,'yyyymmdd'),
'yyyy-mm-dd') as pay_time,
from_unixtime(unix_timestamp(gmt_create,'yyyymmdd'),'yyyy-mm-dd')
as gmt_create,refund_fee,confirm_paid_fee
from s_pay_order_delta;