使用Apache NiFi 2.0.0构建Python处理器

2024-04-26 10:35:04 浏览数 (1)

Apache NiFi 最新版本中内置的 Python 处理器可以简化数据处理任务,增强灵活性并加快开发速度。

译自 Apache NiFi 2.0.0: Building Python Processors,作者 Robert Kimani。

Apache NiFi 是一个专门用于数据流管理的强大平台,它提供了许多旨在提高数据处理效率和灵活性的功能。其基于 Web 的用户界面为设计、控制和监控数据流提供了无缝体验。

NiFi 支持构建自定义处理器和扩展,使用户能够根据自己的特定需求定制平台。

凭借多租户用户体验,NiFi 确保多个用户可以同时与系统交互,每个用户都有自己的一组访问权限。

Python 处理器提供了一种强大的方式来扩展 NiFi 的功能,使用户能够在数据流中利用丰富的 Python 库和工具生态系统。在这里,我们将讨论将 Python 纳入 NiFi 工作流的优势,并探讨 Python 处理器可以简化数据处理任务、增强灵活性和加速开发的实际用例。

无论您是想集成机器学习算法、执行自定义数据转换还是与外部系统交互,在 Apache NiFi 中构建 Python 处理器都可以帮助您满足这些数据集成需求。

Apache NiFi 有什么用?

NiFi 的一个突出特点是其高度可配置的特性,允许用户根据其特定要求定制数据路由、转换和系统中介逻辑。NiFi 帮助用户实现他们想要的数据处理结果,例如优先考虑容错性而不是保证交付,或者针对低延迟而不是高吞吐量进行优化。

动态优先级确定允许实时调整流中的数据优先级,而运行时修改流的能力为适应不断变化的需求增加了一层灵活性。NiFi 还结合了反压机制来调节数据流速并防止过载,确保即使在不同的工作负载下也能平稳高效地运行。

NiFi 被设计为支持垂直和水平扩展。无论是扩展以利用单台机器的全部功能,还是使用零领导者集群模型进行扩展,NiFi 都可以适应任何规模的数据处理任务。

数据来源是另一个关键特性,它允许用户跟踪数据从其开始到最终目的地的旅程。这为审计、故障排除和确保整个过程中的数据完整性提供了宝贵的见解。

安全性在 NiFi 中至关重要,它支持 SSL、SSH、HTTPS 和加密内容以及其他安全措施。可插拔的细粒度基于角色的身份验证和授权机制确保对数据流的访问受到仔细控制,允许多个团队安全地管理和共享流的特定部分。

NiFi 的设计理念受到基于流的编程和分阶段事件驱动架构等概念的启发,提供了几个引人注目的优势:

  • 直观的可视化界面,用于设计和管理数据流,提高生产力和易用性。
  • 异步处理模型,支持高吞吐量和自然缓冲,以适应波动的负载。
  • 内置并发管理,抽象了多线程编程的复杂性。
  • 强调组件的可重用性和可测试性,促进模块化和稳健的设计方法。
  • 本机支持反压和错误处理,确保数据处理管道中的稳健性和可靠性。
  • 全面了解数据流动态,实现有效的监控和故障排除。

为什么在 Apache NiFi 中使用 Python 构建?

Apache NiFi 是一个用于数据摄取、转换和路由的强大工具。NiFi 中的 Python 处理器提供了一种灵活的方式来扩展其功能,特别是对于处理非结构化数据或与外部系统(如 AI 模型或云原生向量数据库 Milvus 等向量存储)集成。

在处理 Cloudera Data Flow 等工具可提取的非结构化文件类型时,Python 处理器对于实现解析和操作数据的自定义逻辑而言至关重要。例如,你可以使用 Python 从文本文件中提取特定信息,对文本数据执行情感分析或者在进行进一步分析之前对图像进行预处理。

另一方面,结构化文件类型通常可以使用 NiFi 的内置处理器进行处理,而无需自定义 Python 代码。NiFi 提供了广泛的处理器,用于处理 CSV、JSON、Avro 等结构化数据格式,以及用于与数据库、API 和其他企业系统进行交互。

当你需要与 AI 模型或 Milvus 等其他外部系统进行交互时,Python 处理器提供了一种便捷的方式,可以将此功能集成到你的 NiFi 数据流中。对于文本到文本、文本到图像或文本到语音处理等任务,你可以编写 Python 代码与相关模型或服务进行交互,并将此处理合并到你的 NiFi 管道中。

Python:NiFi 2.0.0 中的新时代

Apache NiFi 2.0.0 对该平台进行了一些重大改进,尤其是在 Python 集成和性能增强方面。将 Python 脚本无缝集成到 NiFi 数据流中的能力为使用各种数据源和利用生成式 AI 的强大功能开辟了广泛的可能性。

在此版本之前,虽然可以在 NiFi 中使用 Python,但灵活性可能受到限制,并且执行 Python 脚本可能不像用户希望的那样精简。然而,使用最新版本,Python 集成得到了极大改善,允许在 NiFi 管道中更无缝地执行 Python 代码。

此外,对 JDK 21 的支持带来了性能改进,使 NiFi 更快、更高效,尤其是在处理多线程任务时。这可以显著提高 NiFi 数据流的可扩展性和响应能力,尤其是在处理大量数据或复杂处理任务时。

引入诸如将进程组作为无状态运行和规则引擎用于开发辅助等功能进一步增强了 NiFi 的功能和可用性,为开发人员提供了更多灵活性和工具来构建强大的数据流管道。

一个示例处理器:Watson SDK 到基础 AI 模型

此 Python 代码定义了一个名为的 NiFi 处理器,它与 IBM WatsonX AI 服务进行交互,以根据输入提示生成响应。请注意,对于 NiFi 2.0.0,Python3.10 是最低要求。

下面我们分解一下代码,并解释各个部分。

导入

代码语言:javascript复制
import json
import re
from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult
from nifiapi.properties import PropertyDescriptor, StandardValidators, ExpressionLanguageScope

以下是脚本的必要导入:

  • json 和 re 分别是 Python 的用于分别处理 JSON 数据和正则表达式的内置模块。
  • FlowFileTransform 和 FlowFileTransformResult 是与 NiFi 处理相关的自定义模块 (nifiapi.flowfiletransform) 的类。
  • PropertyDescriptor、StandardValidators 和 ExpressionLanguageScope 是用于定义处理器属性的另一个自定义模块 (nifiapi.properties) 的类。

类定义

代码语言:javascript复制
class CallWatsonXAI(FlowFileTransform):
    ...
  • 这将定义一个名为 CallWatsonXAI 的类,它扩展 了FlowFileTransform 类,该类处理 NiFi 中的数据转换。

处理器详细信息

代码语言:javascript复制
processor_details = {
    'name': 'Call WatsonX AI',
    'version': '2.0.0-M2',
    'description': 'Calls IBM WatsonX AI service to generate responses based on input prompts.',
    'tags': ['watsonx', 'ai', 'response', 'generation'],
}
  • 定义处理器的详细信息,例如版本、描述和标记。但请注意,2.0.0-M2 是当前版本。

属性描述符

代码语言:javascript复制
PROMPT_TEXT = PropertyDescriptor(
    name="Prompt Text",
    description="Specifies whether or not the text (including full prompt with 
context) to send",
    required=True,
    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
    
expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBU
TES
)
  • 定义可以为该处理器设置的特性。在这种情况下,有 PROMPT_TEXT、WATSONXAI_API_KEY 和 WATSONXAI_PROJECT_ID。

构造函数

代码语言:javascript复制
def __init__(self, **kwargs):
    super().__init__()
    self.property_descriptors.append(self.PROMPT_TEXT)
    self.property_descriptors.append(self.WATSONXAI_API_KEY)
    self.property_descriptors.append(self.WATSONXAI_PROJECT_ID)
  • 初始化处理器类并将属性描述符附加到属性列表中。

getPropertyDescriptors 方法

代码语言:javascript复制
def get_property_descriptors(self):
    return self.property_descriptors
  • 该方法由 NiFi 处理器要求,用于获取属性列表。

transform 方法

代码语言:javascript复制
def transform(self, context, flowfile):
    ...
  • 该方法负责处理数据。方法接收包含关于处理器执行环境的信息的上下文对象和包含将处理的数据的流文件对象。

IBM WatsonX 集成

代码语言:javascript复制
from ibm_watson_machine_learning.foundation_models.utils.enums import 
ModelTypes
from ibm_watson_machine_learning.foundation_models import Model
  • 导入 IBM Watson 机器学习模块。
代码语言:javascript复制
prompt_text = 
context.getProperty(self.PROMPT_TEXT).evaluateAttributeExpressions(flowfil
e).getValue()
watsonx_api_key = 
context.getProperty(self.WATSONXAI_API_KEY).evaluateAttributeExpressions(
flowfile).getValue()
project_id = 
context.getProperty(self.WATSONXAI_PROJECT_ID).evaluateAttributeExpres
sions(flowfile).getValue()

通过 NiFi 处理器属性获取输入值,例如提示文本、WatsonX API 密钥和项目 ID。

代码语言:javascript复制
model_id = ModelTypes.LLAMA_2_70B_CHAT
gen_parms = None
project_id = project_id
space_id = None
verify = False

model = Model(model_id, my_credentials, gen_parms, project_id, space_id, verify)
gen_parms_override = None
generated_response = model.generate(prompt_text, gen_parms_override)
  • 配置并调用 IBM WatsonX 模块来根据提示文本生成响应。

输出处理

代码语言:javascript复制
attributes = {"mime.type": "application/json"}
output_contents = json.dumps(generated_response)
  • 定义输出属性,将生成的响应转换为 JSON 格式。

日志记录和返回

代码语言:javascript复制
self.logger.debug(f"Prompt: {prompt_text}")
  • 记录提示文本。
代码语言:javascript复制
return FlowFileTransformResult(relationship="success", 
contents=output_contents, attributes=attributes)

返回转换结果,并指示转换是否成功并提供输出数据和属性。

预打包的 Python 处理器

NiFi 2.0.0 附带了一组多样化的 Python 处理器,它们提供了广泛的功能。

  • Pinecone 的 VectorDB 接口:此处理器促进了与 Pinecone(一种矢量数据库服务)的交互,使用户能够高效地查询和存储数据。
  • ChunkDocument:此处理器将大型文档分解为较小的块,使其适合于处理和存储,尤其是在可能应用大小限制的矢量数据库中。
  • ParseDocument:此处理器似乎非常通用,能够解析各种文档格式,如 Markdown、PowerPoint、Google Docs 和 Excel,提取文本内容以供进一步处理或存储。
  • ConvertCSVtoExcel:顾名思义,此处理器将数据从 CSV 格式转换为 Excel 格式,为数据交换和处理提供了灵活性。
  • DetectObjectInImage:此处理器似乎利用深度学习技术进行 图像中的对象检测,使用户能够分析图像数据并提取有价值的见解。
  • PromptChatGPT:此处理器听起来很有趣——它与 ChatGPT 或类似的会话式 AI 模型集成,使用户能够根据提示生成响应或参与对话。
  • PutChroma 和 QueryChroma:这些处理器与 Chroma(一种针对大型语言模型 (LLM) 的开源数据库)相关。它们促进了 Chroma 数据库或类似系统中的数据存储(PutChroma)和检索/查询(QueryChroma)。

结论

在 Apache NiFi 中优先考虑 Python 集成标志着弥合数据工程师和数据科学家之间差距的一个重要里程碑,同时扩展了该平台的多功能性和适用性。

通过使 Python 爱好者能够在 Python 中无缝开发 NiFi 组件,开发周期得到简化,从而加速了数据管道和工作流的实施。

对于 NiFi 中的 Python 处理器来说,这是一个激动人心的时刻,为生态系统做出贡献可能非常有价值。开发和共享 Python 处理器可以扩展 NiFi 的功能,并解决特定用例。

要开始使用 NiFi,用户可以参考快速入门指南进行开发,并参考 NiFi 开发人员指南以获取有关如何为该项目做出贡献的更全面信息。

0 人点赞