上文:spring整合中间件(kafka、RabbitMQ、ActiveMQ、ZeroMQ、TubeMQ、NSQ)-kafka
rabbit消息运转过程,由Produceer发送消息,然后到Broker中,消费者Consumer订阅并接收消息,然后进去消费。如下图
本图来源于:rabbitmq实战指南
java基rabbitmq发送消息
项目结构
代码语言:javascript复制│ java_rabbitmq.iml
│ pom.xml
│
├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─hong
│ │ │ └─rabbitmq
│ │ │ Consumer.java
│ │ │ Producer.java
│ │ │
│ │ └─resources
│ └─test
│ └─java
以上面的图为主,单点发送,经过一个队列,单点消费。
spring_mq/java_rabbitmq/pom.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring_mq</artifactId>
<groupId>com.hong</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>java_rabbitmq</artifactId>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.11.0</version>
</dependency>
</dependencies>
</project>
com.hong.rabbitmq.Producer
代码语言:javascript复制package com.hong.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
/**
* @author: csh
* @Date: 2021/4/7 14:22
* @Description:生产者
*/
public class Producer {
private final static String QUEUE_NAME = "rabbitmq-hong";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
//mq地址
factory.setHost("localhost");
//获取连接
try (Connection connection = factory.newConnection();
//创建通道
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//消息
String message = "Hello hong!";
//发布消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" message "'");
}catch (Exception e){
e.printStackTrace();
}
}
}
com.hong.rabbitmq.Consumer
代码语言:javascript复制package com.hong.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* @author: csh
* @Date: 2021/4/7 14:26
* @Description:消费者
*/
public class Consumer {
private final static String QUEUE_NAME = "rabbitmq-hong";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" message "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
生产者结果
代码语言:javascript复制[x] Sent 'Hello hong!'
消费者结果
代码语言:javascript复制SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[*] Waiting for messages. To exit press CTRL C
[x] Received 'Hello hong!'
以上是单个消息,接下来,一个生产者多个消费者。
改造如下:
com.hong.rabbitmq.Producer
代码语言:javascript复制package com.hong.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
/**
* @author: csh
* @Date: 2021/4/7 14:22
* @Description:生产者
*/
public class Producer {
private final static String QUEUE_NAME = "rabbitmq-hong";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
//mq地址
factory.setHost("localhost");
//获取连接
try (Connection connection = factory.newConnection();
//创建通道
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
int i = 0;
while (true){
i ;
//消息
String message = "Hello hong!" i;
//发布消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" message "'");
Thread.sleep(3000);
}
}catch (Exception e){
e.printStackTrace();
}
}
}
com.hong.rabbitmq.Consumer2
代码语言:javascript复制package com.hong.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* @author: csh
* @Date: 2021/4/7 14:26
* @Description:消费者
*/
public class Consumer2 {
private final static String QUEUE_NAME = "rabbitmq-hong";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消费者2 收到消息 '" message "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
com.hong.rabbitmq.Consumer
代码语言:javascript复制package com.hong.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* @author: csh
* @Date: 2021/4/7 14:26
* @Description:消费者
*/
public class Consumer {
private final static String QUEUE_NAME = "rabbitmq-hong";
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消费者1 收到消息" message "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
结果:
消费者1
消费者2
可以看到,消费者轮询消费消息。
更多请参考官方demo:
https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/java
https://www.rabbitmq.com/api-guide.html
spring 整合 rabbitmq
实现用户通过调用http接口,生产端发送mq消息给消费端进行Mybatis添加数据到库中。
相关版本信息:
spring 4.x
rabbitmq 1.7.0.RELEASE
jdk 1.8.x
spring_rabbitmq_producer 生产者
项目结构
代码语言:javascript复制│ pom.xml
│ spring_rabbitmq_producer.iml
│
├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─hong
│ │ │ └─spring
│ │ │ │ UserController.java
│ │ │ │
│ │ │ ├─ao
│ │ │ │ UserSaveAO.java
│ │ │ │
│ │ │ └─common
│ │ │ Constants.java
│ │ │ MyAMQPConfig.java
│ │ │
│ │ └─resources
│ │ application.properties
│ │ applicationContext.xml
│ │ log4j2.xml
│ │ logging.properties
│ │ rabbitmq.properties
│ │ rabbitmq.xml
│ │
│ └─test
│ └─java
│ TestRabbitMq.java
│
└─web
└─WEB-INF
web.xml
com.hong.spring.ao.UserSaveAO
代码语言:javascript复制package com.hong.spring.ao;
import lombok.Data;
import java.io.Serializable;
/**
* @author: csh
* @Date: 2021/3/16 11:21
* @Description:用户入参
*/
@Data
public class UserSaveAO implements Serializable {
private Integer id;
private String username;
private Integer age;
}
com.hong.spring.common.Constants
代码语言:javascript复制package com.hong.spring.common;
/**
* @author: csh
* @Date: 2021/4/8 11:29
* @Description:
*/
public class Constants {
public static final String MQ_BASE_USER_EXCHANGE="myExchange";
public static final String MQ_BASE_USER_MESSAGE_QUEUE="spring_user_queue";
public static final String MQ_BASE_USER_KEY="spring_user_key";
}
com.hong.spring.common.MyAMQPConfig
代码语言:javascript复制package com.hong.spring.common;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyAMQPConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
com.hong.spring.UserController
代码语言:javascript复制package com.hong.spring;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hong.spring.ao.UserSaveAO;
import com.hong.spring.common.Constants;
import com.hong.spring.entity.User;
import com.hong.spring.utils.DataResponse;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @Auther: csh
* @Date: 2020/8/18 16:11
* @Description:
*/
@RestController
@RequestMapping("/user/")
@Log4j2
public class UserController {
@Resource
private RabbitTemplate rabbitTemplate;
@Autowired
private ObjectMapper objectMapper;
@RequestMapping("save")
public DataResponse<Boolean> save(UserSaveAO ao){
log.info("添加用户入参{}",JSONObject.toJSONString(ao));
if(null==ao){
return DataResponse.BuildFailResponse("参数不能为空!");
}
try {
User user =new User();
BeanUtils.copyProperties(ao,user);
String msgJson = objectMapper.writeValueAsString(user);
Message message = MessageBuilder
.withBody(msgJson.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.build();
rabbitTemplate.convertAndSend(Constants.MQ_BASE_USER_KEY,message);
return DataResponse.BuildFailResponse("添加用户成功!");
}catch (Exception e){
log.error("添加出错{}",e);
return DataResponse.BuildFailResponse("添加出错请重试!");
}
}
}
application.properties
代码语言:javascript复制logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR
applicationContext.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">
<!-- 配置组件扫描 -->
<context:component-scan base-package="com.hong.spring"></context:component-scan>
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 开启注解 -->
<context:annotation-config />
<mvc:default-servlet-handler />
<bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
id="internalResourceViewResolver">
<!-- 前缀 -->
<property name="prefix" value="/WEB-INF/pages/" />
<!-- 后缀 -->
<property name="suffix" value=".html" />
<property name="contentType" value="text/html"/>
</bean>
<!--开启mvc注解事务-->
<!-- 定义注解驱动 -->
<mvc:annotation-driven>
<mvc:message-converters>
<!-- 设置支持中文 -->
<bean class="org.springframework.http.converter.StringHttpMessageConverter">
<property name="supportedMediaTypes">
<list>
<value>text/plain;charset=UTF-8</value>
<value>text/html;charset=UTF-8</value>
</list>
</property>
</bean>
<bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
</mvc:message-converters>
</mvc:annotation-driven>
<bean id="objectMapper" class="com.fasterxml.jackson.databind.ObjectMapper" />
</beans>
log4j2.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<configuration status="INFO">
<appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
<RollingFile name="RollingFile" fileName="logs/app.log"
filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
<PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
<SizeBasedTriggeringPolicy size="5 MB"/>
</RollingFile>
</appenders>
<loggers>
<root level="DEBUG">
<appender-ref ref="Console"/>
<appender-ref ref="RollingFile"/>
</root>
</loggers>
</configuration>
logging.properties
代码语言:javascript复制org.apache.catalina.core.ContainerBase.[Catalina].level=INFO
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler
handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler
############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################
org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.
java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
rabbitmq.properties
代码语言:javascript复制spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtualhost=springrabbitmq
spring.rabbitmq.addresses=amqp://admin:secret@localhost
rabbitmq.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--启用注解监听消息-->
<rabbit:annotation-driven/>
<!--配置连接-->
<rabbit:connection-factory
id="connectionFactory"
host="${spring.rabbitmq.host}"
port="${spring.rabbitmq.port}"
username="${spring.rabbitmq.username}"
password="${spring.rabbitmq.password}"
publisher-confirms="true"
virtual-host="/" />
<!--配置RabbitAdmin-->
<rabbit:admin connection-factory="connectionFactory" />
<!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机
默认交换机类型为direct,名字为:"",路由键为队列的名称-->
<!--
id:bean的名称
name:queue的名称
auto-declare:自动创建
auto-delete:自动删除。最后一个消费者和该队列断开连接后,自动删除队列
exclusive:是否独占
durable:是否持久化
-->
<!--配置队列名-->
<rabbit:queue name="spring_user_queue" auto-declare="true" durable="true" auto-delete="false" exclusive="false" />
<!-- 定义交换机绑定队列(路由模式) -->
<rabbit:direct-exchange name="myExchange" id="myExchange" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="spring_user_queue" key="spring_user_key" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 配置线程池 -->
<!--<bean id ="taskExecutor" class ="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >-->
<!--<!– 线程池维护线程的最少数量 –>-->
<!--<property name ="corePoolSize" value ="5" />-->
<!--<!– 线程池维护线程所允许的空闲时间 –>-->
<!--<property name ="keepAliveSeconds" value ="30000" />-->
<!--<!– 线程池维护线程的最大数量 –>-->
<!--<property name ="maxPoolSize" value ="1000" />-->
<!--<!– 线程池所使用的缓冲队列 –>-->
<!--<property name ="queueCapacity" value ="200" />-->
<!--</bean>-->
<!--<!– 消息对象json转换类 –>-->
<!--<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />-->
<!-- 定义模版 -->
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory" exchange="myExchange"/>
</beans>
spring_mq/spring_rabbitmq_producer/pom.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring_mq</artifactId>
<groupId>com.hong</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spring_rabbitmq_producer</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.0.RELEASE</version>
<exclusions>
<exclusion>
<artifactId>spring-web</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<artifactId>spring_mq_common_api</artifactId>
<version>1.0-SNAPSHOT</version>
<groupId>com.hong</groupId>
</dependency>
</dependencies>
<!--静态资源导出问题-->
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
</project>
WEB-INF/web.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
version="3.1">
<servlet>
<servlet-name>spring_rabbitmq_producer</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:applicationContext.xml,
classpath:rabbitmq.xml
</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<filter>
<filter-name>encodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
<init-param>
<param-name>forceEncoding</param-name>
<param-value>true</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>encodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<servlet-mapping>
<servlet-name>spring_rabbitmq_producer</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
</web-app>
tomcat配置如下:
发送测试数据:
代码语言:javascript复制username:spring_rabbitmq_hong
age:1
spring_rabbitmq_consumer 消费者
项目结构
代码语言:javascript复制│ pom.xml
│ spring_rabbitmq_consumer.iml
│
├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─hong
│ │ │ └─spring
│ │ │ ├─common
│ │ │ │ MyAMQPConfig.java
│ │ │ │
│ │ │ ├─dao
│ │ │ │ UserMapper.java
│ │ │ │
│ │ │ ├─listener
│ │ │ │ UserListener.java
│ │ │ │
│ │ │ ├─mapper
│ │ │ │ UserMapper.xml
│ │ │ │
│ │ │ └─provider
│ │ │ UserServiceImpl.java
│ │ │
│ │ └─resources
│ │ application.properties
│ │ applicationContext.xml
│ │ jdbc.properties
│ │ log4j2.xml
│ │ logging.properties
│ │ mybatis.xml
│ │ rabbitmq.properties
│ │ rabbitmq.xml
│ │
│ └─test
│ └─java
└─web
└─WEB-INF
web.xml
com.hong.spring.dao.UserMapper
代码语言:javascript复制package com.hong.spring.dao;
import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* @Auther: csh
* @Date: 2020/8/18 15:04
* @Description:用户dao层
*/
public interface UserMapper {
/**
*
* 功能描述:查询总条数
*
* @param:
* @return:
* @auther: csh
* @date: 2020/8/18 15:31
*/
List<User> findAllUserList();
/**
*
* 功能描述:获取总数
*
* @param:
* @return:
* @auther: csh
* @date: 2020/8/18 15:30
*/
int findAllTotal();
/**
*
* 功能描述:更新
*
* @param:
* @return:
* @auther: csh
* @date: 2020/8/18 15:30
*/
int update(User user);
/**
*
* 功能描述:添加
*
* @param:
* @return:
* @auther: csh
* @date: 2020/8/19 18:39
*/
int save(User user);
/**
*
* 功能描述:批量添加
*
* @param:
* @return:
* @auther: csh
* @date: 2020/8/21 15:46
*/
int insertBatch(@Param("list") List <User> list);
/**
*
* 功能描述:通过id查询
*
* @param:
* @return:
* @auther: csh
* @date: 2020/8/19 18:39
*/
User findById(int id);
/**
*
* 功能描述:通过分页查询
*
* @param:
* @return:
* @auther: csh
* @date: 2020/8/21 16:05
*/
List<User> findByPage(UserAO ao);
}
com.hong.spring.listener.UserListener
代码语言:javascript复制package com.hong.spring.listener;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hong.spring.api.IUserService;
import com.hong.spring.entity.User;
import com.hong.spring.utils.DataResponse;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.utils.SerializationUtils;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @author: csh
* @Date: 2021/3/16 11:14
* @Description:用户监听
*/
@Log4j2
public class UserListener extends MessageListenerAdapter {
@Autowired
private IUserService userService;
public void onMessage(Message message, Channel channel) throws Exception {
try {
log.info("添加用户参数{}",JSONObject.toJSONString(message));
String messageBody = new String(message.getBody());
ObjectMapper mapper=new ObjectMapper();
User user = mapper.readValue(messageBody.getBytes("UTF-8"),User.class);
log.info("获取的用户信息{}", JSONObject.toJSONString(user));
DataResponse<Boolean> save = userService.save(user);
if(save==null || save.getData()==null || !save.getData()){
log.info("添加失败,原因{}",JSONObject.toJSONString(save));
}
}catch (Exception e){
log.error("添加用户异常{}",e);
}
}
}
com.hong.spring.provider.UserServiceImpl
代码语言:javascript复制package com.hong.spring.provider;
import com.hong.spring.api.IUserService;
import com.hong.spring.dao.UserMapper;
import com.hong.spring.entity.User;
import com.hong.spring.entity.ao.UserAO;
import com.hong.spring.utils.DataResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
/**
* @Auther: csh
* @Date: 2020/8/18 15:16
* @Description:用户实现
*/
@Service("userService")
public class UserServiceImpl implements IUserService {
@Autowired
private UserMapper userDao;
@Override
public DataResponse<List<User>> findByAll() {
List <User> allUserList = userDao.findAllUserList();
int allTotal = userDao.findAllTotal();
return DataResponse.BuildSuccessResponse(allUserList,allTotal);
}
@Override
@Transactional
public DataResponse <Boolean> save(User user) {
if(null==user){
return DataResponse.BuildFailResponse("必传参数不能为空!");
}
int save = userDao.save(user);
return DataResponse.BuildSuccessResponse(save>0?true:false);
}
@Override
public DataResponse <Boolean> insertBatch(List <User> list) {
if(null==list){
return DataResponse.BuildFailResponse("参数不能为空!");
}
int batchSave = userDao.insertBatch(list);
return DataResponse.BuildSuccessResponse(batchSave>0?true:false);
}
@Override
@Transactional
public DataResponse <Boolean> update(User user) {
if(null==user || user.getId()==null){
return DataResponse.BuildFailResponse("必传参数不能为空!");
}
int update = userDao.update(user);
return DataResponse.BuildSuccessResponse(update>0?true:false);
}
@Override
public DataResponse <User> findById(int i) {
User byId = userDao.findById(i);
return DataResponse.BuildSuccessResponse(byId);
}
@Override
public DataResponse <List <User>> findByPage(UserAO ao) {
if(ao==null){
ao.setPage(0);
ao.setPageSize(10);
}else{
ao.setPage(ao.getPageSize() * ao.getPage());
}
int allTotal = userDao.findAllTotal();
List <User> byPage = userDao.findByPage(ao);
return DataResponse.BuildSuccessResponse(byPage,allTotal);
}
}
com.hong.spring.common.MyAMQPConfig
代码语言:javascript复制package com.hong.spring.common;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyAMQPConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
com/hong/spring/mapper/UserMapper.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.hong.spring.dao.UserMapper">
<resultMap type="com.hong.spring.entity.User" id="user">
<id column="id" property="id" />
<result column="user_name" property="username" />
<result column="age" property="age" />
</resultMap>
<select id="findById" resultType="com.hong.spring.entity.User">
SELECT * FROM user WHERE id = #{id,jdbcType=INTEGER}
</select>
<select id="findByPage" resultMap="user" parameterType="com.hong.spring.entity.ao.UserAO">
select * from user where 1=1 limit #{page},#{pageSize}
</select>
<select id="findAllUserList" resultMap="user">
SELECT * FROM user
</select>
<select id="findAllTotal" resultType="int">
SELECT count(*) FROM user
</select>
<insert id="save" >
INSERT INTO user ( user_name, age)
VALUES (#{username,jdbcType=VARCHAR},
#{age,jdbcType=INTEGER})
</insert>
<insert id="insertBatch">
insert into user
( user_name, age)
values
<foreach collection="list" item="user" index="index"
separator=",">
(#{user.username,jdbcType=VARCHAR},#{user.age,jdbcType=INTEGER})
</foreach>
</insert>
<update id="update" >
update user
<set>
<if test="username !=null">
user_name=#{username,jdbcType=VARCHAR},
</if>
<if test="age !=null">
age =#{age,jdbcType=INTEGER}
</if>
</set>
where id = #{id,jdbcType=INTEGER}
</update>
</mapper>
logging.properties
代码语言:javascript复制org.apache.catalina.core.ContainerBase.[Catalina].level=INFO
org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler
org.apache.jasper.servlet.TldScanner.level = FINE
handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler
############################################################
# Handler specific properties.
# Describes specific configuration info for Handlers.
############################################################
org.apache.juli.FileHandler.level = FINE
org.apache.juli.FileHandler.directory = ../logs
org.apache.juli.FileHandler.prefix = error-debug.
java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
application.properties
代码语言:javascript复制logging.level.root=WARN
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate=ERROR
mybatis.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<!-- settings -->
<settings>
<!-- 打开延迟加载的开关 -->
<setting name="lazyLoadingEnabled" value="true"/>
<!-- 将积极加载改为消极加载(即按需加载) -->
<setting name="aggressiveLazyLoading" value="false"/>
<!-- 打开全局缓存开关(二级缓存)默认值就是 true -->
<setting name="cacheEnabled" value="true"/>
<!-- 开启驼峰命名转换 Table(create_time) -> Entity(createtime) -->
<setting name="mapUnderscoreToCamelCase" value="true"/>
<!-- 使用列别名代替列名 默认:true seslect name as title from table -->
<setting name="useColumnLabel" value="true"/>
<!--使用jdbc的getGeneratedKeys获取数据库自增主键值-->
<setting name="useGeneratedKeys" value="true"/>
</settings>
<!-- 别名定义 -->
<typeAliases>
<package name="com.hong.spring.entity"/>
</typeAliases>
</configuration>
rabbitmq.properties
代码语言:javascript复制spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtualhost=springrabbitmq
spring.rabbitmq.addresses=amqp://admin:secret@localhost
log4j2.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<configuration status="INFO">
<appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
<RollingFile name="RollingFile" fileName="logs/app.log"
filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
<PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
<SizeBasedTriggeringPolicy size="5 MB"/>
</RollingFile>
</appenders>
<loggers>
<root level="DEBUG">
<appender-ref ref="Console"/>
<appender-ref ref="RollingFile"/>
</root>
</loggers>
</configuration>
spring_mq/spring_rabbitmq_consumer/pom.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring_mq</artifactId>
<groupId>com.hong</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spring_rabbitmq_consumer</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.0.RELEASE</version>
<exclusions>
<exclusion>
<artifactId>spring-web</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<artifactId>spring_mq_common_api</artifactId>
<version>1.0-SNAPSHOT</version>
<groupId>com.hong</groupId>
</dependency>
</dependencies>
<!--静态资源导出问题-->
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
</project>
jdbc.properties
代码语言:javascript复制config.properties:
#数据库驱动
jdbc.driver=com.mysql.jdbc.Driver
#数据库连接url
jdbc.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8
#数据库用户名
jdbc.user=root
#数据库密码
jdbc.password=123456
applicationContext.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">
<!-- 配置组件扫描 -->
<context:component-scan base-package="com.hong.spring"></context:component-scan>
<!--加载配置文件-->
<context:property-placeholder location="classpath:jdbc.properties,classpath:rabbitmq.properties"/>
<!-- 开启注解 -->
<context:annotation-config />
<!--开启注解事务-->
<tx:annotation-driven transaction-manager="transactionManager" />
<!--放行静态资源-->
<mvc:default-servlet-handler />
<bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
id="internalResourceViewResolver">
<!-- 前缀 -->
<property name="prefix" value="/WEB-INF/pages/" />
<!-- 后缀 -->
<property name="suffix" value=".html" />
<property name="contentType" value="text/html"/>
</bean>
<!--开启mvc注解事务-->
<!-- 定义注解驱动 -->
<mvc:annotation-driven>
<mvc:message-converters>
<!-- 设置支持中文 -->
<bean class="org.springframework.http.converter.StringHttpMessageConverter">
<property name="supportedMediaTypes">
<list>
<value>text/plain;charset=UTF-8</value>
<value>text/html;charset=UTF-8</value>
</list>
</property>
</bean>
<bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
</mvc:message-converters>
</mvc:annotation-driven>
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource">
<!-- 基础配置 -->
<property name="url" value="${jdbc.url}"></property>
<property name="driverClassName" value="${jdbc.driver}"></property>
<property name="username" value="${jdbc.user}"></property>
<property name="password" value="${jdbc.password}"></property>
<!-- 关键配置 -->
<!-- 初始化时建立物理连接的个数。初始化发生在显示调用init方法,或者第一次getConnection时 -->
<property name="initialSize" value="3" />
<!-- 最小连接池数量 -->
<property name="minIdle" value="2" />
<!-- 最大连接池数量 -->
<property name="maxActive" value="15" />
<!-- 配置获取连接等待超时的时间 -->
<property name="maxWait" value="10000" />
<!-- 性能配置 -->
<!-- 打开PSCache,并且指定每个连接上PSCache的大小 -->
<property name="poolPreparedStatements" value="true" />
<property name="maxPoolPreparedStatementPerConnectionSize" value="20" />
<!-- 其他配置 -->
<!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
<property name="minEvictableIdleTimeMillis" value="300000" />
<!-- 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,
执行validationQuery检测连接是否有效。-->
<property name="testWhileIdle" value="true" />
<!-- 这里建议配置为TRUE,防止取到的连接不可用 ,申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。-->
<property name="testOnBorrow" value="true" />
<!-- 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能 -->
<property name="testOnReturn" value="false" />
</bean>
<!--事务管理器-->
<!-- sqlSessionFactory -->
<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<!-- 加载 MyBatis 的配置文件 -->
<property name="configLocation" value="classpath:mybatis.xml"/>
<!-- 数据源 -->
<property name="dataSource" ref="dataSource"/>
<!-- 所有配置的mapper文件 -->
<property name="mapperLocations" value="classpath*:com/hong/spring/mapper/*.xml" />
</bean>
<!-- Mapper 扫描器 -->
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<!-- 扫描 包下的组件 -->
<property name="basePackage" value="com.hong.spring.dao" />
<!-- 关联mapper扫描器 与 sqlsession管理器 -->
<property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
</bean>
<!--事务配置-->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>
</beans>
rabbitmq.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--配置连接-->
<rabbit:connection-factory
id="connectionFactory"
host="${spring.rabbitmq.host}"
port="${spring.rabbitmq.port}"
username="${spring.rabbitmq.username}"
password="${spring.rabbitmq.password}"
publisher-confirms="true"
virtual-host="/" />
<!--配置RabbitAdmin-->
<rabbit:admin connection-factory="connectionFactory" />
<!--定义持久化队列,不存在则自动创建;不绑定到交换机则绑定到默认交换机
默认交换机类型为direct,名字为:"",路由键为队列的名称-->
<!--
id:bean的名称
name:queue的名称
auto-declare:自动创建
auto-delete:自动删除。最后一个消费者和该队列断开连接后,自动删除队列
exclusive:是否独占
durable:是否持久化
-->
<!--配置队列名-->
<rabbit:queue name="spring_user_queue" auto-declare="true" durable="true" />
<!-- 定义消费者 -->
<bean name="queuehandler" class="com.hong.spring.listener.UserListener" />
<!-- 定义消费者监听队列 -->
<rabbit:listener-container
connection-factory="connectionFactory">
<rabbit:listener ref="queuehandler" queues="spring_user_queue" />
</rabbit:listener-container>
</beans>
WEB-INF/web.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
version="3.1">
<servlet>
<servlet-name>spring_rabbitmq_consumer</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:applicationContext.xml,
classpath:rabbitmq.xml
</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<filter>
<filter-name>encodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
<init-param>
<param-name>forceEncoding</param-name>
<param-value>true</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>encodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<servlet-mapping>
<servlet-name>spring_rabbitmq_consumer</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
</web-app>
tomcat配置如下:
运行后,先发送数据,消费者消费,结果如下:
结果
可以发现springmvc整合rabbitmq还是非常简单快捷,上手也比较快。
springboot 整合 rabbitmq
springboot_rabbitmq_api api入口 端口:8286 dubbo:20880
代码语言:javascript复制│ pom.xml
│
└─src
└─main
├─java
│ └─com
│ └─hong
│ └─springboot
│ │ Application.java
│ │
│ └─controller
│ IndexController.java
│ UserController.java
│
└─resources
application.properties
com.hong.springboot.controller.IndexController
代码语言:javascript复制package com.hong.springboot.controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author: csh
* @Date: 2021/1/12 10:16
* @Description:首页
*/
@RestController
public class IndexController {
@RequestMapping("/")
public String index(){
return "成功!";
}
}
com.hong.springboot.controller.UserController
代码语言:javascript复制package com.hong.springboot.controller;
import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.config.annotation.Reference;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* @Auther: csh
* @Date: 2020/8/18 16:11
* @Description:用户
*/
@RestController
@Slf4j
@RequestMapping("/user")
public class UserController {
@Reference
private IUserService userService;
@GetMapping("/findByAll")
public DataResponse<List<User>> findByAll(){
try {
return userService.findByAll();
} catch (Exception e){
log.error("查询出错{}",e);
}
return DataResponse.BuildFailResponse("查询出错!");
}
@PostMapping("/save")
public DataResponse<Boolean> save(User ao){
if(null==ao || ao.getAge()==null || StringUtils.isBlank(ao.getUsername())){
return DataResponse.BuildFailResponse("参数不能为空!");
}
DataResponse <Boolean> save = userService.save(ao);
return save;
}
}
com.hong.springboot.Application
代码语言:javascript复制package com.hong.springboot;
import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author: csh
* @Date: 2020/11/21 11:37
* @Description:springboot dubbo消费端
*/
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@EnableDubbo
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
application.properties
代码语言:javascript复制#dubbo configuration
#服务名称
dubbo.application.name=springboot_dubbo_consumer
#注册中心协议
dubbo.registry.protocol=zookeeper
#注册地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
#扫描注解包通过该设置将服务注册到zookeeper
dubbo.protocol.scan=com.hong.springboot.api
#注册端口
dubbo.protocol.port=20880
#协议名称
dubbo.protocol.name=dubbo
#扫包
dubbo.scan.basePackages=com.hong.springboot.controller
#避免端口冲突
server.port=8286
springboot_rabbitmq_producer dubbo:20881
代码语言:javascript复制│ pom.xml
│
└─src
└─main
├─java
│ └─com
│ └─hong
│ └─springboot
│ │ Application.java
│ │
│ ├─config
│ │ Constants.java
│ │ DruidConfig.java
│ │ RabbitConfiguration.java
│ │ TopicAll.java
│ │
│ ├─dao
│ │ UserMapper.java
│ │
│ └─provider
│ UserServiceImpl.java
│
└─resources
application.properties
com.hong.springboot.dao.UserMapper
代码语言:javascript复制package com.hong.springboot.dao;
import com.hong.springboot.entity.User;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Select;
import java.util.List;
/**
* @Auther: csh
* @Date: 2020/8/18 15:04
* @Description:用户dao层
*/
public interface UserMapper {
@Select("select id,user_name,age from user")
List<User> findAllUser();
@Insert("insert into user (user_name,age) values(#{username},#{age})")
int insert(User user);
}
com.hong.springboot.config.DruidConfig
代码语言:javascript复制package com.hong.springboot.config;
import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
/**
* @author: csh
* @Date: 2021/1/8 18:08
* @Description:数据源配置
*/
@Configuration
public class DruidConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource dataSource(){
return new DruidDataSource();
}
}
com.hong.springboot.config.RabbitConfiguration
代码语言:javascript复制package com.hong.springboot.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfiguration {
@Value("${spring.rabbitmq.host}")
private String host;
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory(host);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public RabbitAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue bindQueue() {
return new Queue("springboot_user_queue",true);
}
/**
*
* 功能描述: 交换机
*
* @param:
* @return:
* @auther: csh
* @date: 2021/4/9 11:40
*/
@Bean
DirectExchange myExchange() {
return new DirectExchange("myExchange", true, false);
}
/**
*
* 功能描述: 消息转换器
*
* @param:
* @return:
* @auther: csh
* @date: 2021/4/9 11:52
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
@Bean
public ObjectMapper objectMapper(){
return new ObjectMapper();
}
}
com.hong.springboot.provider.UserServiceImpl
代码语言:javascript复制package com.hong.springboot.provider;
import com.alibaba.druid.util.StringUtils;
import com.alibaba.dubbo.config.annotation.Service;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.config.Constants;
import com.hong.springboot.dao.UserMapper;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
/**
* @Auther: csh
* @Date: 2020/8/18 15:16
* @Description:用户实现
*/
@Service(interfaceClass = IUserService.class,timeout = 6000)
@Slf4j
public class UserServiceImpl implements IUserService {
@Autowired
private UserMapper userDao;
@Autowired
private AmqpTemplate rabbitMQTemplate;
@Autowired
private ObjectMapper objectMapper;
@Override
public DataResponse<List<User>> findByAll() {
List <User> allUserList = userDao.findAllUser();
return DataResponse.BuildSuccessResponse(allUserList,allUserList.size());
}
@Override
public DataResponse <Boolean> save(User userAO) {
log.info("需要rabbitmq添加的用户信息{}",JSONObject.toJSONString(userAO));
try {
String msgJson = objectMapper.writeValueAsString(userAO);
Message message = MessageBuilder
.withBody(msgJson.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.build();
rabbitMQTemplate.convertAndSend(Constants.MQ_BASE_USER_MESSAGE_QUEUE,message);
}catch (Exception e){
e.printStackTrace();
return DataResponse.BuildSuccessResponse(false);
}
return DataResponse.BuildSuccessResponse(true);
}
@Transactional
@Override
public DataResponse <Boolean> reallySave(User user) {
log.info("要添加的用户信息{}",JSONObject.toJSONString(user));
if(null==user || user.getAge()==null || StringUtils.isEmpty(user.getUsername())){
return DataResponse.BuildFailResponse("参数不能为空!");
}
int insert = userDao.insert(user);
return insert>0?DataResponse.BuildSuccessResponse(true):DataResponse.BuildFailResponse("失败",false);
}
}
com.hong.springboot.config.Constants
代码语言:javascript复制package com.hong.springboot.config;
/**
* @author: csh
* @Date: 2021/4/8 11:29
* @Description:
*/
public class Constants {
public static final String MQ_BASE_USER_EXCHANGE="myExchange";
public static final String MQ_BASE_USER_MESSAGE_QUEUE="spring_user_queue";
public static final String MQ_BASE_USER_KEY="springboot_user_key";
}
com.hong.springboot.config.TopicAll
代码语言:javascript复制package com.hong.springboot.config;
/**
* @author: csh
* @Date: 2021/3/16 14:15
* @Description:存放所有的topic
*/
public class TopicAll {
//用户topic
public static final String USER_TOPIC ="springboot_user_topic";
}
com.hong.springboot.Application
代码语言:javascript复制package com.hong.springboot;
import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author: csh
* @Date: 2020/11/21 11:37
* @Description:
*/
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@MapperScan("com.hong.springboot.dao")
@EnableRabbit
@EnableDubbo
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
application.properties
代码语言:javascript复制rocketmq.name-server=localhost:9876
rocketmq.producer.group=hong_group
rocketmq.producer.sendMessageTimeout=300000
#dubbo configuration
#服务名称
dubbo.application.name=springboot_rabbitmq_producer
dubbo.registry.protocol=zookeeper
#注册地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
#扫描注解包通过该设置将服务注册到zookeeper
dubbo.protocol.scan=com.hong.springboot.api
#注册端口
dubbo.protocol.port=20881
#协议名称
dubbo.protocol.name=dubbo
#避免端口冲突
server.port=8287
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=123456
#mybatis配置
mybatis.typeAliasesPackage=com.hong.springboot.entity
#rabbitmq配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtualhost=/
springboot_rabbitmq_consumer 消费者 端口:8288 dubbo:20882
代码语言:javascript复制D:.
│ pom.xml
│
└─src
└─main
├─java
│ └─com
│ └─hong
│ └─springboot
│ │ Application.java
│ │
│ ├─config
│ │ Constants.java
│ │ RabbitMqConfig.java
│ │
│ └─listener
│ UserListener.java
│
└─resources
application.properties
com.hong.springboot.Application
代码语言:javascript复制package com.hong.springboot;
import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
/**
* @author: csh
* @Date: 2020/11/21 11:37
* @Description:
*/
@SpringBootApplication(scanBasePackages = "com.hong.springboot")
@EnableDubbo
@EnableRabbit
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
com.hong.springboot.config.RabbitMqConfig
代码语言:javascript复制package com.hong.springboot.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: csh
* @Date: 2021/4/12 17:55
* @Description:
*/
@Configuration
public class RabbitMqConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory(host);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
//SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
@Bean
public ObjectMapper objectMapper(){
return new ObjectMapper();
}
@Bean
MessageConverter createMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
com.hong.springboot.listener.UserListener
代码语言:javascript复制package com.hong.springboot.listener;
import com.alibaba.dubbo.config.annotation.Reference;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hong.springboot.api.IUserService;
import com.hong.springboot.config.Constants;
import com.hong.springboot.entity.User;
import com.hong.springboot.utils.DataResponse;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
/**
* @author: csh
* @Date: 2021/3/16 11:14
* @Description:用户监听
*/
@Service
@Log4j2
public class UserListener {
@Reference
private IUserService userService;
@Autowired
private ObjectMapper objectMapper;
@RabbitHandler
@RabbitListener(queues = Constants.MQ_BASE_USER_MESSAGE_QUEUE,containerFactory ="rabbitListenerContainerFactory")
public void onMessage(Message msg) {
log.info("需要添加的值{}",JSONObject.toJSONString(msg));
if(null==msg || msg.getBody()==null){
return;
}
try {
String messageBody = new String(msg.getBody());
User user = objectMapper.readValue(messageBody.getBytes("UTF-8"),User.class);
DataResponse<Boolean> save = userService.reallySave(user);
if(save==null || !save.getData()){
log.info("添加失败,原因{}",JSONObject.toJSONString(save));
}
}catch (Exception e){
log.error("添加出错",e);
}
}
}
com.hong.springboot.config.Constants
代码语言:javascript复制package com.hong.springboot.config;
/**
* @author: csh
* @Date: 2021/4/8 11:29
* @Description:
*/
public class Constants {
public static final String MQ_BASE_USER_EXCHANGE="myExchange";
public static final String MQ_BASE_USER_MESSAGE_QUEUE="spring_user_queue";
public static final String MQ_BASE_USER_KEY="springboot_user_key";
}
application.properties
代码语言:javascript复制rocketmq.name-server=localhost:9876
rocketmq.producer.group=hong_group
rocketmq.producer.sendMessageTimeout=300000
#避免端口冲突
server.port=8288
#dubbo configuration
#服务名称
dubbo.application.name=springboot_rocketmq_consumer
dubbo.registry.protocol=zookeeper
#注册地址
dubbo.registry.address=zookeeper://127.0.0.1:2181
#扫描注解包通过该设置将服务注册到zookeeper
dubbo.protocol.scan=com.hong.springboot.api
#注册端口
dubbo.protocol.port=20882
#协议名称
dubbo.protocol.name=dubbo
#扫包
dubbo.scan.basePackages=com.hong.springboot.listener
#rabbitmq配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtualhost=/
springboot_all/springboot_rabbitmq_consumer/pom.xml
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.hong.springboot</groupId>
<artifactId>springboot_all</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath/>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.hong.springboot</groupId>
<artifactId>springboot_rabbitmq_consumer</artifactId>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.3.6</version>
</dependency>
<dependency>
<groupId>com.hong.springboot</groupId>
<artifactId>springboot_mq_api</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>0.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.4-beta</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<!--<build>-->
<!--<plugins>-->
<!--<plugin>-->
<!--<groupId>org.springframework.boot</groupId>-->
<!--<artifactId>spring-boot-maven-plugin</artifactId>-->
<!--<configuration>-->
<!--<skip>true</skip>-->
<!--</configuration>-->
<!--</plugin>-->
<!--</plugins>-->
<!--</build>-->
<!--静态资源导出问题-->
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
</build>
</project>
然后启动服务 api 生产 消费 请求如下:
代码语言:javascript复制username:springboot_rabbitmq_hong
age:1
结果:
最后
rabbitmq可以说是继 rocketmq 、kafka后比较热门的mq中间件之一,功能优秀的一个mq,区别于其他mq这个mq有一个路由器、队列、topicId三种交换模式,并且支持所有其他mq的功能,在功能上是一大亮点。本文只是简单的完成相关的整合,待日后再继续深入。
参考资料:
https://spring.io/blog/2010/06/14/understanding-amqp-the-protocol-used-by-rabbitmq/
https://docs.spring.io/spring-amqp/reference/html/#receiving-messages
http://www.ityouknow.com/springboot/2016/11/30/spring-boot-rabbitMQ.html
参考书籍:《RabbitMQ实战指南》、《RabbitMQ实战-高效部署分布式消息队列》
系列文章:
spring整合各种RPC框架(netty、dubbo、dubbox、RPC、Motan)
spring整合各种RPC框架(netty、dubbo、dubbox、RPC、Motan)-续netty
spring整合各种RPC框架(netty、dubbo、dubbox、gRPC、Motan)-续(gRPC)
spring整合各种RPC框架(netty、dubbo、dubbox、gRPC、Motan)-续(Motan)
spring整合中间件(RocketMQ、kafka、RabbitMQ、ActiveMQ、ZeroMQ、TubeMQ、NSQ)
spring整合中间件(kafka、RabbitMQ、ActiveMQ、ZeroMQ、TubeMQ、NSQ)-kafka