天天看點

BoneCP源碼——BoneCP中使用的多線程

1、asyncExecutor 可緩存線程池,用于異步的建立一個Connection對象,傳回Future類型對象

/** Executor service for obtaining a connection in an asynchronous fashion. */
	private ExecutorService asyncExecutor;
	/**
	 * Constructor.
	 * @param config Configuration for pool
	 * @throws SQLException on error
	 */
	public BoneCP(BoneCPConfig config) throws SQLException {
		......
		//在構造函數中初始化
		this.asyncExecutor = Executors.newCachedThreadPool();
		......
        }

	/** Obtain a connection asynchronously by queueing a request to obtain a connection in a separate thread. 
	 * 
	 *  Use as follows:<p>
	 *      Future&lt;Connection&gt; result = pool.getAsyncConnection();<p>
	 *       ... do something else in your application here ...<p>
	 *      Connection connection = result.get(); // get the connection<p>
	 *      
	 * @return A Future task returning a connection. 
	 */
	public Future<Connection> getAsyncConnection(){

		return this.asyncExecutor.submit(new Callable<Connection>() {

			public Connection call() throws Exception {
				return getConnection();
			}});
	}
           

2、releaseHelper 用于關閉Connection對象的線程池,該池程池中的線程為守護線程(Daemon Thread)

/** pointer to the thread containing the release helper threads. */
	private ExecutorService releaseHelper;

	/**
	 * Constructor.
	 * @param config Configuration for pool
	 * @throws SQLException on error
	 */
	public BoneCP(BoneCPConfig config) throws SQLException {
                ......
                //Gets number of release-connection helper threads to create per partition
    		int helperThreads = config.getReleaseHelperThreads();
		this.releaseHelperThreadsConfigured = helperThreads > 0;
                //If set to true, config has specified the use of statement release helper threads.		
		this.statementReleaseHelperThreadsConfigured = config.getStatementReleaseHelperThreads() > 0;
		this.config = config;
		String suffix = "";
		if (config.getPoolName()!=null) {
			suffix="-"+config.getPoolName();
		}			
		if (this.releaseHelperThreadsConfigured){
			this.releaseHelper = Executors.newFixedThreadPool(helperThreads*config.getPartitionCount(), new CustomThreadFactory("BoneCP-release-thread-helper-thread"+suffix, true));
		}
                ......
          }
           

 如果使用者設定了以獨立線程來關閉Connection對象,才建立該線程池。

ThreadFactory用于設定建立線程的工廠,可以通過線程工廠給每個建立出來的線程設定更有意義的名字。

public interface ThreadFactory {
    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}
           

 CustomThreadFactory為BoneCP裡對ThreadFactory的一個實作,并可設定線程是否為守護線程(Daemon Thread):

package com.jolbox.bonecp;
......
public class CustomThreadFactory
        implements ThreadFactory, UncaughtExceptionHandler {
    public CustomThreadFactory(String threadName, boolean daemon){
        this.threadName = threadName;
        this.daemon = daemon;
    }
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, this.threadName);
        t.setDaemon(this.daemon);
        t.setUncaughtExceptionHandler(this);
        return t;
    }
    ......
}
           

  3、keepAliveScheduler 該線程池用于定期地測試connection的活性,即用它發送一條簡單的SQL,并關閉故障的Connection,每個分區一個線程,此線程池中的線程也為守護線程。

/** Handle to factory that creates 1 thread per partition that periodically wakes up and performs some
	 * activity on the connection.
	 */
	private ScheduledExecutorService keepAliveScheduler;
        this.keepAliveScheduler =  Executors.newScheduledThreadPool(config.getPartitionCount(), new CustomThreadFactory("BoneCP-keep-alive-scheduler"+suffix, true));
           

 newScheduledThreadPool建立一個大小無限的線程池,此線程池支援定時以及周期性執行任務的需求,相當于Timer。

if (config.getIdleConnectionTestPeriodInMinutes() > 0 || config.getIdleMaxAgeInMinutes() > 0){
				
				final Runnable connectionTester = new ConnectionTesterThread(connectionPartition, this.keepAliveScheduler, this, config.getIdleMaxAge(TimeUnit.MILLISECONDS), config.getIdleConnectionTestPeriod(TimeUnit.MILLISECONDS), queueLIFO);
				long delayInMinutes = config.getIdleConnectionTestPeriodInMinutes();
				if (delayInMinutes == 0L){
					delayInMinutes = config.getIdleMaxAgeInMinutes();
				}
				if (config.getIdleMaxAgeInMinutes() != 0 && config.getIdleConnectionTestPeriodInMinutes() != 0 && config.getIdleMaxAgeInMinutes() < delayInMinutes){
					delayInMinutes = config.getIdleMaxAgeInMinutes();
				}
				this.keepAliveScheduler.schedule(connectionTester, delayInMinutes, TimeUnit.MINUTES);
			}
           

 如果使用者沒有設定了下面兩個屬性小于1就啟動該線程池的任務:

/** Connections older than this are sent a keep-alive statement. */
	private long idleConnectionTestPeriodInSeconds = 240*60; 
	/** Maximum age of an unused connection before it is closed off. */ 
	private long idleMaxAgeInSeconds =  60*60; 
           

 執行個體化一個ConnectionTesterThread類型的Runnable對象,該對象中也持有此線程池的引用,用于在run方法中啟動下次任務,此對象的run方法負責對異常的Connection對象和超出閑置時間的對象進行close并定期給Connection對象發送簡單SQL語句:

// send a keep-alive, close off connection if we fail.
if (!this.pool.isConnectionHandleAlive(connection)){
    closeConnection(connection);
    continue; 
}
           

4、maxAliveScheduler  該線程池用于給每個分區建立一個線程定期的檢查Connection對象是否過期,此線程池中的線程也是守護線程

/** Handle to factory that creates 1 thread per partition that periodically wakes up and performs some
	 * activity on the connection.
	 */
	private ScheduledExecutorService maxAliveScheduler;
        this.maxAliveScheduler =  Executors.newScheduledThreadPool(config.getPartitionCount(), new CustomThreadFactory("BoneCP-max-alive-scheduler"+suffix, true));
           

 如果使用者設定了下面屬性大于0則使用該線程池:

/** A connection older than maxConnectionAge will be destroyed and purged from the pool. */
	private long maxConnectionAgeInSeconds = 0;
           
if (config.getMaxConnectionAgeInSeconds() > 0){
				final Runnable connectionMaxAgeTester = new ConnectionMaxAgeThread(connectionPartition, this.maxAliveScheduler, this, config.getMaxConnectionAge(TimeUnit.MILLISECONDS), queueLIFO);
				this.maxAliveScheduler.schedule(connectionMaxAgeTester, config.getMaxConnectionAgeInSeconds(), TimeUnit.SECONDS);
			}
           

 先執行個體化一個ConnectionMaxAgeThread類型的Runnable對象,該對象定期的對超過maxConnectionAge類型的對象進行關閉:

if (connection.isExpired(currentTime)){
    // kill off this connection
    closeConnection(connection);
    continue;
}
           

 ConnectionMaxAgeThread對象中也對該線程池持有引用來啟動下次全任務。

5、connectionsScheduler  該線程池用于觀察每個分區,根據需要動态的建立新的Connection對象或者清理過剩的,也為守護線程

/** Executor for threads watching each partition to dynamically create new threads/kill off excess ones.
	 */
	private ExecutorService connectionsScheduler;
	this.connectionsScheduler =  Executors.newFixedThreadPool(config.getPartitionCount(), new CustomThreadFactory("BoneCP-pool-watch-thread"+suffix, true));

        // watch this partition for low no of threads
	this.connectionsScheduler.execute(new PoolWatchThread(connectionPartition, this));
           

 6、closeConnectionExecutor  該線程池用于監控那些失敗的close操作

/** Threads monitoring for bad connection requests. */
	private ExecutorService closeConnectionExecutor;
        this.closeConnectionWatch = config.isCloseConnectionWatch();
	if (this.closeConnectionWatch){
		logger.warn(THREAD_CLOSE_CONNECTION_WARNING);
		this.closeConnectionExecutor =  Executors.newCachedThreadPool(new CustomThreadFactory("BoneCP-connection-watch-thread"+suffix, true));
	}
           

 在getConection()方法中如果使用者設定了下面屬性則啟用該線程:

/** If set to true, create a new thread that monitors a connection and displays warnings if application failed to 
	 * close the connection.
	 */
	protected boolean closeConnectionWatch = false;
           
if (this.closeConnectionWatch){ // a debugging tool
			watchConnection(result);
		}
           
/** Starts off a new thread to monitor this connection attempt.
	 * @param connectionHandle to monitor 
	 */
	private void watchConnection(ConnectionHandle connectionHandle) {
		String message = captureStackTrace(UNCLOSED_EXCEPTION_MESSAGE);
		this.closeConnectionExecutor.submit(new CloseThreadMonitor(Thread.currentThread(), connectionHandle, message, this.closeConnectionWatchTimeoutInMs));
	}
           
//	@Override
	public void run() {
		try {
			this.connectionHandle.setThreadWatch(Thread.currentThread());
			// wait for the thread we're monitoring to die off.
			this.threadToMonitor.join(this.closeConnectionWatchTimeout);
			if (!this.connectionHandle.isClosed() 
					&& this.threadToMonitor.equals(this.connectionHandle.getThreadUsingConnection())
				){
				logger.error(this.stackTrace);
			}
		} catch (Exception e) {
			// just kill off this thread
			if (this.connectionHandle != null){ // safety
				this.connectionHandle.setThreadWatch(null);
			}
		} 
	}
           

繼續閱讀