导入依赖
导入关键的依赖包
代码语言:javascript复制 compile("org.scala-lang:scala-library:$scalaVersion")
compile("org.scala-lang:scala-reflect:$scalaVersion")
compile("org.scala-lang:scala-compiler:$scalaVersion")
compile("org.apache.spark:spark-sql_2.11:$sparkVersion")
compile("org.apache.spark:spark-streaming_2.11:$sparkVersion")
compile("org.apache.spark:spark-hive_2.11:$sparkVersion")
compile("org.apache.spark:spark-hive-thriftserver_2.11:$sparkVersion")
启动hive支持
代码语言:javascript复制val warehouseLocation = new File("spark-warehouse").getAbsolutePath
//配置spark
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.master("local[2]")
.config("spark.sql.warehouse.dir", warehouseLocation)
.config("hive.metastore.uris", "thrift://hdp2.nsrc.com:9083")
.config("mapreduce.input.fileinputformat.input.dir.recursive", "true")
.config("hive.input.dir.recursive", "true")
.config("hive.mapred.supports.subdirectories", "true")
.config("hive.supports.subdirectories", "true")
.config("spark.driver.maxResultSize", "5g")
//启动hive支持
.enableHiveSupport()
.getOrCreate()
var startDay = "2019-03-08 00:00:00"
var endDay = "2019-03-10 23:59:59"
var srcIp = "10.28.137.84"
//直接使用sparksql进行查询,返回为df
var resultDf = spark.sql("select * from http_origin where date_format(http_origin.day, 'yyyy-MM-dd HH:mm:ss')>= '" startDay "'"
"and date_format(http_origin.day, 'yyyy-MM-dd HH:mm:ss')<= '" endDay "' and http_origin.srcip = '" srcIp "'")
hive与hbase关联,可以作为一种查询hbase的方式
创建hive对应的映射语句
代码语言:javascript复制CREATE EXTERNAL TABLE IF NOT EXISTS httpsystem_dev( id String, srcIp String, srcPort Int, distIp String, distPort Int, requestURL String, requestMethod String, requestUserAgent String, requestCookie String, responseServer String, responseCode Int, requestHeader String, requestContType String, responseCharset String, httpVersion String, requestHost String, requestBodyString String, requestParameterString String, responseContentType String, responseHeader String, responseBodyReference String, ML_rule_juge String, ML_rule_juge_id String, ML_type String, ML_juge_mal String, ML_juge_type String, DLCNN_rule_juge String, DLCNN_type String, DLCNN_juge_mal String, DLCNN_juge_type String) STORED BY'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES( 'serialization.format'='t','hbase.columns.mapping'=':key,0:srcIp, 0:srcPort, 0:distIp, 0:distPort, 0:requestURL, 0:requestMethod, 0:requestUserAgent, 0:requestCookie, 0:responseServer, 0:responseCode, 0:requestHeader, 0:requestContType, 0:responseCharset, 0:httpVersion, 0:requestHost, 0:requestBodyString, 0:requestParameterString, 0:responseContentType, 0:responseHeader, 0:responseBodyReference, 0:ML_rule_juge, 0:ML_rule_juge_id, 0:ML_type, 0:ML_juge_mal, 0:ML_juge_type, 0:DLCNN_rule_juge, 0:DLCNN_type, 0:DLCNN_juge_mal, 0:DLCNN_juge_type','field.delim'='t') TBLPROPERTIES ('hbase.table.name'='httpsystem_dev')
将结果保存csv到HDFS
代码语言:javascript复制var url: String = "hdfs://hdp1.nsrc.com:8020/user/http_system/offline_file/" "123"
resultDf.write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).option("header", "false")
.save(url);
// //创建样例列表
创建视图返回局部结果
代码语言:javascript复制 resultDf.registerTempTable("offlineResult")
// var samples = spark.sql("select srcip,distip,requesthost,requesturl,requsetheader,"
// "requestbodystring,requestmethod,responsecode,responsebody from offlineResult limit 10")
var samples = spark.sql("select srcip,distip,requesthost,requesturl,requestheader,"
"requestbodystring,requestmethod,responsecode,responsebody from offlineResult limit 10")
samples.show()
var rows = samples.collect()
for(row <- rows){
println(row(1),row(0),row(7))
}