采用 TTL(延遲隊列)+DLX(死信隊列)實作延遲消費
- 1、pom檔案引入jar包
- 2、yml 配置
- 3、聲明交換機、隊列及綁定
- 4、定義常量類
- 5、發送消息
- 6、消費消息
1、pom檔案引入jar包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、yml 配置
spring
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: admin
3、聲明交換機、隊列及綁定
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayConfig {
/**
* 延時交換機 --- 交換機用于重新配置設定隊列(接收死信隊列中的過期消息,将其轉發到需要延遲消息的子產品隊列)
* @return
*/
@Bean
public DirectExchange exchange() {
return new DirectExchange(DelayQueueContent.DELAY_EXCHANGE);
}
/**
* 實際消費隊列
* 用于延時消費的隊列
*/
@Bean
public Queue repeatTradeQueue() {
Queue queue = new Queue(DelayQueueContent.DELAYMSG_RECEIVE_QUEUE_NAME,
true,false,false);
return queue;
}
/**
* 綁定交換機并指定routing key(死信隊列綁定延遲交換機和實際消費隊列綁定延遲交換機的路由鍵一緻)
* @return
*/
@Bean
public Binding repeatTradeBinding() {
return BindingBuilder.bind(repeatTradeQueue())
.to(exchange())
.with(DelayQueueContent.DELAY_KEY);
}
//死信隊列
@Bean
public Queue deadLetterQueue() {
Map<String,Object> args = new HashMap<>();
args.put("x-message-ttl", DelayQueueContent.EXPERI_TIME);
args.put("x-dead-letter-exchange", DelayQueueContent.DELAY_EXCHANGE);
args.put("x-dead-letter-routing-key", DelayQueueContent.DELAY_KEY);
return new Queue(DelayQueueContent.DELAY_QUEUE_NAME, true, false, false, args);
}
}
4、定義常量類
public class DelayQueueContent {
/**
* ttl(延時)交換機名稱
*/
public static final String DELAY_EXCHANGE="message.ttl.exchange";
/**
* ttl(延時)隊列名稱
*/
public static final String DELAY_QUEUE_NAME ="message.ttl.queue";
/**
* dlx(死信)隊列名稱
*/
public static final String DELAYMSG_RECEIVE_QUEUE_NAME="message.dlx.queue";
/**
* 綁定鍵
*/
public static final String DELAY_KEY = "message.dlx.routing";
/**
* TTL 有效時間 3小時
*/
public static final int EXPERI_TIME = 3*60*60*1000; // 3*60*60*1000;
}
5、發送消息
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@RestController
@RequestMapping("/sendMqMessage")
public class SendMqMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 發送延遲消息
* @return
*/
@GetMapping("/send")
public String sendDelayMsg(){
rabbitTemplate.convertAndSend(DelayQueueContent.DELAY_QUEUE_NAME,
"holle,this is message!");
log.info("發送時間:"+ LocalDateTime.now());
return "success";
}
}
6、消費消息
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.Message;
import java.time.LocalDateTime;
import java.util.Map;
/**
* 隊列延時消費
*/
@Slf4j
@Component
public class DelayListener {
/**
* 接收延遲消息
* @param channel
* @param json
* @param message
* @param map
*/
@RabbitHandler
@RabbitListener(queues = DelayQueueContent.DELAYMSG_RECEIVE_QUEUE_NAME)
public void receiveDelayMsg(Channel channel, String json,
Message message, @Headers Map<String,Object> map){
try {
log.info("接收到的消息: {}", json);
log.info("接收時間:{}" ,LocalDateTime.now());
// 在這裡實作具體的邏輯
// todo……
//代碼為在消費者中開啟消息接收确認的手動ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("消息消費成功!",json);
} catch (Exception e) {
log.error("消息消費失敗!",json);
e.printStackTrace();
}
}
}
.