- 欺诈检测一般性处理流程介绍
- pyspark xgboost DEMO
- 参考文献
xgboost 和pyspark 如何配置呢? 请参考之前的博文:
使用 WSL 进行pyspark xgboost 分类 特征重要性 简单实践
银行需要面对数量不断上升的欺诈案件。随着新技术的出现,欺诈事件的实例将会成倍增加,银行很难检查每笔交易并手动识别欺诈模式。RPA使用“if-then”方法识别潜在的欺诈行为并将其标记给相关部门。例如,如果在短时间内进行了多次交易, RPA会识别该账户并将其标记为潜在威胁。这有助于银行仔细审查账户并调查欺诈行为。
流程图说明 正如我们在上面看到的,我们接收我们的输入,包括关于金融数据中个人保险索赔的数据(这些包含索赔特征、客户特征和保险特征)。
经过一些预处理和添加新的特征,我们使用数据来训练XGBOOST分类器。 在分类器被训练之后,它可以用来确定新记录是否被接受(不欺诈)或被拒绝(欺诈)。 下面将更详细地描述该过程的流程。
输入 Our input consists of a dataset with lines for each claim. The claims contain data about the customers, the types of claims, claim amounts, and other relevant features.
Data Preparation (Preprocessing, Generation of Code Features, and Generation of Customer Segmentation Features)
We first do some initial preprocessing to convert the data fields to a suitable format. Then, based on the input, we generate features which characterize the customer based on factors like number of previous claims, number of previous occurences of fraud, total amount claimed, etc. These customer segmentation features are added to the existing dataset along with features that detail the presence (or lack thereof) of Warning Codes, Diagnosis Codes, etc.
Preprocessing consists of :
- Log Transformation on high magnitude numerical features
- One-hot encoding of categorical features
- Timestamp transforms on date-time features
Customer Segmentation Features in the data include : Summation of ORG_PRES_AMT_VALUE for a particular customer until a particular timestamp Summation of TOTAL_RECEIPT_AMT for a particular customer until a particular timestamp Summation of APP_AMT Summation of CL_SOCIAL_PAY_AMT for a particular customer until a particular timestamp Summation of CL_OWNER_PAY_AMT for a particular customer until a particular timestamp Summation of REJECTED_AMT Average of COPAY_PCT for a particular customer until a particular timestamp Max value of NO_OF_YEAR for a particular customer until a particular timestamp Number of CL_LINE_STATUS_RJ lines Number of CL_LINE_STATUS_AC lines Ratio of CL_LINE_STATUS_RJ lines to CL_LINE_STATUS_AC lines Number of BEN_SPEND_Applicant lines Number of BEN_SPEND_Spouse lines Number of BEN_SPEND_Child lines Number of BEN_SPEND_Parents lines Number of lines Ratio of Total Rejected Amount to Total ORG_PRES_AMT_VALUE Ratio of CL_LINE_STATUS_RJ lines to all lines Ratio of CL_LINE_STATUS_AC lines to all lines Ratio of BEN_SPEND_Applicant lines to all lines Ratio of BEN_SPEND_Spouse lines to all lines Ratio of BEN_SPEND_Child lines to all lines Ratio of BEN_SPEND_Parents lines to all lines
Code Features are generated by selecting a particular code field, determining the top 20 most frequently occuring items in that particular field, and then creating a feature for each of those 20 items, indicating whether they are present in the record or not. The code fields for which we generate Code
Features in the data are :
- Warning Code
Classification Model :
XGBoost is an implementation of gradient boosted decision trees designed for speed and performance. The implementation of the algorithm was engineered for efficiency of compute time and memory resources. A design goal was to make the best use of available resources to train the model. We are using an XGBoost classifier to determine whether a claim is fraudulent or not.
输出 The model classifies the claims as Rejected or Accepted, ie, Fraudulent or Not Fraudulent
pyspark xgboost DEMO
代码语言:javascript复制# Imports and Initialization
from xgboost import XGBClassifier, plot_importance
from sklearn.model_selection import RandomizedSearchCV
from pyspark.sql import functions as F
from pyspark.sql.types import *
from sklearn import preprocessing
import sys
import os
import numpy as np
import pandas as pd
from sklearn.metrics import precision_recall_curve
from sklearn.model_selection import train_test_split
from sklearn.metrics import average_precision_score, confusion_matrix, accuracy_score
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevel
import json
import math
import numbers
from pyspark.sql import SQLContext
from pyspark.sql import Window
import matplotlib.pyplot as plt
import itertools
from sklearn.metrics import roc_curve
from sklearn.metrics import roc_auc_score
day_of_week_udf = F.udf(
lambda ts: ts.weekday() if ts is not None else None,
def getSummary(df):
summarydf = (df
.withColumn('is_BEN_TYPE_Applicant',F.when(F.col("BEN_TYPE") == "Applicant", F.lit(1)).otherwise(F.lit(0)))
.withColumn("FRAC_BEN_TYPE_Applicant", F.col("TOT_is_BEN_TYPE_Applicant")/F.col("NUM_LINES"))
.withColumn("FRAC_BEN_TYPE_Spouse", F.col("TOT_is_BEN_TYPE_Spouse")/F.col("NUM_LINES"))
.withColumn("FRAC_BEN_TYPE_Child", F.col("TOT_is_BEN_TYPE_Child")/F.col("NUM_LINES"))
.withColumn("FRAC_BEN_TYPE_Parent", F.col("TOT_is_BEN_TYPE_Parent")/F.col("NUM_LINES"))
return summarydf
def plot_confusion_matrix(cm, classes,
title='Confusion matrix',
This function prints and plots the confusion matrix.
Normalization can be applied by setting `normalize=True`.
if normalize:
cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
print("Normalized confusion matrix")
print('Confusion matrix, without normalization')
plt.imshow(cm, interpolation='nearest', cmap=cmap)
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes, rotation=45)
plt.yticks(tick_marks, classes)
fmt = '.2f' if normalize else 'd'
thresh = cm.max() / 2.
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
plt.text(j, i, format(cm[i, j], fmt),
color="white" if cm[i, j] > thresh else "black")
plt.ylabel('True label')
plt.xlabel('Predicted label')
def checkContain(baseFeatures, allFeatures, transformsList):
Description : Used to indicate that we want to use the transformed features
and not the original features if a transform has been done
Input : baseFeatures - Features included in the dataset before any
transforms are applied
allFeatures - All the features present in the dataframe after all
transforms and prep have completed
transformsList - The features from the original dataset that should
have transforms applied to them
Output : List of the features we're going to use for the ML model
resList = []
for baseFeat in baseFeatures:
if baseFeat not in transformsList:
for feat in allFeatures:
if baseFeat in feat:
if "~~" in feat or "log10" in feat:
return resList
def transform_ts_fields(df, ts_cols):
Description : Produces a timestamp in the standard dow-hod format for the
supplied field
Input : df - dataframe
ts_cols - timestamp features that need to be formatted correctly
Output : dataframe with appropriately formatted timestamp features
col_list = df.columns
for col in ts_cols:
if(col in col_list):
df = (
df .withColumn(
F.col(col).cast("timestamp")) .withColumn(
F.col(col))) .withColumn(
return df
def transform_numeric_fields(df, num_cols):
Description : Converts all numeric fields into float type
Input : df - dataframe
num_cols - numeric features that need to be converted to float type
Output : dataframe with appropriately numerical features converted to float
col_list = df.columns
for col in num_cols:
if(col in col_list):
df = (df
.withColumn(col, F.col(col).cast("float"))
return df
def transform_log_fields(df, num_cols):
Description : Produces the log_10 of the fields passed to it
Input : df - dataframe
num_cols - numeric features whose log values need to be calculated
Output : dataframe with added log values for the required numerical
col_list = df.columns
for col in num_cols:
if(col in col_list):
df = (df
.withColumn(col "_log10", F.log(10.0, F.col(col)))
return df
def with_transform(df, param_dict):
Description : Applies transforms on relevant data fields in the data
Input : df - dataframe
param_dict - parameter dictionary
Output : dataframe with all appropriate transforms
df = transform_ts_fields(df, param_dict['BASE_FEATURES_TIMESTAMP'])
df = transform_numeric_fields(df, param_dict['BASE_FEATURES_NUMERIC'])
df = transform_log_fields(df, param_dict['LOG_TRANSFORM_FEATURES'])
df = (
df .withColumn(
F.col("INCUR_DATE_TO").cast("long") -
return df
def run_xgboost(data,feats, scale_pos_weight=1.0, old_model = None):
Description : Generates an xgboost model based on training data
Input : X_train_pd - Pandas Dataframe, training data input
y_train - training data output/labels
param_dict - parameter dictionary
max_depth_list - list of max depths of trees
n_estimators_list - list of number of trees
scoring_metric - scoring metric used
grid_scoring - grid scoring metric
scale_pos_weight - weight applied to positive vals
num_cv = cross-validation splitting strategy
Output : Trained XGBoost Classifier
X_train, X_test, y_train, y_test = train_test_split(data[feats], data['label'], test_size=0.33)
unique, counts = np.unique(y_train, return_counts=True)
cdict = dict(zip(unique, counts))
temp_pos_weight = cdict[0]/cdict[1]
xgb_class = XGBClassifier(scale_pos_weight=temp_pos_weight)
xgb_class.fit(X=X_train, y=y_train, xgb_model = old_model)
y_pred_proba = xgb_class.predict_proba(X_test)
threshs = np.arange(0.01,1,0.01)
acc = 0
prsum = 0
abdist = 1
bestthresh = 0
for thresh in threshs:
y_pred_temp = (y_pred_proba[:,1] >= thresh).astype(int)
precision, recall, thresholds = precision_recall_curve(y_test, y_pred_temp)
average_precision = average_precision_score(y_test, y_pred_temp)
if ((precision[1] recall[1])>prsum) and (recall[1]>precision[1]):
prsum = precision[1] recall[1]
bestthresh = thresh
temp_acc = accuracy_score(np.array(y_test), np.array(y_pred_temp))
if temp_acc >acc:
acc = temp_acc
bestthresh = thresh
cnf_matrix_temp = confusion_matrix(y_test, y_pred_temp)
cm = cnf_matrix_temp.astype('float') / cnf_matrix_temp.sum(axis=1)[:, np.newaxis]
fp = cm[0][1] * 1.0
fn = cm[1][0] * 1.0
dist = abs((fn/fp)-1)
if dist<abdist:
abdist = dist
bestthresh = thresh
y_pred = (y_pred_proba[:,1] >= bestthresh).astype(int)
precision, recall, thresholds = precision_recall_curve(y_test, y_pred)
average_precision = average_precision_score(y_test, y_pred)
# Compute confusion matrix
cnf_matrix = confusion_matrix(y_test, y_pred)
# Plot non-normalized confusion matrix
plot_confusion_matrix(cnf_matrix, classes=[0,1],
title='Confusion matrix, without normalization')
# Plot normalized confusion matrix
plot_confusion_matrix(cnf_matrix, classes=[0,1], normalize=True,
title='Normalized confusion matrix')
plt.step(recall, precision, color='b', alpha=0.2,
plt.fill_between(recall, precision, step='post', alpha=0.2,
plt.ylim([0.0, 1.05])
plt.xlim([0.0, 1.0])
plt.title('2-class Precision-Recall curve: AP={0:0.5f}'.format(
auc = roc_auc_score(y_test, y_pred_proba[:,1])
print('AUC: %.3f' % auc)
# calculate roc curve
fpr, tpr, thresholds = roc_curve(y_test, y_pred_proba[:,1])
# plot no skill
plt.plot([0, 1], [0, 1], linestyle='--')
# plot the roc curve for the model
plt.plot(fpr, tpr, marker='.')
# show the plot
unique, counts = np.unique(data['label'], return_counts=True)
cdict = dict(zip(unique, counts))
pos_weight = cdict[0]/cdict[1]
full_model = XGBClassifier(scale_pos_weight= pos_weight)
full_model.fit(data[feats], data['label'])
return full_model, bestthresh
def setup_spark_session(param_dict):
Description : Used to setup spark session
Input : param_dict - parameter dictionary
Output : Spark Session, Spark Context, and SQL Context
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
os.environ["PYSPARK_PYTHON"] = "/home/hadoop/anaconda/envs/playground_py36/bin/python"
print("Stopped a SparkSession")
except Exception as e:
print("No existing SparkSession")
conf = SparkConf().
set('spark.executor.cores', SPARK_EXECUTOR_CORE).
set('spark.executor.memory', SPARK_EXECUTOR_MEMORY).
set('spark.driver.cores', SPARK_DRIVER_CORE).
set('spark.driver.memory', SPARK_DRIVER_MEMORY).
set('spark.driver.maxResultSize', '0')
spark = SparkSession.builder.
sc = spark.sparkContext
hadoop_conf = sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.access.key", AWS_ACCESS_KEY)
hadoop_conf.set("fs.s3a.secret.key", AWS_SECRET_KEY)
hadoop_conf.set("fs.s3a.endpoint", AWS_S3_ENDPOINT)
hadoop_conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
sqlContext = SQLContext(sc)
return spark, sc, sqlContext
def loadDataset(vw_cl_lines_df, datefield, param_dict):
Description : Runs data through appropriate transforms to convert it
to a suitable format
Input : vw_cl_lines_df - input dataframe
datefield - field used to establish a window for the addFeats
param_dict - parameter dictionary
Output : Properly formatted dataframe
vw_cl_lines_df = (with_transform(vw_cl_lines_df, param_dict))
vw_cl_lines_df = vw_cl_lines_df.withColumn(
datefield "_unix",
format='yyyy-MM-dd HH:mm:ss.000')))
return vw_cl_lines_df
def addOneHotsTest(df, oneHots):
for item in oneHots:
field = item.split('~')[0]
df[item] = np.where(df[field] == item, 1, 0)
return df
def addCodes(res_df, codeField, topCodes, optUDF, knownPref):
# Need to get same codes for testing
for code in topCodes:
likeCode = "%" code "%"
res_df = res_df.withColumn(
def checkOtherCodes(x):
if not x:
return 0
x = set(x)
if x.issubset(topCodes):
return 0
return 1
otherCodesUDF = F.udf(checkOtherCodes, IntegerType())
if knownPref is not None:
otherlabel = knownPref "_" codeField
otherlabel = codeField
res_df = res_df.withColumn(
"OTHER_" otherlabel,
codesAdded = topCodes
return res_df, codesAdded
def codeExtract(df, codeField, topCount, optUDF=None, knownPref=None):
Description : Function to extract code features
Input : df - input dataframe
codeField - field used to establish a window for the addFeats
topCount - number of code features to be added
optUDF - optional udf to apply to the field
knownPref - prefix characterizing a field, if any
Output : dataframe with code features added
codeEx_df = df
if optUDF is not None:
codeEx_df = codeEx_df.withColumn(
codeField, optUDF(codeEx_df[codeField]))
codeEx_df = codeEx_df.withColumn(
codeField, F.explode(
codeEx_df[codeField], ",")))
code_counts = codeEx_df.groupBy(codeField).count().sort(F.desc("count"))
if knownPref is not None:
code_counts = code_counts.filter(
"%" knownPref "%"))
# code_counts.show(10)
xy = code_counts.toPandas()
# Generating a list of the top 20 most frequently occuring Reject Codes
topCodes = xy[codeField].head(topCount).tolist()
topCodes = [x.strip() for x in topCodes]
res_df = df
return addCodes(res_df, codeField, topCodes, optUDF, knownPref)
# checks for presence of values in a field
def isVal(df, field, value):
return df.withColumn(
"is_" field "_" value,
F.col(field) == value,
# sums values of a field within a specified window
def sumVal(df, field, windowval):
return df.withColumn("TOT_" field, F.sum(field).over(windowval))
# finds the maximum value of a field within a specified window
def maxVal(df, field, windowval):
return df.withColumn("MAX_" field, F.max(field).over(windowval))
# finds the average value of a field within a specified window
def meanVal(df, field, windowval):
return df.withColumn("MEAN_" field, F.mean(field).over(windowval))
# finds the ratio between two fields of a record
def fracVal(df, numfield, denomfield, fracName):
return df.withColumn(fracName, F.col(numfield) / F.col(denomfield))
# adds required fields to the dataframe
def addFeatsTrain(vw_cl_lines_df, param_dict):
orig = vw_cl_lines_df
windowval = (Window.partitionBy(param_dict["groupField"]).orderBy(
param_dict["windowField"] "_unix").rangeBetween(
Window.unboundedPreceding, -1))
codes_df = orig.withColumn("NUM_LINES", F.sum(F.lit(1)).over(windowval))
for field in param_dict["isFields"]:
codes_df = isVal(codes_df, field[0], field[1])
for field in param_dict["sumFields"]:
codes_df = sumVal(codes_df, field, windowval)
for field in param_dict["maxFields"]:
codes_df = maxVal(codes_df, field, windowval)
for field in param_dict["meanFields"]:
codes_df = meanVal(codes_df, field, windowval)
for fracTuple in param_dict["fracTuples"]:
codes_df = fracVal(codes_df, fracTuple[0], fracTuple[1], fracTuple[2])
def remPref(x):
if x is None:
return ""
x = x.split(",")
y = []
for item in x:
if (('T' not in item) & ('M' not in item)):
y = ','.join(y)
return y
remPrefUDF = F.udf(remPref, StringType())
allCodes = {}
for code in param_dict["codeFields"]:
if len(code) == 1:
codes_df, toAdd = codeExtract(codes_df, code[0], 20)
if code[0] in allCodes:
allCodes[code[0]] = allCodes[code[0]] toAdd
allCodes[code[0]] = toAdd
codes_df, toAdd = codeExtract(
if code[0] in allCodes:
allCodes[code[0]] = allCodes[code[0]] toAdd
allCodes[code[0]] = toAdd
addedCols = list(set(codes_df.columns) - set(vw_cl_lines_df.columns))
return codes_df, addedCols, allCodes
def addFeatsTest(vw_cl_lines_df, param_dict, summary_df):
orig = vw_cl_lines_df
joinfields = [param_dict['groupField'], "NUM_LINES"]
for field in param_dict["sumFields"]:
joinfields.append("TOT_" field)
for field in param_dict["maxFields"]:
joinfields.append("MAX_" field)
for field in param_dict["meanFields"]:
joinfields.append("MEAN_" field)
for fracTuple in param_dict["fracTuples"]:
codes_df = orig.join(summary_df[joinfields], param_dict['groupField'],how='left')
for field in param_dict["isFields"]:
codes_df = isVal(codes_df, field[0], field[1])
def remPref(x):
if x is None:
return ""
x = x.split(",")
y = []
for item in x:
if (('T' not in item) & ('M' not in item)):
y = ','.join(y)
return y
remPrefUDF = F.udf(remPref, StringType())
allCodes = {}
for code in param_dict["codeFields"]:
presentInTrain = param_dict["allCodes"][code[0]]
if len(code) == 1:
codes_df, added = addCodes(codes_df, code[0], presentInTrain, None,
codes_df, added = addCodes(codes_df, code[0], presentInTrain,
optUDF=remPrefUDF, knownPref=code[1])
addedCols = list(set(codes_df.columns) - set(vw_cl_lines_df.columns))
return codes_df, addedCols
# prepares the data for use in a training or inference by adding features
# and appropriate labels
def prepTrainData(df, baseFeatures, param_dict):
trainData = loadDataset(df, param_dict["custSegOrder"], param_dict)
negCount = trainData.filter(trainData[param_dict["labelField"]] ==
posCount = trainData.filter(trainData[param_dict["labelField"]] ==
pos_weight = negCount/posCount
trainData, extraCols, param_dict["allCodes"] = addFeatsTrain(trainData,
vw_cl_lines_pd = trainData.toPandas()
prep_labelled_data_pd = pd.get_dummies(
featureCols = extraCols checkContain(baseFeatures,
param_dict["oneHots"] = [x for x in prep_labelled_data_pd.columns.tolist()
if "~~" in x]
leakageFeats = ["is_" str(x[0]) "_" str(x[1]) for x in
param_dict["isFields"] if x[0] == param_dict["labelField"]]
featureCols = [x for x in featureCols if x not in leakageFeats]
return prep_labelled_data_pd, featureCols, pos_weight, param_dict
def prepTestData(df, summary, baseFeatures, param_dict):
trainData = loadDataset(df, param_dict["custSegOrder"], param_dict)
trainData, extraCols = addFeatsTest(trainData, param_dict , summary)
vw_cl_lines_pd = trainData.toPandas()
prep_labelled_data_pd = addOneHotsTest(vw_cl_lines_pd,
featureCols = extraCols checkContain(baseFeatures,
leakageFeats = ["is_" str(x[0]) "_" str(x[1]) for x in
param_dict["isFields"] if x[0] == param_dict["labelField"]]
featureCols = [x for x in featureCols if x not in leakageFeats]
return prep_labelled_data_pd, featureCols
# trains and returns an XGBoost Classifier
def trainXGBModel(df, param_dict): # ,onlyWarn = False):
pdf, feats, ratio, param_dict = prepTrainData(df, param_dict["baseFeatures"], param_dict)
for col in param_dict["BASE_FEATURES_TIMESTAMP"]:
pdf[col] = pd.to_datetime(pdf[col], errors='coerce')
adf = pdf.replace([np.inf,-np.inf], 0)
cols = pdf[feats].columns
label = np.where(adf[param_dict["labelField"]] ==
param_dict["positiveLabel"], 1, 0)
x = adf[feats].values #returns a numpy array
standard_scaler = preprocessing.StandardScaler()
x_scaled = standard_scaler.fit_transform(x)
adf = pd.DataFrame(x_scaled, columns=adf[feats].columns)
adf['label'] = label
#X_train, y_train = adf[feats], adf['label']
xgb_model, bestThresh = run_xgboost(adf[feats ['label']], feats, scale_pos_weight= ratio)
param_dict["trainedCols"] = list(feats)
return xgb_model, feats, param_dict, bestThresh
def updateXGBModel(df, param_dict, model):
pandas_df, featureCols, pos_weight = prepTestData(
df, param_dict["baseFeatures"], param_dict)
pandas_df['label'] = np.where(pandas_df[param_dict["labelField"]] ==
param_dict["positiveLabel"], 1, 0)
pandas_df = pandas_df.fillna(0)
y_train = pandas_df['label'].values
X_train_pd = pandas_df.drop('label', 1)
if len(X_train_pd) > 100000 :
X = np.array_split(X_train_pd, 100000)
y = np.array_split(y_train, 100000)
for i in range(len(X)):
xgb_class = XGBClassifier(scale_pos_weight=pos_weight)
model = xgb_class.fit(X[i],y[i], xgb_model = model)
xgb_model = model
return xgb_model, featureCols, param_dict
# uses a model to predict values
def modelPredict(model, test_df, summary, param_dict, posThresh):
test_pdf, feats1 = prepTestData(test_df, summary, param_dict["baseFeatures"], param_dict)
for col in param_dict["BASE_FEATURES_TIMESTAMP"]:
test_pdf[col] = pd.to_datetime(test_pdf[col], errors='coerce')
test_adf = test_pdf.replace([np.inf,-np.inf], 0)
x = test_adf[feats1].values #returns a numpy array
standard_scaler = preprocessing.StandardScaler()
x_scaled = standard_scaler.fit_transform(x)
test_adf = pd.DataFrame(x_scaled, columns=test_adf[feats1].columns)
X_test = test_adf[param_dict["trainedCols"]]
result_proba = model.predict_proba(X_test)
result = []
result = (result_proba[:,1] >= posThresh).astype(int)
#result = model.predict(X_test)
return result, result_proba
https://wenku.baidu.com/view/529fc3a4a45177232e60a287.html https://zhuanlan.zhihu.com/p/345828553