shell脚本监控文件夹文件实现自动上传数据到hive表

2021-04-27 11:53:05 浏览数 (1)

脚本说明  

createtb.sh:创建hive表,触发监控目录脚本 monitor.sh:监控目录,根据文件变化自动触发导入hive表 loadtb_all.sh:第一次将文件导入hive表(第一次建表时触发) loadtb_mid.sh:第N此件文件导入hive表(监控文件添加时触发)

脚本上传

## 给 /home/hive/ 目录下的所有脚本赋予执行权限 chmod x /home/hive/*.sh ## 如果脚本在windowns环境下编辑,需要将脚本转化为unix格式 sed -i "s/r$//" /home/hive/*.sh

监控说明

监控说明 监控文件的目录:/home/hive/observation/tables 监控文件日期目录:/home/log/hive/observation/tables.log 第一次建表 createtb.sh 脚本根据传入的参数在hive建立对应的分区表,并在/home/log/hive/observation/tables.log文件写入十行初始日期 接着 loadtb_all.sh 脚本会把/home/hive/observation/tables目录下的所有以.txt为后缀的文件写入hive表(文件名必须以分区名区分并且以下划线隔开) 并且把/home/hive/observation/ 第N次更新表 之后如果有新文件写入/home/hive/observation/tables目录,则监控脚本 monitor.sh 会把当前监控目录下的所有文件日期和/home/log/hive/observation/tables.log下的最新日期进行对比 通过排序遍历判断日志目录下的最新日期和当前目录下的日期相等找到比日志目录下所有更新的文件 最后把监控到的最新文件导入hive表,并把这些文件的最新日期追加到日志目录(先删掉第一行,再追加最后一行)

参数说明

sh createtb.sh "tablename;field1,field2,field3,field4,field5,field6,field7;partition1,partition2" 数据库名:observation (脚本里写死了) 表名:tablename 指定名:field1,field2,field3,field4,field5,field6,field7 分区名:partition1,partition2

文件说明

本地上传的文件:/home/hive/observation/$table/*.txt 文件名格式:xx01_xx02_xxxx.txt (xx01为第一个分区名称,xx02为第二个分区名称) 比如文件名为 2019_10_02.txt(/home/hive/observation/tablename目录下),则脚本生成的hive导入语句为 load data local inpath '/home/hive/observation/tablename/2019_10_02.txt' into table observation.tablename partition(partition1='2019',partition2='10');

 脚本代码

1、输入字段,建hive表脚本 createtb.sh

代码语言:javascript复制
#!/bin/bash

echo "---------createtb.sh--------------"
#string="tablename;field1,field2,field3,field4,field5,field6,field7;partition1,partition2"
string=$*
group=(${string//;/ })
for i in ${group[@]};do
    ((m  ))
    if [[ m -eq 1 ]];then
        tables=$i
    elif [[ m -eq 2 ]];then
        fields=$i
    else partitions=$i
    fi
done
# echo "tables:" $tables
# echo "fileds:" $fields
# echo "partitions:" $partitions
# echo "-----------------"
field=(${fields//,/ })
partition=(${partitions//,/ })

# 创建表目录
$(mkdir -p /home/hive/observation/$tables ; chmod 777 /home/hive/observation/$tables)
 
# 数据库名称,这里不作为参数用自变量写了
database="observation"
# hive 拼接语句,分为h1,h2,h3
# h1是建表语句的前半部,h2是参数列表中间部分,h3是建表语句的后半部
h1="create table $database.$tables"
# 在当前目录下创建文本文件temp,如果文件存在则清空文件
$(> temp)
# for 循环将参数追加到当前目录的temp文件,逗号分隔,echo -n 不换行
for i in ${field[@]};do
	echo -n $i" varchar(255)," >> temp
done
# h2取temp文本里的字符串
temp=$(cat temp)
# 将字符串最后的一个逗号去掉
h2="(${temp%*,})"
# 在当前目录下创建文本文件tmp,如果文件存在则清空文件
$(> tmp)
# for 循环将参数追加到当前目录的temp文件,逗号分隔,echo -n 不换行
for i in ${partition[@]};do
	echo -n $i" varchar(255)," >> tmp
done
# h3取temp文本里的字符串
tmp=$(cat tmp)
# 将字符串最后的一个逗号去掉
# h3是建表语句的后半部
h3="
partitioned by
(${tmp%*,})
row format delimited 
fields terminated by 't'
lines terminated by 'n'
stored as textfile
location '/user/hive/warehouse/$database.db/$tables';
"
echo $h1$h2$h3
$(hive -e "$h1$h2$h3")
$(rm -rf tmp temp)

echo "-------create hive table successfully--------"

# 创建数据日期日志目录
> /home/log/hive/observation/$tables.log

#/bin/bash /home/hive/loadtb.sh
#exec /home/hive/loadtb.sh
#source /home/hive/loadtb.sh
#fork /home/hive/loadtb.sh
# 第一个参数为表名,第二个参数为分区字段
#/home/hive/loadtb.sh $tables $partitions
/home/hive/monitor.sh $*

echo "* * * * * su - root /home/hive/monitor.sh "$*"" >> /var/spool/cron/root
# echo "* * * * * root /home/hive/monitor.sh "$*"" >> /etc/crontab

# /home/hive/createtb.sh "tablename;field1,field2,field3,field4,field5,field6,field7;partition1,partition2"

2、监控目录脚本 monitor.sh

代码语言:javascript复制
#!/bin/bash

# -------------接收参数,解析字符串--------------
echo "---------monitor.sh--------------"
string=$*
group=(${string//;/ })
for i in ${group[@]};do
    ((m  ))
    if [[ m -eq 1 ]];then
        tables=$i
    elif [[ m -eq 2 ]];then
        fields=$i
    else partitions=$i
    fi
done
tables=(${tables//;/ })
field=(${fields//,/ })
partition=(${partitions//,/ })
# -------------接收参数,解析字符串--------------

# 第一个参数为表名,也是监控的文件夹名称
tables=$tables
echo "tables:$tables"
echo "----------监控目录----------"
# 获取当前数据文件夹下的文件数量
let "total=$(ls /home/hive/observation/$tables | wc -l)"
echo "数据文件数量:${total}"

let "logline=$(cat /home/log/hive/observation/$tables.log | wc -l)"
echo "日志文件行数:${logline}"

# 如果文件为空,则初始化log文件
if (( logline==0 ));then
    > /home/log/hive/observation/$tables.log
    # 初始化log文件,赋予10行默认时间值
    for (( i=0;i<10;i   ));do
        $(echo "2020-01-01 00:00:00.00000000$i" >> /home/log/hive/observation/$tables.log)
    done
fi

# 获取log文件中的最后一个日期
logdate=$(tail -n 1 /home/log/hive/observation/$tables.log)
echo "log 日志最新日期:$logdate"

# 获取数据目录下的最新的日期
newdate=$(ls --full-time -lt /home/hive/observation/$tables | tail -n -$total | awk '{print $6,$7}' | head -n 1)
echo "数据目录最新日期:$newdate"

# 将两个时间转为时间戳
LOGDATE=`date -d "$logdate"  %s`
NEWDATE=`date -d "$newdate"  %s`
echo "log 日志最新时间戳:$LOGDATE"
echo "数据目录最新时间戳:$NEWDATE"

# 获取数据目录所有的日期信息
alldate=$(ls --full-time -lt /home/hive/observation/$tables | tail -n -$total | awk '{print $6,$7}')
echo "数据目录所有日期信息:"
echo "$alldate"

# 数据目录日期字符串长度
let len=${#alldate}
echo "数据目录日期字符串长度:$len"

# 由于日期信息是空格连续的,单个日期也有空格,所以需要特殊处理
for((i=0;i> /home/hive/judge.log
    # 删除log第一行数据
    $(sed -i '1d' /home/log/hive/observation/$tables.log)
    # 把之前读取的最新日期数据加到log文件下
    echo "${date[0]}" >> /home/log/hive/observation/$tables.log
    # echo "" >> /home/log/hive/observation/$tables.log
    /home/hive/loadtb_all.sh $tables $partitions
# 如果n==0表示没有最新数据(n=0为最新的数据),既不需要处理
elif (( n == 0));then
    # 删除log第一行数据
    # $(sed -i '1d' /home/log/hive/observation/$tables.log)
    echo "n == 0 , 没有最新数据,${date[0]} "
    # echo "n == 0 , 没有最新数据,${date[0]} " >> /home/hive/judge.log
else 
    echo "最新数据:${date[0]}"
    # 删除log第一行数据
    $(sed -i '1d' /home/log/hive/observation/$tables.log)
    # 把之前读取的最新日期数据加到log文件下
    echo "${date[0]}" >> /home/log/hive/observation/$tables.log
    #echo "" >> /home/log/hive/observation/$tables.log
    #$(sed -i '1s/$/2020-04-16 17:03:48.000000000/' /home/log/hive/observation/tablename.log)
    # 调用加载数据脚本,第一个参数为表名,第二个参数为分区字段,第三个为最新的第n个数据
    echo "传递的第一个参数是:$tables, 传递的第二个参数是:$partitions, 传递的第三个参数是:$n, 最新数据:${date[0]}" >> /home/hive/judge.log
    /home/hive/loadtb_mid.sh $tables $partitions $n
fi

# /home/hive/monitor.sh "tablename;field1,field2,field3,field4,field5,field6,field7;partition1,partition2"

3、第一次 load into hive表脚本 loadtb_all.sh

代码语言:javascript复制
#!/bin/bash

echo "------loadtb_all.sh--start------"
# 第一个参数为表名
table=$1
echo "表名参数:$table"
# 第二个参数为分区字段
partitions=$2
echo "分区参数:$partitions"
# load data directory 
DIR=/home/hive/observation/$table
partkey=(${partitions//,/ })
# 分区数为$m
let m=${#partkey[@]}

h1="load data local inpath '/home/hive/observation/$table"
h3="' into table observation.$table partition"
allfiles=$(ls /home/hive/observation/$table/*.txt)
echo "allfiles:$allfiles"
# echo "allfiles:$allfiles" >> /home/hive/allfiles_all.log
# 遍历目录,得到具体分区名称
for file in ${allfiles};do
	> part_all
	echo "$file" >> /home/hive/allfiles_all.log
	# 取最后一个/后的文件名称
	h2="${file##*/}"
	# echo "h2:$h2" >> /home/hive/allfiles_all.log
	# 去掉文件名后缀.txt
	str=${h2%%.*}
	# echo "str:$str" >> /home/hive/allfiles_all.log
	# 分区名称分割,遍历文件名
	partvalue=(${str//_/ })
	# 分区拼接字符串
	for (( i=0;i> part_all
	done
	h4_tmp=$(cat part_all)
	h4="(${h4_tmp%*,});"
	echo "$h1/$h2$h3$h4"
	$(hive -e "$h1/$h2$h3$h4")
	#hive -e "load data local inpath '/home/hive/observation/$table' into table observation.$table partition(partition1=$i,partition2=$i);"
done

$(rm -rf part_all)

echo "------loadtb_all.sh--end------"

3、第N次 load into hive表脚本 loadtb_mid.sh

代码语言:javascript复制
#!/bin/bash

echo "--------loadtb_mid.sh--start------"
# 第一个参数为表名
table=$1
echo "表名参数:$table"
# 第二个参数为分区字段
partitions=$2
echo "分区参数:$partitions"
# 第三个为最新的第n个数据
echo "第三个参数:$3"
echo "全部参数:$*"
N=$(($3))
echo "N为:$N"
partkey=(${partitions//,/ })
# 分区数为$m
let m=${#partkey[@]}

h1="load data local inpath '/home/hive/observation/$table"
h3="' into table observation.$table partition"

allfiles=$(ls --full-time -lt /home/hive/observation/$table/*.txt | awk '{print $9}' | head -n ${N})
#allfiles=$(ls --full-time -lt /home/hive/observation/$table/*.txt | head -n ${N})
# allfiles=$(ls /home/hive/observation/$table/*.txt | tail -n ${N})

echo "$allfiles, ${N}个文件, $(date)" >> /home/hive/n_mid.log
echo "allfiles:$allfiles"
# echo "allfiles:$allfiles" >> /home/hive/allfiles_mid.log

## 逆序数组转为正序数组
files=(${allfiles// / })
let fileN=${#files[@]}-1
echo ${fileN} ${files[*]}

# 遍历目录,得到具体分区名称(逆序循环)
#for file in ${allfiles};do
for i in $(seq $fileN -1 0);do
	> part_mid
	file=${files[$i]}
	echo "$file" >> /home/hive/allfiles_mid.log
	# 取最后一个/后的文件名称
	h2="${file##*/}"
	# echo "h2: $h2" > /home/hive/allfiles_mid.log
	# 去掉文件名后缀.txt
	str=${h2%%.*}
	# echo "str: $str" > /home/hive/allfiles_mid.log
	# 分区名称分割,遍历文件名
	partvalue=(${str//_/ })
	# 分区拼接字符串
	for (( i=0;i> part_mid
	done
	h4_tmp=$(cat part_mid)
	h4="(${h4_tmp%*,});"
	echo "$h1/$h2$h3$h4"
	#echo "$h1/$h2$h3$h4" >> /home/hive/hive_mid.log
	$(hive -e "$h1/$h2$h3$h4")
	#hive -e "load data local inpath '/home/hive/observation/$table' into table observation.$table partition(partition1=$i,partition2=$i);"
done

$(rm -rf part_mid)

echo "--------loadtb_mid.sh--end------"

执行命令

代码语言:javascript复制
sh createtb.sh "tablename;field1,field2,field3,field4,field5,field6,field7;partition1,partition2"

运行效果

0 人点赞