文章大纲
- 欺诈检测一般性处理流程介绍
- pyspark xgboost DEMO
- 参考文献
xgboost 和pyspark 如何配置呢? 请参考之前的博文:
使用 WSL 进行pyspark xgboost 分类 特征重要性 简单实践
银行需要面对数量不断上升的欺诈案件。随着新技术的出现,欺诈事件的实例将会成倍增加,银行很难检查每笔交易并手动识别欺诈模式。RPA使用“if-then”方法识别潜在的欺诈行为并将其标记给相关部门。例如,如果在短时间内进行了多次交易, RPA会识别该账户并将其标记为潜在威胁。这有助于银行仔细审查账户并调查欺诈行为。
欺诈检测一般性处理流程介绍
流程图说明 正如我们在上面看到的,我们接收我们的输入,包括关于金融数据中个人保险索赔的数据(这些包含索赔特征、客户特征和保险特征)。
经过一些预处理和添加新的特征,我们使用数据来训练XGBOOST分类器。 在分类器被训练之后,它可以用来确定新记录是否被接受(不欺诈)或被拒绝(欺诈)。 下面将更详细地描述该过程的流程。
当我们和客户交流后,需要针对每个字段进行理解,客户会给到我们一个数据说明表格:
输入 Our input consists of a dataset with lines for each claim. The claims contain data about the customers, the types of claims, claim amounts, and other relevant features.
Data Preparation (Preprocessing, Generation of Code Features, and Generation of Customer Segmentation Features)
We first do some initial preprocessing to convert the data fields to a suitable format. Then, based on the input, we generate features which characterize the customer based on factors like number of previous claims, number of previous occurences of fraud, total amount claimed, etc. These customer segmentation features are added to the existing dataset along with features that detail the presence (or lack thereof) of Warning Codes, Diagnosis Codes, etc.
我们首先做一些初始的预处理,将数据字段转换成合适的格式。然后,基于输入,我们生成特征,这些特征基于以前索赔次数、以前欺诈发生次数、索赔总额等因素来描述客户。这些客户细分特征与详细说明警告代码存在(或缺乏)的特征一起添加到现有数据集中,诊断代码等。
Preprocessing consists of :
- Log Transformation on high magnitude numerical features
- One-hot encoding of categorical features
- Timestamp transforms on date-time features
Customer Segmentation Features in the data include : Summation of ORG_PRES_AMT_VALUE for a particular customer until a particular timestamp Summation of TOTAL_RECEIPT_AMT for a particular customer until a particular timestamp Summation of APP_AMT Summation of CL_SOCIAL_PAY_AMT for a particular customer until a particular timestamp Summation of CL_OWNER_PAY_AMT for a particular customer until a particular timestamp Summation of REJECTED_AMT Average of COPAY_PCT for a particular customer until a particular timestamp Max value of NO_OF_YEAR for a particular customer until a particular timestamp Number of CL_LINE_STATUS_RJ lines Number of CL_LINE_STATUS_AC lines Ratio of CL_LINE_STATUS_RJ lines to CL_LINE_STATUS_AC lines Number of BEN_SPEND_Applicant lines Number of BEN_SPEND_Spouse lines Number of BEN_SPEND_Child lines Number of BEN_SPEND_Parents lines Number of lines Ratio of Total Rejected Amount to Total ORG_PRES_AMT_VALUE Ratio of CL_LINE_STATUS_RJ lines to all lines Ratio of CL_LINE_STATUS_AC lines to all lines Ratio of BEN_SPEND_Applicant lines to all lines Ratio of BEN_SPEND_Spouse lines to all lines Ratio of BEN_SPEND_Child lines to all lines Ratio of BEN_SPEND_Parents lines to all lines
Code Features are generated by selecting a particular code field, determining the top 20 most frequently occuring items in that particular field, and then creating a feature for each of those 20 items, indicating whether they are present in the record or not. The code fields for which we generate Code
Features in the data are :
- Warning Code
- DIAG_CODE
- PROV_CODE
- KIND_CODE
- CLSH_HOSP_CODE
Classification Model :
XGBoost is an implementation of gradient boosted decision trees designed for speed and performance. The implementation of the algorithm was engineered for efficiency of compute time and memory resources. A design goal was to make the best use of available resources to train the model. We are using an XGBoost classifier to determine whether a claim is fraudulent or not.
XGBoost是一个梯度增强决策树的实现,旨在提高速度和性能。算法的实现是为了提高计算时间和内存资源的效率而设计的。设计目标是充分利用现有资源来训练模型。我们使用XGBoost分类器来确定索赔是否具有欺诈性。
输出 The model classifies the claims as Rejected or Accepted, ie, Fraudulent or Not Fraudulent
pyspark xgboost DEMO
代码语言:javascript复制# Imports and Initialization
from xgboost import XGBClassifier, plot_importance
from sklearn.model_selection import RandomizedSearchCV
from pyspark.sql import functions as F
from pyspark.sql.types import *
from sklearn import preprocessing
import sys
import os
import numpy as np
import pandas as pd
from sklearn.metrics import precision_recall_curve
from sklearn.model_selection import train_test_split
from sklearn.metrics import average_precision_score, confusion_matrix, accuracy_score
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevel
import json
import math
import numbers
from pyspark.sql import SQLContext
from pyspark.sql import Window
import matplotlib.pyplot as plt
import itertools
from sklearn.metrics import roc_curve
from sklearn.metrics import roc_auc_score
day_of_week_udf = F.udf(
lambda ts: ts.weekday() if ts is not None else None,
StringType())
def getSummary(df):
summarydf = (df
.withColumn('is_BEN_TYPE_Applicant',F.when(F.col("BEN_TYPE") == "Applicant", F.lit(1)).otherwise(F.lit(0)))
.groupby("MBR_NO")
.agg(
F.max("NO_OF_YR").alias("MAX_NO_OF_YR"),
F.sum(F.lit(1)).alias("NUM_LINES"),
)
.withColumn("FRAC_REJECTED_AMT", F.col("TOT_REJECTED_AMT")/F.col("TOT_ORG_PRES_AMT_VALUE"))
.withColumn("FRAC_BEN_TYPE_Applicant", F.col("TOT_is_BEN_TYPE_Applicant")/F.col("NUM_LINES"))
.withColumn("FRAC_BEN_TYPE_Spouse", F.col("TOT_is_BEN_TYPE_Spouse")/F.col("NUM_LINES"))
.withColumn("FRAC_BEN_TYPE_Child", F.col("TOT_is_BEN_TYPE_Child")/F.col("NUM_LINES"))
.withColumn("FRAC_BEN_TYPE_Parent", F.col("TOT_is_BEN_TYPE_Parent")/F.col("NUM_LINES"))
.persist()
)
return summarydf
def plot_confusion_matrix(cm, classes,
normalize=False,
title='Confusion matrix',
cmap=plt.cm.Blues):
"""
This function prints and plots the confusion matrix.
Normalization can be applied by setting `normalize=True`.
"""
if normalize:
cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
print("Normalized confusion matrix")
else:
print('Confusion matrix, without normalization')
print(cm)
plt.imshow(cm, interpolation='nearest', cmap=cmap)
plt.title(title)
plt.colorbar()
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes, rotation=45)
plt.yticks(tick_marks, classes)
fmt = '.2f' if normalize else 'd'
thresh = cm.max() / 2.
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
plt.text(j, i, format(cm[i, j], fmt),
horizontalalignment="center",
color="white" if cm[i, j] > thresh else "black")
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
def checkContain(baseFeatures, allFeatures, transformsList):
"""
Description : Used to indicate that we want to use the transformed features
and not the original features if a transform has been done
Input : baseFeatures - Features included in the dataset before any
transforms are applied
allFeatures - All the features present in the dataframe after all
transforms and prep have completed
transformsList - The features from the original dataset that should
have transforms applied to them
Output : List of the features we're going to use for the ML model
"""
resList = []
for baseFeat in baseFeatures:
if baseFeat not in transformsList:
resList.append(baseFeat)
else:
for feat in allFeatures:
if baseFeat in feat:
if "~~" in feat or "log10" in feat:
resList.append(feat)
return resList
def transform_ts_fields(df, ts_cols):
"""
Description : Produces a timestamp in the standard dow-hod format for the
supplied field
Input : df - dataframe
ts_cols - timestamp features that need to be formatted correctly
Output : dataframe with appropriately formatted timestamp features
"""
col_list = df.columns
for col in ts_cols:
if(col in col_list):
df = (
df .withColumn(
col,
F.col(col).cast("timestamp")) .withColumn(
"{}_dow".format(col),
day_of_week_udf(
F.col(col))) .withColumn(
"{}_hod".format(col),
F.hour(
F.col(col))))
return df
def transform_numeric_fields(df, num_cols):
"""
Description : Converts all numeric fields into float type
Input : df - dataframe
num_cols - numeric features that need to be converted to float type
Output : dataframe with appropriately numerical features converted to float
type
"""
col_list = df.columns
for col in num_cols:
if(col in col_list):
df = (df
.withColumn(col, F.col(col).cast("float"))
)
return df
def transform_log_fields(df, num_cols):
"""
Description : Produces the log_10 of the fields passed to it
Input : df - dataframe
num_cols - numeric features whose log values need to be calculated
Output : dataframe with added log values for the required numerical
features
"""
col_list = df.columns
for col in num_cols:
if(col in col_list):
df = (df
.withColumn(col "_log10", F.log(10.0, F.col(col)))
)
return df
def with_transform(df, param_dict):
"""
Description : Applies transforms on relevant data fields in the data
Input : df - dataframe
param_dict - parameter dictionary
Output : dataframe with all appropriate transforms
"""
df = transform_ts_fields(df, param_dict['BASE_FEATURES_TIMESTAMP'])
df = transform_numeric_fields(df, param_dict['BASE_FEATURES_NUMERIC'])
df = transform_log_fields(df, param_dict['LOG_TRANSFORM_FEATURES'])
df = (
df .withColumn(
"INCUR_PERIOD_SECS",
F.col("INCUR_DATE_TO").cast("long") -
F.col("INCUR_DATE_FROM").cast("long")))
return df
def run_xgboost(data,feats, scale_pos_weight=1.0, old_model = None):
"""
Description : Generates an xgboost model based on training data
Input : X_train_pd - Pandas Dataframe, training data input
y_train - training data output/labels
param_dict - parameter dictionary
max_depth_list - list of max depths of trees
n_estimators_list - list of number of trees
scoring_metric - scoring metric used
grid_scoring - grid scoring metric
scale_pos_weight - weight applied to positive vals
num_cv = cross-validation splitting strategy
Output : Trained XGBoost Classifier
"""
X_train, X_test, y_train, y_test = train_test_split(data[feats], data['label'], test_size=0.33)
unique, counts = np.unique(y_train, return_counts=True)
cdict = dict(zip(unique, counts))
temp_pos_weight = cdict[0]/cdict[1]
xgb_class = XGBClassifier(scale_pos_weight=temp_pos_weight)
xgb_class.fit(X=X_train, y=y_train, xgb_model = old_model)
y_pred_proba = xgb_class.predict_proba(X_test)
threshs = np.arange(0.01,1,0.01)
acc = 0
prsum = 0
abdist = 1
bestthresh = 0
for thresh in threshs:
y_pred_temp = (y_pred_proba[:,1] >= thresh).astype(int)
'''
precision, recall, thresholds = precision_recall_curve(y_test, y_pred_temp)
average_precision = average_precision_score(y_test, y_pred_temp)
if ((precision[1] recall[1])>prsum) and (recall[1]>precision[1]):
prsum = precision[1] recall[1]
bestthresh = thresh
'''
'''
temp_acc = accuracy_score(np.array(y_test), np.array(y_pred_temp))
if temp_acc >acc:
acc = temp_acc
bestthresh = thresh
'''
cnf_matrix_temp = confusion_matrix(y_test, y_pred_temp)
cm = cnf_matrix_temp.astype('float') / cnf_matrix_temp.sum(axis=1)[:, np.newaxis]
fp = cm[0][1] * 1.0
fn = cm[1][0] * 1.0
dist = abs((fn/fp)-1)
if dist<abdist:
abdist = dist
bestthresh = thresh
y_pred = (y_pred_proba[:,1] >= bestthresh).astype(int)
precision, recall, thresholds = precision_recall_curve(y_test, y_pred)
average_precision = average_precision_score(y_test, y_pred)
# Compute confusion matrix
cnf_matrix = confusion_matrix(y_test, y_pred)
np.set_printoptions(precision=2)
# Plot non-normalized confusion matrix
plt.figure()
plot_confusion_matrix(cnf_matrix, classes=[0,1],
title='Confusion matrix, without normalization')
# Plot normalized confusion matrix
plt.figure()
plot_confusion_matrix(cnf_matrix, classes=[0,1], normalize=True,
title='Normalized confusion matrix')
plt.show()
plt.step(recall, precision, color='b', alpha=0.2,
where='post')
plt.fill_between(recall, precision, step='post', alpha=0.2,
color='b')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.ylim([0.0, 1.05])
plt.xlim([0.0, 1.0])
plt.title('2-class Precision-Recall curve: AP={0:0.5f}'.format(
average_precision))
plt.show()
auc = roc_auc_score(y_test, y_pred_proba[:,1])
print('AUC: %.3f' % auc)
# calculate roc curve
fpr, tpr, thresholds = roc_curve(y_test, y_pred_proba[:,1])
# plot no skill
plt.plot([0, 1], [0, 1], linestyle='--')
# plot the roc curve for the model
plt.plot(fpr, tpr, marker='.')
# show the plot
plt.show()
unique, counts = np.unique(data['label'], return_counts=True)
cdict = dict(zip(unique, counts))
pos_weight = cdict[0]/cdict[1]
full_model = XGBClassifier(scale_pos_weight= pos_weight)
full_model.fit(data[feats], data['label'])
return full_model, bestthresh
def setup_spark_session(param_dict):
"""
Description : Used to setup spark session
Input : param_dict - parameter dictionary
Output : Spark Session, Spark Context, and SQL Context
"""
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
os.environ["PYSPARK_PYTHON"] = "/home/hadoop/anaconda/envs/playground_py36/bin/python"
try:
spark.stop()
print("Stopped a SparkSession")
except Exception as e:
print("No existing SparkSession")
SPARK_DRIVER_MEMORY = param_dict["SPARK_DRIVER_MEMORY"] # "10G"
SPARK_DRIVER_CORE = param_dict["SPARK_DRIVER_CORE"] # "5"
SPARK_EXECUTOR_MEMORY = param_dict["SPARK_EXECUTOR_MEMORY"] # "3G"
SPARK_EXECUTOR_CORE = param_dict["SPARK_EXECUTOR_CORE"] # "1"
AWS_ACCESS_KEY = param_dict["AWS_ACCESS_KEY"]
AWS_SECRET_KEY = param_dict["AWS_SECRET_KEY"]
AWS_S3_ENDPOINT = param_dict["AWS_S3_ENDPOINT"]
conf = SparkConf().
setAppName(param_dict["APP_NAME"]).
setMaster('yarn-client').
set('spark.executor.cores', SPARK_EXECUTOR_CORE).
set('spark.executor.memory', SPARK_EXECUTOR_MEMORY).
set('spark.driver.cores', SPARK_DRIVER_CORE).
set('spark.driver.memory', SPARK_DRIVER_MEMORY).
set('spark.driver.maxResultSize', '0')
spark = SparkSession.builder.
config(conf=conf).
getOrCreate()
sc = spark.sparkContext
hadoop_conf = sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.access.key", AWS_ACCESS_KEY)
hadoop_conf.set("fs.s3a.secret.key", AWS_SECRET_KEY)
hadoop_conf.set("fs.s3a.endpoint", AWS_S3_ENDPOINT)
hadoop_conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
sqlContext = SQLContext(sc)
return spark, sc, sqlContext
def loadDataset(vw_cl_lines_df, datefield, param_dict):
"""
Description : Runs data through appropriate transforms to convert it
to a suitable format
Input : vw_cl_lines_df - input dataframe
datefield - field used to establish a window for the addFeats
function
param_dict - parameter dictionary
Output : Properly formatted dataframe
"""
vw_cl_lines_df = (with_transform(vw_cl_lines_df, param_dict))
vw_cl_lines_df = vw_cl_lines_df.withColumn(
datefield "_unix",
(F.unix_timestamp(
F.col(datefield),
format='yyyy-MM-dd HH:mm:ss.000')))
return vw_cl_lines_df
def addOneHotsTest(df, oneHots):
for item in oneHots:
field = item.split('~')[0]
df[item] = np.where(df[field] == item, 1, 0)
return df
def addCodes(res_df, codeField, topCodes, optUDF, knownPref):
# Need to get same codes for testing
for code in topCodes:
likeCode = "%" code "%"
res_df = res_df.withColumn(
code,
F.when(
res_df[codeField].like(likeCode),
1).otherwise(0))
def checkOtherCodes(x):
if not x:
return 0
x = set(x)
if x.issubset(topCodes):
return 0
else:
return 1
otherCodesUDF = F.udf(checkOtherCodes, IntegerType())
if knownPref is not None:
otherlabel = knownPref "_" codeField
else:
otherlabel = codeField
res_df = res_df.withColumn(
"OTHER_" otherlabel,
otherCodesUDF(
res_df[codeField]))
codesAdded = topCodes
return res_df, codesAdded
def codeExtract(df, codeField, topCount, optUDF=None, knownPref=None):
"""
Description : Function to extract code features
Input : df - input dataframe
codeField - field used to establish a window for the addFeats
function
topCount - number of code features to be added
optUDF - optional udf to apply to the field
knownPref - prefix characterizing a field, if any
Output : dataframe with code features added
"""
codeEx_df = df
if optUDF is not None:
codeEx_df = codeEx_df.withColumn(
codeField, optUDF(codeEx_df[codeField]))
codeEx_df = codeEx_df.withColumn(
codeField, F.explode(
F.split(
codeEx_df[codeField], ",")))
code_counts = codeEx_df.groupBy(codeField).count().sort(F.desc("count"))
if knownPref is not None:
code_counts = code_counts.filter(
code_counts[codeField].like(
"%" knownPref "%"))
# code_counts.show(10)
xy = code_counts.toPandas()
# Generating a list of the top 20 most frequently occuring Reject Codes
topCodes = xy[codeField].head(topCount).tolist()
topCodes = [x.strip() for x in topCodes]
res_df = df
return addCodes(res_df, codeField, topCodes, optUDF, knownPref)
# checks for presence of values in a field
def isVal(df, field, value):
return df.withColumn(
"is_" field "_" value,
F.when(
F.col(field) == value,
F.lit(1)).otherwise(
F.lit(0)))
# sums values of a field within a specified window
def sumVal(df, field, windowval):
return df.withColumn("TOT_" field, F.sum(field).over(windowval))
# finds the maximum value of a field within a specified window
def maxVal(df, field, windowval):
return df.withColumn("MAX_" field, F.max(field).over(windowval))
# finds the average value of a field within a specified window
def meanVal(df, field, windowval):
return df.withColumn("MEAN_" field, F.mean(field).over(windowval))
# finds the ratio between two fields of a record
def fracVal(df, numfield, denomfield, fracName):
return df.withColumn(fracName, F.col(numfield) / F.col(denomfield))
# adds required fields to the dataframe
def addFeatsTrain(vw_cl_lines_df, param_dict):
orig = vw_cl_lines_df
windowval = (Window.partitionBy(param_dict["groupField"]).orderBy(
param_dict["windowField"] "_unix").rangeBetween(
Window.unboundedPreceding, -1))
codes_df = orig.withColumn("NUM_LINES", F.sum(F.lit(1)).over(windowval))
for field in param_dict["isFields"]:
codes_df = isVal(codes_df, field[0], field[1])
for field in param_dict["sumFields"]:
codes_df = sumVal(codes_df, field, windowval)
for field in param_dict["maxFields"]:
codes_df = maxVal(codes_df, field, windowval)
for field in param_dict["meanFields"]:
codes_df = meanVal(codes_df, field, windowval)
for fracTuple in param_dict["fracTuples"]:
codes_df = fracVal(codes_df, fracTuple[0], fracTuple[1], fracTuple[2])
def remPref(x):
if x is None:
return ""
x = x.split(",")
y = []
for item in x:
if (('T' not in item) & ('M' not in item)):
y.append(item.strip())
y = ','.join(y)
return y
remPrefUDF = F.udf(remPref, StringType())
allCodes = {}
for code in param_dict["codeFields"]:
if len(code) == 1:
codes_df, toAdd = codeExtract(codes_df, code[0], 20)
if code[0] in allCodes:
allCodes[code[0]] = allCodes[code[0]] toAdd
else:
allCodes[code[0]] = toAdd
else:
codes_df, toAdd = codeExtract(
codes_df,
code[0],
20,
optUDF=remPrefUDF,
knownPref=code[1])
if code[0] in allCodes:
allCodes[code[0]] = allCodes[code[0]] toAdd
else:
allCodes[code[0]] = toAdd
addedCols = list(set(codes_df.columns) - set(vw_cl_lines_df.columns))
return codes_df, addedCols, allCodes
def addFeatsTest(vw_cl_lines_df, param_dict, summary_df):
orig = vw_cl_lines_df
joinfields = [param_dict['groupField'], "NUM_LINES"]
for field in param_dict["sumFields"]:
joinfields.append("TOT_" field)
for field in param_dict["maxFields"]:
joinfields.append("MAX_" field)
for field in param_dict["meanFields"]:
joinfields.append("MEAN_" field)
for fracTuple in param_dict["fracTuples"]:
joinfields.append(fracTuple[2])
codes_df = orig.join(summary_df[joinfields], param_dict['groupField'],how='left')
for field in param_dict["isFields"]:
codes_df = isVal(codes_df, field[0], field[1])
def remPref(x):
if x is None:
return ""
x = x.split(",")
y = []
for item in x:
if (('T' not in item) & ('M' not in item)):
y.append(item.strip())
y = ','.join(y)
return y
remPrefUDF = F.udf(remPref, StringType())
allCodes = {}
for code in param_dict["codeFields"]:
presentInTrain = param_dict["allCodes"][code[0]]
if len(code) == 1:
codes_df, added = addCodes(codes_df, code[0], presentInTrain, None,
None)
else:
codes_df, added = addCodes(codes_df, code[0], presentInTrain,
optUDF=remPrefUDF, knownPref=code[1])
addedCols = list(set(codes_df.columns) - set(vw_cl_lines_df.columns))
return codes_df, addedCols
# prepares the data for use in a training or inference by adding features
# and appropriate labels
def prepTrainData(df, baseFeatures, param_dict):
trainData = loadDataset(df, param_dict["custSegOrder"], param_dict)
negCount = trainData.filter(trainData[param_dict["labelField"]] ==
param_dict["negativeLabel"]).count()
posCount = trainData.filter(trainData[param_dict["labelField"]] ==
param_dict["positiveLabel"]).count()
pos_weight = negCount/posCount
trainData, extraCols, param_dict["allCodes"] = addFeatsTrain(trainData,
param_dict)
vw_cl_lines_pd = trainData.toPandas()
prep_labelled_data_pd = pd.get_dummies(
vw_cl_lines_pd,
columns=param_dict["BASE_FEATURES_CATEGORICAL"],
drop_first=False,
prefix_sep="~~")
featureCols = extraCols checkContain(baseFeatures,
prep_labelled_data_pd.columns
.tolist(),
param_dict
["LOG_TRANSFORM_FEATURES"]
param_dict
["BASE_FEATURES_CATEGORICAL"])
param_dict["oneHots"] = [x for x in prep_labelled_data_pd.columns.tolist()
if "~~" in x]
leakageFeats = ["is_" str(x[0]) "_" str(x[1]) for x in
param_dict["isFields"] if x[0] == param_dict["labelField"]]
featureCols = [x for x in featureCols if x not in leakageFeats]
return prep_labelled_data_pd, featureCols, pos_weight, param_dict
def prepTestData(df, summary, baseFeatures, param_dict):
trainData = loadDataset(df, param_dict["custSegOrder"], param_dict)
trainData, extraCols = addFeatsTest(trainData, param_dict , summary)
vw_cl_lines_pd = trainData.toPandas()
prep_labelled_data_pd = addOneHotsTest(vw_cl_lines_pd,
param_dict["oneHots"])
featureCols = extraCols checkContain(baseFeatures,
prep_labelled_data_pd.columns
.tolist(),
param_dict
["LOG_TRANSFORM_FEATURES"]
param_dict
["BASE_FEATURES_CATEGORICAL"])
leakageFeats = ["is_" str(x[0]) "_" str(x[1]) for x in
param_dict["isFields"] if x[0] == param_dict["labelField"]]
featureCols = [x for x in featureCols if x not in leakageFeats]
return prep_labelled_data_pd, featureCols
# trains and returns an XGBoost Classifier
def trainXGBModel(df, param_dict): # ,onlyWarn = False):
pdf, feats, ratio, param_dict = prepTrainData(df, param_dict["baseFeatures"], param_dict)
for col in param_dict["BASE_FEATURES_TIMESTAMP"]:
pdf[col] = pd.to_datetime(pdf[col], errors='coerce')
adf = pdf.replace([np.inf,-np.inf], 0)
cols = pdf[feats].columns
label = np.where(adf[param_dict["labelField"]] ==
param_dict["positiveLabel"], 1, 0)
x = adf[feats].values #returns a numpy array
standard_scaler = preprocessing.StandardScaler()
x_scaled = standard_scaler.fit_transform(x)
adf = pd.DataFrame(x_scaled, columns=adf[feats].columns)
adf['label'] = label
#X_train, y_train = adf[feats], adf['label']
xgb_model, bestThresh = run_xgboost(adf[feats ['label']], feats, scale_pos_weight= ratio)
param_dict["trainedCols"] = list(feats)
return xgb_model, feats, param_dict, bestThresh
def updateXGBModel(df, param_dict, model):
pandas_df, featureCols, pos_weight = prepTestData(
df, param_dict["baseFeatures"], param_dict)
pandas_df['label'] = np.where(pandas_df[param_dict["labelField"]] ==
param_dict["positiveLabel"], 1, 0)
pandas_df = pandas_df.fillna(0)
y_train = pandas_df['label'].values
X_train_pd = pandas_df.drop('label', 1)
if len(X_train_pd) > 100000 :
X = np.array_split(X_train_pd, 100000)
y = np.array_split(y_train, 100000)
for i in range(len(X)):
xgb_class = XGBClassifier(scale_pos_weight=pos_weight)
model = xgb_class.fit(X[i],y[i], xgb_model = model)
xgb_model = model
return xgb_model, featureCols, param_dict
# uses a model to predict values
def modelPredict(model, test_df, summary, param_dict, posThresh):
test_pdf, feats1 = prepTestData(test_df, summary, param_dict["baseFeatures"], param_dict)
for col in param_dict["BASE_FEATURES_TIMESTAMP"]:
test_pdf[col] = pd.to_datetime(test_pdf[col], errors='coerce')
test_adf = test_pdf.replace([np.inf,-np.inf], 0)
x = test_adf[feats1].values #returns a numpy array
standard_scaler = preprocessing.StandardScaler()
x_scaled = standard_scaler.fit_transform(x)
test_adf = pd.DataFrame(x_scaled, columns=test_adf[feats1].columns)
X_test = test_adf[param_dict["trainedCols"]]
result_proba = model.predict_proba(X_test)
result = []
result = (result_proba[:,1] >= posThresh).astype(int)
#result = model.predict(X_test)
return result, result_proba
参考文献
https://wenku.baidu.com/view/529fc3a4a45177232e60a287.html https://zhuanlan.zhihu.com/p/345828553