互联网时代,电子支付方式日益便捷,而信用卡欺诈手段也在不断向高科技、专业化、规模化发展,案件实施过程隐蔽,更易造成巨大损失。诈骗者通常会先入侵安全级别较低的系统来盗窃卡号,用盗得的信用卡进行小额消费测试,如果测试成功,则会用此信用卡进行大笔消费,购买倒卖财物,进而达到诈骗敛财的目的。
目前,国内信用卡市场面临的风险形式严峻,大部分银行对此都专门设立了针对信用卡诈骗的反欺诈检测系统,通过对诈骗模式进行识别,及时通知用户或者直接冻结账户,避免进一步损失。
本文将介绍通过少于100行代码来修改实例程序,实现基于Oracle上账户表变更的实时欺诈检测。(文末附下载链接)
1
传统实时欺诈检测方案优缺点分析
Flink官网上的欺诈检测示例程序会检测每一笔交易,若发现一个帐户在1分钟内,先出现了一笔小交易(小于1),后面又出现了一笔大交易(大于500),则认为出现了欺诈交易,立即输出警告。具体的代码解析可以阅读Flink官方文档中的《基于 DataStream API 实现欺诈检测》。
但是,示例程序中的数据来源TransactionSource的数据来源是一个静态数组private static Listdata = Arrays.asList(new Transaction(1L, 0L, 188.23D)...的迭代器。一般情况下,客户的余额是存储在Oracle的账户表中的。
怎么将客户余额的变化输出到Flink中,来实现实时的欺诈检测列?能想到的方案列举如下:
- 方案1:轮询从Oracle账户表查询余额变更 应用程序按照固定时间间隔去轮询Oracle账户表的数据,检查到某个客户的账户余额发生了变化后,通知Flink进行欺诈检测。这种方案需要不断轮询Oracle数据库,对有数据库性能影响,并且就算轮询的间隔足够短,还是有可能漏掉了一些账户变更信息,不可取。
- 方案2:业务代码修改Oracle账户表时通知Flink 修改交易程序,在它去更新Oracle账户表时,通知Flink进行欺诈检测。这种方案的优势在于不会丢掉任何账户变更的事件;但是需要修改交易程序,会导致业务程序耦合度提升。实现上如果采用同步模式,可能会由于Flink失败而导致交易失败,也会大大提高交易持续时间;而采用异步方式,需要考虑通知Flink和写入账户表的原子性,有可能成功通知了Flink但是写入账户表失败了,也有可能写入账户表成功了,却没有通知到Flink。
- 方案3:利用logminer抽取账户表变更 Oracle提供logminer来将数据库日志反解析成变更SQL,这样就可以将Oracle账户表更新的信息抽取出来,通知Flink进行欺诈检测。这种方案的优点在于直接基于Oracle数据表的修改来做增量的同步(oracle日志中记录账户表修改并提交了,说明客户修改账户是成功的,不用担心Flink通知了,账户表反而写失败了),降低了业务的耦合度,也不会担心丢失了账户变更事件;但是logminer每次只能挖掘一整个日志的变化,没法断点续传,并且挖掘的数据也只能写入alert.log,会污染错误日志。缺陷较大。
上述三个方案都有一定的缺陷和问题,要么可能会漏掉部分变更数据,要么可能影响oracle性能。logminer相对来说是避免漏数据,对数据库性能影响最小的方案,是否有一个类似于logminer而且支持断点续传,对Flink又比较友好的方案?
- 方案4 利用OGG抽取账户表变更 Oracle公司的OGG和国内部分厂商基本能避免logminer的缺点,但是需要在Oracle服务器上安装客户端,有侵入,并且配置和使用比较复杂,价格上也不是很友好。
最后我们找到了一个轻量、免费日志解析工具QDecoder来替代OGG,实现将oracle账户表的变更通知到Flink,实现欺诈检测的方法。
QDecoder是沃趣科技自主研发,基于Oracle redo日志进行二进制解析的订阅同步工具,易集成、零侵入、高性能、全免费。目前,QDecoder已经在多家证券和银行上线使用,稳定运行,得到诸多客户的肯定与认可。
2
Oracle源端增量输出
期待实现的实时欺诈检测的架构:
如图:
- QDecoder实时获取Oracle的redo log,将账户表数据变更解析出来,写入kafka的指定topic中。
- Flink程序(FraudDetection)使用flink-connector-kafka从kafka获取交易数据,进行流式计算,识别出可能的欺诈交易,并输出警告。
2.1 安装QDecoder
为了从Oracle的日志中挖掘出account表的变更数据,我们需要安装QDecoder。QDecoder的安装也非常简单,并且它跟其他的数据库同步软件不一样,它默认是不需要到Oracle服务器上去部署客户端的,只需要给定ASM账号就可以通过网络连接到Oracle上取日志。
一键安装命令如下:
代码语言:javascript复制docker run -it --name=qdecoder -p 9191:9191 -p 9092:9092 --pull always registry.cn-hangzhou.aliyuncs.com/woqutech/qdecoder
根据提示配置QDecoder,更多信息可参考
https://hub.docker.com/r/woqutech/qdecoder
QDecoder安装视频详解
以下配置需要特别注意:
- 配置项1.1:列出的sql,请以dba权限在oracle中执行,这将配置QDecoder查询系统表需要的权限。
- 配置项2.1: 输入将要检测的表:qdecoder.account。简单起见,我们这里把account示例表新建在qdecoder账户下了。
conn qdecoder/qdecoder;
create table account(accountid int primary key, balance number);
- 配置项3.1: 选择输出到kafka, bootstrap.servers可以不输入,直接在容器中启动kafka,qdecoder会将变更数据写入“defaultapp.qdecoder.binlog.qdecoder”的topic。生产环境下如果有kafka集群,请输入集群连接的地址和账号。
配置示例:
2.2 更新账户表观察QDecoder的输出
等QDecoder成功启动后,可以按照提示运行binlogdumpK,从kafka读取binlog并打印出来。由于account表并没有更新,此时没有变更数据输出。
更新account表:
代码语言:javascript复制insert into account values(1,10000);
insert into account values(2,20000);
insert into account values(3,30000);
commit;
binlogdumpK输出的binlog如下图:
上图中schemaName: "qdecoder",tableName: "account", eventType: INSERT表示这里是对qdecoder.account表的INSERT操作。其实它对应的sql就是insert into qdecoder.account(accountid,balance) values(1,10000)。
- Oracle的日志解析出来是遵循阿里巴巴canal的protobuf格式,这个是关系型数据库增量输出的标准格式
- binlogdumpK只是为了观察一下QDecoder的输出,你可以随时关掉它,这并不影响QDecoder和Flink程序的运行。
至此,我们已经利用QDecoder从Oracle的日志解析出账户表的数据变更,那么,怎么将这些输出作为Flink现有欺诈检测的输入源列呢?
3
实现Kafka Consumer对接Oracle源增量数据
Flink示例程序是利用addSource(new TransactionSource())来将静态数组作为源加入流处理的
代码语言:javascript复制 DataStreamtransactions = env
.addSource(new TransactionSource())
.name("transactions");
我们要修改为从kafka中取日志,可以利用DataStream Connectors配置一个Kafka Consumer。按照Flink自带的示例 中的DataStreamstream = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));添加数据源。
- "topic"应该修改为"defaultapp.qdecoder.binlog.qdecoder",这是QDecoder插入kafka的topic名称
- properties修改为你连接的kafka的地址,我们这里使用的是QDecoder自带的kafka,修改为127.0.0.1:9092
- SimpleStringSchema()是直接以string的方式输出出来,QDecoder的输出是protobuf的,不能直接用string的方式输出,而需要解析出来转换成欺诈检测认识的Transaction
3.1 binlog转换成Transaction实现
为了能完全重用Flink示例程序中的代码,我们这里需要
- 使用com.alibaba.otter.canal.protocol反序列化QDecoder插入到kafka的binlog日志
- 将binlog日志中的账户表变更转换成Transaction对象。
我们实现一个BinlogTransactionSchema类反序列化binlog, 计算balance的变化,生成org.apache.flink.walkthrough.common.entity.Transaction对象。
主要代码如下:
代码语言:javascript复制// 反序列化Entry
CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(binlog);
// 获取表名
entry.getHeader().getTableName();
// 获取Entry type: ROWDATA|TRANSACTIONBEGIN|TRANSACTIONEND|...
entry.getEntryType();
// 获取row change
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
// 获取Event type: INSERT|UPDATE|DELTE|...
rowChange.getEventType();
// 获取执行时间
long executeTimeMs = entry.getHeader().getExecuteTime();
// 获取row data
CanalEntry.RowData rowData = rowChange.getRowDatas(0);
// 获取旧值
for (CanalEntry.Column col : rowData.getBeforeColumnsList()) {
if (col.getName().equalsIgnoreCase("accountid")) {
oldRow.accountId = Long.parseLong(col.getValue());
} else if (col.getName().equalsIgnoreCase("balance")) {
oldRow.balance = Double.parseDouble(col.getValue());
}
}
// 获取新值
for (CanalEntry.Column col : rowData.getAfterColumnsList()) {
if (col.getName().equalsIgnoreCase("accountid")) {
newRow.accountId = Long.parseLong(col.getValue());
} else if (col.getName().equalsIgnoreCase("balance")) {
newRow.balance = Double.parseDouble(col.getValue());
}
}
// 创建transaction
new Transaction(newRow.accountId, executeTimeMs, Math.abs(newRow.balance-oldRow.balance));
3.2 将数据源加入Flink
QDecoder输出的增量数据转换后,可以直接作为数据源形成DataStream,修改后代码如下:
代码语言:javascript复制public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建FlinkKafkaConsumer,用BinlogTransactionSchema反序列化
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("group.id", "flink.test");
// 指定"defaultapp.qdecoder.binlog.qdecoder"的topic,使用BinlogTransactionSchema来将QDecoder账户表的更新转换成Transaction
FlinkKafkaConsumer<Transaction> kafkaSource = new FlinkKafkaConsumer<Transaction>("defaultapp.qdecoder.binlog.qdecoder", new BinlogTransactionSchema(), properties);
kafkaSource.setStartFromEarliest();
DataStream<Transaction> transactions = env
.addSource(kafkaSource)
.name("transactions");
...
}
3.3 验证运行
修改好的应用程序我们已经上传到github上供下载,测试运行步骤如下:
-程序下载
代码语言:javascript复制git clone https://github.com/woqutech/qdecoder.git
cd qdecoder/FlinkSample/frauddetection
-程序运行
frauddetection是一个maven创建的项目,有pom.xml项目文件,可以导入各种IDE,进行调试和运行。这里只介绍大家常用的intellij IDEA运行验证方法
- 打开项目 开始界面:open or import -> 选择frauddetection目录 或者 菜单:file/open -> 选择frauddetection目录
- 运行程序 菜单: run -> run 'FraudDetectionJob'
注意:如果报告slf4j重复,且有大量的log输出,请在module/dependencies中删除ch.qos.logback:logback-classic和ch.qos.logback:logback-core。
-欺诈检测验证
QDecoder和frauddetection正常运行以后,我们就可以更新account.balance,模拟交易,来观察frauddetection程序是否能进行欺诈检测了。
- 在Oracle上执行以下SQL
update account set balance = balance - 0.1 where accountid = 1;
commit;
update account set balance = balance - 0.2 where accountid = 1;
commit;
update account set balance = balance 100 where accountid = 2;
commit;
update account set balance = balance - 501 where accountid = 1;
commit;
update account set balance = balance - 200 where accountid = 2;
commit;
这里我们模拟了两个账号account=1/2的交易。其中account=2先入账100元,然后扣了200元是正常交易,不满足欺诈检测条件。account=1的账户余额先扣了0.1元,然后再扣了0.2元,最后直接扣了501元,满足欺诈检测的条件:在一分钟内(private static final long ONE_MINUTE = 60 * 1000;)先做小于1元(private static final double SMALL_AMOUNT = 1.00)的小额交易,然后再做500以上的大额交易(private static final double LARGE_AMOUNT = 500.00;)。
- 检查欺诈交易是否被正确识别
执行完上述SQL,frauddetection程序会立即输出:
代码语言:javascript复制21:11:20,107 INFO org.apache.flink.walkthrough.common.sink.AlertSink [] - Alert{id=1}
表示accountid=1的帐号检测到欺诈交易。
至此,我们的frauddetection欺诈检测程序就修改完成了。
4
欺诈检测程序总结
- 利用QDecoder实现了Oracle数据库的日志订阅增量数据导出
- 利用alibaba canal的protobuf来解析Oracle增量变化
- 实现了不到100行代码的BinlogTransactionSchema类,用于将解析数据转换为欺诈检测识别的数据
- 利用Flink的DataStream Connectors从kafka取出增量变化数据
- 利用了Flink实现增量流数据的有状态计算分布式处理,实现欺诈检测
我们利用QDecoder和Flink写了少于100行的代码,实现了一个简化版的银行信用卡欺诈检测程序。整体来说,QDecoder轻量、简单,对oracle侵入少,同步延迟低,与Flink结合很容易将oracle的增量变化同步给大数据、机器学习等分析运营系统中。
同时,相关技术可以应用于银行、保险等金融行业和电力能源、智慧城市等应用场景,这些行业与场景存在大量Oracle的数据库系统,将其中的业务数据同步出来,盘活数据资产,能够进一步提升业务价值,改善用户体验,助力数字经济社会新发展。