activeMQ的某個應用場景,消費者和伺服器隻需建立一個長連接配接,而生産者的情況集中在伺服器,需要對伺服器端的生産者連接配接進行優化。
首先maven引入jar包依賴
[java] view plain copy
- <dependency>
- <groupId>org.activemq</groupId>
- <artifactId>activemq-all</artifactId>
- <version>5.9.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-pool</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-jms_1.1_spec</artifactId>
- </exclusion>
- </exclusions>
下面是實作代碼
- import org.apache.activemq.ActiveMQConnectionFactory;
- import org.apache.activemq.pool.PooledConnection;
- import org.apache.activemq.pool.PooledConnectionFactory;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import javax.jms.*;
- public class MQProductHelper {
- public static final Logger LOG = LoggerFactory.getLogger(MQProductHelper.class);
- private static PooledConnectionFactory poolFactory;
- /**
- * 擷取單例的PooledConnectionFactory
- * @return
- */
- private static synchronized PooledConnectionFactory getPooledConnectionFactory() {
- LOG.info("getPooledConnectionFactory");
- if (poolFactory != null) return poolFactory;
- LOG.info("getPooledConnectionFactory create new");
- IConfigService configService = ServiceManager.getService(IConfigService.class);
- String userName = configService.getConfig("MQ_USER_NAME", ShopConstant.BC_SHOP_ID);
- String password = configService.getConfig("MQ_USER_PASS", ShopConstant.BC_SHOP_ID);
- String url = configService.getConfig("MQ_BROKER_URL", ShopConstant.BC_SHOP_ID);
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
- poolFactory = new PooledConnectionFactory(factory);
- // 池中借出的對象的最大數目
- poolFactory.setMaxConnections(100);
- poolFactory.setMaximumActiveSessionPerConnection(50);
- //背景對象清理時,休眠時間超過了3000毫秒的對象為過期
- poolFactory.setTimeBetweenExpirationCheckMillis(3000);
- LOG.info("getPooledConnectionFactory create success");
- return poolFactory;
- }
- * 1.對象池管理connection和session,包括建立和關閉等
- * 2.PooledConnectionFactory預設設定MaxIdle為1,
- * 官方解釋Set max idle (not max active) since our connections always idle in the pool. *
- * @return * @throws JMSException
- public static Session createSession() throws JMSException {
- PooledConnectionFactory poolFactory = getPooledConnectionFactory();
- PooledConnection pooledConnection = (PooledConnection) poolFactory.createConnection();
- //false 參數表示 為非事務型消息,後面的參數表示消息的确認類型(見4.消息發出去後的确認模式)
- return pooledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- public static void produce(String subject, String msg) {
- LOG.info("producer send msg: {} ", msg);
- if (StringUtil.isEmpty(msg)) {
- LOG.warn("發送消息不能為空。");
- return;
- }
- try {
- Session session = createSession();
- LOG.info("create session");
- TextMessage textMessage = session.createTextMessage(msg);
- Destination destination = session.createQueue(subject);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- producer.send(textMessage);
- LOG.info("create session success");
- } catch (JMSException e) {
- LOG.error(e.getMessage(), e);
- public static void main(String[] args) {
- MQProductHelper.produce("test.subject", "hello");
- }