1、asyncExecutor 可緩存線程池,用于異步的建立一個Connection對象,傳回Future類型對象
/** Executor service for obtaining a connection in an asynchronous fashion. */
private ExecutorService asyncExecutor;
/**
* Constructor.
* @param config Configuration for pool
* @throws SQLException on error
*/
public BoneCP(BoneCPConfig config) throws SQLException {
......
//在構造函數中初始化
this.asyncExecutor = Executors.newCachedThreadPool();
......
}
/** Obtain a connection asynchronously by queueing a request to obtain a connection in a separate thread.
*
* Use as follows:<p>
* Future<Connection> result = pool.getAsyncConnection();<p>
* ... do something else in your application here ...<p>
* Connection connection = result.get(); // get the connection<p>
*
* @return A Future task returning a connection.
*/
public Future<Connection> getAsyncConnection(){
return this.asyncExecutor.submit(new Callable<Connection>() {
public Connection call() throws Exception {
return getConnection();
}});
}
2、releaseHelper 用于關閉Connection對象的線程池,該池程池中的線程為守護線程(Daemon Thread)
/** pointer to the thread containing the release helper threads. */
private ExecutorService releaseHelper;
/**
* Constructor.
* @param config Configuration for pool
* @throws SQLException on error
*/
public BoneCP(BoneCPConfig config) throws SQLException {
......
//Gets number of release-connection helper threads to create per partition
int helperThreads = config.getReleaseHelperThreads();
this.releaseHelperThreadsConfigured = helperThreads > 0;
//If set to true, config has specified the use of statement release helper threads.
this.statementReleaseHelperThreadsConfigured = config.getStatementReleaseHelperThreads() > 0;
this.config = config;
String suffix = "";
if (config.getPoolName()!=null) {
suffix="-"+config.getPoolName();
}
if (this.releaseHelperThreadsConfigured){
this.releaseHelper = Executors.newFixedThreadPool(helperThreads*config.getPartitionCount(), new CustomThreadFactory("BoneCP-release-thread-helper-thread"+suffix, true));
}
......
}
如果使用者設定了以獨立線程來關閉Connection對象,才建立該線程池。
ThreadFactory用于設定建立線程的工廠,可以通過線程工廠給每個建立出來的線程設定更有意義的名字。
public interface ThreadFactory {
/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}
CustomThreadFactory為BoneCP裡對ThreadFactory的一個實作,并可設定線程是否為守護線程(Daemon Thread):
package com.jolbox.bonecp;
......
public class CustomThreadFactory
implements ThreadFactory, UncaughtExceptionHandler {
public CustomThreadFactory(String threadName, boolean daemon){
this.threadName = threadName;
this.daemon = daemon;
}
public Thread newThread(Runnable r) {
Thread t = new Thread(r, this.threadName);
t.setDaemon(this.daemon);
t.setUncaughtExceptionHandler(this);
return t;
}
......
}
3、keepAliveScheduler 該線程池用于定期地測試connection的活性,即用它發送一條簡單的SQL,并關閉故障的Connection,每個分區一個線程,此線程池中的線程也為守護線程。
/** Handle to factory that creates 1 thread per partition that periodically wakes up and performs some
* activity on the connection.
*/
private ScheduledExecutorService keepAliveScheduler;
this.keepAliveScheduler = Executors.newScheduledThreadPool(config.getPartitionCount(), new CustomThreadFactory("BoneCP-keep-alive-scheduler"+suffix, true));
newScheduledThreadPool建立一個大小無限的線程池,此線程池支援定時以及周期性執行任務的需求,相當于Timer。
if (config.getIdleConnectionTestPeriodInMinutes() > 0 || config.getIdleMaxAgeInMinutes() > 0){
final Runnable connectionTester = new ConnectionTesterThread(connectionPartition, this.keepAliveScheduler, this, config.getIdleMaxAge(TimeUnit.MILLISECONDS), config.getIdleConnectionTestPeriod(TimeUnit.MILLISECONDS), queueLIFO);
long delayInMinutes = config.getIdleConnectionTestPeriodInMinutes();
if (delayInMinutes == 0L){
delayInMinutes = config.getIdleMaxAgeInMinutes();
}
if (config.getIdleMaxAgeInMinutes() != 0 && config.getIdleConnectionTestPeriodInMinutes() != 0 && config.getIdleMaxAgeInMinutes() < delayInMinutes){
delayInMinutes = config.getIdleMaxAgeInMinutes();
}
this.keepAliveScheduler.schedule(connectionTester, delayInMinutes, TimeUnit.MINUTES);
}
如果使用者沒有設定了下面兩個屬性小于1就啟動該線程池的任務:
/** Connections older than this are sent a keep-alive statement. */
private long idleConnectionTestPeriodInSeconds = 240*60;
/** Maximum age of an unused connection before it is closed off. */
private long idleMaxAgeInSeconds = 60*60;
執行個體化一個ConnectionTesterThread類型的Runnable對象,該對象中也持有此線程池的引用,用于在run方法中啟動下次任務,此對象的run方法負責對異常的Connection對象和超出閑置時間的對象進行close并定期給Connection對象發送簡單SQL語句:
// send a keep-alive, close off connection if we fail.
if (!this.pool.isConnectionHandleAlive(connection)){
closeConnection(connection);
continue;
}
4、maxAliveScheduler 該線程池用于給每個分區建立一個線程定期的檢查Connection對象是否過期,此線程池中的線程也是守護線程
/** Handle to factory that creates 1 thread per partition that periodically wakes up and performs some
* activity on the connection.
*/
private ScheduledExecutorService maxAliveScheduler;
this.maxAliveScheduler = Executors.newScheduledThreadPool(config.getPartitionCount(), new CustomThreadFactory("BoneCP-max-alive-scheduler"+suffix, true));
如果使用者設定了下面屬性大于0則使用該線程池:
/** A connection older than maxConnectionAge will be destroyed and purged from the pool. */
private long maxConnectionAgeInSeconds = 0;
if (config.getMaxConnectionAgeInSeconds() > 0){
final Runnable connectionMaxAgeTester = new ConnectionMaxAgeThread(connectionPartition, this.maxAliveScheduler, this, config.getMaxConnectionAge(TimeUnit.MILLISECONDS), queueLIFO);
this.maxAliveScheduler.schedule(connectionMaxAgeTester, config.getMaxConnectionAgeInSeconds(), TimeUnit.SECONDS);
}
先執行個體化一個ConnectionMaxAgeThread類型的Runnable對象,該對象定期的對超過maxConnectionAge類型的對象進行關閉:
if (connection.isExpired(currentTime)){
// kill off this connection
closeConnection(connection);
continue;
}
ConnectionMaxAgeThread對象中也對該線程池持有引用來啟動下次全任務。
5、connectionsScheduler 該線程池用于觀察每個分區,根據需要動态的建立新的Connection對象或者清理過剩的,也為守護線程
/** Executor for threads watching each partition to dynamically create new threads/kill off excess ones.
*/
private ExecutorService connectionsScheduler;
this.connectionsScheduler = Executors.newFixedThreadPool(config.getPartitionCount(), new CustomThreadFactory("BoneCP-pool-watch-thread"+suffix, true));
// watch this partition for low no of threads
this.connectionsScheduler.execute(new PoolWatchThread(connectionPartition, this));
6、closeConnectionExecutor 該線程池用于監控那些失敗的close操作
/** Threads monitoring for bad connection requests. */
private ExecutorService closeConnectionExecutor;
this.closeConnectionWatch = config.isCloseConnectionWatch();
if (this.closeConnectionWatch){
logger.warn(THREAD_CLOSE_CONNECTION_WARNING);
this.closeConnectionExecutor = Executors.newCachedThreadPool(new CustomThreadFactory("BoneCP-connection-watch-thread"+suffix, true));
}
在getConection()方法中如果使用者設定了下面屬性則啟用該線程:
/** If set to true, create a new thread that monitors a connection and displays warnings if application failed to
* close the connection.
*/
protected boolean closeConnectionWatch = false;
if (this.closeConnectionWatch){ // a debugging tool
watchConnection(result);
}
/** Starts off a new thread to monitor this connection attempt.
* @param connectionHandle to monitor
*/
private void watchConnection(ConnectionHandle connectionHandle) {
String message = captureStackTrace(UNCLOSED_EXCEPTION_MESSAGE);
this.closeConnectionExecutor.submit(new CloseThreadMonitor(Thread.currentThread(), connectionHandle, message, this.closeConnectionWatchTimeoutInMs));
}
// @Override
public void run() {
try {
this.connectionHandle.setThreadWatch(Thread.currentThread());
// wait for the thread we're monitoring to die off.
this.threadToMonitor.join(this.closeConnectionWatchTimeout);
if (!this.connectionHandle.isClosed()
&& this.threadToMonitor.equals(this.connectionHandle.getThreadUsingConnection())
){
logger.error(this.stackTrace);
}
} catch (Exception e) {
// just kill off this thread
if (this.connectionHandle != null){ // safety
this.connectionHandle.setThreadWatch(null);
}
}
}