实现收到IBMMQ的数据之后存储到数据库(微服务系列第五天)
前言
今天的目标是: 实现收到MQ队列消息,实现收到IBMMQ的数据之后存储到数据库。
注意!!! 这是个学习使用mybatis的示例工程,正式项目则不应该如此做,耦合很严重.正确的做法是把消息发送到topic上面,由存储服务去topic上面取数据再做保存,这样就可以解耦了.
修改mqserver项目
pom.xml
添加依赖
此处用的是阿里的连接池 druid
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.4</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.2.4</version>
</dependency>
JDBC 配置
这个是从官网例子上拿来的配置数据
# 只有下面三个是必填项(使用内嵌数据库的话这三个也可以不用填,会使用默认配置),其他配置不是必须的
spring.datasource.url=jdbc:mysql://172.25.0.2:3306/db1
spring.datasource.username=root
spring.datasource.password=123456
# driver-class-name 非必填可根据url推断
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# Druid 数据源配置,继承spring.datasource.* 配置,相同则覆盖
spring.datasource.initial-size=2
spring.datasource.max-active=30
spring.datasource.min-idle=2
spring.datasource.max-wait=1234
spring.datasource.pool-prepared-statements=true
spring.datasource.max-pool-prepared-statement-per-connection-size=5
# spring.datasource.max-open-prepared-statements= #等价于上面的max-pool-prepared-statement-per-connection-size
spring.datasource.validation-query=select 1
spring.datasource.validation-query-timeout=1
spring.datasource.test-on-borrow=true
spring.datasource.test-on-return=true
spring.datasource.test-while-idle=true
spring.datasource.time-between-eviction-runs-millis=10000
spring.datasource.min-evictable-idle-time-millis=30001
spring.datasource.async-close-connection-enable=true
# spring.datasource.aop-patterns=com.alibaba.spring.boot.demo.service.*
# 自定义StatFilter 配置 其他 Filter 不再演示
spring.datasource.filter.stat.db-type=h2
spring.datasource.filter.stat.log-slow-sql=true
spring.datasource.filter.stat.slow-sql-millis=2000
# JPA
spring.jpa.show-sql= true
spring.jpa.hibernate.ddl-auto=create-drop
# 配置下面参数用于启动监控页面,考虑安全问题,默认是关闭的,按需开启
spring.datasource.stat-view-servlet.enabled=true
spring.datasource.filter.stat.enabled=true
spring.datasource.web-stat-filter.enabled=true
# 更多配置属性见 DruidDataSource 内成员变量(只要有set方法便支持),或者根据IDE提示,或者查看官方文档
Druid类
src/main/java/com/example/dbserver/DruidConfig.java
@Configuration
public class DruidConfig {
/**
* 数据源
*
* @return
*/
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource druid() {
return new DruidDataSource();
}
}
测试代码
@Test
public void testDataSourceExists() throws Exception {
assertThat(dataSource).isNotNull();
}
@Test
public void testDataSourcePropertiesOverridden() throws Exception {
assertThat(dataSource.getUrl()).isEqualTo("jdbc:mysql://172.25.0.2:3306/db1");
assertThat(dataSource.getInitialSize()).isEqualTo(2);
assertThat(dataSource.getMaxActive()).isEqualTo(30);
assertThat(dataSource.getMinIdle()).isEqualTo(2);
assertThat(dataSource.getMaxWait()).isEqualTo(1234);
assertThat(dataSource.isPoolPreparedStatements()).isTrue();
//assertThat(dataSource.getMaxOpenPreparedStatements()).isEqualTo(5); //Duplicated with following
assertThat(dataSource.getMaxPoolPreparedStatementPerConnectionSize()).isEqualTo(5);
assertThat(dataSource.getValidationQuery()).isEqualTo("select 1");
assertThat(dataSource.getValidationQueryTimeout()).isEqualTo(1);
assertThat(dataSource.isTestWhileIdle()).isTrue();
assertThat(dataSource.isTestOnBorrow()).isTrue();
assertThat(dataSource.isTestOnReturn()).isTrue();
assertThat(dataSource.getTimeBetweenEvictionRunsMillis()).isEqualTo(10000);
assertThat(dataSource.getMinEvictableIdleTimeMillis()).isEqualTo(30001);
assertThat(dataSource.isAsyncCloseConnectionEnable()).isEqualTo(true);
}
创建消息实体
这个写法,有点把类当结构体用的感觉 src/main/java/com/example/tarsmqserver/domain/MessageModel.java
正常写法
public class MessageModel {
/**
* 消息ID
*/
private long msgid;
/**
* 消息内容
*/
private String content;
/**
* 消息保存到数据库的时间
*/
private long created;
public void setContent(String content) {
this.content = content;
}
public void setCreated(long created) {
this.created = created;
}
public void setMsgid(long msgid) {
this.msgid = msgid;
}
public String getContent() {
return content;
}
public long getCreated() {
return created;
}
public long getMsgid() {
return msgid;
}
}
@data 注解写法
@Data
public class MessageData {
/**
* 消息ID
*/
private long msgid;
/**
* 消息内容
*/
private String content;
/**
* 消息保存到数据库的时间
*/
private long created;
}
两种方法都能通过测试
创建 mapper
src/main/java/com/example/tarsmqserver/dao/MessageMapper.java
@Repository
public interface MessageMapper {
/**
* 从nsq的topic获取到数据保存到db中存储
* @param msg
*/
@Insert("insert into message (content,created) values (#{content},#{created})")
void save(Message msg);
/**
* 从nsq的topic获取到数据保存到db中存储 测试@data写法数据的提交
* @param msg
*/
@Insert("insert into message (content,created) values (#{content},#{created})")
void saveData(MessageData msg);
}
创建服务
服务接口
src/main/java/com/example/tarsmqserver/service/MessageService.java
public interface MessageService {
/**
* 从nsq的topic获取到数据保存到db中存储
*/
void save(Message msg);
void saveData(MessageData msg);
}
接口实现
src/main/java/com/example/tarsmqserver/service/MessageServiceImpl.java
@Component
public class MessageServiceImpl implements MessageService {
@Autowired
MessageMapper messageMapper;
@Override
public void save(Message msg) {
// TODO Auto-generated method stub
messageMapper.save(msg);
}
@Override
public void saveData(MessageData msg) {
// TODO Auto-generated method stub
messageMapper.saveData(msg);
}
}
修改 main
主要是添加注解 @MapperScan("com.example.dbserver.dao")
意思是让 mybatis 怎么去找到这个文件,然后产生绑定关系
@SpringBootApplication
@EnableTarsServer
@MapperScan("com.example.tarsmqserver.dao")
public class TarsMqServerApplication {
public static void main(String[] args) {
// 关闭 spring boot 自带的web服务 目前场景只用到了rpc服务
SpringApplication app = new SpringApplication(TarsMqServerApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
app.run(args);
}
}
测试用例
@Autowired
MessageServiceImpl imp;
@Test
public void saveMessage() {
Message message = new Message();
message.setContent("abcdefg");
message.setCreated(System.currentTimeMillis());
imp.save(message);
}
修改 mqserver
src/main/java/com/example/tarsmqserver/service/mqserver/ConsumerListener.java
@Autowired
private NsqConfig nsqConfig;
/**
* 使用JmsListener配置消费者监听的队列
*
* @param receivedMsg 接收到的消息
*/
@JmsListener(destination = "#{@conf.recvQueueName}")
public void receiveQueue(String receivedMsg) {
JMS_LISTENER.info("RECEIVE: {}", receivedMsg);
MessageModel messagObject = new MessageModel();
messagObject.setContent(receivedMsg);
messagObject.setCreated(System.currentTimeMillis());
try {
imp.save(messagObject);
} catch (Exception e) {
e.printStackTrace();
JMS_LISTENER.error(e.toString());
//TODO: handle exception
}
}
打包/上传/测试/观察
$ make build-upload
随意发了一个消息,返回如下
2021-02-01 19:38:57.002 INFO 3376 --- [ageObjAdapter-1] MQSERVER : send: DEV.QUEUE.1 -> qqqqqqqqqq
2021-02-01 19:38:57.148 INFO 3376 --- [enerContainer-1] JMS_LISTENER : RECEIVE: qqqqqqqqqq
2021-02-01 19:38:57.465 INFO 3376 --- [ nsq-batch-0] com.sproutsocial.nsq.Connection : connected 172.25.0.213:4150 msgTimeout:60000 heartbeatInterval:30000 maxRdyCount:2500
2021-02-01 19:38:57.467 INFO 3376 --- [ nsq-batch-0] com.sproutsocial.nsq.Publisher : publisher connected:172.25.0.213:4150
服务正常收到消息,查看数据库,也能看到数据正常被添加上,数据库表自行创建即可.
源代码
实现收到IBMMQ的数据之后存储到数据库(微服务系列第五天)
结语
spring boot 2.x 对于快餐使用,还是很友好的,封装了很多东西,很多东西拿来就可以直接用.但对于真正去掌控,估计会很复杂.这个只有等技术深入后,再慢慢探究. 不过Java是真的吃内存,可能是用JVM的原因吧.这个时候我就比较想念GO了,相同的功能,对资源占用少得多.