介绍
随着越来越多的人呆在家里,让我们的信用卡代步,网上购物正在兴起。不幸的是,与这一趋势保持同步的是信用卡欺诈的增加。
这真的并不奇怪。据《福布斯》报道,过去几年,在线欺诈问题日益严重。而现在,随着消费者和企业适应全球大流行并在无卡 (CNP) 领域进行更多的信用卡交易,在线购物和电子商务的兴起为欺诈者开辟了更大的游乐场尝试新花样。
欺诈检测一直是金融服务和机构面临的主要问题。但人工智能在减少金融欺诈方面具有巨大潜力。人工智能应用程序具有检测和防止欺诈的巨大潜力。
因此,我们将开始一系列文章讨论这一点以及我们如何使用 Cloudera 机制来实施整个信用卡欺诈检测解决方案。但首先,让我们从实现它的简单方法开始:
把事情简单化
在这个 MVP 上,让我们首先使用 Apache NiFi 从公共 API 摄取和转换模拟数据,将该数据转换为我们的欺诈检测算法预期格式的数据,将该数据放入 Apache Kafka 主题,并使用 Apache Flink 的 SQL控制台来处理一个简单的欺诈检测算法。所有这一切都将在可扩展性方面变得更好,因此锦上添花的是将数据转换摄取流转换为带有 Kubernetes 的 Cloudera 数据流服务。
CDF(Cloudera 数据流)和 CSA Cloudera Streaming Analytics 中提供了所有注释组件:
CLOUDERA 动态数据平台
先决条件
我们将使用带有 CDF 和 CSA Data Hub的 CDP 公共云(大家在CDP Base中也一样进行):
Data Hub:7.2.14 -使用 Apache NiFi、Apache NiFi Registry 的轻型流量管理
Data Hub:7.2.14 - Streams Messaging Light Duty:Apache Kafka、Schema Registry、Streams Messaging Manager、Streams Replication Manager、Cruise Control
Data Hub:7.2.14 -使用 Apache Flink 进行轻型流分析
数据摄取
让我们开始在 NiFi 中获取我们的数据。使用 InvokeHTTP Processor,我们可以从randomuser API 收集所有数据。
对https://randomuser.me/api/?nat=br的简单调用将返回如下内容:
代码语言:javascript复制 {
"results": [
{
"gender": "female",
"name": {
"title": "Miss",
"first": "Shirlei",
"last": "Freitas"
},
"location": {
"street": {
"number": 6133,
"name": "Rua Santa Luzia "
},
"city": "Belford Roxo",
"state": "Amapá",
"country": "Brazil",
"postcode": 88042,
"coordinates": {
"latitude": "78.0376",
"longitude": "74.2175"
},
"timezone": {
"offset": " 11:00",
"description": "Magadan, Solomon Islands, New Caledonia"
}
},
"email": "shirlei.freitas@example.com",
"login": {
"uuid": "d73f9a11-d61c-424d-8309-51d6d8e83a73",
"username": "organicfrog175",
"password": "1030",
"salt": "yhVkrYWm",
"md5": "2bf9beb695c663a0a83aa060f27629c0",
"sha1": "f4dfdef9f2d2a9d04a0622636d0851b5d000164a",
"sha256": "e0a96117182914b3fa7fef22829f6692607bd58eb012b8fee763e34b21acf043"
},
"dob": {
"date": "1991-09-06T08:31:08.082Z",
"age": 31
},
"registered": {
"date": "2009-06-26T00:02:49.893Z",
"age": 13
},
"phone": "(59) 5164-1997",
"cell": "(44) 4566-5655",
"id": {
"name": "",
"value": null
},
"picture": {
"large": "https://randomuser.me/api/portraits/women/82.jpg",
"medium": "https://randomuser.me/api/portraits/med/women/82.jpg",
"thumbnail": "https://randomuser.me/api/portraits/thumb/women/82.jpg"
},
"nat": "BR"
}
],
"info": {
"seed": "fad8d9259d3f2b0b",
"results": 1,
"page": 1,
"version": "1.3"
}
}
使用 JoltTransformJSON 处理器,我们可以轻松地将之前的 Json 转换为我们的 JSON 结构:
我们将使用JOLT转换来清理和调整我们的数据:
代码语言:javascript复制[
{
"operation": "shift",
"spec": {
"results": {
"*": {
"login": { "username": "customer_id", "uuid": "account_number" },
"name": { "first": "name", "last": "lastname" },
"email": "email",
"gender": "gender",
"location": {
"street": { "number": "charge_amount" },
"country": "country",
"state": "state",
"city": "city",
"coordinates": {
"latitude": "lat",
"longitude": "lon"
}
},
"picture": { "large": "image" }
}
}
}
},
{
"operation": "default",
"spec": {
"center_inferred_lat": -5.0000,
"center_inferred_lon": -5.0000,
"max_inferred_distance": 0.0,
"max_inferred_amount": 0.0
}
},
{
"operation": "modify-overwrite-beta",
"spec": {
"lat": "=toDouble",
"lon": "=toDouble"
}
}
]
我们的输出转换数据将是:
代码语言:javascript复制Result:
{
"customer_id" : "organicfrog175",
"account_number" : "d73f9a11-d61c-424d-8309-51d6d8e83a73",
"name" : "Shirlei",
"lastname" : "Freitas",
"email" : "shirlei.freitas@example.com",
"gender" : "female",
"charge_amount" : 6133,
"country" : "Brazil",
"state" : "Amapá",
"city" : "Belford Roxo",
"lat" : 78.0376,
"lon" : 74.2175,
"image" : "https://randomuser.me/api/portraits/women/82.jpg",
"max_inferred_distance" : 0.0,
"center_inferred_lat" : -5.0,
"center_inferred_lon" : -5.0,
"max_inferred_amount" : 0.0
}
现在,我们可以使用UpdateRecord 处理器来改进它并在某些字段中获取一些随机数,因此,使用PublishKafka2RecordCDP处理器将我们的 JSON 数据放入 Kafka。
更新记录处理器
PublishKafka2RecordCDP处理器
(重要的是要注意必须根据 Kafka 集群端点填充的 Kafka 代理变量。)
最后,我们的 NiFi 流程将是这样的:
数据缓冲
在 Kafka 集群上,我们只需点击 SMM(流消息管理器)组件中的“添加新”按钮即可创建一个新的 Kafka 主题:我已经创建了 skilltransactions 作为示例。
一旦我们已经创建了 NiFi 流和 Kafka 主题,就可以打开您的流并查看我们的数据进入我们的 Kafka 主题。 您还可以查看数据资源管理器图标
查看到目前为止所有摄取的数据。
流式 SQL 分析
Apache Flink是由Apache 软件基金会开发的开源、统一的流处理和批处理框架。Flink 提供了一个高吞吐量、低延迟的流媒体引擎,并支持事件时间处理和状态管理。
Flink 的 Table API 是一种用于关系流和批处理的类 SQL 表达式语言,可以嵌入到 Flink 的 Java 和 Scala DataSet 和 DataStream API 中。表 API 和 SQL 接口对关系表抽象进行操作。可以从外部数据源或现有数据流和数据集中创建表。
Cloudera 开发了一个名为 Cloudera SQL Stream Builder 的应用程序,它可以映射我们的 Kafka Topic,并通过 Flink 的 Table API 将所有数据查询为一个表。
我们将在 SSB 上的表连接器上轻松创建我们的“虚拟表”映射:
创建这个“虚拟表”后,我们可以使用 SQL 对使用 power、sin 和 radians SQL 函数进行的交易进行了多远的数学计算:
代码语言:javascript复制select account_number, charge_amount,
2 * 3961 * asin(sqrt(
power(
power((sin(radians((lat - center_inferred_lat) / 2))) , 2)
cos(radians(center_inferred_lat)) * cos(radians(lat))
* (sin(radians((lon - center_inferred_lon) / 2)))
, 2))) as distance, max_inferred_distance, max_inferred_amount
from `skilltransactions`
WHERE
2 * 3961 * asin(sqrt(
power(
power((sin(radians((lat - center_inferred_lat) / 2))) , 2)
cos(radians(center_inferred_lat)) * cos(radians(lat))
* (sin(radians((lon - center_inferred_lon) / 2)))
, 2))) > max_inferred_distance
要查看有关此查询的更多详细信息,请访问我们 Cloudera 社区上 @sunile_manjee撰写的这篇精彩文章。
我们还可以创建我们的函数,然后调用它或查询。
例如,让我们创建一个 DISTANCE_BETWEEN函数并在我们的最终查询中使用它。
最终查询
代码语言:javascript复制select account_number, charge_amount, DISTANCE_BETWEEN(lat, lon, center_inferred_lat, center_inferred_lon) as distance, max_inferred_distance, max_inferred_amount
from `skilltransactions`
WHERE DISTANCE_BETWEEN(lat, lon, center_inferred_lat, center_inferred_lon) > max_inferred_distance
OR charge_amount > max_inferred_amount
此时我们的查询应该可以实时检测到可疑交易,可以报警了。