最近要使用alibaba的rocket mq(我們公司對其進行了封裝,使其運作在dotNet平台上,Java還是和原生的差不多,涉及公司的内容本文不會提及),其中 在生産者組這一塊,建議是用單例模式的。但是其中又建議一個組(group)使用一個執行個體,這樣僅僅單例模式就不行了,是以要進行改動,我們的目标就是“一個group使用一個單例”。
其實簡單點,多封裝幾個不同的單例類就行了,一個組用一個類。但是這顯然不是一個好主意,于是我們來考慮用另一種方式。
首先要将 group 這個概念抽出來,它是變量,接下來封裝不變的代碼。
我們先看看代碼是什麼樣的:
/**
* TurboMQ 消息生産者管理器
*/
public class MqProducer {
private DefaultMQProducer currentMQProducer;
private static Map<String, MqProducer> producerMap = new ConcurrentHashMap<>(3);
private static final Object lock = new Object();
private MqProducer(String group) throws MQClientException {
if (!Validator.isNotNullAndVisible(group)) {
throw new NullPointerException("Group名稱不能為空!");
}
currentMQProducer = new DefaultMQProducer(group);
currentMQProducer.setNamesrvAddr(“1.1.1.1”);
currentMQProducer.start();
}
public static MqProducer instance(String group) throws MQClientException {
if (!Validator.isNotNullAndVisible(group)) {
throw new NullPointerException("Group名稱不能為空!");
}
if (producerMap.get(group) == null) {
synchronized (lock) {
if (producerMap.get(group) == null) {
producerMap.put(group, new MqProducer(group));
}
}
}
return producerMap.get(group);
}
public SendResult send(String topic, String tag, String body) throws UnsupportedEncodingException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
if (!Validator.isNotNullAndVisible(topic, tag, body)) {
throw new NullPointerException("請檢查參數是否為空,topic,tag,body");
}
Message message = new Message(topic, tag, body.getBytes("UTF-8"));
return currentMQProducer.send(message);
}
public static void shutdownAll() {
producerMap.forEach((key, value) -> {
value.shutdown();
});
}
public void shutdown() {
currentMQProducer.shutdown();
}
}
我們的解決思路,就是使用 Map 讓 group 和執行個體一一對應起來。
這些代碼中你可能需要注意的點是:
1 線程安全的 ConcurrentHashMap 以及要設定初始容量
private static Map<String, MqProducer> producerMap = new ConcurrentHashMap<>(3);
2 instance方法中的兩層 if 判斷
在 synchronized(lock)鎖住之前可能有多個線程了解到目前組是null,都去請求鎖,當第一個線程new了新生産者之後,下一個程序進來就不會再new一個新的生産者了。
public static MqProducer instance(String group) throws MQClientException {
if (producerMap.get(group) == null) {
synchronized (lock) {
if (producerMap.get(group) == null) {
producerMap.put(group, new MqProducer(group));
}
}
}
return producerMap.get(group);
}
題外話:
為什麼要抛異常?
因為此處是通用代碼,通用代碼不應處理業務邏輯,而且不該隐蔽錯誤的發生,要讓業務邏輯去確定參數沒問題。