利用随机森林算法实现Bank风险预测
源码分享及数据集分享:https://github.com/luo948521848/BigDatas
在机器学习中,随机森林是一个包含多个决策树的分类器, 并且其输出的类别是由个别树输出的类别的众数而定。 Leo Breiman和Adele Cutler发展出推论出随机森林的算法。 而 “Random Forests” 是他们的商标。 这个术语是1995年由贝尔实验室的Tin Kam Ho所提出的随机决策森林(random decision forests)而来的。这个方法则是结合 Breimans 的 “Bootstrap aggregating” 想法和 Ho 的"random subspace method"以建造决策树的集合。
1.分裂:在决策树的训练过程中,需要一次次的将训练数据集分裂成两个子数据集,这个过程就叫做分裂。 2.特征:在分类问题中,输入到分类器中的数据叫做特征。以上面的股票涨跌预测问题为例,特征就是前一天的交易量和收盘价。 3.待选特征:在决策树的构建过程中,需要按照一定的次序从全部的特征中选取特征。待选特征就是在目前的步骤之前还没有被选择的特征的集合。例如,全部的特征是 ABCDE,第一步的时候,待选特征就是ABCDE,第一步选择了C,那么第二步的时候,待选特征就是ABDE。 4.分裂特征:接待选特征的定义,每一次选取的特征就是分裂特征,例如,在上面的例子中,第一步的分裂特征就是C。因为选出的这些特征将数据集分成了一个个不相交的部分,所以叫它们分裂特征。
数据源类型: A为用户信誉状况:1正常 0坏信用 其他数据是用户的基本信息
代码部分:
代码语言:javascript复制//关于银行风险预测
object Credit {
//用一个scala的case 定义case类的属性
case class Credit(
creditability: Double,
balance: Double, duration: Double, history: Double, purpose: Double, amount: Double,
savings: Double, employment: Double, instPercent: Double, sexMarried: Double, guarantors: Double,
residenceDuration: Double, assets: Double, age: Double, concCredit: Double, apartment: Double,
credits: Double, occupation: Double, dependents: Double, hasPhone: Double, foreign: Double
)
//用一个函数 解析一行 将值存入Credit类中
def parseCredit(line: Array[Double]): Credit = {
Credit(
line(0),
line(1) - 1, line(2), line(3), line(4) , line(5),
line(6) - 1, line(7) - 1, line(8), line(9) - 1, line(10) - 1,
line(11) - 1, line(12) - 1, line(13), line(14) - 1, line(15) - 1,
line(16) - 1, line(17) - 1, line(18) - 1, line(19) - 1, line(20) - 1
)
}
// 将字符串的RDD转换成Double类的RDD
def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {
rdd.map(_.split(",")).map(_.map(_.toDouble))
}
//定义main函数
def main(args: Array[String]): Unit = {
//定义ScalacAPP
val conf = new SparkConf().setAppName("SparkDFebay")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//读取CVS文件
import sqlContext.implicits._
//第一个map 将字符串RDD转换成 Double RDD 第二个map 将Double 注入到Credit的类当中
//toDF 将RDD转换成Credit类的DataFrame(一种表格结构)
val creditDF = parseRDD(sc.textFile("germancredit.csv")).map(parseCredit).toDF().cache()
creditDF.registerTempTable("credit")
//creditDF.printSchema 打印结果
// creditDF.show
// 可以利用SQLContext 对数据做进一步SQL操作
//为了更好地给机器使用对一些特征进行变化 变成一种维度
val featureCols = Array("balance", "duration", "history", "purpose", "amount",
"savings", "employment", "instPercent", "sexMarried", "guarantors",
"residenceDuration", "assets", "age", "concCredit", "apartment",
"credits", "occupation", "dependents", "hasPhone", "foreign")
val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
val df2 = assembler.transform(creditDF)
//在2的基础上增加信用度,这个标签
//creditability 设置为指标值
val labelIndexer = new StringIndexer().setInputCol("creditability").setOutputCol("label")
val df3 = labelIndexer.fit(df2).transform(df2)
//对3的数据进行分割
//5043random的种子,基本可忽略。
val splitSeed = 5043
val Array(trainingData, testData) = df3.randomSplit(Array(0.7, 0.3), splitSeed)
//第一种方法利用随机森林分类器
/* *MaxDepth 最大深度 越大效果越好,但越延长训练时间 * NumTrees用于设置数的数量 越大精度越高(考虑维度) * maxBins 最大分桶数 决定节点分裂 * impurity 计算信息增益的指标 * auto 节点分裂时选择参加的特征数 * seed 随机生成的种子 */
val classifier = new RandomForestClassifier().setImpurity("gini").setMaxDepth(3).setNumTrees(20).setFeatureSubsetStrategy("auto").setSeed(5043)
val model = classifier.fit(trainingData) //进行训练
val evaluator = new BinaryClassificationEvaluator().setLabelCol("label") //来设置label值
val predictions = model.transform(testData) //进行预测
model.toDebugString
//保存模型
model.save("BankModel001")
//计算预测的准确率
val accuracy = evaluator.evaluate(predictions)
println("accuracy before pipeline fitting" accuracy*100 "%")
/* *第二种方法利用管道模式来训练模型(网络搜索法) * *将不同参数进行组合来预测结果 * * */
//利用ParamGridBuilder 工具来构建参数网络
//classifier是已关联好
val paramGrid = new ParamGridBuilder()
.addGrid(classifier.maxBins, Array(25, 31))
.addGrid(classifier.maxDepth, Array(5, 10))
.addGrid(classifier.numTrees, Array(20, 60))
.addGrid(classifier.impurity, Array("entropy", "gini"))
.build()
//创建管道 由一系列stage 每个stage相当于一个 Estimator测评者或者Transformer转换者
val steps: Array[PipelineStage] = Array(classifier)
val pipeline = new Pipeline().setStages(steps)
//用CrossValidator类完成模型筛选 不能过高
// 这类让管道在网上爬行
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(10)
//管道在参数网络上爬行不断被优化
val pipelineFittedModel = cv.fit(trainingData)
pipelineFittedModel.save("BankPipelineMode")
//测试数据
val predictions2 = pipelineFittedModel.transform(testData)
val accuracy2 = evaluator.evaluate(predictions2)
println("accuracy after pipeline fitting" accuracy2*100 "%")
}
}
将项目打包成jar 上传到spark集群中进行计算,获得模型准确率: