Spark笔记15-Spark数据源及操作

2021-03-02 15:41:16 浏览数 (1)

数据输入源

Spark Streaming中的数据来源主要是

  • 系统文件源
  • 套接字流
  • RDD对列流
  • 高级数据源Kafka
文件流
  • 交互式环境下执行
代码语言:javascript复制
# 创建文件存放的目录
cd /usr/loca/spark/mycode
mkdir streaming
cd streaming
mkdir logfile
cd logfile   # 对这个子目录进行数据监控
代码语言:javascript复制
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 10)   # 每10秒监听;交互式环境下自带sc实例对象
lines = ssc.textFileStream(".../logfile")  # 创建文件流,监控目录的全称地址
words = lines.flatMap(lambda line:line.split(' '))  # 通过flatMap操作将数据进行lambda操作,再进行拍平
wordCounts = words.map(lambda x:(x,1)).reduceByKey(lambda a,b: a b)
wordCounts.pprint()  # 在交互式环境下查看
ssc.start()   # 启动流计算
ssc.awaitTermination()  # 等待流计算结束
套接字流
创建客户端和服务端

tcp编程包含客户端和服务端,通信过程:

  • 服务端先进行端口的绑定,再进入监听和阻塞状态,等待来自客户端的连接
  • 客户端发送请求,连接到指定的端口号,服务端收到请求,完成通信过程

SparkStreaming扮演的是客户端的角色,不断的发送数据。

代码语言:javascript复制
# 创建文件存放的目录
cd /usr/loca/spark/mycode
mkdir streaming
cd streaming
mkdir socket
cd socket
代码语言:javascript复制
# vim NetworkWordCount.py:扮演的是客户端角色

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
  if len(sys.argv) != 3:   # 第一个参数默认是self
    print("Usage: NetworkWordCount.py<hostname><port>", file=sys.stderr)
    exit(-1)   # 参数长度不够,自动退出
  sc = SparkContext(appName="pythonStreamingNetworkWordCount")
  ssc = StreamingContext(sc, 1)   # 流计算的指挥官
  lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))  # 定义套接字类型的输入源
  counts = lines.flatMap(lambda line:line.split(" ").map(lambda word:(word,1)).reduceByKey(lambda a,b: a b)

  counts.pprint()
  ssc.start()
  ssc.awaitTermination()


# 服务端的角色
# 在linux中:nc -lk 9999
cd /usr/local/spark/mycode/streaming/socket
/usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
代码语言:javascript复制
# 使用socket编程实现自定义数据源
# DataSourceSocket.py

import socket
server = socket.socket()  #  生成对象
server.bind("localhose", 9999)   # 设置监听的机器和端口号
server.listen(1)
while 1:
  conn,addr = server.accept()  # 使用两个值进行接受
  print("connect success! connection is from %s" �dr[0])
  print("sending data....")
  conn.send("I love hadoop  I love spark hadoop is good spark is fast".encode())  # 打印正在传输的数据
  conn.close()
  print("connection is broken.")
如何启动
代码语言:javascript复制
cd /usr/local/spark/mycode/streaming/socket
/usr/local/spark/bin/spark-submit DataSourceSocket.py

# 启动客户端
cd /usr/local/spark/mycode/streaming/socket
/usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
RDD队列流
代码语言:javascript复制
cd /usr/local/spark/mycode/streaming/rddqueue
/usr/local/spark/bin/spark-submit RDDQueueStream.py
代码语言:javascript复制
# RDDQueueStream.py

import time
from pyspark import SparkContext
from pyspark.Streaming import StreamingContext

if __name__ == "__main__":
  sc = sparkContext(appName="pythonStreamingQueueStream")
  ssc = StreamingContext(sc, 2)   # 数据流指挥官的生成
  rddQueue = []
  for i in range(5):
    rddQueue  = [ssc.sparkContext.parallelize[j for j in range(1,1001)], 10]   #. 创建RDD队列流
    time.sleep(1)

  inputStream = ssc.queueStream(rddQueue)
  mappedStream = inputStream.map(lambda x:(x , 1))
  reduceStream = mappedStream.reduceByKey(lambda a,b: a   b)
  reduceStream.pprint()
  ssc.start()
  ssc.stop(stopSparkContext=True, stopGraceFully=True)
Kafka(Apache)
功能

不同类型的分布式系统(关系数据库、NoSQL数据库、流处理系统等)可以统一接入到Kafka,实现和Hadoop各个组件之间的不同类型数据的实现高效交换

信息传递的枢纽,主要功能是:

  • 高吞吐量的分布式发布订阅消息系统
  • 同时满足在线实时处理和批量离线处理
组件
  • Broker:一个或者多个服务器
  • Topic:每条消息发布到Kafka集群的消息都有一个类别,这个类别就是Topic
    • 不同的topic消息分开存储
    • 用户不必关心数据存放位置,只需要指定消息的topic即可产生或者消费数据
  • partition:每个topic分布在一个或者多个分区上
  • Producer:生产者,负责发布消息
  • Consumer:向Broker读取消息额客户端
  • Consumer Group:所属组

Kafka的运行是依赖于Zookeeper

启动Kafka
spark 配置

先下载jar包:

代码语言:javascript复制
# 将下载解压后的jar包拷贝到spark的jars目录下
cd /usr/local/spark/jars
mkdir kafka
cd ~
cp ./spark-streaming-kafka-0.8_2.11-2.4.0.jar /usr/local/spark/jars/kafka

# 将Kafka安装目录下的libs目录下的所有文件复制到spark的jars目录下
cd /usr/local/kafka/libs
cp ./* /usr/local/spark/jars/kafka   # 进入libs目录后,将当权目录下的所有文件进行拷贝
修改spark配置文件
代码语言:javascript复制
cd /usr/local/spark/conf
vim spark-env.sh
kafka数据源
代码语言:javascript复制
# kafkaWordCount.py

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
  if len(sys.argv) != 3:   # 第一个参数默认是self
    print("Usage: kafkaWordCount.py<hostname><port>", file=sys.stderr)
    exit(-1)   # 参数长度不够,自动退出
  sc = SparkContext(appName="pythonStreamingKafkaWordCount")
  ssc = StreamingContext(sc, 1)   # 流计算的指挥官

  zkQuorum,topic = sys.argv[1:]
	kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic:1})   # 建立数据源

  lines = kvs.map(lambda x:x[1])
  counts = lines.flatMap(lambda line:line.split(" ").map(lambda word:(word,1)).reduceByKey(lambda a,b: a b)    # 第二个 map 函数的作用是形成键值对,因为 reduceByKeyd 的参数必须是键值对

  counts.pprint()
  ssc.start()
  ssc.awaitTermination()
执行过程
代码语言:javascript复制
cd /usr/local/spark/mycode/streaming/kafka
/usr/local/spark/bin/spark-submit ./kafkaWordCount.py locaohost:2181 wordsendertest   # 2181是ZK服务器的地址,wordsendertest是topic的名称

0 人点赞