前些时候和后台对接,需要用pyspark获取MongoDB、MySQL数据,本文将介绍如何使用PySpark与MongoDB、MySQL进行数据交互。MongoDB是一个基于分布式文件存储的数据库,由C 语言编写。它旨在为Web应用提供可扩展的高性能数据存储解决方案。
1. 准备
- 安装Python 3.x
- 安装PySpark:使用
pip install pyspark
命令安装 - 安装MongoDB:按照MongoDB官方文档进行安装和配置
- 准备MongoDB数据库和集合:创建一个数据库和集合,并插入一些测试数据
- 安装MySQL:按照MySQL官方文档进行安装和配置
- 准备MySQL数据库和表:创建一个数据库和表,并插入一些测试数据
2. 代码
2.1 MongoDB
下面是一个简单的PySpark脚本,用于从MongoDB中读取数据:
代码语言:javascript复制#!/usr/bin/python3
# coding=utf-8
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession
.builder
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://username:password@host1:port,host2:port/dbName.collectionName?authSource=admin")
.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.4")
.enableHiveSupport()
.getOrCreate()
# 读取mongodb中的数据
df = spark.read
.format("com.mongodb.spark.sql.DefaultSource")
.load()
# 打印数据
df.show()
spark.stop()
在这个脚本中需要注意根据实际情况修改URI中的用户名、密码、主机、端口、数据库名和集合名。最后使用spark.read.format().load()
方法从MongoDB中读取数据,并将其存储在DataFrame中。
2.2 MySQL
代码语言:javascript复制#!/usr/bin/python3
# coding=utf-8
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession
.builder
.appName("PySparkMySQLConnectorIntro")
.config("spark.jars", "/path/to/mysql-connector-java-x.x.xx.jar")
.getOrCreate()
# 读取MySQL中的数据
df = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://hostname:port/dbname")
.option("dbtable", "tablename")
.option("user", "username")
.option("password", "password")
.load()
# 打印数据
df.show()
spark.stop()
MySQL与MongoDB类似,故不赘述。
3. 注意事项(踩坑必看)
在使用此脚本时,需要注意以下几点:
- 在配置Spark参数时,确保添加了
spark.jars.packages
设置,指定MongoDB Spark Connector的版本。注意,最后的2.11
是Scala版本,通常不需要更改;2.4.4
是Spark版本,需要根据实际使用的Spark版本进行修改。 - 如果在连接MongoDB时遇到“Exception authenticating MongoCredential...”错误,这可能是由于权限问题导致的。在这种情况下,需要修改URI,添加
authSource=admin
参数。具体示例请参见2.1代码中的第12行。
(MongoDB常用的查询语句可以参考):
MongoDB常用28条查询语句(转)_Lucky小黄人的博客-CSDN博客
我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!