用 Kafka、Spark、Airflow 和 Docker 构建数据流管道指南

2023-11-27 17:17:11 浏览数 (1)

在本指南中,我们将深入探讨构建强大的数据管道,用 Kafka 进行数据流处理、Spark 进行处理、Airflow 进行编排、Docker 进行容器化、S3 进行存储,Python 作为主要脚本语言。

为了说明这个过程,我们将使用 Random Name API,这是一个多功能工具,每次触发都会生成新的随机数据。它提供了许多企业日常处理实时数据的实用表示。我们第一步涉及一个 Python 脚本,该脚本经过精心设计,用于从该 API 获取数据。为了模拟数据的流式传输性质,我们将定期执行此脚本。这个脚本还将充当我们与 Kafka 的桥梁,将获取的数据直接写入 Kafka 主题。

随着我们的深入,Airflow 的有向无环图 (DAG) 发挥着关键作用。Airflow DAG 脚本编排我们的流程,确保我们的 Python 脚本像时钟一样运行,持续流式传输数据并将其输入到我们的管道中。一旦我们的数据到达 Kafka producer,Spark Structured Streaming 就会接过接力棒。使用这些数据,对其进行处理,然后将修改后的数据无缝写入 S3,确保其为后续分析过程做好准备。

项目的一个重要方面是其模块化架构。得益于 Docker 容器,每个服务,无论是 Kafka、Spark 还是 Airflow,都在隔离的环境中运行。不仅确保了平滑的互操作性,还简化了可扩展性和调试。

入门:先决条件和设置

对于这个项目,我们利用GitHub存储库来托管我们的整个设置,使任何人都可以轻松开始。

A、Docker:Docker 将成为我们编排和运行各种服务的主要工具。

  • 安装:访问 Docker 官方网站,下载并安装适合您操作系统的 Docker Desktop。
  • 验证:打开终端或命令提示符并执行 docker --version 以确保安装成功。

B、S3:AWS S3 是我们数据存储的首选。

  • 设置:登录 AWS 管理控制台,导航到 S3 服务,然后建立一个新存储桶,确保根据您的数据存储首选项对其进行配置。

C、设置项目:

  • 克隆存储库:首先,您需要使用以下命令从 GitHub 存储库克隆项目:
代码语言:javascript复制
git clone <https://github.com/simardeep1792/Data-Engineering-Streaming-Project.git>
  • 导航到项目目录:
代码语言:javascript复制
cd Data-Engineering-Streaming-Project
代码语言:javascript复制
使用以下方式部署服务docker-compose:在项目目录中,您将找到一个

docker-compose.yml文件。该文件描述了所有服务。

docker network create docker_streaming
docker-compose -f docker-compose.yml up -d
该命令协调 Docker 容器中所有必要服务的启动,例如 Kafka、Spark、Airflow 等。

分解项目文件

1、docker-compose.yml

代码语言:javascript复制
version: '3.7'

services:
  # Airflow PostgreSQL Database
  airflow_db:
    image: postgres:16.0
    environment:
      - POSTGRES_USER=${POSTGRES_USER}
      - POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
      - POSTGRES_DB=${POSTGRES_DB}
    logging:
      options:
        max-size: 10m
        max-file: "3"

  # Apache Airflow Webserver
  airflow_webserver:
    command: bash -c "airflow db init && airflow webserver && airflow users  create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin"
    image: apache/airflow:latest
    restart: always
    depends_on:
      - airflow_db
    environment:
      - LOAD_EX=${LOAD_EX}
      - EXECUTOR=${EXECUTOR}
      - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql psycopg2://${POSTGRES_USER}:${POSTGRES_PASSWORD}@airflow_db:5432/${POSTGRES_DB}
    logging:
      options:
        max-size: 10m
        max-file: "3"
    volumes:
      - ./dags:/opt/airflow/dags
      - ./requirements.txt:/opt/airflow/requirements.txt
    ports:
      - "8080:8080"
    healthcheck:
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
      interval: 30s
      timeout: 30s
      retries: 3

  # Zookeeper for Kafka
  kafka_zookeeper:
    image: confluentinc/cp-zookeeper:latest
    ports:
      - "2181:2181"
    environment:
      - ZOOKEEPER_CLIENT_PORT=${ZOOKEEPER_CLIENT_PORT}
      - ZOOKEEPER_SERVER_ID=${ZOOKEEPER_SERVER_ID}
      - ZOOKEEPER_SERVERS=kafka_zookeeper:2888:3888
    networks:
      - kafka_network
      - default

  # Kafka Broker Instances
  kafka_broker_1:
    extends:
      service: kafka_base
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka_broker_1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092

  kafka_broker_2:
    extends:
      service: kafka_base
    environment:
      - KAFKA_BROKER_ID=2
      - KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka_broker_2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093

  kafka_broker_3:
    extends:
      service: kafka_base
    environment:
      - KAFKA_BROKER_ID=3
      - KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka_broker_3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094

  kafka_base:
    image: confluentinc/cp-kafka:latest
    environment:
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=${KAFKA_LISTENER_SECURITY_PROTOCOL_MAP}
      - KAFKA_INTER_BROKER_LISTENER_NAME=${KAFKA_INTER_BROKER_LISTENER_NAME}
      - KAFKA_ZOOKEEPER_CONNECT=kafka_zookeeper:2181
      - KAFKA_LOG4J_LOGGERS=${KAFKA_LOG4J_LOGGERS}
      - KAFKA_AUTHORIZER_CLASS_NAME=${KAFKA_AUTHORIZER_CLASS_NAME}
      - KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND=${KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND}
    networks:
      - kafka_network
      - default

  # Kafka Connect
  kafka_connect:
    image: confluentinc/cp-kafka-connect:latest
    ports:
      - "8083:8083"
    environment:
      - CONNECT_BOOTSTRAP_SERVERS=${CONNECT_BOOTSTRAP_SERVERS}
      - CONNECT_REST_PORT=${CONNECT_REST_PORT}
      - CONNECT_GROUP_ID=${CONNECT_GROUP_ID}
      - CONNECT_CONFIG_STORAGE_TOPIC=${CONNECT_CONFIG_STORAGE_TOPIC}
      - CONNECT_OFFSET_STORAGE_TOPIC=${CONNECT_OFFSET_STORAGE_TOPIC}
      - CONNECT_STATUS_STORAGE_TOPIC=${CONNECT_STATUS_STORAGE_TOPIC}
      - CONNECT_KEY_CONVERTER=${CONNECT_KEY_CONVERTER}
      - CONNECT_VALUE_CONVERTER=${CONNECT_VALUE_CONVERTER}
      - CONNECT_INTERNAL_KEY_CONVERTER=${CONNECT_INTERNAL_KEY_CONVERTER}
      - CONNECT_INTERNAL_VALUE_CONVERTER=${CONNECT_INTERNAL_VALUE_CONVERTER}
      - CONNECT_REST_ADVERTISED_HOST_NAME=${CONNECT_REST_ADVERTISED_HOST_NAME}
      - CONNECT_LOG4J_ROOT_LOGLEVEL=${CONNECT_LOG4J_ROOT_LOGLEVEL}
      - CONNECT_LOG4J_LOGGERS=${CONNECT_LOG4J_LOGGERS}
      - CONNECT_PLUGIN_PATH=${CONNECT_PLUGIN_PATH}
    networks:
      - kafka_network
      - default

  # Kafka Schema Registry
  kafka_schema_registry:
    image: confluentinc/cp-schema-registry:latest
    ports:
      - "8081:8081"
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=${SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS}
      - SCHEMA_REGISTRY_HOST_NAME=${SCHEMA_REGISTRY_HOST_NAME}
      - SCHEMA_REGISTRY_LISTENERS=${SCHEMA_REGISTRY_LISTENERS}
    networks:
      - kafka_network
      - default

  # Kafka User Interface
  kafka_ui:
    container_name: kafka-ui-1
    image: provectuslabs/kafka-ui:latest
    ports:
      - 8888:8080
    depends_on:
      - kafka_broker_1
      - kafka_broker_2
      - kafka_broker_3
      - kafka_schema_registry
      - kafka_connect
    environment:
      - KAFKA_CLUSTERS_0_NAME=${KAFKA_CLUSTERS_0_NAME}
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=${KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS}
      - KAFKA_CLUSTERS_0_SCHEMAREGISTRY=${KAFKA_CLUSTERS_0_SCHEMAREGISTRY}
      - KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME=${KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME}
      - KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS=${KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS}
      - DYNAMIC_CONFIG_ENABLED=${DYNAMIC_CONFIG_ENABLED}

    networks:
      - kafka_network
      - default

  # Apache Spark Master Node
  spark_master:
    image: bitnami/spark:3
    container_name: spark_master
    ports:
      - 8085:8080
    environment:
      - SPARK_UI_PORT=${SPARK_UI_PORT}
      - SPARK_MODE=${SPARK_MODE}
      - SPARK_RPC_AUTHENTICATION_ENABLED=${SPARK_RPC_AUTHENTICATION_ENABLED}
      - SPARK_RPC_ENCRYPTION_ENABLED=${SPARK_RPC_ENCRYPTION_ENABLED}
    volumes:
      - ./:/home
      - spark_data:/opt/bitnami/spark/data
    networks:
      - default
      - kafka_network

#volumes for data
volumes:
  spark_data:

#network for Kafka
networks:
  kafka_network:
    driver: bridge
  default:
    external:
      name: docker_streaming

项目设置的核心在于文件 docker-compose.yml 。它协调我们的服务,确保顺畅的通信和初始化。这是一个细分:

1)版本

使用 Docker Compose 文件格式版本“3.7”,确保与服务兼容。

2)服务

项目包含多项服务:

  • Airflow:
  • 数据库 ( airflow_db):使用 PostgreSQL 1。
  • Web 服务器 ( airflow_webserver):启动数据库并设置管理员用户。
  • Kafka:
  • Zookeeper ( kafka_zookeeper):管理 broker 元数据。
  • Brokers:三个实例(kafka_broker_1、2 和 3)。
  • 基本配置 ( kafka_base):Broker的常见设置。
  • Kafka Connect(kafka_connect):促进流处理。
  • 架构注册表 ( kafka_schema_registry):管理 Kafka 架构。
  • 用户界面 ( kafka_ui):Kafka 的可视化界面。
  • spark:
  • 主节点 ( spark_master):Apache Spark 的中央控制节点。

3)卷

利用持久卷spark_data来确保 Spark 的数据一致性。

4)网络

服务有两个网络:

  • Kafka Network ( kafka_network):专用于 Kafka。
  • 默认网络 ( default):外部命名为docker_streaming。

2、kafka_stream_dag.py

代码语言:javascript复制
# Importing required modules
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from kafka_streaming_service import initiate_stream  
# Configuration for the DAG's start date
DAG_START_DATE = datetime(2018, 12, 21, 12, 12)

# Default arguments for the DAG
DAG_DEFAULT_ARGS = {
    'owner': 'airflow',
    'start_date': DAG_START_DATE,
    'retries': 1,
    'retry_delay': timedelta(seconds=5)
}

# Creating the DAG with its configuration
with DAG(
    'name_stream_dag',  # Renamed for uniqueness
    default_args=DAG_DEFAULT_ARGS,
    schedule_interval='0 1 * * *',
    catchup=False,
    description='Stream random names to Kafka topic',
    max_active_runs=1
) as dag:

    # Defining the data streaming task using PythonOperator
    kafka_stream_task = PythonOperator(
        task_id='stream_to_kafka_task', 
        python_callable=initiate_stream,
        dag=dag
    )

    kafka_stream_task

该文件主要定义了一个Airflow Directed Acyclic Graph(DAG),用于处理数据流到Kafka主题。

1)进口

导入基本模块和函数,特别是 Airflow DAG 和 PythonOperator,以及initiate_stream来自kafka_streaming_service.

2)配置

  • DAG 开始日期 ( DAG_START_DATE):设置 DAG 开始执行的时间。
  • 默认参数 ( DAG_DEFAULT_ARGS):配置 DAG 的基本参数,例如所有者、开始日期和重试设置。

3)DAG定义

将创建一个名为 的新 DAG name_stream_dag,配置为每天凌晨 1 点运行。它的设计目的是不运行任何错过的间隔(带有catchup=False),并且一次只允许一次活动运行。

4)任务

单个任务 kafka_stream_task 是使用 PythonOperator 定义的。此任务调用该initiate_stream函数,在 DAG 运行时有效地将数据流式传输到 Kafka。

3、kafka_streaming_service.py

代码语言:javascript复制
# Importing necessary libraries and modules
import requests
import json
import time
import hashlib
from confluent_kafka import Producer

# Constants and configuration
API_ENDPOINT = "https://randomuser.me/api/?results=1"
KAFKA_BOOTSTRAP_SERVERS = ['kafka_broker_1:19092','kafka_broker_2:19093','kafka_broker_3:19094']
KAFKA_TOPIC = "names_topic"  
PAUSE_INTERVAL = 10  
STREAMING_DURATION = 120

def retrieve_user_data(url=API_ENDPOINT) -> dict:
    """Fetches random user data from the provided API endpoint."""
    response = requests.get(url)
    return response.json()["results"][0]

def transform_user_data(data: dict) -> dict:
    """Formats the fetched user data for Kafka streaming."""
    return {
        "name": f"{data['name']['title']}. {data['name']['first']} {data['name']['last']}",
        "gender": data["gender"],
        "address": f"{data['location']['street']['number']}, {data['location']['street']['name']}",  
        "city": data['location']['city'],
        "nation": data['location']['country'],  
        "zip": encrypt_zip(data['location']['postcode']),  
        "latitude": float(data['location']['coordinates']['latitude']),
        "longitude": float(data['location']['coordinates']['longitude']),
        "email": data["email"]
    }

def encrypt_zip(zip_code):  
    """Hashes the zip code using MD5 and returns its integer representation."""
    zip_str = str(zip_code)
    return int(hashlib.md5(zip_str.encode()).hexdigest(), 16)

def configure_kafka(servers=KAFKA_BOOTSTRAP_SERVERS):
    """Creates and returns a Kafka producer instance."""
    settings = {
        'bootstrap.servers': ','.join(servers),
        'client.id': 'producer_instance'  
    }
    return Producer(settings)

def publish_to_kafka(producer, topic, data):
    """Sends data to a Kafka topic."""
    producer.produce(topic, value=json.dumps(data).encode('utf-8'), callback=delivery_status)
    producer.flush()

def delivery_status(err, msg):
    """Reports the delivery status of the message to Kafka."""
    if err is not None:
        print('Message delivery failed:', err)
    else:
        print('Message delivered to', msg.topic(), '[Partition: {}]'.format(msg.partition()))

def initiate_stream():
    """Initiates the process to stream user data to Kafka."""
    kafka_producer = configure_kafka()
    for _ in range(STREAMING_DURATION // PAUSE_INTERVAL):
        raw_data = retrieve_user_data()
        kafka_formatted_data = transform_user_data(raw_data)
        publish_to_kafka(kafka_producer, KAFKA_TOPIC, kafka_formatted_data)
        time.sleep(PAUSE_INTERVAL)

if __name__ == "__main__":
    initiate_stream()

1)导入和配置

导入基本库并设置常量,例如 API 端点、Kafka 引导服务器、主题名称和流间隔详细信息。

2)用户数据检索

该retrieve_user_data函数从指定的 API 端点获取随机用户详细信息。

3)数据转换

该 transform_user_data 函数格式化用于 Kafka 流的原始用户数据,同时 encrypt_zip 对邮政编码进行哈希处理以维护用户隐私。

4)Kafka 配置与发布

  • configure_kafka 设置 Kafka 生产者。
  • publish_to_kafka 将转换后的用户数据发送到 Kafka 主题。
  • delivery_status 提供有关数据是否成功发送到 Kafka 的反馈。

5)主要流功能

initiate_stream 协调整个流程,定期检索、转换用户数据并将其发布到 Kafka。

6)执行

当直接运行脚本时,initiate_stream 将执行该函数,并在指定的持续时间内流式传输数据 STREAMING_DURATION。

4、spark_processing.py

代码语言:javascript复制
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType


# Initialize logging
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s:%(funcName)s:%(levelname)s:%(message)s')
logger = logging.getLogger("spark_structured_streaming")


def initialize_spark_session(app_name, access_key, secret_key):
    """
    Initialize the Spark Session with provided configurations.

    :param app_name: Name of the spark application.
    :param access_key: Access key for S3.
    :param secret_key: Secret key for S3.
    :return: Spark session object or None if there's an error.
    """
    try:
        spark = SparkSession 
                .builder 
                .appName(app_name) 
                .config("spark.hadoop.fs.s3a.access.key", access_key) 
                .config("spark.hadoop.fs.s3a.secret.key", secret_key) 
                .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") 
                .getOrCreate()

        spark.sparkContext.setLogLevel("ERROR")
        logger.info('Spark session initialized successfully')
        return spark

    except Exception as e:
        logger.error(f"Spark session initialization failed. Error: {e}")
        return None


def get_streaming_dataframe(spark, brokers, topic):
    """
    Get a streaming dataframe from Kafka.

    :param spark: Initialized Spark session.
    :param brokers: Comma-separated list of Kafka brokers.
    :param topic: Kafka topic to subscribe to.
    :return: Dataframe object or None if there's an error.
    """
    try:
        df = spark 
            .readStream 
            .format("kafka") 
            .option("kafka.bootstrap.servers", brokers) 
            .option("subscribe", topic) 
            .option("delimiter", ",") 
            .option("startingOffsets", "earliest") 
            .load()
        logger.info("Streaming dataframe fetched successfully")
        return df

    except Exception as e:
        logger.warning(f"Failed to fetch streaming dataframe. Error: {e}")
        return None


def transform_streaming_data(df):
    """
    Transform the initial dataframe to get the final structure.

    :param df: Initial dataframe with raw data.
    :return: Transformed dataframe.
    """
    schema = StructType([
        StructField("full_name", StringType(), False),
        StructField("gender", StringType(), False),
        StructField("location", StringType(), False),
        StructField("city", StringType(), False),
        StructField("country", StringType(), False),
        StructField("postcode", IntegerType(), False),
        StructField("latitude", FloatType(), False),
        StructField("longitude", FloatType(), False),
        StructField("email", StringType(), False)
    ])

    transformed_df = df.selectExpr("CAST(value AS STRING)") 
        .select(from_json(col("value"), schema).alias("data")) 
        .select("data.*")
    return transformed_df


def initiate_streaming_to_bucket(df, path, checkpoint_location):
    """
    Start streaming the transformed data to the specified S3 bucket in parquet format.

    :param df: Transformed dataframe.
    :param path: S3 bucket path.
    :param checkpoint_location: Checkpoint location for streaming.
    :return: None
    """
    logger.info("Initiating streaming process...")
    stream_query = (df.writeStream
                    .format("parquet")
                    .outputMode("append")
                    .option("path", path)
                    .option("checkpointLocation", checkpoint_location)
                    .start())
    stream_query.awaitTermination()


def main():
    app_name = "SparkStructuredStreamingToS3"
    access_key = "ENTER_YOUR_ACCESS_KEY"
    secret_key = "ENTER_YOUR_SECRET_KEY"
    brokers = "kafka_broker_1:19092,kafka_broker_2:19093,kafka_broker_3:19094"
    topic = "names_topic"
    path = "BUCKET_PATH"
    checkpoint_location = "CHECKPOINT_LOCATION"

    spark = initialize_spark_session(app_name, access_key, secret_key)
    if spark:
        df = get_streaming_dataframe(spark, brokers, topic)
        if df:
            transformed_df = transform_streaming_data(df)
            initiate_streaming_to_bucket(transformed_df, path, checkpoint_location)


# Execute the main function if this script is run as the main module
if __name__ == '__main__':
    main()
代码语言:javascript复制
1. 导入和日志初始化

导入必要的库,并创建日志记录设置以更好地调试和监控。

2. Spark会话初始化

initialize_spark_session:此函数使用从 S3 访问数据所需的配置来设置 Spark 会话。

3. 数据检索与转换

  • get_streaming_dataframe:从 Kafka 获取具有指定代理和主题详细信息的流数据帧。
  • transform_streaming_data:将原始 Kafka 数据转换为所需的结构化格式。

4. 流式传输到 S3

initiate_streaming_to_bucket:此函数将转换后的数据以 parquet 格式流式传输到 S3 存储桶。它使用检查点机制来确保流式传输期间数据的完整性。

5. 主执行

该 main 函数协调整个过程:初始化 Spark 会话、从 Kafka 获取数据、转换数据并将其流式传输到 S3。

6. 脚本执行

如果脚本是正在运行的主模块,它将执行该 main 函数,启动整个流处理过程。

构建数据管道:逐步

1. 设置Kafka集群

使用以下命令启动 Kafka 集群:

代码语言:javascript复制
docker network create docker_streaming
docker-compose -f docker-compose.yml up -d
代码语言:javascript复制
2. 为 Kafka 创建主题(http://localhost:8888/)
  • 通过http://localhost:8888/访问 Kafka UI 。
  • 观察活动集群。
  • 导航至“主题”。
  • 创建一个名为“names_topic”的新主题。
  • 将复制因子设置为 3。

3. 配置 Airflow 用户

创建具有管理员权限的 Airflow 用户:

代码语言:javascript复制
docker-compose run airflow_webserver airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin

4. 访问 Airflow Bash 并安装依赖项

我们应该将脚本移动kafka_stream_dag.py到文件夹下以便能够运行 DAG 使用提供的脚本访问 Airflow bash 并安装所需的软件包:kafka_streaming_service.py dags

代码语言:javascript复制
./airflow.sh bash
pip install -r ./requirements.txt
代码语言:javascript复制

5. 验证 DAG

确保您的 DAG 没有错误:

代码语言:javascript复制
airflow dags list

6. 启动 Airflow 调度程序

要启动 DAG,请运行调度程序:

代码语言:javascript复制
airflow scheduler

7. 验证数据是否上传到 Kafka 集群

  • 访问 Kafka UI:http://localhost:8888/并验证该主题的数据是否已上传

8. 传输 Spark 脚本

将 Spark 脚本复制到 Docker 容器中:

代码语言:javascript复制
docker cp spark_processing.py spark_master:/opt/bitnami/spark/

9.启动 Spark Master 并下载 JAR

访问 Spark bash,导航到jars目录并下载必要的 JAR 文件。下载后,提交Spark作业:

代码语言:javascript复制
docker exec -it spark_master /bin/bash
cd jars

curl -O <https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.8.1/kafka-clients-2.8.1.jar>
curl -O <https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.13/3.3.0/spark-sql-kafka-0-10_2.13-3.3.0.jar>
curl -O <https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar>
curl -O <https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.11.375/aws-java-sdk-s3-1.11.375.jar>
curl -O <https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.8.0/commons-pool2-2.8.0.jar>

cd ..


spark-submit \
--master local[2] \
--jars /opt/bitnami/spark/jars/kafka-clients-2.8.1.jar,\
/opt/bitnami/spark/jars/spark-sql-kafka-0-10_2.13-3.3.0.jar,\
/opt/bitnami/spark/jars/hadoop-aws-3.2.0.jar,\
/opt/bitnami/spark/jars/aws-java-sdk-s3-1.11.375.jar,\
/opt/bitnami/spark/jars/commons-pool2-2.8.0.jar \
spark_processing.py

10. 验证S3上的数据

执行这些步骤后,检查您的 S3 存储桶以确保数据已上传

挑战和故障排除

  • 配置挑战:确保docker-compose.yaml 正确设置环境变量和配置(如文件中的)可能很棘手。不正确的设置可能会阻止服务启动或通信。
  • 服务依赖性:像 Kafka 或 Airflow 这样的服务依赖于其他服务(例如,Kafka 的 Zookeeper)。确保服务初始化的正确顺序至关重要。
  • Airflow DAG 错误:DAG 文件 ( kafka_stream_dag.py) 中的语法或逻辑错误可能会阻止 Airflow 正确识别或执行 DAG。
  • 数据转换问题:Python 脚本中的数据转换逻辑可能并不总是产生预期的结果,特别是在处理来自随机名称 API 的各种数据输入时。
  • Spark 依赖项:确保所有必需的 JAR 可用且兼容对于 Spark 的流作业至关重要。JAR 丢失或不兼容可能会导致作业失败。
  • Kafka 主题管理:使用正确的配置(如复制因子)创建主题对于数据持久性和容错能力至关重要。
  • 网络挑战:在 docker-compose.yaml 中设置的 Docker 网络必须正确地促进服务之间的通信,特别是对于 Kafka 代理和 Zookeeper。
  • S3 存储桶权限:写入 S3 时确保正确的权限至关重要。权限配置错误可能会阻止 Spark 将数据保存到存储桶。
  • 弃用警告:提供的日志显示弃用警告,表明所使用的某些方法或配置在未来版本中可能会过时。

结论:

在整个旅程中,我们深入研究了现实世界数据工程的复杂性,从原始的未经处理的数据发展到可操作的见解。从收集随机用户数据开始,我们利用 Kafka、Spark 和 Airflow 的功能来管理、处理和自动化这些数据的流式传输。Docker 简化了部署,确保了环境的一致性,而 S3 和 Python 等其他工具发挥了关键作用。

这项努力不仅仅是建造一条管道,而是理解工具之间的协同作用。我鼓励大家进一步尝试、调整和增强此流程,以满足独特的需求并发现更深刻的见解。潜心、探索、创新!

原文作者:Simardeep Singh

0 人点赞