Message Queue/消息队列/分布式消息中间件,
异步通信/解耦/冗余/扩展/过载保护/可恢复性/顺序保证/缓冲/数据流处理
Options: Kafka,ActiveMQ,RabbitMQ, WebSphere MQ*(IBM),RocketMQ(阿里系) ...
Protocal 概念和细节比较多,自动略过文字直奔脚本,或者概念参考-> https://www.cnblogs.com/sea520/p/11125174.html
AMQP/Advanced Message Queuing Protocol/先进//高级消息队列协议,应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。可靠、通用
MQTT/Message Queuing Telemetry Transport/消息队列遥测传输是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。格式简洁、占用带宽小、移动端通信、PUSH、嵌入式系统
STOMP/Streaming Text Orientated Message Protocol/流文本定向消息协议,为MOM/Message Oriented Middleware/面向消息的中间件设计的简单文本协议。STOMP提供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。 命令模式(非topicqueue模式)
XMPP/Extensible Messaging and Presence Protocol/可扩展消息处理现场协议,基于可扩展标记语言(XML)的协议,多用于即时消息(IM)以及在线现场探测。适用于服务器之间的准即时操作。核心是基于XML流传输,这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息,即使其操作系统和浏览器不同。通用公开、兼容性强、可扩展、安全性高,但XML编码格式占用带宽大
redis、kafka、zeroMq等根据自身需要未严格遵循MQ规范,而是基于TCPIP自行封装了一套协议,通过网络socket接口进行传输,实现MQ功能
Details
Broker/消息服务器/server,提供消息核心服务;
Producer/消息生产者/producer,业务的发起方产生消息 -> broker;
Consumer/消息消费者,业务的处理方负责从broker获取消息并进行业务逻辑处理;
Topic/主题,发布订阅模式下消息汇集地,不同生产者向其发送消息,由MQ服务器分发到不同订阅者,实现消息广播/broadcast;
Queue/队列,PTP Point To Point/点对点模式下特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收;
Message/消息体,根据不同通信协议定义的固定格式进行编码的数据包封装业务数据;
Kafka -> http://kafka.apache.org/quickstart
快速持久化:通过磁盘顺序读写与零拷贝机制,可以在O(1)的系统开销下进行消息持久化;
高吞吐:在一台普通的服务器上既可以达到10W/s的吞吐速率;
高堆积:支持topic下消费者较长时间离线,消息堆积量大;
完全的分布式系统:Broker、Producer、Consumer都原生自动支持分布式,依赖zookeeper自动实现复杂均衡;
支持Hadoop数据并行加载:对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,是个可行的解决方案。
代码语言:shell复制# 0. JVM prepared
# 1. download
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.12-2.4.0.tgz
tar -xzf kafka_2.12-2.4.0.tgz && cd kafka_2.12-2.4.0
# 2. start server
bin/zookeeper-server-start.sh config/zookeeper.properties
# INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager)
bin/kafka-server-start.sh config/server.properties
# INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
# 3. create topic
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 4. send a message
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
# send xxx
# 5. start a consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# echo xxx
# 6. Setting up a multi-broker cluster
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
#########################################################################################
# server started # ps -ef | grep kafka
# bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper_log.file 2>&1 &
# bin/kafka-server-start.sh config/server.properties > kafka_log.file 2>&1 &
# bin/kafka-topics.sh --create --bootstrap-server 10.170.15.54:9092 --replication-factor 1 --partitions 1 --topic hello
# bin/kafka-topics.sh --list --bootstrap-server 10.170.15.54:9092
# library installed
# pip install kafka
# pip install kafka-python
from kafka import KafkaProducer
from kafka import KafkaConsumer
KAFAKA_TOPIC = "hello"
KAFAKA_SERVERS = ['10.170.15.54:9092']
def kfk_producer_send():
producer = KafkaProducer(bootstrap_servers=KAFAKA_SERVERS)
producer.send(KAFAKA_TOPIC, bytes("学而不思则罔",encoding="utf-8"))
producer.send(KAFAKA_TOPIC, bytes("思而不学则殆",encoding="utf-8"))
print("XXX SENT TWO MESSAGE XXX")
producer.flush()
producer.close()
print("PRODUCER END")
def kfk_consumer_receive():
consumer = KafkaConsumer(bootstrap_servers=KAFAKA_SERVERS)
consumer.subscribe(KAFAKA_TOPIC)
for msg in consumer: print(msg.value)
consumer.close()
print("CONSUMER END")
if __name__ == "__main__":
kfk_producer_send()
kfk_consumer_receive()