前言:以下所有知识仅限于入门,对自己代码的一个记录,有不详细之处留到日后补录。
Hadoop
map的输入固定是LongWritable和Text,可理解为偏移量和String类型的数据。 核心:map的输出的key和value是reduce的输入的key和value
1、求和
代码语言:javascript复制主类
public static void main(String[] args) throws Exception{
// 1、初始化配置,告诉取得程序
Configuration configuration = new Configuration();
// 2、初始化任务
Job job = Job.getInstance(configuration);
job.setJarByClass(PriceSumMain.class);
// 3、开始配置map 所有的map都在做切割文件
job.setMapperClass(PriceSumMapper.class);
// 设置map阶段的输入和输出阶段
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 4、开始配置reduce
job.setReducerClass(PriceSumReduce.class);
// 设置reduce的输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 5、设置输入输出结果集
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 6、提交任务
System.exit(job.waitForCompletion(true)?0:1);
}
代码语言:javascript复制mapper
// 求价格总和
// 偏移量 数据 输出的key 输出的value
public class PriceSumMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 取得每一行数据
String line=value.toString();
// 对每一行数据进行切割
String[] items = line.split(",");
// 取得第一列的数据
int num = Integer.parseInt(items[1]);
context.write(new Text("num"), new LongWritable(num));
}
}
代码语言:javascript复制reduce
public class PriceSumReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
protected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {
long sum =0L;
for(LongWritable a:values){
sum =a.get();
}
context.write(new Text("最后求出的数据:"), new LongWritable(sum));
}
}
2、排序
排序分为俩种,数字排序和字母排序。排序是对key的排序 根据数字排序,默认升序。
代码语言:javascript复制主类
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(SortSalaryMain.class);
// 根据数字排序
job.setMapperClass(SortSalaryByNumMapper.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
// 数字排序无需reduce
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
代码语言:javascript复制mapper
// 员工薪资排序
public class SortSalaryByNumMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取到每一行数据
String line = value.toString();
// 切割数据
String[] emps = line.split(",");
// 获取员工姓名
String name=emps[1];
// 获取员工薪资
int salary =Integer.parseInt(emps[5]);
context.write( new IntWritable(salary),new Text(name));
}
}
根据字母排序
代码语言:javascript复制主类
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(SortSalaryByCharMain.class);
job.setMapperClass(SortSalaryByNumMapper.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
// 配置其他 根据字母排序
job.setSortComparatorClass(SortSalaryComparator.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
代码语言:javascript复制排序
// WritableComparator
public class SortSalaryComparator extends WritableComparator {
//public class SortSalaryComparator implements RawComparator {
// 构造器调用父类方法,告诉Comparator(比较器) 需要比较的数据类型是什么
public SortSalaryComparator(){
super(IntWritable.class,true);
}
// 重写比较方法,默认升序,实现降序 自定义比较规则
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 向下转型
IntWritable xa = (IntWritable) a;
IntWritable xb = (IntWritable) b;
// 也执行成功
return -super.compare(a, b);
// 可行
// return -xa.compareTo(xb);
}
}
3、去重
代码语言:javascript复制主类
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(DistinctJobMain.class);
job.setMapperClass(DistinctJobMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setReducerClass(DistinctJobReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
代码语言:javascript复制mapper
public class DistinctJobMapper extends Mapper <LongWritable, Text, Text, NullWritable> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line=value.toString();
String[] items = line.split(",");
// 获取员工的职位
String job = items[2];
// 因为原语,所以将所有的key都设置为相同的
context.write(new Text(job), NullWritable.get());
}
}
代码语言:javascript复制reduce
public class DistinctJobReduce extends Reducer <Text, NullWritable, Text, NullWritable> {
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
// 利用原语,相同的key为一组,调用一个或一次reduce,过滤重复的工作
context.write(key, NullWritable.get());
}
}
4、分区
需求:根据薪资进行分区 (0,1500,5000,10000,以上)
代码语言:javascript复制主类
// 根据员工薪资分区
public static void main(String[] args) throws Exception{
// 1、初始化配置,告诉取得程序
Configuration configuration = new Configuration();
// 2、初始化任务
Job job = Job.getInstance(configuration);
job.setJarByClass(PartitionBySalaryMain.class);
// 3、开始配置map 所有的map都在做切割文件
job.setMapperClass(PartitionMapper.class);
// 设置map阶段的输入和输出阶段
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 4、开始配置reduce
job.setReducerClass(PartitionReduce.class);
// 设置reduce的输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置分区
job.setPartitionerClass(EmpPartitionBySalary.class);
// 设置分区数量 按工资分区0,1500,5000,10000
job.setNumReduceTasks(4);
// 5、设置输入输出结果集
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 6、提交任务
System.exit(job.waitForCompletion(true)?0:1);
}
代码语言:javascript复制mapper
public class PartitionMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] items = line.split(",");
// 获取员工的薪资
int salary = Integer.parseInt(items[5]);
// 获取员工的姓名
String name = items[1];
context.write(new Text(name), new IntWritable(salary));
}
}
代码语言:javascript复制reduce
public class PartitionReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
for (IntWritable value:values){
context.write(key,value);
}
}
}
代码语言:javascript复制分区类
public class EmpPartitionBySalary extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text name, IntWritable salary, int numPartition) {
// 根据薪资进行分区 (0,1500,5000,10000,以上)
int sal = salary.get();
if(sal<1500){
return 1% numPartition;
}else if(sal < 5000){
return 2%numPartition;
}else if(sal <10000){
return 3%numPartition;
}
return 4%numPartition;
}
}
5、合并
代码语言:javascript复制主类
/**
* 合并员工信息
*/
public class JoinInfoMain {
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(JoinInfoMain.class);
job.setMapperClass(JoinInfoMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setReducerClass(JoinInfoReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
代码语言:javascript复制mapper
//在做多个文档联合数据分析的时候,一定要注意你join的点在哪地方
public class JoinInfoMapper extends Mapper<LongWritable, Text, Text, Text> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] items = line.split(",");
// 根据每一行的长度来判断是哪一个文档,根据不同的文档来取值
if(items.length==8){
String name = items[1];
String job = items[2];
context.write(new Text(job),new Text(name));
}else if(items.length==3){
String job = items[0];
String desc=items[2]; // 获取岗位的描述
context.write(new Text(job), new Text("--" desc));
}
// 打包,去分析,现在输出的是什么,然后在分析reduce
// 合并的话,要知道那个字段相同,他们两个数据集的key相同,原语,去重,合并
// context.write(new Text(job),new Text(name)); 第一个数据集
// context.write(new Text(job), new Text("--" desc)); 第二个数据集
}
}
代码语言:javascript复制reduce
public class JoinInfoReduce extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 创建一个变量,用于存储该岗位下的所有员工姓名
StringBuffer names = new StringBuffer();
// 岗位信息
String jobInfo = "";
for(Text value:values){
// 取得mapper传过来的所有数据
String msg = value.toString();
// 判断是否包含-- 是的话,则为岗位描述
boolean flag = msg.contains("--");
if(flag){
jobInfo=msg;
}else {
// 将所有的values组合起来
names.append(msg "、");
}
}
// 输出join之后的数据集
context.write(new Text(jobInfo),new Text(names.toString()));
}
}
6、topN
待后期完善
导入依赖
代码语言:javascript复制 <dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.2</version>
</dependency>
</dependencies>
使用
代码语言:javascript复制Hadoop节点的顺序:NameNode---DataNode---SecondaryNameNode
1、将文件从Windows本地传到Linux本地,使用winscp即可
2、将文件从Linux本地上传到hdfs上
hdfs dfs -put 本地文件 hdfs上的路径
3、执行
hadoop jar jar的路径 Java类的包名 主类名 数据集在hdfs的位置 hdfs的输出路径
例如:hadoop jar /usr/mydata/restuemp-1.0.0.jar com.mypartition.PartitionBySalaryMain /data/newemp.csv /out1754
可能会遇到的问题,解决方案
Hadoop离开安全模式
hadoop dfsadmin -safemode leave
Hive
1、常用命令
代码语言:javascript复制(-e)不进入hive,执行命令:hive -e "select id from student;"
(-f)执行脚本SQL语句:touch hivef.sql
执行文件中的SQL hive -f hivef.sql
执行文件中的SQL,并将结果写入到文件中
hive -f hivejs.sq > result-hive.txt
可通过hive查看hdfs文件
dfs -ls /
退出:quit;(直接退出)或者exit;(先隐型提交数据,在退出)
在hive中查看本地文件:
! ls /root/tools
在~目录下,输入 cat .hivehistory 可查看所有hive的历史命令
经典语句:row format delimited fields terminated by ‘t’
2、数据操作
代码语言:javascript复制创建分区表
注意:分区字段不能是表中已经存在的数据,可以将分区字段看作表的伪列。
create table dept_partition(
deptno int, dname string, loc string
)
partitioned by (month string)
row format delimited fields terminated by 't';
加载数据到分区表
注意:分区表加载数据时,必须指定分区
load data local inpath '/root/tools/dept.txt' into table dept_partition partition(month='20210722');
查询分区数据
select * from dept_partition where month='20210722';
多个分区联合查询
select * from dept_partition where month='20210721'
union
select * from dept_partition where month='20210722'
union
select * from dept_partition where month='20210723';
创建二级分区表
create table dept_partition2(
deptno int, dname string, loc string
)
partitioned by (month string, day string)
row format delimited fields terminated by 't';
代码语言:javascript复制加载本地文件到hive表里面
load data local inpath '/root/emp.txt' into table emp;
加载HDFS文件到hive里面---加载数据覆盖本地已有数据
load data inpath '/dara/student.txt' overwrite into table student;
通过查询语句向表中插入数据
将结果文件写入本地 去掉local 则为写到hdfs路径下
insert overwrite local directory '/usr/local/distribute-result' ROW FORMAT DELIMITED FIELDS TERMINATED BY 't' select * from emp distribute by deptno sort by idno desc;
import数据到指定hive表中
import table student2 partition(month='201709') from
'/user/hive/warehouse/export/student';
代码语言:javascript复制 数据导出
insert导出 导出到hdfs 下没有local
insert overwrite local directory '/opt/module/datas/export/student1'
ROW FORMAT DELIMITED FIELDS TERMINATED BY 't' select * from student;
hadoop命令,从hdfs下载到本地 get的使用
hdfs dfs -get resource_path_name target_path_name
hive shell导出
hive -e 'select * from default.student;' > /data/student4.txt;
export 导出到hdfs上
export table default.student to '/data/student';
export和import主要用于两个Hadoop平台集群之间Hive表迁移。
3、表操作
代码语言:javascript复制创建表时,location指定加载数据路径
create external table if not exists student5(
id int, name string
)row format delimited fields terminated by 't'
location '/student';
重命名表
ALTER TABLE table_name RENAME TO new_table_name
删除表
drop table table_name
清除表中数据(Truncate):Truncate只能删除管理表,不能删除外部表中数据
truncate table student;
增加单个分区
alter table dept_partition add partition(month='201706') ;
同时创建多个分区
alter table dept_partition add partition(month='201705') partition(month='201704');
删除分区 删除多个分区逗号隔开
alter table dept_partition drop partition (month='201704');
查看分区表有多少分区
show partitions dept_partition;
查看分区表结构
desc formatted dept_partition;
ADD是代表新增一字段,字段位置在所有列后面(partition列前),REPLACE则是表示替换表中所有字段。
添加字段
alter table dept_partition add columns(deptdesc string);
替换字段
alter table dept_partition replace columns(deptno string, dname
string, loc string);
更新列
alter table dept_partition change column deptdesc desc int;
sqoop
Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。
代码语言:javascript复制从hive写到MySQL 即从hdfs-->mysql 我的文件名是course
sqoop export
--connect jdbc:mysql://192.168.233.133:3306/company
--username root
--password Su#2021Gjj
--table deptsumsalary
--num-mappers 1
--export-dir /output/emp/course
--input-fields-terminated-by ","
代码语言:javascript复制MySQL到hdfs
sqoop import
--connect jdbc:mysql://192.168.233.133:3306/xyxy
--username root
--password Su#2021Gjj
--table tbl_student
--target-dir /output/tbl_student
--delete-target-dir
--num-mappers 1
--fields-terminated-by ","
代码语言:javascript复制mysql到hive 默认写到default数据库 其他数据的话,前面加数据库名.
sqoop import
--connect jdbc:mysql://192.168.233.133:3306/xyxy
--username root
--password Su#2021Gjj
--table tbl_student
--num-mappers 1
--hive-import
--fields-terminated-by ","
--hive-overwrite
--hive-table tbl_student_hive
逗号和制表符都是同样的效果
scala
spark也是基于scala开发的
spark
sparkSql
代码语言:javascript复制import java.text.SimpleDateFormat
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
// 创建样例类 将数据集和样例类做一个映射
case class userview(userid:String,productid:String,ptypeid:String,beh:String,time:String)
/**
* 利用spark SQL 分析用户行为
*/
object SparkDemo {
def main(args: Array[String]): Unit = {
val appName = "demo" // 给当前任务取一个名字
val master = "local[*]" //初始化核数 :*代表根据具体的电脑CPU来模拟核数 有几个核心就起几个线程来处理当前任务
// 初始化spark配置环境
val conf = new SparkConf().setAppName(appName).setMaster(master)
// 初始化spark session
val spark = SparkSession.builder().config(conf).getOrCreate()
// 初始化spark context
val sc = spark.sparkContext
// 1、读取数据
val data = sc.textFile("E:\java\workplace2018\studyknowlege\sparkdemo\src\main\resources\UserBehavior.csv")
// 导入隐式转换 就可以使用toDF()了
import spark.implicits._
// 2、将数据集和样例类进行映射
val userDF = data.map{
line =>
// 将行数据用"," 切割,形成数组
val array = line.split(",")
// 返回一个样例对象:对象使用数组作为数据填充
userview(array(0),array(1),array(2),array(3),array(4))
}.toDF()
// 3、统计用户id为1的所有用户的行为
userDF.createOrReplaceTempView("uv")
// spark.sql("select * from uv where userid = 1").show()
// show方法是打印到控制台
// spark.sql("select count(distinct(userid)) as alltotal from uv").show()
// spark.sql("select beh count(beh) from uv group by beh ").show()
// 统计某一个月用户的行为是什么样的
// 自定义一个函数:用于处理时间数据
def formateDate(timestamp:String):String = {
val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
simpleDateFormat.format(timestamp.toLong * 1000).split(" ")(0)
}
// 将自定义函数注册 _
spark.udf.register("formateDate",formateDate _)
// 将自定义匿名(处理时间)函数注册到spark当中去 可用
// spark.udf.register("formateDate",(timestamp: String) => {
// val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// simpleDateFormat.format(timestamp.toLong * 1000).split(" ")(0)
// })
// 重新通过formatDate函数处理数据集
val datanew = spark.sql("select userid,productid,ptypeid,beh,formateDate(time) from uv where userid = 1").show()
// 问题:使用group by 和 order by 统计每日用户活跃量
// 如何将统计好的数据存储到mysql中
val userbehDF = spark.sql("select beh,count(beh) from uv group by beh")
userbehDF.write.format("jdbc") // 告诉spark 存储的时候格式化数据的方式
.option("url","jdbc:mysql://192.168.233.133:3306/company") // 告诉spark存储的路径和数据库
.option("dbtable","userbeh") // 告诉spark 要操作的表(无需在数据库创建,直接创建并插入值)
.option("user","root")
.option("password","Su#2021Gjj")
.mode("Overwrite") // Overwrite覆盖、Append追加还是其他
.save() // 开始存储
sc.stop()
spark.stop()
}
}
sparkStream
代码语言:javascript复制package com.sparkStream
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* sprakStreaming
*/
object SparkStreamDemo {
def main(args: Array[String]): Unit = {
// 1、告诉spark 我的配置 核心数 本次运行驱动的名字
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
// 2、初始化StreamingContext 告诉spark 每五秒读取一次数据
val ssc = new StreamingContext(conf, Seconds(10)) // ,每五秒读取一次
// 3、监控端口,开始读取数据
// val lines = ssc.socketTextStream("192.168.233.133", 9999) // Linux启动9999端口 nc -lk 9999
val lines = ssc.textFileStream("hdfs://192.168.233.133:9000/sparkStream/")
// 可以合一句话
// val words = lines.flatMap(_.split(" "))
// val pairs = words.map(word => (word, 1))
// val wordCounts = pairs.reduceByKey(_ _)
// 4、将读到的数据可以做Wordcount的处理 可按照RDD的处理算子来进行调用
val wordCounts = lines.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ _)
// val wordCounts = lines.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey( (a,b) => a b)
// 5、输出数据集
wordCounts.print()
// 6、将sparkstream启动 开始监控
ssc.start()
ssc.awaitTermination()
}
}
附录
操作完数据之后,进行数据渲染
jQuery关键性代码
代码语言:javascript复制<script src="./static/jquery.min.js" charset="utf-8"></script>
代码语言:javascript复制 $(function () {
$.get("./servletdemo",function(data,status){
var result = JSON.parse(data);
var x = new Array();
var y = new Array();
for(var i = 0; i< result.length; i ){
x.push(result[i].deptno);
y.push(result[i].sumsalary);
}
console.log(x);
console.log(y);
});
另一个案例
// 组成这样的形式{value: 60, name: '访问'}, 即 records =[{...},{},{}]
var records = Array();
for(var i=0;i<result.length;i ){
records.push({"value":result[i].counts,"name":result[i].beh})
}
console.log("新的data记录集:",records)
echarts关键代码
可参考菜鸟教程地址:学习echarts
代码语言:javascript复制<div id="main1" style="width: 600px;height:400px;"></div>
<script src="./static/echarts.min.js" charset="utf-8"></script>
代码语言:javascript复制 $(function () {
$.get("./servletdemo",function(data,status){
var result = JSON.parse(data);
var x = new Array();
var y = new Array();
for(var i = 0; i< result.length; i ){
x.push(result[i].deptno);
y.push(result[i].sumsalary);
}
console.log(x);
console.log(y);
// 基于准备好的dom,初始化echarts实例
var myChart1 = echarts.init(document.getElementById('main1'));
// 指定图表的配置项和数据
var option1 = {
title: {
text: '部门员工薪资总和'
},
tooltip: {},
legend: {
data:['薪资总和']
},
xAxis: {
data: x
},
yAxis: {},
series: [{
name: '销量',
type: 'bar',
data: y
}]
};
// 使用刚指定的配置项和数据显示图表。
myChart1.setOption(option1);
});
});