DStream
无状态转换操作
- map:每个元素采用操作,返回的列表形式
- flatmap:操作之后拍平,变成单个元素
- filter:过滤元素
- repartition:通过改变分区的多少,来改变DStream的并行度
- reduce:对函数的每个进行操作,返回的是一个包含单元素
RDD
的DStream
- count:统计总数
- union:合并两个
DStream
- reduceByKey:通过key分组再通过
func
进行聚合 - join:
K
相同,V
进行合并同时以元组形式表示
有状态转换操作
在有状态转换操作而言,本批次的词频统计,会在之前的词频统计的结果上进行不断的累加,最终得到的结果是所有批次的单词的总的统计结果。
滑动窗口转换操作
- 主要是两个参数
(windowLength, slideInterval)
- 滑动窗口的长度
- 滑动窗口间隔
- 两个重要的函数 第二个函数中增加逆向函数的作用是减小计算量
# 数据源终端
# 连续输入多个Hadoop和spark
cd /usr/local/spark/mycode/streaming/socket/
nc -lk 9999
# 流计算终端
# 动态显示词频统计结果
cd /usr/local/spark/mycode/streaming/socket/
/usr/local/spark/bin/spark-submit WindowedNetworkWordCount.py localhost 9999
upateaStateByKey
代码语言:javascript复制from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3: # 第一个参数默认是self
print("Usage: NetworkWordCountStateful.py<hostname><port>", file=sys.stderr)
exit(-1) # 参数长度不够,自动退出
sc = SparkContext(appName="pythonStreamingStateNetworkWordCount")
ssc = StreamingContext(sc, 1) # 流计算的指挥官
ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stat eful")
initialStateRDD = sc.parallelize([(u'hello', 1), (u'word', 1)])
def updateFunc(new_vlaues, last_sum):
return sum(new_values) (last_sum or 0)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) # 定义套接字类型的输入源
running_counts = lines.flatMap(lambda line:line.split(" ").map(lambda word:(word,1)).updateStateByKey(updateFunc,initialRDD=initialStateRDD)
running_counts.pprint()
ssc.start()
ssc.awaitTermination()
DStream输出操作
输出到文本
代码语言:javascript复制from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3: # 第一个参数默认是self
print("Usage: NetworkWordCountStateful.py<hostname><port>", file=sys.stderr)
exit(-1) # 参数长度不够,自动退出
sc = SparkContext(appName="pythonStreamingStateNetworkWordCount")
ssc = StreamingContext(sc, 1) # 流计算的指挥官
ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stat eful")
initialStateRDD = sc.parallelize([(u'hello', 1), (u'word', 1)])
def updateFunc(new_vlaues, last_sum):
return sum(new_values) (last_sum or 0)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) # 定义套接字类型的输入源
running_counts = lines.flatMap(lambda line:line.split(" ").map(lambda word:(word,1)).updateStateByKey(updateFunc,initialRDD=initialStateRDD)
running_counts.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/stateful/output") # 保存到该路径下
running_counts.pprint()
ssc.start()
ssc.awaitTermination()
DStream写入到mysql
代码语言:javascript复制# 启动mysql
service mysql start
mysql -uroot -p
# 创建表
use spark
create table wordcount(word count(20), count int(4));
# 安装pymysql
sudo apt-get update
sudo apt-get install python3-pip
pip3-v
sudo pip3 install PyMySQL
代码语言:javascript复制from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3: # 第一个参数默认是self
print("Usage: NetworkWordCountStateful.py<hostname><port>", file=sys.stderr)
exit(-1) # 参数长度不够,自动退出
sc = SparkContext(appName="pythonStreamingStateNetworkWordCount")
ssc = StreamingContext(sc, 1) # 流计算的指挥官
ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stat eful")
initialStateRDD = sc.parallelize([(u'hello', 1), (u'word', 1)])
def updateFunc(new_values, last_sum):
return sum(new_values) (last_sum or 0)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) # 定义套接字类型的输入源
running_counts = lines.flatMap(lambda line: line.split(" ").map(lambda word: (word, 1))
.updateStateByKey(updateFunc,initialRDD=initialStateRDD)
running_counts.pprint()
def dbfunc(records):
db = pymysql.connect("localhost", "root", "123456", "spark")
cursor = db.cursor()
def doinsert(p):
sql = "insert into wordcount(word, count) values('%s', '%s')" % (str(p[0]), str(p[1]))
try:
cursor.execute(sql)
db.commit()
except:
db.rollback
for item in records:
doinsert(item)
def func(rdd):
repartitionRDD = rdd.repartition(3)
repartitionRDD.foreachPartition(dbfunc)
running_counts.foreachRDD(func)
ssc.start()
ssc.awaitTermination()