PySpark与MongoDB、MySQL进行数据交互

2023-11-06 19:46:11 浏览数 (3)

前些时候和后台对接,需要用pyspark获取MongoDB、MySQL数据,本文将介绍如何使用PySpark与MongoDB、MySQL进行数据交互。MongoDB是一个基于分布式文件存储的数据库,由C 语言编写。它旨在为Web应用提供可扩展的高性能数据存储解决方案。

1. 准备

  • 安装Python 3.x
  • 安装PySpark:使用pip install pyspark命令安装
  • 安装MongoDB:按照MongoDB官方文档进行安装和配置
  • 准备MongoDB数据库和集合:创建一个数据库和集合,并插入一些测试数据
  • 安装MySQL:按照MySQL官方文档进行安装和配置
  • 准备MySQL数据库和表:创建一个数据库和表,并插入一些测试数据

2. 代码

2.1 MongoDB

下面是一个简单的PySpark脚本,用于从MongoDB中读取数据:

代码语言:javascript复制
#!/usr/bin/python3
# coding=utf-8

from pyspark.sql import SparkSession

if __name__ == '__main__':

    spark = SparkSession 
            .builder 
            .appName("MongoSparkConnectorIntro") 
            .config("spark.mongodb.input.uri", "mongodb://username:password@host1:port,host2:port/dbName.collectionName?authSource=admin") 
            .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.4") 
            .enableHiveSupport() 
            .getOrCreate()

    # 读取mongodb中的数据
    df = spark.read 
            .format("com.mongodb.spark.sql.DefaultSource") 
            .load()

    # 打印数据
    df.show()
    spark.stop()

在这个脚本中需要注意根据实际情况修改URI中的用户名、密码、主机、端口、数据库名和集合名。最后使用spark.read.format().load()方法从MongoDB中读取数据,并将其存储在DataFrame中。

2.2 MySQL

代码语言:javascript复制
#!/usr/bin/python3
# coding=utf-8

from pyspark.sql import SparkSession

if __name__ == '__main__':

    spark = SparkSession 
            .builder 
            .appName("PySparkMySQLConnectorIntro") 
            .config("spark.jars", "/path/to/mysql-connector-java-x.x.xx.jar") 
            .getOrCreate()

    # 读取MySQL中的数据
    df = spark.read 
            .format("jdbc") 
            .option("url", "jdbc:mysql://hostname:port/dbname") 
            .option("dbtable", "tablename") 
            .option("user", "username") 
            .option("password", "password") 
            .load()

    # 打印数据
    df.show()
    spark.stop()

MySQL与MongoDB类似,故不赘述。

3. 注意事项(踩坑必看)

在使用此脚本时,需要注意以下几点:

  • 在配置Spark参数时,确保添加了spark.jars.packages设置,指定MongoDB Spark Connector的版本。注意,最后的2.11是Scala版本,通常不需要更改;2.4.4是Spark版本,需要根据实际使用的Spark版本进行修改。
  • 如果在连接MongoDB时遇到“Exception authenticating MongoCredential...”错误,这可能是由于权限问题导致的。在这种情况下,需要修改URI,添加authSource=admin参数。具体示例请参见2.1代码中的第12行。

(MongoDB常用的查询语句可以参考):

MongoDB常用28条查询语句(转)_Lucky小黄人的博客-CSDN博客

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

0 人点赞