基于腾讯云tdmq消息队列封装SpringBootStarter(一)
一、环境准备
1.1 注册腾讯云TDMQ
创建tdmq集群
创建完成后记录下集群ID(clusterId
);
1.2 创建命名空间
创建好集群后,在命名空间中新建命名空间,命名空间名称可以根据实际业务场景进行区分,比如这里创建可以根据测试环境、预发布环境、生产环境等进行区分创建。
新建命名空间
1.3、创建好命名空间后,新建个`topic`主题。
创建topic
以上信息创建好后,我们在集群中可以看到集群的访问地址,如下:
查看接入地址
在创建tdmq集群时我们需要申请外网访问,这个需要找腾讯的客服开通。
至此,我们开发的基础环境已经准备完成。
二、编写生产者、消费者代码
2.1、创建工程
在idea中新建个工程,工程名称为spring-boot-starter-tdmq
创建工程
工程名称和包路径可以根据实际情况进行自定义。
2.2、引入相关依赖包
代码语言:javascript复制<!-- spring-boot-autoconfigure -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<!--tdmq 核心依赖 2.9.1 -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
这里使用的是腾讯云tdmq-pulsar版,这里需要引入pulsar-client
.
2.3、创建生产者
首先我们在项目中创建一个config
的包路径,新建一个tdmq的配置类TdmqProperties
。
@Data
@ConfigurationProperties(prefix = "tdmq")
public class TdmqProperties {
/**
* 启用
*/
private boolean enable = false;
/**
* 服务地址
*/
private String serviceUrl;
/**
* token
*/
private String token;
/**
* 集群ID
*/
private String clusterId;
/**
* 命名空间ID
*/
private String environmentId;
}
这里主要配置下tdmq集群相关的信息。我们先将配置信息写在配置类中,后续优化我们在使用外部配置进行初始化操作。
创建PulsarClient
对象,生产者消费者都是基于PulsarClient
对象进行创建。
这里我们通过SpringBoot的自动装配功能来装配PulsarClient
。
这里我们创建一个自动化配置类TdmqAutoConfiguration
。内容如下:
@Data
@EnableConfigurationProperties({TdmqProperties.class})
public class TdmqAutoConfiguration {
/**
* Pulsar 客户端
* 推荐一个进程一个实例
*
* @return {@link TdmqAutoConfiguration}
*/
@Bean
@ConditionalOnMissingBean(PulsarClient.class)
@ConditionalOnProperty(name = "tdmq.enable", havingValue = "true")
public PulsarClient pulsarClient(TdmqProperties mqProperties) throws PulsarClientException {
return PulsarClient.builder()
.serviceUrl(mqProperties.getServiceUrl())
.authentication(AuthenticationFactory.token(mqProperties.getToken()))
.build();
}
}
这里增加了两个条件注解@ConditionalOnProperty
和@ConditionalOnMissingBean(PulsarClient.class)
这里分别表示如果没有找到PulsarClient
对象和配置文件中启用了tdmq功能,我们才实例化PulsarClient
对象。
有了PulsarClient
对象,我们可以继续编写生产者代码啦。
在工程中创建producer
包路径,并在该路径下创建TdmqProucer.class
,内容如下:
@Service
public class TdmqProucer {
@Autowired
private PulsarClient pulsarClient;
/**
* 发送消息
*
* @param message 消息内容
* @return 消息ID
* @throws PulsarClientException
*/
public MessageId sendMsg(String message) throws PulsarClientException {
Producer<byte[]> producer = pulsarClient.newProducer().topic("clusterId/environmentId/test").create();
MessageId messageId = producer.newMessage().value(message.getBytes()).send();
producer.close();
return messageId;
}
}
2.4、创建消息消费者
在工程中创建consumer
包路径,并在该路径下创建TdmqConsumer
类,内容如下:
/**
* @Author julyWhj
* @Description 消费者$
* @Date 2022/1/2 10:13 上午
**/
@Slf4j
@Service
public class TdmqConsumer {
@Autowired
private PulsarClient pulsarClient;
private Consumer<byte[]> consumer;
@PostConstruct
public void initConsumer() throws PulsarClientException {
log.info("MessageLoggingListener is start");
consumer = pulsarClient.newConsumer().topic("clusterId/environmentId/自己的topic").subscriptionName("自己的subscriptionName").subscriptionType(SubscriptionType.Exclusive)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Executors.newSingleThreadExecutor().submit(this::consumer);
}
/**
* 销毁容器
*
* @throws PulsarClientException
*/
@PreDestroy
public void destroy() throws PulsarClientException {
consumer.close();
pulsarClient.close();
}
public void consumer() {
while (true) {
Message<byte[]> message = null;
try {
//使用while(true)无问题,该处会发生阻塞释放CPU资源
message = consumer.receive();
String json = new String(message.getData());
if (StringUtils.isNotEmpty(json)) {
log.info("获取消息数据内容:{}", json);
}
consumer.acknowledge(message);
} catch (PulsarClientException e) {
log.error("数据消费失败", e);
}
}
}
}
2.5、测试生产者和消费者
编写单元测试类
代码语言:javascript复制@Slf4j
@SpringBootTest
class SpringBootStarterTdmqApplicationTests {
@Autowired
private TdmqProucer proucer;
@Test
public void producer() throws PulsarClientException {
MessageId messageId = proucer.sendMsg("/clusterId/environmentId/自己的topic", "发送消息测试");
log.info("send msg is success Id = {}", messageId);
}
}
我们看下测试结果:
测试结果
最简单的生产消费已经完成了。接下来我们需要对代码进一步封装,这里我们参考amqp的方式,分别封装proucerTemplate和TdmqConsumer
注解。实现消费者功能达到一下情况:
@TdmqConsumer(topic = TopicConstant.MESSAGE_LOGGING_TOPIC, clazz = CreateMsgBean.class, subscriptionName = "subscriptionName")
void consume(JddMessage<String> msg) {
log.info("------------{}", JSONUtil.toJsonStr(msg));
}
OK,基本使用先到这里,我们开始后续内容的封装优化。