spark读取Hive

2020-08-04 21:39:16 浏览数 (1)

导入依赖

导入关键的依赖包

代码语言:javascript复制
    compile("org.scala-lang:scala-library:$scalaVersion")
    compile("org.scala-lang:scala-reflect:$scalaVersion")
    compile("org.scala-lang:scala-compiler:$scalaVersion")

    compile("org.apache.spark:spark-sql_2.11:$sparkVersion")
    compile("org.apache.spark:spark-streaming_2.11:$sparkVersion")
    compile("org.apache.spark:spark-hive_2.11:$sparkVersion")
    compile("org.apache.spark:spark-hive-thriftserver_2.11:$sparkVersion")

启动hive支持

代码语言:javascript复制
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
   //配置spark
    val spark = SparkSession
      .builder()
      .appName("Spark Hive Example")
      .master("local[2]")
      .config("spark.sql.warehouse.dir", warehouseLocation)
      .config("hive.metastore.uris", "thrift://hdp2.nsrc.com:9083")
      .config("mapreduce.input.fileinputformat.input.dir.recursive", "true")
      .config("hive.input.dir.recursive", "true")
      .config("hive.mapred.supports.subdirectories", "true")
      .config("hive.supports.subdirectories", "true")
      .config("spark.driver.maxResultSize", "5g")
      //启动hive支持
      .enableHiveSupport()
      .getOrCreate()
    var startDay = "2019-03-08 00:00:00"
    var endDay = "2019-03-10 23:59:59"
    var srcIp = "10.28.137.84"
    //直接使用sparksql进行查询,返回为df
    var resultDf = spark.sql("select * from http_origin where  date_format(http_origin.day, 'yyyy-MM-dd HH:mm:ss')>= '" startDay "'"  
      "and date_format(http_origin.day, 'yyyy-MM-dd HH:mm:ss')<= '" endDay "' and http_origin.srcip = '" srcIp "'")

hive与hbase关联,可以作为一种查询hbase的方式

创建hive对应的映射语句

代码语言:javascript复制
CREATE EXTERNAL TABLE IF NOT EXISTS httpsystem_dev( id String, srcIp String, srcPort Int, distIp String, distPort Int, requestURL String, requestMethod String, requestUserAgent String, requestCookie String, responseServer String, responseCode Int, requestHeader String, requestContType String, responseCharset String, httpVersion String, requestHost String, requestBodyString String, requestParameterString String, responseContentType String, responseHeader String, responseBodyReference String, ML_rule_juge String, ML_rule_juge_id String, ML_type String, ML_juge_mal String, ML_juge_type String, DLCNN_rule_juge String, DLCNN_type String, DLCNN_juge_mal String, DLCNN_juge_type String) STORED BY'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES( 'serialization.format'='t','hbase.columns.mapping'=':key,0:srcIp, 0:srcPort, 0:distIp, 0:distPort, 0:requestURL, 0:requestMethod, 0:requestUserAgent, 0:requestCookie, 0:responseServer, 0:responseCode, 0:requestHeader, 0:requestContType, 0:responseCharset, 0:httpVersion, 0:requestHost, 0:requestBodyString, 0:requestParameterString, 0:responseContentType, 0:responseHeader, 0:responseBodyReference, 0:ML_rule_juge, 0:ML_rule_juge_id, 0:ML_type, 0:ML_juge_mal, 0:ML_juge_type, 0:DLCNN_rule_juge, 0:DLCNN_type, 0:DLCNN_juge_mal, 0:DLCNN_juge_type','field.delim'='t') TBLPROPERTIES ('hbase.table.name'='httpsystem_dev')

将结果保存csv到HDFS

代码语言:javascript复制
var url: String = "hdfs://hdp1.nsrc.com:8020/user/http_system/offline_file/"   "123"
        resultDf.write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).option("header", "false")
         .save(url);
//        //创建样例列表

创建视图返回局部结果

代码语言:javascript复制
       resultDf.registerTempTable("offlineResult")
//      var samples = spark.sql("select srcip,distip,requesthost,requesturl,requsetheader,"  
//        "requestbodystring,requestmethod,responsecode,responsebody from offlineResult limit 10")
        var samples = spark.sql("select srcip,distip,requesthost,requesturl,requestheader,"  
  "requestbodystring,requestmethod,responsecode,responsebody from offlineResult limit 10")
      samples.show()
           var rows = samples.collect()
           for(row <- rows){
              println(row(1),row(0),row(7))
           }

0 人点赞