Spark笔记16-DStream基础及操作

2021-03-02 15:40:32 浏览数 (2)

DStream

无状态转换操作
  • map:每个元素采用操作,返回的列表形式
  • flatmap:操作之后拍平,变成单个元素
  • filter:过滤元素
  • repartition:通过改变分区的多少,来改变DStream的并行度
  • reduce:对函数的每个进行操作,返回的是一个包含单元素RDDDStream
  • count:统计总数
  • union:合并两个DStream
  • reduceByKey:通过key分组再通过func进行聚合
  • join:K相同,V进行合并同时以元组形式表示
有状态转换操作

在有状态转换操作而言,本批次的词频统计,会在之前的词频统计的结果上进行不断的累加,最终得到的结果是所有批次的单词的总的统计结果。

滑动窗口转换操作
  1. 主要是两个参数(windowLength, slideInterval)
  • 滑动窗口的长度
  • 滑动窗口间隔
  1. 两个重要的函数 第二个函数中增加逆向函数的作用是减小计算量

代码语言:javascript复制
# 数据源终端
# 连续输入多个Hadoop和spark
cd /usr/local/spark/mycode/streaming/socket/
nc -lk 9999

# 流计算终端
# 动态显示词频统计结果
cd /usr/local/spark/mycode/streaming/socket/
/usr/local/spark/bin/spark-submit WindowedNetworkWordCount.py localhost 9999
upateaStateByKey
代码语言:javascript复制
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
  if len(sys.argv) != 3:   # 第一个参数默认是self
    print("Usage: NetworkWordCountStateful.py<hostname><port>", file=sys.stderr)
    exit(-1)   # 参数长度不够,自动退出
  sc = SparkContext(appName="pythonStreamingStateNetworkWordCount")
  ssc = StreamingContext(sc, 1)   # 流计算的指挥官

 ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stat   eful")
 initialStateRDD = sc.parallelize([(u'hello', 1), (u'word', 1)])
 def updateFunc(new_vlaues, last_sum):
    return sum(new_values)   (last_sum or 0)

  lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))  # 定义套接字类型的输入源
  running_counts = lines.flatMap(lambda line:line.split(" ").map(lambda word:(word,1)).updateStateByKey(updateFunc,initialRDD=initialStateRDD)

  running_counts.pprint()
  ssc.start()
  ssc.awaitTermination()

DStream输出操作

输出到文本
代码语言:javascript复制
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
  if len(sys.argv) != 3:   # 第一个参数默认是self
    print("Usage: NetworkWordCountStateful.py<hostname><port>", file=sys.stderr)
    exit(-1)   # 参数长度不够,自动退出
  sc = SparkContext(appName="pythonStreamingStateNetworkWordCount")
  ssc = StreamingContext(sc, 1)   # 流计算的指挥官

 ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stat   eful")
 initialStateRDD = sc.parallelize([(u'hello', 1), (u'word', 1)])
 def updateFunc(new_vlaues, last_sum):
    return sum(new_values)   (last_sum or 0)

  lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))  # 定义套接字类型的输入源
  running_counts = lines.flatMap(lambda line:line.split(" ").map(lambda word:(word,1)).updateStateByKey(updateFunc,initialRDD=initialStateRDD)

                             running_counts.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/stateful/output")  # 保存到该路径下
  running_counts.pprint()
  ssc.start()
  ssc.awaitTermination()
DStream写入到mysql
代码语言:javascript复制
# 启动mysql
service mysql start
mysql -uroot -p

# 创建表
use spark
create table wordcount(word count(20), count int(4));

# 安装pymysql
sudo apt-get update
sudo apt-get install python3-pip
pip3-v
sudo pip3 install PyMySQL
代码语言:javascript复制
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 3:  # 第一个参数默认是self
        print("Usage: NetworkWordCountStateful.py<hostname><port>", file=sys.stderr)
        exit(-1)  # 参数长度不够,自动退出
    sc = SparkContext(appName="pythonStreamingStateNetworkWordCount")
    ssc = StreamingContext(sc, 1)  # 流计算的指挥官
    ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stat   eful")
    initialStateRDD = sc.parallelize([(u'hello', 1), (u'word', 1)])


    def updateFunc(new_values, last_sum):
        return sum(new_values)   (last_sum or 0)


    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))  # 定义套接字类型的输入源
    running_counts = lines.flatMap(lambda line: line.split(" ").map(lambda word: (word, 1))
                                   .updateStateByKey(updateFunc,initialRDD=initialStateRDD)
    running_counts.pprint()


def dbfunc(records):
    db = pymysql.connect("localhost", "root", "123456", "spark")
    cursor = db.cursor()
    def doinsert(p):
        sql = "insert into wordcount(word, count) values('%s', '%s')" % (str(p[0]), str(p[1]))
        try:
            cursor.execute(sql)
            db.commit()
        except:
            db.rollback

        for item in records:
            doinsert(item)

def func(rdd):
    repartitionRDD = rdd.repartition(3)
    repartitionRDD.foreachPartition(dbfunc)
    running_counts.foreachRDD(func)
    ssc.start()
    ssc.awaitTermination()

0 人点赞