首先我们先来看一下rocketmq的环境搭建,下面我们采用docker来进行
首先我们创建如下目录
代码语言:javascript复制mkdir -p rmp/conf
mkdir -p rmp/logs
mkdir -p rmp/store
下面我们来看一下docker-compose.yaml文件内容
代码语言:javascript复制version: '2'
services:
namesrv:
image: rocketmqinc/rocketmq
container_name: rmqnamesrv
restart: always
ports:
- 9876:9876
volumes:
- ./logs:/home/rocketmq/logs
- ./store:/home/rocketmq/store
command: sh mqnamesrv
broker:
image: rocketmqinc/rocketmq
container_name: rmqbroker
restart: always
ports:
- 10909:10909
- 10911:10911
- 10912:10912
volumes:
- ./logs:/home/rocketmq/logs
- ./store:/home/rocketmq/store
- ./conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf
#command: sh mqbroker -n namesrv:9876
command: sh mqbroker -n namesrv:9876 -c ../conf/broker.conf
depends_on:
- namesrv
environment:
- JAVA_HOME=/usr/lib/jvm/jre
console:
image: styletang/rocketmq-console-ng
container_name: rocketmq-console-ng
restart: always
ports:
- 8076:8080
depends_on:
- namesrv
environment:
- JAVA_OPTS= -Dlogging.level.root=info -Drocketmq.namesrv.addr=rmqnamesrv:9876
- Dcom.rocketmq.sendMessageWithVIPChannel=false
通过yaml文件可知总共有三个组件:分别为nameserver, broker, web console
我们再来看一下rmq/conf下的broker.conf文件
代码语言:javascript复制brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
#这里需要设置成宿主机的IP
brokerIP1=xxxxx
一切准备就绪后我们启动docker: docker-compose up -d
访问consolehttp://localhost:8076/#/message
下面我们分别跑一段原生的producer和consumer程序,最后在用flink进行consumer并sink入mq
producer
代码语言:javascript复制package org.apache.rocketmq.flink.legacy.example;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleProducer.class);
private static final int MESSAGE_NUM = 10000;
private static final String PRODUCER_GROUP = "GID_SIMPLE_PRODUCER";
private static final String TOPIC = "SOURCE_TOPIC";
private static final String TAGS = "*";
private static final String KEY_PREFIX = "KEY";
private static RPCHook getAclRPCHook() {
final String accessKey = "${AccessKey}";
final String secretKey = "${SecretKey}";
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
public static void main(String[] args) {
DefaultMQProducer producer =
new DefaultMQProducer(PRODUCER_GROUP, true, null);
producer.setNamesrvAddr("127.0.0.1:9876");
// When using aliyun products, you need to set up channels
//producer.setAccessChannel(AccessChannel.CLOUD);
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
for (int i = 0; i < MESSAGE_NUM; i ) {
String content = "Test Message " i;
Message msg = new Message(TOPIC, TAGS, KEY_PREFIX i, content.getBytes());
try {
SendResult sendResult = producer.send(msg);
assert sendResult != null;
System.out.printf(
"send result: %s %sn",
sendResult.getMsgId(), sendResult.getMessageQueue().toString());
Thread.sleep(50);
} catch (Exception e) {
LOGGER.info("send message failed. {}", e.toString());
}
}
}
}
consumer
代码语言:javascript复制package org.apache.rocketmq.flink.legacy.example;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumer.class);
private static final String GROUP = "GID_SIMPLE_CONSUMER";
private static final String TOPIC = "SOURCE_TOPIC";
private static final String TAGS = "*";
private static RPCHook getAclRPCHook() {
final String accessKey = "${AccessKey}";
final String secretKey = "${SecretKey}";
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
public static void main(String[] args) {
DefaultMQPushConsumer consumer =
new DefaultMQPushConsumer(
GROUP);
consumer.setNamesrvAddr("127.0.0.1:9876");
// When using aliyun products, you need to set up channels
//consumer.setAccessChannel(AccessChannel.CLOUD);
try {
consumer.subscribe(TOPIC, TAGS);
} catch (MQClientException e) {
e.printStackTrace();
}
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(
(MessageListenerConcurrently)
(msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.printf(
"%s %s %d %sn",
msg.getMsgId(),
msg.getBrokerName(),
msg.getQueueId(),
new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
try {
consumer.start();
} catch (MQClientException e) {
LOGGER.info("send message failed. {}", e.toString());
}
}
}
flink从mq消费并写入mq
从github下来flink-rocketmq-connector
git clone https://github.com/apache/rocketmq-flink.git
进行编译和安装之后在pom.xml中引入
代码语言:javascript复制<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-flink</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
代码语言:javascript复制package org.apache.rocketmq.flink.legacy.example;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.flink.legacy.RocketMQConfig;
import org.apache.rocketmq.flink.legacy.RocketMQSink;
import org.apache.rocketmq.flink.legacy.RocketMQSourceFunction;
import org.apache.rocketmq.flink.legacy.common.serialization.SimpleTupleDeserializationSchema;
import org.apache.rocketmq.flink.legacy.function.SinkMapFunction;
import org.apache.rocketmq.flink.legacy.function.SourceMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_TAG;
public class RocketMQFlinkExample {
/**
* Source Config
*
* @return properties
*/
private static Properties getConsumerProps() {
Properties consumerProps = new Properties();
consumerProps.setProperty(
RocketMQConfig.NAME_SERVER_ADDR,
"127.0.0.1:9876");
consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "flink_consumer_test");
consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "SOURCE_TOPIC");
consumerProps.setProperty(RocketMQConfig.CONSUMER_TAG, DEFAULT_CONSUMER_TAG);
consumerProps.setProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
return consumerProps;
}
/**
* Sink Config
*
* @return properties
*/
private static Properties getProducerProps() {
Properties producerProps = new Properties();
producerProps.setProperty(
RocketMQConfig.NAME_SERVER_ADDR,
"127.0.0.1:9876");
producerProps.setProperty(RocketMQConfig.PRODUCER_GROUP, "flink_produce_test");
return producerProps;
}
public static void main(String[] args) throws Exception {
//final ParameterTool params = ParameterTool.fromArgs(args);
// for local
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// for cluster
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.getConfig().setGlobalJobParameters(params);
env.setStateBackend(new MemoryStateBackend());
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// start a checkpoint every 10s
env.enableCheckpointing(10000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig()
.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Properties consumerProps = getConsumerProps();
Properties producerProps = getProducerProps();
SimpleTupleDeserializationSchema schema = new SimpleTupleDeserializationSchema();
DataStreamSource<Tuple2<String, String>> source =
env.addSource(new RocketMQSourceFunction<>(schema, consumerProps))
.setParallelism(2);
source.print();
source.process(new SourceMapFunction())
.process(new SinkMapFunction("SINK_TOPIC", "*"))
.addSink(
new RocketMQSink(producerProps)
.withBatchFlushOnCheckpoint(true)
.withBatchSize(32)
.withAsync(true))
.setParallelism(2);
env.execute("rocketmq-connect-flink");
}
}