数据输入源
Spark Streaming
中的数据来源主要是
- 系统文件源
- 套接字流
RDD
对列流- 高级数据源
Kafka
文件流
- 交互式环境下执行
# 创建文件存放的目录
cd /usr/loca/spark/mycode
mkdir streaming
cd streaming
mkdir logfile
cd logfile # 对这个子目录进行数据监控
代码语言:javascript复制from pyspark import SparkContext
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 10) # 每10秒监听;交互式环境下自带sc实例对象
lines = ssc.textFileStream(".../logfile") # 创建文件流,监控目录的全称地址
words = lines.flatMap(lambda line:line.split(' ')) # 通过flatMap操作将数据进行lambda操作,再进行拍平
wordCounts = words.map(lambda x:(x,1)).reduceByKey(lambda a,b: a b)
wordCounts.pprint() # 在交互式环境下查看
ssc.start() # 启动流计算
ssc.awaitTermination() # 等待流计算结束
套接字流
创建客户端和服务端
tcp编程包含客户端和服务端,通信过程:
- 服务端先进行端口的绑定,再进入监听和阻塞状态,等待来自客户端的连接
- 客户端发送请求,连接到指定的端口号,服务端收到请求,完成通信过程
SparkStreaming扮演的是客户端的角色,不断的发送数据。
代码语言:javascript复制# 创建文件存放的目录
cd /usr/loca/spark/mycode
mkdir streaming
cd streaming
mkdir socket
cd socket
代码语言:javascript复制# vim NetworkWordCount.py:扮演的是客户端角色
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3: # 第一个参数默认是self
print("Usage: NetworkWordCount.py<hostname><port>", file=sys.stderr)
exit(-1) # 参数长度不够,自动退出
sc = SparkContext(appName="pythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 1) # 流计算的指挥官
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) # 定义套接字类型的输入源
counts = lines.flatMap(lambda line:line.split(" ").map(lambda word:(word,1)).reduceByKey(lambda a,b: a b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
# 服务端的角色
# 在linux中:nc -lk 9999
cd /usr/local/spark/mycode/streaming/socket
/usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
代码语言:javascript复制# 使用socket编程实现自定义数据源
# DataSourceSocket.py
import socket
server = socket.socket() # 生成对象
server.bind("localhose", 9999) # 设置监听的机器和端口号
server.listen(1)
while 1:
conn,addr = server.accept() # 使用两个值进行接受
print("connect success! connection is from %s" �dr[0])
print("sending data....")
conn.send("I love hadoop I love spark hadoop is good spark is fast".encode()) # 打印正在传输的数据
conn.close()
print("connection is broken.")
如何启动
代码语言:javascript复制cd /usr/local/spark/mycode/streaming/socket
/usr/local/spark/bin/spark-submit DataSourceSocket.py
# 启动客户端
cd /usr/local/spark/mycode/streaming/socket
/usr/local/spark/bin/spark-submit NetworkWordCount.py localhost 9999
RDD队列流
代码语言:javascript复制cd /usr/local/spark/mycode/streaming/rddqueue
/usr/local/spark/bin/spark-submit RDDQueueStream.py
代码语言:javascript复制# RDDQueueStream.py
import time
from pyspark import SparkContext
from pyspark.Streaming import StreamingContext
if __name__ == "__main__":
sc = sparkContext(appName="pythonStreamingQueueStream")
ssc = StreamingContext(sc, 2) # 数据流指挥官的生成
rddQueue = []
for i in range(5):
rddQueue = [ssc.sparkContext.parallelize[j for j in range(1,1001)], 10] #. 创建RDD队列流
time.sleep(1)
inputStream = ssc.queueStream(rddQueue)
mappedStream = inputStream.map(lambda x:(x , 1))
reduceStream = mappedStream.reduceByKey(lambda a,b: a b)
reduceStream.pprint()
ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True)
Kafka(Apache)
功能
不同类型的分布式系统(关系数据库、NoSQL数据库、流处理系统等)可以统一接入到Kafka,实现和Hadoop各个组件之间的不同类型数据的实现高效交换
信息传递的枢纽,主要功能是:
- 高吞吐量的分布式发布订阅消息系统
- 同时满足在线实时处理和批量离线处理
组件
- Broker:一个或者多个服务器
- Topic:每条消息发布到
Kafka
集群的消息都有一个类别,这个类别就是Topic
。- 不同的topic消息分开存储
- 用户不必关心数据存放位置,只需要指定消息的topic即可产生或者消费数据
- partition:每个topic分布在一个或者多个分区上
- Producer:生产者,负责发布消息
- Consumer:向Broker读取消息额客户端
- Consumer Group:所属组
Kafka的运行是依赖于Zookeeper
启动Kafka
spark 配置
先下载jar
包:
# 将下载解压后的jar包拷贝到spark的jars目录下
cd /usr/local/spark/jars
mkdir kafka
cd ~
cp ./spark-streaming-kafka-0.8_2.11-2.4.0.jar /usr/local/spark/jars/kafka
# 将Kafka安装目录下的libs目录下的所有文件复制到spark的jars目录下
cd /usr/local/kafka/libs
cp ./* /usr/local/spark/jars/kafka # 进入libs目录后,将当权目录下的所有文件进行拷贝
修改spark配置文件
代码语言:javascript复制cd /usr/local/spark/conf
vim spark-env.sh
kafka数据源
代码语言:javascript复制# kafkaWordCount.py
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
if len(sys.argv) != 3: # 第一个参数默认是self
print("Usage: kafkaWordCount.py<hostname><port>", file=sys.stderr)
exit(-1) # 参数长度不够,自动退出
sc = SparkContext(appName="pythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1) # 流计算的指挥官
zkQuorum,topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic:1}) # 建立数据源
lines = kvs.map(lambda x:x[1])
counts = lines.flatMap(lambda line:line.split(" ").map(lambda word:(word,1)).reduceByKey(lambda a,b: a b) # 第二个 map 函数的作用是形成键值对,因为 reduceByKeyd 的参数必须是键值对
counts.pprint()
ssc.start()
ssc.awaitTermination()
执行过程
代码语言:javascript复制cd /usr/local/spark/mycode/streaming/kafka
/usr/local/spark/bin/spark-submit ./kafkaWordCount.py locaohost:2181 wordsendertest # 2181是ZK服务器的地址,wordsendertest是topic的名称