laitimes

How does Message Queue ensure that messages are not consumed repeatedly?

author:opendotnet

As an important middleware, message queue (MQ) is widely used in scenarios such as decoupling services, asynchronous processing, and data buffering in distributed systems. However, while message queues bring convenience, they may also introduce the problem of repeated consumption of messages. Repeated message consumption may be caused by network failures, consumer crashes, message processing failures, and other reasons. This article will discuss in detail how to ensure that messages are not repeatedly consumed in a message queue and provide C# sample code.

1. Reasons for repeated message consumption

The root cause of repeated message consumption lies in the reliability guarantee mechanism of message queues, that is, to ensure that messages are consumed at least once. This mechanism ensures that messages are not lost due to network issues or consumer crashes, but it can also result in messages being delivered to consumers multiple times. Here are some common reasons:

  1. Network failure: During the transmission of messages from the queue to the consumer, if the network fails, the message queue may not receive the acknowledgment receipt (ACK) from the consumer, so that the message is considered unconsumed and the message is redelivered.
  2. Consumer crash: The consumer crashes during message processing and cannot send an acknowledgment receipt to the message queue, which redelivers the message because it thinks the message has not been consumed.
  3. Message processing failure: The consumer successfully receives the message and starts processing, but encounters an exception or error during processing, fails to complete the processing logic, and fails to send an acknowledgement receipt, and the message queue also redelivers the message.

2. Strategies to avoid repeated message consumption

In order to avoid repeated consumption of messages, measures need to be taken at multiple levels, such as the producer of the message, the message queue itself, and the consumer. Here are some common strategies:

1. Message deduplication identification

By adding unique identifiers (such as message IDs and sequence numbers) to messages, consumers can record the processed identifiers when processing messages to avoid repeating messages with the same identity. This strategy is simple and easy to implement, but requires maintaining a state store (such as a database, Redis, etc.) on the consumer side to record the identity of processed messages.

2. Idempotency control

Idempotency means that no matter how many operations are performed, the effect on the state of the system is the same as if it were performed once. By designing idempotent message processing logic, you can ensure that even if messages are consumed repeatedly, there are no side effects on the system state. For example, for database operations, you can use unique key constraints or idempotent SQL statements to avoid repeatedly inserting or updating data.

3. Message consumption confirmation mechanism

After the message is successfully processed, the consumer should send an acknowledgment receipt (ACK) to the message queue in a timely manner to inform the message queue that the message has been consumed, and the message queue can delete or mark the consumed message. This is one of the key mechanisms to prevent repeated consumption of messages.

4. Distributed locks

In a distributed system, distributed locks can be used to ensure that the same message will only be processed by one consumer. Distributed locks can be implemented through ZooKeeper and Redis, but you need to pay attention to the performance overhead and deadlock problems when using them.

5. Message expiration settings

Set the expiration date in the message to ensure that the message is consumed within a certain period of time, and the message that exceeds the expiration date will be discarded to prevent expired messages from being consumed repeatedly. This strategy is suitable for scenarios that require high timeliness.

3. C# sample code

Next, we will use the C# sample code to show how to avoid duplicate consumption of messages in the message queue. RabbitMQ is used as an example to demonstrate how to combine message deduplication identification and idempotency control to protect against repeated consumption of messages.

1. Introduce the RabbitMQ client library

First, you need to introduce the RabbitMQ client library into your C# project. It can be installed via the NuGet package manager

RabbitMQ.Client

Pack.

Install-Package RabbitMQ.Client
           

2. Send a message

When sending a message, a unique message ID is generated for each message and sent as one of the properties of the message.

using System;
using RabbitMQ.Client;

public class MessageSender
{
public void Send(string messageBody)
 {
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateChannel())
 {
var exchangeName = "test_exchange";
var routingKey = "test_routing_key";
var messageId = Guid.NewGuid().ToString(); // 生成唯一消息ID

var properties = channel.CreateBasicProperties();
 properties.MessageId = messageId; // 设置消息ID

 channel.BasicPublish(exchangeName, routingKey, properties, System.Text.Encoding.UTF8.GetBytes(messageBody));
 Console.WriteLine($"Sent message with ID: {messageId}");
 }
 }
}
           

3. Consume messages and avoid duplicate consumption

When consuming a message, first obtain the message ID from the message properties, and then query a state store (such as Redis) to check whether the message has been processed. If it has not been processed, it is processed and the processing status is recorded; If it has already been processed, the message is simply ignored.

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using StackExchange.Redis;

public class MessageConsumer
{
private readonly ConnectionMultiplexer _redisMultiplexer;
private readonly IDatabase _redisDb;

public MessageConsumer()
 {
 _redisMultiplexer = ConnectionMultiplexer.Connect("localhost");
 _redisDb = _redisMultiplexer.GetDatabase();
 }

public void Consume()
 {
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateChannel())
 {
var queueName = "test_queue";
var exchangeName = "test_exchange";
var routingKey = "test_routing_key";

 channel.QueueDeclare(queueName, true, false, false, );
 channel.QueueBind(queueName, exchangeName, routingKey, );

var consumer = new EventingBasicConsumer(channel);
 consumer.Received += (model, ea) =>
 {
var messageId = ea.BasicProperties.MessageId;
var messageBody = System.Text.Encoding.UTF8.GetString(ea.Body.ToArray());

if (!IsMessageProcessed(messageId))
 {
 ProcessMessage(messageBody);
 SaveProcessedMessage(messageId);
 channel.BasicAck(ea.DeliveryTag, false); // 确认消息已被消费
 }
else
 {
 Console.WriteLine($"Message with ID {messageId} has already been processed.");
 channel.BasicAck(ea.DeliveryTag, false); // 即使消息已处理,也应确认消费
 }
 };

 channel.BasicConsume(queueName, true, consumer);
 Console.WriteLine("Waiting for messages...");
 }
 }

private bool IsMessageProcessed(string messageId)
 {
return _redisDb.KeyExists($"processed_message:{messageId}");
 }

private void SaveProcessedMessage(string messageId)
 {
 _redisDb.StringSet($"processed_message:{messageId}", "true");
 }

private void ProcessMessage(string messageBody)
 {
// 处理消息的逻辑
 Console.WriteLine($"Processing message: {messageBody}");
 }
}
           

In the example above, we used Redis to store the state of processed messages.

IsMessageProcessed

The method is used to check whether the corresponding message ID exists in Redis, and if it exists, it means that the message has been processed.

SaveProcessedMessage

The method is used to add the message ID to Redis to mark it as processed.

Fourth, summary

Message queues play an important role in distributed systems, but repeated message consumption is a problem that needs to be paid attention to. Multiple policies such as message deduplication identification, idempotency control, message consumption acknowledgment mechanism, distributed lock, and message expiration settings can effectively avoid repeated message consumption. In practice, appropriate policies should be selected based on specific business scenarios and system characteristics, and multiple mechanisms should be combined to ensure reliable message consumption.

This topic uses C# sample code to show how to combine message deduplication identification and idempotency control in RabbitMQ message queue to prevent repeated consumption of messages. It is hoped that these contents will help readers solve the problem of repeated consumption of messages in actual development.