天天看點

activeMQ性能優化--對象池管理connection

activeMQ的某個應用場景,消費者和伺服器隻需建立一個長連接配接,而生産者的情況集中在伺服器,需要對伺服器端的生産者連接配接進行優化。

首先maven引入jar包依賴

[java] ​​view plain​​ ​​copy​​

​​​​

  1. <dependency>  
  2.        <groupId>org.activemq</groupId>  
  3.        <artifactId>activemq-all</artifactId>  
  4.        <version>5.9.0</version>  
  5.    </dependency>  
  6.    <dependency>  
  7.        <groupId>org.apache.activemq</groupId>  
  8.        <artifactId>activemq-pool</artifactId>  
  9.        <exclusions>  
  10.            <exclusion>  
  11.                <groupId>org.apache.geronimo.specs</groupId>  
  12.                <artifactId>geronimo-jms_1.1_spec</artifactId>  
  13.            </exclusion>  
  14.        </exclusions>  

下面是實作代碼

  1. import org.apache.activemq.ActiveMQConnectionFactory;  
  2. import org.apache.activemq.pool.PooledConnection;  
  3. import org.apache.activemq.pool.PooledConnectionFactory;  
  4. import org.slf4j.Logger;  
  5. import org.slf4j.LoggerFactory;  
  6. import javax.jms.*;  
  7. public class MQProductHelper {  
  8.   public static final Logger LOG = LoggerFactory.getLogger(MQProductHelper.class);  
  9.   private static PooledConnectionFactory poolFactory;  
  10.   /** 
  11.    * 擷取單例的PooledConnectionFactory 
  12.    *  @return 
  13.    */  
  14.   private static synchronized PooledConnectionFactory getPooledConnectionFactory() {  
  15.     LOG.info("getPooledConnectionFactory");  
  16.     if (poolFactory != null) return poolFactory;  
  17.     LOG.info("getPooledConnectionFactory create new");  
  18.     IConfigService configService = ServiceManager.getService(IConfigService.class);  
  19.     String userName = configService.getConfig("MQ_USER_NAME", ShopConstant.BC_SHOP_ID);  
  20.     String password = configService.getConfig("MQ_USER_PASS", ShopConstant.BC_SHOP_ID);  
  21.     String url = configService.getConfig("MQ_BROKER_URL", ShopConstant.BC_SHOP_ID);  
  22.     ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);  
  23.     poolFactory = new PooledConnectionFactory(factory);        
  24.     // 池中借出的對象的最大數目  
  25.     poolFactory.setMaxConnections(100);  
  26.     poolFactory.setMaximumActiveSessionPerConnection(50);        
  27.     //背景對象清理時,休眠時間超過了3000毫秒的對象為過期  
  28.     poolFactory.setTimeBetweenExpirationCheckMillis(3000);  
  29.     LOG.info("getPooledConnectionFactory create success");  
  30.     return poolFactory;  
  31.   }  
  32.    * 1.對象池管理connection和session,包括建立和關閉等 
  33.    * 2.PooledConnectionFactory預設設定MaxIdle為1, 
  34.    *  官方解釋Set max idle (not max active) since our connections always idle in the pool.   * 
  35.    *  @return   * @throws JMSException 
  36.   public static Session createSession() throws JMSException {  
  37.     PooledConnectionFactory poolFactory = getPooledConnectionFactory();  
  38.     PooledConnection pooledConnection = (PooledConnection) poolFactory.createConnection();  
  39.     //false 參數表示 為非事務型消息,後面的參數表示消息的确認類型(見4.消息發出去後的确認模式)  
  40.     return pooledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
  41.   public static void produce(String subject, String msg) {  
  42.     LOG.info("producer send msg: {} ", msg);  
  43.     if (StringUtil.isEmpty(msg)) {  
  44.       LOG.warn("發送消息不能為空。");  
  45.       return;  
  46.     }  
  47.     try {  
  48.       Session session = createSession();  
  49.       LOG.info("create session");  
  50.       TextMessage textMessage = session.createTextMessage(msg);  
  51.       Destination destination = session.createQueue(subject);  
  52.       MessageProducer producer = session.createProducer(destination);  
  53.       producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
  54.       producer.send(textMessage);  
  55.       LOG.info("create session success");  
  56.     } catch (JMSException e) {  
  57.       LOG.error(e.getMessage(), e);  
  58.   public static void main(String[] args) {  
  59.     MQProductHelper.produce("test.subject", "hello");