官方:消息中间件是基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统 。
-
- 我个人觉得消息中间件也可以用共享单车模式形容,平台将共享单车放在固定的某些地方,然后大家有需要的自己用对应的app扫描对应的二维码就可以开锁使用。
- 固定地方就像是中间件存放消息的地方Broker,共享单车就是Message消息,平台-Producer生产者,大家-Consumer消费者,而对应的app就像是Topic(如果用美团app扫描,就是使用美团单车,如果用支付宝扫描,就是使用哈罗单车),每个单车的编号其实就是组成Topic的更小单元Queue。
大概的执行逻辑是这样滴:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiInVGcq5SZzQGMlVjMwYWM3MWL4UDZ40SNhFGNtYmZ5QWLwYzYiJmZyYTL4kDO1YTM5IzN5MjNx8CX5YjM3QTMx8CXnVGcq9CXxIDMy8CXw8CXlVXc1l3Lc12bj5yayFGbu5ibkN2Lc9CX6MHc0RHaiojIsJye.jpeg)
为什么要使用消息队列?
主要的应用场景解耦、异步、削峰
经常会有人提及到“消息中间件/消息队列”,那这到底是个啥呢?为什么要使用消息队列?主要的应用场景解耦、异步、削峰
解耦:
-
- 传统模式的系统间耦合性太强,比如系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!
- 中间件模式的优点是将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。
异步:
-
- 传统模式的一些非必要的业务逻辑以同步的方式运行,太耗费时间。
- 中间件模式的优点是将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度
削峰:
-
- 传统模式并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常
- 中间件模式系统可以慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。
上代码瞅瞅,我之前项目上用过的
1、引入依赖:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.8.Final</version>
</dependency>
2、yml配置文件添加生产者属性:
#配置rocketmq
rocketmq:
producer:
producerId: ***** #生产者id(旧版本是生产者id,新版本是groupid),替换成自己的
msgTopic: alarmStatus #生产主题,替换成自己的
accessKey: XXX #连接通道,替换成自己的
secretKey: XXX #连接秘钥,替换成自己的
onsAddr: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet #生产者ons接入域名,替换成自己的
3、 初始化生产者:
package com.iwhalecloud.citybrain.ducha.ism.utils;
import java.util.Properties;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
/**
* rocketmq生产者启动初始化类
* @author lkf
* @Date 2021年8月9日
*
*/
@Component
public class RocketmqProducerInit {
@Value("${rocketmq.producer.producerId}")
private String producerId;
@Value("${rocketmq.producer.accessKey}")
private String accessKey;
@Value("${rocketmq.producer.secretKey}")
private String secretKey;
@Value("${rocketmq.producer.onsAddr}")
private String ONSAddr;
private static Producer producer;
/*
//当无法注入实例的时候可以使用此方法进行实例初始化
private static class ProducerHolder {
private static final RocketmqProducerInit INSTANCE = new RocketmqProducerInit();
}
private RocketmqProducerInit (){
}
public static final RocketmqProducerInit getInstance() {
return ProducerHolder.INSTANCE;
}*/
@PostConstruct
public void init(){
System.out.println("初始化启动生产者!");
// producer 实例配置初始化
Properties properties = new Properties();
//您在控制台创建的Producer ID
properties.setProperty(PropertyKeyConst.GROUP_ID, producerId);
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
//设置发送超时时间,单位毫秒
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 设置 TCP 接入域名(此处以公共云生产环境为例),设置 TCP 接入域名,进入 MQ 控制台的消费者管理页面,在左侧操作栏单击获取接入点获取
properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr);
producer = ONSFactory.createProducer(properties);
// 在发送消息前,初始化调用start方法来启动Producer,只需调用一次即可,当项目关闭时,自动shutdown
producer.start();
}
/**
* 初始化生产者
* @return
*/
public Producer getProducer(){
return producer;
}
}
4、发送消息:
package com.iwhalecloud.citybrain.ducha.ism.utils;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.sixmonth.rocketmq.common.rocketmq.init.RocketmqProducerInit;
/**
* 消息生产者,可与消费者分离
* @author lkf
* @Date 2021年8月9日
*
*/
@Service
public class RocketmqProducerService {
private Logger logger = LoggerFactory.getLogger(RocketmqProducerService.class);
@Value("${rocketmq.producer.msgTopic}")
private String msgTopic;
@Autowired
private RocketmqProducerInit rocketmqProducerInit;
public String tag = "*";//生产标签,可自定义,默认通配
/**
* 异步发送消息
* 可靠异步发送:发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式;
* 特点:速度快;有结果反馈;数据可靠;
* 应用场景:异步发送一般用于链路耗时较长,对 rt响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等;
* @param msg
* @return
*/
public boolean sendMsgAsy(String msg) {
Long startTime = System.currentTimeMillis();
Message message = new Message(msgTopic, tag, msg.getBytes());
rocketmqProducerInit.getProducer().sendAsync(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
///消息发送成功
System.out.println("send message success. topic=" + sendResult.getMessageId());
}
@Override
public void onException(OnExceptionContext context) {
//消息发送失败
System.out.println("send message failed. execption=" + context.getException());
}
});
Long endTime = System.currentTimeMillis();
System.out.println("单次生产耗时:"+(endTime-startTime)/1000);
return true;
}
}
5、消费者初始化:
package com.iwhalecloud.citybrain.ducha.ism.utils;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Properties;
@Component
public class RocketMQ3 {
private static Logger log = LoggerFactory.getLogger(RocketMQ3.class);
/**
* 创建的Consumer 对象为线程安全的,可以在多线程间进行共享,避免每个线程创建一个实例。
*/
private static Consumer consumer;
/**
* 消费警情数据
*
*/
@PostConstruct
public void init() {
log.info("初始化启动消费者");
Properties p = new Properties();
// accessKey 阿里云身份验证,在阿里云服务器管理控制台创建
p.setProperty(PropertyKeyConst.AccessKey, "****");
//您在控制台创建的 CONSUMER_ID
p.setProperty(PropertyKeyConst.ConsumerId, "****");//CID_alarmInfo_jwzt
// secretKey 阿里云身份验证,在阿里云服务器管理控制台创建//rYwdAimwMeov5xEKLkOBHq1V3I89lc
p.setProperty(PropertyKeyConst.SecretKey, "*******");//kbpnLBVkSWUjCr1RPaHLa23MaLcaMQ
// 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
// p.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
p.setProperty(PropertyKeyConst.NAMESRV_ADDR, "15.***.***.242:8080");
//设置发送超时时间,单位毫秒
p.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
consumer = ONSFactory.createConsumer(p);
//订阅多个 Tag alarmStatus
consumer.subscribe("alarmStatus", "*", new RocketMQListener());
consumer.start();
log.info("消费者启动成功");
}
}
6、消费者消费/处理消息:
package com.iwhalecloud.citybrain.ducha.ism.utils;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.iwhalecloud.citybrain.ducha.ism.mapper.PoliceOrgMapper;
import com.iwhalecloud.citybrain.ducha.ism.service.impl.JWZTUtilsImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
@Service
public class RocketMQListener implements MessageListener {
private static Logger log = LoggerFactory.getLogger(RocketMQListener.class);
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
log.info("Receive: {}", message);
String messageBodyStr = new String(message.getBody());
log.info("message消息为: {}", messageBodyStr);
System.out.println("message消息为:" + messageBodyStr);
try {
JSONObject jsonObject = JSONObject.parseObject(messageBodyStr);
//数据处理
//。。。。
//请求接收日志
} catch (Exception e) {
log.error("MQ警情事件数据录字符串格式异常Exception", e);
// 记录字符串格式不正确
return Action.CommitMessage;
}
return Action.CommitMessage;
}
}
简单的使用大概就是这样的,其他的比如
- 消息队列如何选型?
- 如何保证消息队列是高可用的?
- 如何保证消息不被重复消费?
- 如何保证消费的可靠性传输?
- 如何保证消息的顺序性?
- 生产端和消费端的集群负载均衡
想了解更多相关知识的,可以去这儿