flink rocketmq消费和写入数据

2021-11-24 15:28:25 浏览数 (1)

首先我们先来看一下rocketmq的环境搭建,下面我们采用docker来进行

首先我们创建如下目录

代码语言:javascript复制
mkdir -p rmp/conf
mkdir -p rmp/logs
mkdir -p rmp/store

下面我们来看一下docker-compose.yaml文件内容

代码语言:javascript复制
version: '2'
services:
  namesrv:
    image: rocketmqinc/rocketmq
    container_name: rmqnamesrv
    restart: always    
    ports:
      - 9876:9876
    volumes:
      - ./logs:/home/rocketmq/logs
      - ./store:/home/rocketmq/store
    command: sh mqnamesrv
  broker:
    image: rocketmqinc/rocketmq
    container_name: rmqbroker
    restart: always    
    ports:
      - 10909:10909
      - 10911:10911
      - 10912:10912
    volumes:
      - ./logs:/home/rocketmq/logs
      - ./store:/home/rocketmq/store
      - ./conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf
    #command: sh mqbroker -n namesrv:9876
    command: sh mqbroker -n namesrv:9876 -c ../conf/broker.conf
    depends_on:
      - namesrv
    environment:
      - JAVA_HOME=/usr/lib/jvm/jre
  console:
    image: styletang/rocketmq-console-ng
    container_name: rocketmq-console-ng
    restart: always    
    ports:
      - 8076:8080
    depends_on:
      - namesrv
    environment:
      - JAVA_OPTS= -Dlogging.level.root=info   -Drocketmq.namesrv.addr=rmqnamesrv:9876 
      - Dcom.rocketmq.sendMessageWithVIPChannel=false

通过yaml文件可知总共有三个组件:分别为nameserver, broker, web console

我们再来看一下rmq/conf下的broker.conf文件

代码语言:javascript复制
brokerName = broker-a  
brokerId = 0  
deleteWhen = 04  
fileReservedTime = 48  
brokerRole = ASYNC_MASTER  
flushDiskType = ASYNC_FLUSH  
#这里需要设置成宿主机的IP
brokerIP1=xxxxx

一切准备就绪后我们启动docker: docker-compose up -d

访问consolehttp://localhost:8076/#/message

下面我们分别跑一段原生的producer和consumer程序,最后在用flink进行consumer并sink入mq

producer

代码语言:javascript复制
package org.apache.rocketmq.flink.legacy.example;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleProducer.class);

    private static final int MESSAGE_NUM = 10000;

    private static final String PRODUCER_GROUP = "GID_SIMPLE_PRODUCER";
    private static final String TOPIC = "SOURCE_TOPIC";
    private static final String TAGS = "*";
    private static final String KEY_PREFIX = "KEY";

    private static RPCHook getAclRPCHook() {
        final String accessKey = "${AccessKey}";
        final String secretKey = "${SecretKey}";
        return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
    }

    public static void main(String[] args) {
        DefaultMQProducer producer =
                new DefaultMQProducer(PRODUCER_GROUP, true, null);
        producer.setNamesrvAddr("127.0.0.1:9876");

        // When using aliyun products, you need to set up channels
        //producer.setAccessChannel(AccessChannel.CLOUD);

        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

        for (int i = 0; i < MESSAGE_NUM; i  ) {
            String content = "Test Message "   i;
            Message msg = new Message(TOPIC, TAGS, KEY_PREFIX   i, content.getBytes());
            try {
                SendResult sendResult = producer.send(msg);
                assert sendResult != null;
                System.out.printf(
                        "send result: %s %sn",
                        sendResult.getMsgId(), sendResult.getMessageQueue().toString());
                Thread.sleep(50);
            } catch (Exception e) {
                LOGGER.info("send message failed. {}", e.toString());
            }
        }
    }
}

consumer

代码语言:javascript复制
package org.apache.rocketmq.flink.legacy.example;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumer.class);

    private static final String GROUP = "GID_SIMPLE_CONSUMER";
    private static final String TOPIC = "SOURCE_TOPIC";
    private static final String TAGS = "*";

    private static RPCHook getAclRPCHook() {
        final String accessKey = "${AccessKey}";
        final String secretKey = "${SecretKey}";
        return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
    }

    public static void main(String[] args) {
        DefaultMQPushConsumer consumer =
                new DefaultMQPushConsumer(
                        GROUP);
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // When using aliyun products, you need to set up channels
        //consumer.setAccessChannel(AccessChannel.CLOUD);

        try {
            consumer.subscribe(TOPIC, TAGS);
        } catch (MQClientException e) {
            e.printStackTrace();
        }

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(
                (MessageListenerConcurrently)
                        (msgs, context) -> {
                            for (MessageExt msg : msgs) {
                                System.out.printf(
                                        "%s %s %d %sn",
                                        msg.getMsgId(),
                                        msg.getBrokerName(),
                                        msg.getQueueId(),
                                        new String(msg.getBody()));
                            }
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        });

        try {
            consumer.start();
        } catch (MQClientException e) {
            LOGGER.info("send message failed. {}", e.toString());
        }
    }
}

flink从mq消费并写入mq

从github下来flink-rocketmq-connector

git clone https://github.com/apache/rocketmq-flink.git

进行编译和安装之后在pom.xml中引入

代码语言:javascript复制
<dependency> 
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-flink</artifactId>
    <version>1.0.0-SNAPSHOT</version>
</dependency>
代码语言:javascript复制
package org.apache.rocketmq.flink.legacy.example;

import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.flink.legacy.RocketMQConfig;
import org.apache.rocketmq.flink.legacy.RocketMQSink;
import org.apache.rocketmq.flink.legacy.RocketMQSourceFunction;
import org.apache.rocketmq.flink.legacy.common.serialization.SimpleTupleDeserializationSchema;
import org.apache.rocketmq.flink.legacy.function.SinkMapFunction;
import org.apache.rocketmq.flink.legacy.function.SourceMapFunction;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_TAG;

public class RocketMQFlinkExample {

    /**
     * Source Config
     *
     * @return properties
     */
    private static Properties getConsumerProps() {
        Properties consumerProps = new Properties();
        consumerProps.setProperty(
                RocketMQConfig.NAME_SERVER_ADDR,
                "127.0.0.1:9876");
        consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "flink_consumer_test");
        consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "SOURCE_TOPIC");
        consumerProps.setProperty(RocketMQConfig.CONSUMER_TAG, DEFAULT_CONSUMER_TAG);
        consumerProps.setProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
        return consumerProps;
    }

    /**
     * Sink Config
     *
     * @return properties
     */
    private static Properties getProducerProps() {
        Properties producerProps = new Properties();
        producerProps.setProperty(
                RocketMQConfig.NAME_SERVER_ADDR,
                "127.0.0.1:9876");
        producerProps.setProperty(RocketMQConfig.PRODUCER_GROUP, "flink_produce_test");
        return producerProps;
    }

    public static void main(String[] args) throws Exception {

        //final ParameterTool params = ParameterTool.fromArgs(args);

        // for local
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        // for cluster
        // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //env.getConfig().setGlobalJobParameters(params);
        env.setStateBackend(new MemoryStateBackend());
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // start a checkpoint every 10s
        env.enableCheckpointing(10000);
        // advanced options:
        // set mode to exactly-once (this is the default)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // checkpoints have to complete within one minute, or are discarded
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // make sure 500 ms of progress happen between checkpoints
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // allow only one checkpoint to be in progress at the same time
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // enable externalized checkpoints which are retained after job cancellation
        env.getCheckpointConfig()
                .enableExternalizedCheckpoints(
                        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        Properties consumerProps = getConsumerProps();
        Properties producerProps = getProducerProps();

        SimpleTupleDeserializationSchema schema = new SimpleTupleDeserializationSchema();

        DataStreamSource<Tuple2<String, String>> source =
                env.addSource(new RocketMQSourceFunction<>(schema, consumerProps))
                        .setParallelism(2);

        source.print();
        source.process(new SourceMapFunction())
                .process(new SinkMapFunction("SINK_TOPIC", "*"))
                .addSink(
                        new RocketMQSink(producerProps)
                                .withBatchFlushOnCheckpoint(true)
                                .withBatchSize(32)
                                .withAsync(true))
                .setParallelism(2);

        env.execute("rocketmq-connect-flink");
    }
}

0 人点赞