在本指南中,我们将深入探讨构建强大的数据管道,用 Kafka 进行数据流处理、Spark 进行处理、Airflow 进行编排、Docker 进行容器化、S3 进行存储,Python 作为主要脚本语言。
为了说明这个过程,我们将使用 Random Name API,这是一个多功能工具,每次触发都会生成新的随机数据。它提供了许多企业日常处理实时数据的实用表示。我们第一步涉及一个 Python 脚本,该脚本经过精心设计,用于从该 API 获取数据。为了模拟数据的流式传输性质,我们将定期执行此脚本。这个脚本还将充当我们与 Kafka 的桥梁,将获取的数据直接写入 Kafka 主题。
随着我们的深入,Airflow 的有向无环图 (DAG) 发挥着关键作用。Airflow DAG 脚本编排我们的流程,确保我们的 Python 脚本像时钟一样运行,持续流式传输数据并将其输入到我们的管道中。一旦我们的数据到达 Kafka producer,Spark Structured Streaming 就会接过接力棒。使用这些数据,对其进行处理,然后将修改后的数据无缝写入 S3,确保其为后续分析过程做好准备。
项目的一个重要方面是其模块化架构。得益于 Docker 容器,每个服务,无论是 Kafka、Spark 还是 Airflow,都在隔离的环境中运行。不仅确保了平滑的互操作性,还简化了可扩展性和调试。
入门:先决条件和设置
对于这个项目,我们利用GitHub存储库来托管我们的整个设置,使任何人都可以轻松开始。
A、Docker:Docker 将成为我们编排和运行各种服务的主要工具。
- 安装:访问 Docker 官方网站,下载并安装适合您操作系统的 Docker Desktop。
- 验证:打开终端或命令提示符并执行 docker --version 以确保安装成功。
B、S3:AWS S3 是我们数据存储的首选。
- 设置:登录 AWS 管理控制台,导航到 S3 服务,然后建立一个新存储桶,确保根据您的数据存储首选项对其进行配置。
C、设置项目:
- 克隆存储库:首先,您需要使用以下命令从 GitHub 存储库克隆项目:
git clone <https://github.com/simardeep1792/Data-Engineering-Streaming-Project.git>
- 导航到项目目录:
cd Data-Engineering-Streaming-Project
代码语言:javascript复制使用以下方式部署服务docker-compose:在项目目录中,您将找到一个
docker-compose.yml文件。该文件描述了所有服务。
docker network create docker_streaming
docker-compose -f docker-compose.yml up -d
该命令协调 Docker 容器中所有必要服务的启动,例如 Kafka、Spark、Airflow 等。
分解项目文件
1、docker-compose.yml
代码语言:javascript复制version: '3.7'
services:
# Airflow PostgreSQL Database
airflow_db:
image: postgres:16.0
environment:
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
- POSTGRES_DB=${POSTGRES_DB}
logging:
options:
max-size: 10m
max-file: "3"
# Apache Airflow Webserver
airflow_webserver:
command: bash -c "airflow db init && airflow webserver && airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin"
image: apache/airflow:latest
restart: always
depends_on:
- airflow_db
environment:
- LOAD_EX=${LOAD_EX}
- EXECUTOR=${EXECUTOR}
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql psycopg2://${POSTGRES_USER}:${POSTGRES_PASSWORD}@airflow_db:5432/${POSTGRES_DB}
logging:
options:
max-size: 10m
max-file: "3"
volumes:
- ./dags:/opt/airflow/dags
- ./requirements.txt:/opt/airflow/requirements.txt
ports:
- "8080:8080"
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
# Zookeeper for Kafka
kafka_zookeeper:
image: confluentinc/cp-zookeeper:latest
ports:
- "2181:2181"
environment:
- ZOOKEEPER_CLIENT_PORT=${ZOOKEEPER_CLIENT_PORT}
- ZOOKEEPER_SERVER_ID=${ZOOKEEPER_SERVER_ID}
- ZOOKEEPER_SERVERS=kafka_zookeeper:2888:3888
networks:
- kafka_network
- default
# Kafka Broker Instances
kafka_broker_1:
extends:
service: kafka_base
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka_broker_1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
kafka_broker_2:
extends:
service: kafka_base
environment:
- KAFKA_BROKER_ID=2
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka_broker_2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
kafka_broker_3:
extends:
service: kafka_base
environment:
- KAFKA_BROKER_ID=3
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka_broker_3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
kafka_base:
image: confluentinc/cp-kafka:latest
environment:
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=${KAFKA_LISTENER_SECURITY_PROTOCOL_MAP}
- KAFKA_INTER_BROKER_LISTENER_NAME=${KAFKA_INTER_BROKER_LISTENER_NAME}
- KAFKA_ZOOKEEPER_CONNECT=kafka_zookeeper:2181
- KAFKA_LOG4J_LOGGERS=${KAFKA_LOG4J_LOGGERS}
- KAFKA_AUTHORIZER_CLASS_NAME=${KAFKA_AUTHORIZER_CLASS_NAME}
- KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND=${KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND}
networks:
- kafka_network
- default
# Kafka Connect
kafka_connect:
image: confluentinc/cp-kafka-connect:latest
ports:
- "8083:8083"
environment:
- CONNECT_BOOTSTRAP_SERVERS=${CONNECT_BOOTSTRAP_SERVERS}
- CONNECT_REST_PORT=${CONNECT_REST_PORT}
- CONNECT_GROUP_ID=${CONNECT_GROUP_ID}
- CONNECT_CONFIG_STORAGE_TOPIC=${CONNECT_CONFIG_STORAGE_TOPIC}
- CONNECT_OFFSET_STORAGE_TOPIC=${CONNECT_OFFSET_STORAGE_TOPIC}
- CONNECT_STATUS_STORAGE_TOPIC=${CONNECT_STATUS_STORAGE_TOPIC}
- CONNECT_KEY_CONVERTER=${CONNECT_KEY_CONVERTER}
- CONNECT_VALUE_CONVERTER=${CONNECT_VALUE_CONVERTER}
- CONNECT_INTERNAL_KEY_CONVERTER=${CONNECT_INTERNAL_KEY_CONVERTER}
- CONNECT_INTERNAL_VALUE_CONVERTER=${CONNECT_INTERNAL_VALUE_CONVERTER}
- CONNECT_REST_ADVERTISED_HOST_NAME=${CONNECT_REST_ADVERTISED_HOST_NAME}
- CONNECT_LOG4J_ROOT_LOGLEVEL=${CONNECT_LOG4J_ROOT_LOGLEVEL}
- CONNECT_LOG4J_LOGGERS=${CONNECT_LOG4J_LOGGERS}
- CONNECT_PLUGIN_PATH=${CONNECT_PLUGIN_PATH}
networks:
- kafka_network
- default
# Kafka Schema Registry
kafka_schema_registry:
image: confluentinc/cp-schema-registry:latest
ports:
- "8081:8081"
environment:
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=${SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS}
- SCHEMA_REGISTRY_HOST_NAME=${SCHEMA_REGISTRY_HOST_NAME}
- SCHEMA_REGISTRY_LISTENERS=${SCHEMA_REGISTRY_LISTENERS}
networks:
- kafka_network
- default
# Kafka User Interface
kafka_ui:
container_name: kafka-ui-1
image: provectuslabs/kafka-ui:latest
ports:
- 8888:8080
depends_on:
- kafka_broker_1
- kafka_broker_2
- kafka_broker_3
- kafka_schema_registry
- kafka_connect
environment:
- KAFKA_CLUSTERS_0_NAME=${KAFKA_CLUSTERS_0_NAME}
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=${KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS}
- KAFKA_CLUSTERS_0_SCHEMAREGISTRY=${KAFKA_CLUSTERS_0_SCHEMAREGISTRY}
- KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME=${KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME}
- KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS=${KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS}
- DYNAMIC_CONFIG_ENABLED=${DYNAMIC_CONFIG_ENABLED}
networks:
- kafka_network
- default
# Apache Spark Master Node
spark_master:
image: bitnami/spark:3
container_name: spark_master
ports:
- 8085:8080
environment:
- SPARK_UI_PORT=${SPARK_UI_PORT}
- SPARK_MODE=${SPARK_MODE}
- SPARK_RPC_AUTHENTICATION_ENABLED=${SPARK_RPC_AUTHENTICATION_ENABLED}
- SPARK_RPC_ENCRYPTION_ENABLED=${SPARK_RPC_ENCRYPTION_ENABLED}
volumes:
- ./:/home
- spark_data:/opt/bitnami/spark/data
networks:
- default
- kafka_network
#volumes for data
volumes:
spark_data:
#network for Kafka
networks:
kafka_network:
driver: bridge
default:
external:
name: docker_streaming
项目设置的核心在于文件 docker-compose.yml 。它协调我们的服务,确保顺畅的通信和初始化。这是一个细分:
1)版本
使用 Docker Compose 文件格式版本“3.7”,确保与服务兼容。
2)服务
项目包含多项服务:
- Airflow:
- 数据库 ( airflow_db):使用 PostgreSQL 1。
- Web 服务器 ( airflow_webserver):启动数据库并设置管理员用户。
- Kafka:
- Zookeeper ( kafka_zookeeper):管理 broker 元数据。
- Brokers:三个实例(kafka_broker_1、2 和 3)。
- 基本配置 ( kafka_base):Broker的常见设置。
- Kafka Connect(kafka_connect):促进流处理。
- 架构注册表 ( kafka_schema_registry):管理 Kafka 架构。
- 用户界面 ( kafka_ui):Kafka 的可视化界面。
- spark:
- 主节点 ( spark_master):Apache Spark 的中央控制节点。
3)卷
利用持久卷spark_data来确保 Spark 的数据一致性。
4)网络
服务有两个网络:
- Kafka Network ( kafka_network):专用于 Kafka。
- 默认网络 ( default):外部命名为docker_streaming。
2、kafka_stream_dag.py
代码语言:javascript复制# Importing required modules
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from kafka_streaming_service import initiate_stream
# Configuration for the DAG's start date
DAG_START_DATE = datetime(2018, 12, 21, 12, 12)
# Default arguments for the DAG
DAG_DEFAULT_ARGS = {
'owner': 'airflow',
'start_date': DAG_START_DATE,
'retries': 1,
'retry_delay': timedelta(seconds=5)
}
# Creating the DAG with its configuration
with DAG(
'name_stream_dag', # Renamed for uniqueness
default_args=DAG_DEFAULT_ARGS,
schedule_interval='0 1 * * *',
catchup=False,
description='Stream random names to Kafka topic',
max_active_runs=1
) as dag:
# Defining the data streaming task using PythonOperator
kafka_stream_task = PythonOperator(
task_id='stream_to_kafka_task',
python_callable=initiate_stream,
dag=dag
)
kafka_stream_task
该文件主要定义了一个Airflow Directed Acyclic Graph(DAG),用于处理数据流到Kafka主题。
1)进口
导入基本模块和函数,特别是 Airflow DAG 和 PythonOperator,以及initiate_stream来自kafka_streaming_service.
2)配置
- DAG 开始日期 ( DAG_START_DATE):设置 DAG 开始执行的时间。
- 默认参数 ( DAG_DEFAULT_ARGS):配置 DAG 的基本参数,例如所有者、开始日期和重试设置。
3)DAG定义
将创建一个名为 的新 DAG name_stream_dag,配置为每天凌晨 1 点运行。它的设计目的是不运行任何错过的间隔(带有catchup=False),并且一次只允许一次活动运行。
4)任务
单个任务 kafka_stream_task 是使用 PythonOperator 定义的。此任务调用该initiate_stream函数,在 DAG 运行时有效地将数据流式传输到 Kafka。
3、kafka_streaming_service.py
代码语言:javascript复制# Importing necessary libraries and modules
import requests
import json
import time
import hashlib
from confluent_kafka import Producer
# Constants and configuration
API_ENDPOINT = "https://randomuser.me/api/?results=1"
KAFKA_BOOTSTRAP_SERVERS = ['kafka_broker_1:19092','kafka_broker_2:19093','kafka_broker_3:19094']
KAFKA_TOPIC = "names_topic"
PAUSE_INTERVAL = 10
STREAMING_DURATION = 120
def retrieve_user_data(url=API_ENDPOINT) -> dict:
"""Fetches random user data from the provided API endpoint."""
response = requests.get(url)
return response.json()["results"][0]
def transform_user_data(data: dict) -> dict:
"""Formats the fetched user data for Kafka streaming."""
return {
"name": f"{data['name']['title']}. {data['name']['first']} {data['name']['last']}",
"gender": data["gender"],
"address": f"{data['location']['street']['number']}, {data['location']['street']['name']}",
"city": data['location']['city'],
"nation": data['location']['country'],
"zip": encrypt_zip(data['location']['postcode']),
"latitude": float(data['location']['coordinates']['latitude']),
"longitude": float(data['location']['coordinates']['longitude']),
"email": data["email"]
}
def encrypt_zip(zip_code):
"""Hashes the zip code using MD5 and returns its integer representation."""
zip_str = str(zip_code)
return int(hashlib.md5(zip_str.encode()).hexdigest(), 16)
def configure_kafka(servers=KAFKA_BOOTSTRAP_SERVERS):
"""Creates and returns a Kafka producer instance."""
settings = {
'bootstrap.servers': ','.join(servers),
'client.id': 'producer_instance'
}
return Producer(settings)
def publish_to_kafka(producer, topic, data):
"""Sends data to a Kafka topic."""
producer.produce(topic, value=json.dumps(data).encode('utf-8'), callback=delivery_status)
producer.flush()
def delivery_status(err, msg):
"""Reports the delivery status of the message to Kafka."""
if err is not None:
print('Message delivery failed:', err)
else:
print('Message delivered to', msg.topic(), '[Partition: {}]'.format(msg.partition()))
def initiate_stream():
"""Initiates the process to stream user data to Kafka."""
kafka_producer = configure_kafka()
for _ in range(STREAMING_DURATION // PAUSE_INTERVAL):
raw_data = retrieve_user_data()
kafka_formatted_data = transform_user_data(raw_data)
publish_to_kafka(kafka_producer, KAFKA_TOPIC, kafka_formatted_data)
time.sleep(PAUSE_INTERVAL)
if __name__ == "__main__":
initiate_stream()
1)导入和配置
导入基本库并设置常量,例如 API 端点、Kafka 引导服务器、主题名称和流间隔详细信息。
2)用户数据检索
该retrieve_user_data函数从指定的 API 端点获取随机用户详细信息。
3)数据转换
该 transform_user_data 函数格式化用于 Kafka 流的原始用户数据,同时 encrypt_zip 对邮政编码进行哈希处理以维护用户隐私。
4)Kafka 配置与发布
- configure_kafka 设置 Kafka 生产者。
- publish_to_kafka 将转换后的用户数据发送到 Kafka 主题。
- delivery_status 提供有关数据是否成功发送到 Kafka 的反馈。
5)主要流功能
initiate_stream 协调整个流程,定期检索、转换用户数据并将其发布到 Kafka。
6)执行
当直接运行脚本时,initiate_stream 将执行该函数,并在指定的持续时间内流式传输数据 STREAMING_DURATION。
4、spark_processing.py
代码语言:javascript复制import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
# Initialize logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s:%(funcName)s:%(levelname)s:%(message)s')
logger = logging.getLogger("spark_structured_streaming")
def initialize_spark_session(app_name, access_key, secret_key):
"""
Initialize the Spark Session with provided configurations.
:param app_name: Name of the spark application.
:param access_key: Access key for S3.
:param secret_key: Secret key for S3.
:return: Spark session object or None if there's an error.
"""
try:
spark = SparkSession
.builder
.appName(app_name)
.config("spark.hadoop.fs.s3a.access.key", access_key)
.config("spark.hadoop.fs.s3a.secret.key", secret_key)
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
logger.info('Spark session initialized successfully')
return spark
except Exception as e:
logger.error(f"Spark session initialization failed. Error: {e}")
return None
def get_streaming_dataframe(spark, brokers, topic):
"""
Get a streaming dataframe from Kafka.
:param spark: Initialized Spark session.
:param brokers: Comma-separated list of Kafka brokers.
:param topic: Kafka topic to subscribe to.
:return: Dataframe object or None if there's an error.
"""
try:
df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
.option("delimiter", ",")
.option("startingOffsets", "earliest")
.load()
logger.info("Streaming dataframe fetched successfully")
return df
except Exception as e:
logger.warning(f"Failed to fetch streaming dataframe. Error: {e}")
return None
def transform_streaming_data(df):
"""
Transform the initial dataframe to get the final structure.
:param df: Initial dataframe with raw data.
:return: Transformed dataframe.
"""
schema = StructType([
StructField("full_name", StringType(), False),
StructField("gender", StringType(), False),
StructField("location", StringType(), False),
StructField("city", StringType(), False),
StructField("country", StringType(), False),
StructField("postcode", IntegerType(), False),
StructField("latitude", FloatType(), False),
StructField("longitude", FloatType(), False),
StructField("email", StringType(), False)
])
transformed_df = df.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), schema).alias("data"))
.select("data.*")
return transformed_df
def initiate_streaming_to_bucket(df, path, checkpoint_location):
"""
Start streaming the transformed data to the specified S3 bucket in parquet format.
:param df: Transformed dataframe.
:param path: S3 bucket path.
:param checkpoint_location: Checkpoint location for streaming.
:return: None
"""
logger.info("Initiating streaming process...")
stream_query = (df.writeStream
.format("parquet")
.outputMode("append")
.option("path", path)
.option("checkpointLocation", checkpoint_location)
.start())
stream_query.awaitTermination()
def main():
app_name = "SparkStructuredStreamingToS3"
access_key = "ENTER_YOUR_ACCESS_KEY"
secret_key = "ENTER_YOUR_SECRET_KEY"
brokers = "kafka_broker_1:19092,kafka_broker_2:19093,kafka_broker_3:19094"
topic = "names_topic"
path = "BUCKET_PATH"
checkpoint_location = "CHECKPOINT_LOCATION"
spark = initialize_spark_session(app_name, access_key, secret_key)
if spark:
df = get_streaming_dataframe(spark, brokers, topic)
if df:
transformed_df = transform_streaming_data(df)
initiate_streaming_to_bucket(transformed_df, path, checkpoint_location)
# Execute the main function if this script is run as the main module
if __name__ == '__main__':
main()
代码语言:javascript复制1. 导入和日志初始化
导入必要的库,并创建日志记录设置以更好地调试和监控。
2. Spark会话初始化
initialize_spark_session:此函数使用从 S3 访问数据所需的配置来设置 Spark 会话。
3. 数据检索与转换
- get_streaming_dataframe:从 Kafka 获取具有指定代理和主题详细信息的流数据帧。
- transform_streaming_data:将原始 Kafka 数据转换为所需的结构化格式。
4. 流式传输到 S3
initiate_streaming_to_bucket:此函数将转换后的数据以 parquet 格式流式传输到 S3 存储桶。它使用检查点机制来确保流式传输期间数据的完整性。
5. 主执行
该 main 函数协调整个过程:初始化 Spark 会话、从 Kafka 获取数据、转换数据并将其流式传输到 S3。
6. 脚本执行
如果脚本是正在运行的主模块,它将执行该 main 函数,启动整个流处理过程。
构建数据管道:逐步
1. 设置Kafka集群
使用以下命令启动 Kafka 集群:
代码语言:javascript复制docker network create docker_streaming
docker-compose -f docker-compose.yml up -d
代码语言:javascript复制2. 为 Kafka 创建主题(http://localhost:8888/)
- 通过http://localhost:8888/访问 Kafka UI 。
- 观察活动集群。
- 导航至“主题”。
- 创建一个名为“names_topic”的新主题。
- 将复制因子设置为 3。
3. 配置 Airflow 用户
创建具有管理员权限的 Airflow 用户:
代码语言:javascript复制docker-compose run airflow_webserver airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin
4. 访问 Airflow Bash 并安装依赖项
我们应该将脚本移动kafka_stream_dag.py到文件夹下以便能够运行 DAG 使用提供的脚本访问 Airflow bash 并安装所需的软件包:kafka_streaming_service.py dags
代码语言:javascript复制./airflow.sh bash
pip install -r ./requirements.txt
代码语言:javascript复制
5. 验证 DAG
确保您的 DAG 没有错误:
代码语言:javascript复制airflow dags list
6. 启动 Airflow 调度程序
要启动 DAG,请运行调度程序:
代码语言:javascript复制airflow scheduler
7. 验证数据是否上传到 Kafka 集群
- 访问 Kafka UI:http://localhost:8888/并验证该主题的数据是否已上传
8. 传输 Spark 脚本
将 Spark 脚本复制到 Docker 容器中:
代码语言:javascript复制docker cp spark_processing.py spark_master:/opt/bitnami/spark/
9.启动 Spark Master 并下载 JAR
访问 Spark bash,导航到jars目录并下载必要的 JAR 文件。下载后,提交Spark作业:
代码语言:javascript复制docker exec -it spark_master /bin/bash
cd jars
curl -O <https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.8.1/kafka-clients-2.8.1.jar>
curl -O <https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.13/3.3.0/spark-sql-kafka-0-10_2.13-3.3.0.jar>
curl -O <https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar>
curl -O <https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.11.375/aws-java-sdk-s3-1.11.375.jar>
curl -O <https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.8.0/commons-pool2-2.8.0.jar>
cd ..
spark-submit \
--master local[2] \
--jars /opt/bitnami/spark/jars/kafka-clients-2.8.1.jar,\
/opt/bitnami/spark/jars/spark-sql-kafka-0-10_2.13-3.3.0.jar,\
/opt/bitnami/spark/jars/hadoop-aws-3.2.0.jar,\
/opt/bitnami/spark/jars/aws-java-sdk-s3-1.11.375.jar,\
/opt/bitnami/spark/jars/commons-pool2-2.8.0.jar \
spark_processing.py
10. 验证S3上的数据
执行这些步骤后,检查您的 S3 存储桶以确保数据已上传
挑战和故障排除
- 配置挑战:确保docker-compose.yaml 正确设置环境变量和配置(如文件中的)可能很棘手。不正确的设置可能会阻止服务启动或通信。
- 服务依赖性:像 Kafka 或 Airflow 这样的服务依赖于其他服务(例如,Kafka 的 Zookeeper)。确保服务初始化的正确顺序至关重要。
- Airflow DAG 错误:DAG 文件 ( kafka_stream_dag.py) 中的语法或逻辑错误可能会阻止 Airflow 正确识别或执行 DAG。
- 数据转换问题:Python 脚本中的数据转换逻辑可能并不总是产生预期的结果,特别是在处理来自随机名称 API 的各种数据输入时。
- Spark 依赖项:确保所有必需的 JAR 可用且兼容对于 Spark 的流作业至关重要。JAR 丢失或不兼容可能会导致作业失败。
- Kafka 主题管理:使用正确的配置(如复制因子)创建主题对于数据持久性和容错能力至关重要。
- 网络挑战:在 docker-compose.yaml 中设置的 Docker 网络必须正确地促进服务之间的通信,特别是对于 Kafka 代理和 Zookeeper。
- S3 存储桶权限:写入 S3 时确保正确的权限至关重要。权限配置错误可能会阻止 Spark 将数据保存到存储桶。
- 弃用警告:提供的日志显示弃用警告,表明所使用的某些方法或配置在未来版本中可能会过时。
结论:
在整个旅程中,我们深入研究了现实世界数据工程的复杂性,从原始的未经处理的数据发展到可操作的见解。从收集随机用户数据开始,我们利用 Kafka、Spark 和 Airflow 的功能来管理、处理和自动化这些数据的流式传输。Docker 简化了部署,确保了环境的一致性,而 S3 和 Python 等其他工具发挥了关键作用。
这项努力不仅仅是建造一条管道,而是理解工具之间的协同作用。我鼓励大家进一步尝试、调整和增强此流程,以满足独特的需求并发现更深刻的见解。潜心、探索、创新!
原文作者:Simardeep Singh