天天看點

Quartz與Spring內建——建立排程器

版權聲明:本文為部落客原創文章,未經部落客允許不得轉載。 https://blog.csdn.net/beliefer/article/details/52094198

前言

在《

Quartz與Spring內建—— SchedulerFactoryBean的初始化分析

》一文中介紹過Spring內建Quartz時的初始化過程,其中簡單的提到了建立排程器的方法createScheduler。本文将着重介紹Quartz初始化時是如何建立排程器的。

建立排程器

這裡從createScheduler的實作(見代碼清單1)來分析,其處理步驟如下:

  1. 設定線程上下文的類加載器;
  2. 通過單例方法擷取SchedulerRepository的執行個體(見代碼清單2);
  3. 從排程倉庫執行個體SchedulerRepository中查找已經存在的排程器,這裡想說的是雖然此實作考慮到了多線程安全問題,不過這種方式效率較低。不如提前初始化,由final修飾,不使用同步語句,避免線程間的阻塞和等待;
  4. 擷取調取器(見代碼清單3),其實際上首先從排程器緩存中查找排程器,否則調用instantiate方法建立排程器;
  5. 檢查重新擷取的排程器和已經存在的排程器是否相同,如果相同則說明此排程器已經被激活了,将會報出異常。如果排程器是首次被激活,那麼将傳回此排程器。這裡的實作稍微有些拖沓,其實隻有當existingScheduler為null時,才會調用instantiate方法建立newScheduler,也隻有在這個時候newScheduler才不等于existingScheduler,并且不會抛出異常。是以我們甚至可以省去Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);這行代碼,而直接将代碼清單3中的實作進行修改——當sched為null時才調用instantiate方法建立排程器。

代碼清單1

protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName)
			throws SchedulerException {

		// Override thread context ClassLoader to work around naive Quartz ClassLoadHelper loading.
		Thread currentThread = Thread.currentThread();
		ClassLoader threadContextClassLoader = currentThread.getContextClassLoader();
		boolean overrideClassLoader = (this.resourceLoader != null &&
				!this.resourceLoader.getClassLoader().equals(threadContextClassLoader));
		if (overrideClassLoader) {
			currentThread.setContextClassLoader(this.resourceLoader.getClassLoader());
		}
		try {
			SchedulerRepository repository = SchedulerRepository.getInstance();
			synchronized (repository) {
				Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);
				Scheduler newScheduler = schedulerFactory.getScheduler();
				if (newScheduler == existingScheduler) {
					throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " +
							"in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!");
				}
				if (!this.exposeSchedulerInRepository) {
					// Need to remove it in this case, since Quartz shares the Scheduler instance by default!
					SchedulerRepository.getInstance().remove(newScheduler.getSchedulerName());
				}
				return newScheduler;
			}
		}
		finally {
			if (overrideClassLoader) {
				// Reset original thread context ClassLoader.
				currentThread.setContextClassLoader(threadContextClassLoader);
			}
		}
	}           

代碼清單2

public static synchronized SchedulerRepository getInstance() {
        if (inst == null) {
            inst = new SchedulerRepository();
        }

        return inst;
    }           

代碼清單3

public Scheduler getScheduler() throws SchedulerException {
        if (cfg == null) {
            initialize();
        }

        SchedulerRepository schedRep = SchedulerRepository.getInstance();

        Scheduler sched = schedRep.lookup(getSchedulerName());

        if (sched != null) {
            if (sched.isShutdown()) {
                schedRep.remove(getSchedulerName());
            } else {
                return sched;
            }
        }

        sched = instantiate();

        return sched;
    }           

讀取排程器配置

instantiate方法中包含了很多從PropertiesParser(PropertiesParser在《

》一文中介紹過)中擷取各種屬性的語句,這裡不過多展示。重點來看其更為本質的内容。

建立遠端排程器代理

如果目前排程器實際是代理遠端RMI排程器,那麼建立RemoteScheduler,并将目前調取器與RemoteScheduler進行綁定,最後以此RemoteScheduler作為排程器,見代碼清單4。

代碼清單4

if (rmiProxy) {

            if (autoId) {
                schedInstId = DEFAULT_INSTANCE_ID;
            }

            String uid = (rmiBindName == null) ? QuartzSchedulerResources.getUniqueIdentifier(
                    schedName, schedInstId) : rmiBindName;

            RemoteScheduler remoteScheduler = new RemoteScheduler(uid, rmiHost, rmiPort);

            schedRep.bind(remoteScheduler);

            return remoteScheduler;
        }           

建立遠端jmx排程器代理

如果目前排程器實際是代理遠端JMX排程器,那麼建立RemoteMBeanScheduler,并将目前排程器與RemoteMBeanScheduler進行綁定,最後以此RemoteMBeanScheduler作為排程器,見代碼清單5。

代碼清單5

if (jmxProxy) {
            if (autoId) {
                schedInstId = DEFAULT_INSTANCE_ID;
            }

            if (jmxProxyClass == null) {
                throw new SchedulerConfigException("No JMX Proxy Scheduler class provided");
            }

            RemoteMBeanScheduler jmxScheduler = null;
            try {
                jmxScheduler = (RemoteMBeanScheduler)loadHelper.loadClass(jmxProxyClass)
                        .newInstance();
            } catch (Exception e) {
                throw new SchedulerConfigException(
                        "Unable to instantiate RemoteMBeanScheduler class.", e);
            }

            if (jmxObjectName == null) {
                jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId);
            }

            jmxScheduler.setSchedulerObjectName(jmxObjectName);

            tProps = cfg.getPropertyGroup(PROP_SCHED_JMX_PROXY, true);
            try {
                setBeanProps(jmxScheduler, tProps);
            } catch (Exception e) {
                initException = new SchedulerException("RemoteMBeanScheduler class '"
                        + jmxProxyClass + "' props could not be configured.", e);
                throw initException;
            }

            jmxScheduler.initialize();

            schedRep.bind(jmxScheduler);

            return jmxScheduler;
        }           

執行個體化作業工廠

如果指定了jobFactoryClass屬性,那麼執行個體化作業工廠執行個體,見代碼清單6。執行個體化的JobFactory将用于建立排程作業。

代碼清單6

JobFactory jobFactory = null;
        if(jobFactoryClass != null) {
            try {
                jobFactory = (JobFactory) loadHelper.loadClass(jobFactoryClass)
                        .newInstance();
            } catch (Exception e) {
                throw new SchedulerConfigException(
                        "Unable to instantiate JobFactory class: "
                                + e.getMessage(), e);
            }

            tProps = cfg.getPropertyGroup(PROP_SCHED_JOB_FACTORY_PREFIX, true);
            try {
                setBeanProps(jobFactory, tProps);
            } catch (Exception e) {
                initException = new SchedulerException("JobFactory class '"
                        + jobFactoryClass + "' props could not be configured.", e);
                throw initException;
            }
        }           

執行個體化執行個體ID生成器

如果指定了instanceIdGeneratorClass屬性,那麼執行個體化執行個體ID生成器,見代碼清單7。此生成器用來給排程器執行個體生成ID。

代碼清單7

InstanceIdGenerator instanceIdGenerator = null;
        if(instanceIdGeneratorClass != null) {
            try {
                instanceIdGenerator = (InstanceIdGenerator) loadHelper.loadClass(instanceIdGeneratorClass)
                    .newInstance();
            } catch (Exception e) {
                throw new SchedulerConfigException(
                        "Unable to instantiate InstanceIdGenerator class: "
                        + e.getMessage(), e);
            }

            tProps = cfg.getPropertyGroup(PROP_SCHED_INSTANCE_ID_GENERATOR_PREFIX, true);
            try {
                setBeanProps(instanceIdGenerator, tProps);
            } catch (Exception e) {
                initException = new SchedulerException("InstanceIdGenerator class '"
                        + instanceIdGeneratorClass + "' props could not be configured.", e);
                throw initException;
            }
        }           

執行個體化線程池

org.quartz.threadPool.class屬性用于指定線程池類,如果沒有指定,則預設為org.quartz.simpl.SimpleThreadPool,見代碼清單8。此線程池将用于shell作業的執行。

代碼清單8

String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());

        if (tpClass == null) {
            initException = new SchedulerException(
                    "ThreadPool class not specified. ");
            throw initException;
        }

        try {
            tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
        } catch (Exception e) {
            initException = new SchedulerException("ThreadPool class '"
                    + tpClass + "' could not be instantiated.", e);
            throw initException;
        }
        tProps = cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX, true);
        try {
            setBeanProps(tp, tProps);
        } catch (Exception e) {
            initException = new SchedulerException("ThreadPool class '"
                    + tpClass + "' props could not be configured.", e);
            throw initException;
        }           

執行個體化JobStore的具體執行個體

org.quartz.jobStore.class屬性用于指定JobStore的具體類型,我顯示指定了org.springframework.scheduling.quartz.LocalDataSourceJobStore,如果沒有指定,則預設為RAMJobStore,見代碼清單9。jobStore顧名思義,就是作業的存儲,以LocalDataSourceJobStore為例,将通過它對觸發器、作業等内容進行增删改查。

代碼清單9

String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS,
                RAMJobStore.class.getName());

        if (jsClass == null) {
            initException = new SchedulerException(
                    "JobStore class not specified. ");
            throw initException;
        }

        try {
            js = (JobStore) loadHelper.loadClass(jsClass).newInstance();
        } catch (Exception e) {
            initException = new SchedulerException("JobStore class '" + jsClass
                    + "' could not be instantiated.", e);
            throw initException;
        }

        SchedulerDetailsSetter.setDetails(js, schedName, schedInstId);

        tProps = cfg.getPropertyGroup(PROP_JOB_STORE_PREFIX, true, new String[] {PROP_JOB_STORE_LOCK_HANDLER_PREFIX});
        try {
            setBeanProps(js, tProps);
        } catch (Exception e) {
            initException = new SchedulerException("JobStore class '" + jsClass
                    + "' props could not be configured.", e);
            throw initException;
        }           

擷取資料庫管理器并設定資料庫連接配接池

這一步驟的執行邏輯比較多,但是仔細整理後發現資料庫管理器都一樣,無非是資料連接配接池的提供者不同(見代碼清單10),一共分為三種:

方式一:連接配接池提供者由connectionProvider.class屬性指定;

方式二:連接配接池提供者由jndiURL屬性指定;

方式三:連接配接池提供者為PoolingConnectionProvider,其使用了C3P0連接配接池;

代碼清單10

String[] dsNames = cfg.getPropertyGroups(PROP_DATASOURCE_PREFIX);
        for (int i = 0; i < dsNames.length; i++) {
            PropertiesParser pp = new PropertiesParser(cfg.getPropertyGroup(
                    PROP_DATASOURCE_PREFIX + "." + dsNames[i], true));

            String cpClass = pp.getStringProperty(PROP_CONNECTION_PROVIDER_CLASS, null);

            // custom connectionProvider...
            if(cpClass != null) {
                ConnectionProvider cp = null;
                try {
                    cp = (ConnectionProvider) loadHelper.loadClass(cpClass).newInstance();
                } catch (Exception e) {
                    initException = new SchedulerException("ConnectionProvider class '" + cpClass
                            + "' could not be instantiated.", e);
                    throw initException;
                }

                try {
                    // remove the class name, so it isn't attempted to be set
                    pp.getUnderlyingProperties().remove(
                            PROP_CONNECTION_PROVIDER_CLASS);

                    if (cp instanceof PoolingConnectionProvider) {
                        populateProviderWithExtraProps((PoolingConnectionProvider)cp, pp.getUnderlyingProperties());
                    } else {
                        setBeanProps(cp, pp.getUnderlyingProperties());
                    }
                    cp.initialize();
                } catch (Exception e) {
                    initException = new SchedulerException("ConnectionProvider class '" + cpClass
                            + "' props could not be configured.", e);
                    throw initException;
                }

                dbMgr = DBConnectionManager.getInstance();
                dbMgr.addConnectionProvider(dsNames[i], cp);
            } else {
                String dsJndi = pp.getStringProperty(PROP_DATASOURCE_JNDI_URL, null);

                if (dsJndi != null) {
                    boolean dsAlwaysLookup = pp.getBooleanProperty(
                            PROP_DATASOURCE_JNDI_ALWAYS_LOOKUP);
                    String dsJndiInitial = pp.getStringProperty(
                            PROP_DATASOURCE_JNDI_INITIAL);
                    String dsJndiProvider = pp.getStringProperty(
                            PROP_DATASOURCE_JNDI_PROVDER);
                    String dsJndiPrincipal = pp.getStringProperty(
                            PROP_DATASOURCE_JNDI_PRINCIPAL);
                    String dsJndiCredentials = pp.getStringProperty(
                            PROP_DATASOURCE_JNDI_CREDENTIALS);
                    Properties props = null;
                    if (null != dsJndiInitial || null != dsJndiProvider
                            || null != dsJndiPrincipal || null != dsJndiCredentials) {
                        props = new Properties();
                        if (dsJndiInitial != null) {
                            props.put(PROP_DATASOURCE_JNDI_INITIAL,
                                    dsJndiInitial);
                        }
                        if (dsJndiProvider != null) {
                            props.put(PROP_DATASOURCE_JNDI_PROVDER,
                                    dsJndiProvider);
                        }
                        if (dsJndiPrincipal != null) {
                            props.put(PROP_DATASOURCE_JNDI_PRINCIPAL,
                                    dsJndiPrincipal);
                        }
                        if (dsJndiCredentials != null) {
                            props.put(PROP_DATASOURCE_JNDI_CREDENTIALS,
                                    dsJndiCredentials);
                        }
                    }
                    JNDIConnectionProvider cp = new JNDIConnectionProvider(dsJndi,
                            props, dsAlwaysLookup);
                    dbMgr = DBConnectionManager.getInstance();
                    dbMgr.addConnectionProvider(dsNames[i], cp);
                } else {
                    String dsDriver = pp.getStringProperty(PoolingConnectionProvider.DB_DRIVER);
                    String dsURL = pp.getStringProperty(PoolingConnectionProvider.DB_URL);

                    if (dsDriver == null) {
                        initException = new SchedulerException(
                                "Driver not specified for DataSource: "
                                        + dsNames[i]);
                        throw initException;
                    }
                    if (dsURL == null) {
                        initException = new SchedulerException(
                                "DB URL not specified for DataSource: "
                                        + dsNames[i]);
                        throw initException;
                    }
                    try {
                        PoolingConnectionProvider cp = new PoolingConnectionProvider(pp.getUnderlyingProperties());
                        dbMgr = DBConnectionManager.getInstance();
                        dbMgr.addConnectionProvider(dsNames[i], cp);

                        // Populate the underlying C3P0 data source pool properties
                        populateProviderWithExtraProps(cp, pp.getUnderlyingProperties());
                    } catch (Exception sqle) {
                        initException = new SchedulerException(
                                "Could not initialize DataSource: " + dsNames[i],
                                sqle);
                        throw initException;
                    }
                }

            }

        }           

設定排程器插件

這一段用于設定各種排程器插件,見代碼清單11。這裡的PROP_PLUGIN_PREFIX的值為org.quartz.plugin,即可以在Quartz的屬性檔案中配置一系列以org.quartz.plugin為字首的插件,例如可以在關閉JVM時,添加鈎子做一些清理工作的插件org.quartz.plugins.management.ShutdownHookPlugin。

代碼清單11

String[] pluginNames = cfg.getPropertyGroups(PROP_PLUGIN_PREFIX);
        SchedulerPlugin[] plugins = new SchedulerPlugin[pluginNames.length];
        for (int i = 0; i < pluginNames.length; i++) {
            Properties pp = cfg.getPropertyGroup(PROP_PLUGIN_PREFIX + "."
                    + pluginNames[i], true);

            String plugInClass = pp.getProperty(PROP_PLUGIN_CLASS, null);

            if (plugInClass == null) {
                initException = new SchedulerException(
                        "SchedulerPlugin class not specified for plugin '"
                                + pluginNames[i] + "'");
                throw initException;
            }
            SchedulerPlugin plugin = null;
            try {
                plugin = (SchedulerPlugin)
                        loadHelper.loadClass(plugInClass).newInstance();
            } catch (Exception e) {
                initException = new SchedulerException(
                        "SchedulerPlugin class '" + plugInClass
                                + "' could not be instantiated.", e);
                throw initException;
            }
            try {
                setBeanProps(plugin, pp);
            } catch (Exception e) {
                initException = new SchedulerException(
                        "JobStore SchedulerPlugin '" + plugInClass
                                + "' props could not be configured.", e);
                throw initException;
            }

            plugins[i] = plugin;
        }           

設定作業監聽器

這一步用于設定作業監聽器,我覺得可以用于做一些作業監控相關的擴充,見代明清單12。這裡的常量PROP_JOB_LISTENER_PREFIX的值為org.quartz.jobListener。我們可以在Quartz屬性檔案添加以org.quartz.jobListener為字首的作業監聽器。

代明清單12

Class<?>[] strArg = new Class[] { String.class };
        String[] jobListenerNames = cfg.getPropertyGroups(PROP_JOB_LISTENER_PREFIX);
        JobListener[] jobListeners = new JobListener[jobListenerNames.length];
        for (int i = 0; i < jobListenerNames.length; i++) {
            Properties lp = cfg.getPropertyGroup(PROP_JOB_LISTENER_PREFIX + "."
                    + jobListenerNames[i], true);

            String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null);

            if (listenerClass == null) {
                initException = new SchedulerException(
                        "JobListener class not specified for listener '"
                                + jobListenerNames[i] + "'");
                throw initException;
            }
            JobListener listener = null;
            try {
                listener = (JobListener)
                       loadHelper.loadClass(listenerClass).newInstance();
            } catch (Exception e) {
                initException = new SchedulerException(
                        "JobListener class '" + listenerClass
                                + "' could not be instantiated.", e);
                throw initException;
            }
            try {
                Method nameSetter = null;
                try { 
                    nameSetter = listener.getClass().getMethod("setName", strArg);
                }
                catch(NoSuchMethodException ignore) { 
                    /* do nothing */ 
                }
                if(nameSetter != null) {
                    nameSetter.invoke(listener, new Object[] {jobListenerNames[i] } );
                }
                setBeanProps(listener, lp);
            } catch (Exception e) {
                initException = new SchedulerException(
                        "JobListener '" + listenerClass
                                + "' props could not be configured.", e);
                throw initException;
            }
            jobListeners[i] = listener;
        }           

設定觸發器監聽器

這一步設定觸發器監聽器,見代碼清單13。與作業監聽器類似,不再贅述。

代碼清單13

String[] triggerListenerNames = cfg.getPropertyGroups(PROP_TRIGGER_LISTENER_PREFIX);
        TriggerListener[] triggerListeners = new TriggerListener[triggerListenerNames.length];
        for (int i = 0; i < triggerListenerNames.length; i++) {
            Properties lp = cfg.getPropertyGroup(PROP_TRIGGER_LISTENER_PREFIX + "."
                    + triggerListenerNames[i], true);

            String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null);

            if (listenerClass == null) {
                initException = new SchedulerException(
                        "TriggerListener class not specified for listener '"
                                + triggerListenerNames[i] + "'");
                throw initException;
            }
            TriggerListener listener = null;
            try {
                listener = (TriggerListener)
                       loadHelper.loadClass(listenerClass).newInstance();
            } catch (Exception e) {
                initException = new SchedulerException(
                        "TriggerListener class '" + listenerClass
                                + "' could not be instantiated.", e);
                throw initException;
            }
            try {
                Method nameSetter = null;
                try { 
                    nameSetter = listener.getClass().getMethod("setName", strArg);
                }
                catch(NoSuchMethodException ignore) { /* do nothing */ }
                if(nameSetter != null) {
                    nameSetter.invoke(listener, new Object[] {triggerListenerNames[i] } );
                }
                setBeanProps(listener, lp);
            } catch (Exception e) {
                initException = new SchedulerException(
                        "TriggerListener '" + listenerClass
                                + "' props could not be configured.", e);
                throw initException;
            }
            triggerListeners[i] = listener;
        }           

擷取線程執行器

可以通過屬性org.quartz.threadExecutor.class設定線程執行器,如果沒有指定,預設為DefaultThreadExecutor,見代碼清單13。此線程執行器用于執行定時排程線程QuartzSchedulerThread(有關QuartzSchedulerThread的執行過程将會在單獨的博文中展開)。

String threadExecutorClass = cfg.getStringProperty(PROP_THREAD_EXECUTOR_CLASS);
        if (threadExecutorClass != null) {
            tProps = cfg.getPropertyGroup(PROP_THREAD_EXECUTOR, true);
            try {
                threadExecutor = (ThreadExecutor) loadHelper.loadClass(threadExecutorClass).newInstance();
                log.info("Using custom implementation for ThreadExecutor: " + threadExecutorClass);

                setBeanProps(threadExecutor, tProps);
            } catch (Exception e) {
                initException = new SchedulerException(
                        "ThreadExecutor class '" + threadExecutorClass + "' could not be instantiated.", e);
                throw initException;
            }
        } else {
            log.info("Using default implementation for ThreadExecutor");
            threadExecutor = new DefaultThreadExecutor();
        }           

建立腳本執行工廠

如果需要作業運作在事務中(可以通過屬性org.quartz.scheduler.wrapJobExecutionInUserTransaction指定),則建立JTAJobRunShellFactory,否則建立JTAAnnotationAwareJobRunShellFactory,見代碼清單14。JobRunShellFactory将用于生成作業的shell執行對象JobRunShell。

代碼清單14

JobRunShellFactory jrsf = null; // Create correct run-shell factory...
    
            if (userTXLocation != null) {
                UserTransactionHelper.setUserTxLocation(userTXLocation);
            }
    
            if (wrapJobInTx) {
                jrsf = new JTAJobRunShellFactory();
            } else {
                jrsf = new JTAAnnotationAwareJobRunShellFactory();
            }           

生成排程執行個體ID

如果需要自動生成排程執行個體ID(可以通過屬性org.quartz.scheduler.instanceId為AUTO或者SYS_PROP,其中當指定為AUTO時,則instanceIdGeneratorClass由org.quartz.scheduler.instanceIdGenerator.class屬性指定,預設為org.quartz.simpl.SimpleInstanceIdGenerator;當指定為SYS_PROP,則instanceIdGeneratorClass等于org.quartz.simpl.SystemPropertyInstanceIdGenerator),那麼排程執行個體ID為NON_CLUSTERED,當JobStore支援叢集部署,那麼排程執行個體ID将由排程執行個體ID生成器instanceIdGenerator産生,見代碼清單15。(注:當不需要自動生成排程執行個體ID時,可以通過屬性org.quartz.scheduler.instanceId指定)

代碼清單15

if (autoId) {
                try {
                  schedInstId = DEFAULT_INSTANCE_ID;
                  if (js.isClustered()) {
                      schedInstId = instanceIdGenerator.generateInstanceId();
                  }
                } catch (Exception e) {
                    getLog().error("Couldn't generate instance Id!", e);
                    throw new IllegalStateException("Cannot run without an instance id.");
                }
            }           

設定JobStore的資料庫錯誤重試的間隔及現場執行器

JobStoreSupport是JobStore的抽象實作類,隻有繼承自JobStoreSupport的具體實作類(例如org.springframework.scheduling.quartz.LocalDataSourceJobStore)才可以通過調用其setDbRetryInterval方法設定資料庫錯誤重試間隔(dbFailureRetry屬性預設為15000,也可以通過設定org.quartz.scheduler.dbFailureRetryInterval屬性進行指定),setThreadExecutor方法用于設定JobStoreSupport的線程執行器,見代碼清單16。

代碼清單16

if (js instanceof JobStoreSupport) {
                JobStoreSupport jjs = (JobStoreSupport)js;
                jjs.setDbRetryInterval(dbFailureRetry);
                if(threadsInheritInitalizersClassLoader)
                    jjs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);
                
                jjs.setThreadExecutor(threadExecutor);
            }           

構造QuartzSchedulerResources

QuartzSchedulerResources用于持有定時排程需要的各種資源,如作業運作腳本的工廠、執行QuartzSchedulerThread的線程執行器、執行具體shell作業的線程池、各種插件、監聽器等。在構造QuartzSchedulerResources的過程中(見代碼清單17),設定了很多屬性,現在列舉如下:

屬性名稱 含義 備注
name 排程名稱 可以由org.quartz.scheduler.instanceName屬性指定
threadName 排程線程名稱 可以由org.quartz.scheduler.threadName屬性指定,預設等于排程名稱加字尾_QuartzSchedulerThread産生
instanceId 排程執行個體ID 可以由org.quartz.scheduler.instanceId屬性指定,具體生成規則見文中描述
jobRunShellFactory 建立作業運作腳本工廠 可以由org.quartz.scheduler.wrapJobExecutionInUserTransaction屬性指定,具體實作有JTAJobRunShellFactory和JTAAnnotationAwareJobRunShellFactory兩種
makeSchedulerThreadDaemon 排程線程是否是背景線程 可以由org.quartz.scheduler.makeSchedulerThreadDaemon屬性指定
threadsInheritInitalizersClassLoader 線程是否繼承初始化的類加載器 可以由org.quartz.scheduler.threadsInheritContextClassLoaderOfInitializer屬性指定
runUpdateCheck 運作時是否檢查Quartz的可用更新版本 可以由org.quartz.scheduler.skipUpdateCheck屬性指定,runUpdateCheck與指定值相反
batchTimeWindow 在時間視窗前批量觸發 可以由org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow屬性指定
maxBatchSize 最大批量執行的作業數 可以由org.quartz.scheduler.batchTriggerAcquisitionMaxCount屬性指定
interruptJobsOnShutdown 當關閉作業時,中斷作業線程 可以由org.quartz.scheduler.interruptJobsOnShutdown屬性指定
interruptJobsOnShutdownWithWait 當關閉作業時,等待中斷作業線程 可以由org.quartz.scheduler.interruptJobsOnShutdownWithWait屬性指定
threadExecutor 線程執行器 可以由org.quartz.threadExecutor.class屬性指定,預設為DefaultThreadExecutor
threadPool 線程池 可以由org.quartz.threadPool.class屬性指定,預設為SimpleThreadPool
jobStore 作業存儲 可以由org.quartz.jobStore.class屬性指定,預設為RAMJobStore

代碼清單17

QuartzSchedulerResources rsrcs = new QuartzSchedulerResources();
            rsrcs.setName(schedName);
            rsrcs.setThreadName(threadName);
            rsrcs.setInstanceId(schedInstId);
            rsrcs.setJobRunShellFactory(jrsf);
            rsrcs.setMakeSchedulerThreadDaemon(makeSchedulerThreadDaemon);
            rsrcs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);
            rsrcs.setRunUpdateCheck(!skipUpdateCheck);
            rsrcs.setBatchTimeWindow(batchTimeWindow);
            rsrcs.setMaxBatchSize(maxBatchSize);
            rsrcs.setInterruptJobsOnShutdown(interruptJobsOnShutdown);
            rsrcs.setInterruptJobsOnShutdownWithWait(interruptJobsOnShutdownWithWait);
            rsrcs.setJMXExport(jmxExport);
            rsrcs.setJMXObjectName(jmxObjectName);

            if (managementRESTServiceEnabled) {
                ManagementRESTServiceConfiguration managementRESTServiceConfiguration = new ManagementRESTServiceConfiguration();
                managementRESTServiceConfiguration.setBind(managementRESTServiceHostAndPort);
                managementRESTServiceConfiguration.setEnabled(managementRESTServiceEnabled);
                rsrcs.setManagementRESTServiceConfiguration(managementRESTServiceConfiguration);
            }
    
            if (rmiExport) {
                rsrcs.setRMIRegistryHost(rmiHost);
                rsrcs.setRMIRegistryPort(rmiPort);
                rsrcs.setRMIServerPort(rmiServerPort);
                rsrcs.setRMICreateRegistryStrategy(rmiCreateRegistry);
                rsrcs.setRMIBindName(rmiBindName);
            }
    
            SchedulerDetailsSetter.setDetails(tp, schedName, schedInstId);

            rsrcs.setThreadExecutor(threadExecutor);
            threadExecutor.initialize();

            rsrcs.setThreadPool(tp);
            if(tp instanceof SimpleThreadPool) {
                if(threadsInheritInitalizersClassLoader)
                    ((SimpleThreadPool)tp).setThreadsInheritContextClassLoaderOfInitializingThread(threadsInheritInitalizersClassLoader);
            }
            tp.initialize();
            tpInited = true;
    
            rsrcs.setJobStore(js);
    
            // add plugins
            for (int i = 0; i < plugins.length; i++) {
                rsrcs.addSchedulerPlugin(plugins[i]);
            }           

構造QuartzScheduler

構造QuartzScheduler的代碼如下:

qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
            qsInited = true;           

QuartzScheduler的構造器實作見代碼清單18,其處理步驟如下:

  1. 設定QuartzSchedulerResources;
  2. QuartzSchedulerResources設定的JobStore如果實作了JobListener接口,那麼将其作為作業監聽器添加到監聽器清單;
  3. 構造線程QuartzSchedulerThread執行個體;
  4. 從QuartzSchedulerResources中擷取設定的線程執行器;
  5. 啟動QuartzSchedulerThread;
  6. 建立執行作業管理器ExecutingJobsManager,由于其實作了JobListener,是以加入了内置的作業監聽器中;
  7. 建立錯誤日志元件ErrorLogger,由于繼承了SchedulerListenerSupport,是以加入了内置的排程監聽器中;
  8. 構造SchedulerSignalerImpl,此元件的作業包括:向QuartzScheduler中注冊的觸發器監聽器發送觸發器失常或者觸發器再也不會被觸發的信号、修改觸發器下次觸發的時間、向QuartzScheduler中注冊的排程監聽器發送作業被删除或者排程異常的信号;
  9. 當shouldRunUpdateCheck為true是則調用scheduleUpdateCheck方法(見代明清單19),實際是利用定時器定時執行UpdateChecker任務,此任務用于檢查Quartz的可用的更新版本;為了提高性能,可以将屬性org.quartz.scheduler.skipUpdateCheck設定為true;

代碼清單18

public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
        throws SchedulerException {
        this.resources = resources;
        if (resources.getJobStore() instanceof JobListener) {
            addInternalJobListener((JobListener)resources.getJobStore());
        }

        this.schedThread = new QuartzSchedulerThread(this, resources);
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
        schedThreadExecutor.execute(this.schedThread);
        if (idleWaitTime > 0) {
            this.schedThread.setIdleWaitTime(idleWaitTime);
        }

        jobMgr = new ExecutingJobsManager();
        addInternalJobListener(jobMgr);
        errLogger = new ErrorLogger();
        addInternalSchedulerListener(errLogger);

        signaler = new SchedulerSignalerImpl(this, this.schedThread);
        
        if(shouldRunUpdateCheck()) 
            updateTimer = scheduleUpdateCheck();
        else
            updateTimer = null;
        
        getLog().info("Quartz Scheduler v." + getVersion() + " created.");
    }           

   代碼清單19

private Timer scheduleUpdateCheck() {
        Timer rval = new Timer(true);
        rval.scheduleAtFixedRate(new UpdateChecker(), 1000, 7 * 24 * 60 * 60 * 1000L);
        return rval;
    }           

構造QuartzSchedulerThread

這裡再詳細分析下QuartzSchedulerThread的構造過程,其構造器見代碼清單20。

代碼清單20

QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs) {
        this(qs, qsRsrcs, qsRsrcs.getMakeSchedulerThreadDaemon(), Thread.NORM_PRIORITY);
    }           

QuartzSchedulerThread的構造器又代理了另一個構造器,見代碼清單21。

代碼清單21

QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs, boolean setDaemon, int threadPrio) {
        super(qs.getSchedulerThreadGroup(), qsRsrcs.getThreadName());
        this.qs = qs;
        this.qsRsrcs = qsRsrcs;
        this.setDaemon(setDaemon);
        if(qsRsrcs.isThreadsInheritInitializersClassLoadContext()) {
            log.info("QuartzSchedulerThread Inheriting ContextClassLoader of thread: " + Thread.currentThread().getName());
            this.setContextClassLoader(Thread.currentThread().getContextClassLoader());
        }

        this.setPriority(threadPrio);

        // start the underlying thread, but put this object into the 'paused'
        // state
        // so processing doesn't start yet...
        paused = true;
        halted = new AtomicBoolean(false);
    }           

代碼清單21比較簡單,QuartzScheduler的getSchedulerThreadGroup方法用于建立線程組,QuartzSchedulerResources的isThreadsInheritInitializersClassLoadContext方法實際擷取QuartzSchedulerResources的屬性threadsInheritInitializersClassLoadContext,此屬性如果為真,則設定QuartzSchedulerThread的線程上下文類加載器為目前線程的類加載器,設定paused标志為true,以便于QuartzSchedulerThread線程不能開始處理。halted可以解釋為叫停目前線程的執行。

阻止QuartzSchedulerThread的執行

由于在構造QuartzScheduler的過程中已經啟動了QuartzSchedulerThread,那麼勢必導緻此線程的執行,其run方法的部分代碼見代碼清單22.

代碼清單22

public void run() {
        boolean lastAcquireFailed = false;

        while (!halted.get()) {
            try {
                // check if we're supposed to pause...
                synchronized (sigLock) {
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }
                    }

                    if (halted.get()) {
                        break;
                    }
                }           

我們并未叫停排程線程的執行,是以halted屬性等于false,對于paused标志而言,這裡涉及多線程安全問題,是以這裡使用了同步塊,但是實際上可以通過調整代碼将paused用volatile修飾,這樣通過記憶體可見性省去同步,能夠提高性能。由于paused标志線上程剛開始執行時為false,那麼這裡的white循環将不斷輪詢,每次循環線程wait一秒。既然QuartzSchedulerThread已經開始執行,但是卻又無法執行,豈不是自相沖突?雖然QuartzSchedulerThread線程開始啟動,但是QuartzScheduler并未準備好這一切,必須等到QuartzScheduler準備時将paused修改為false。雖說這樣實作也是可以的,但是在QuartzScheduler準備好的這段時間内,QuartzSchedulerThread線程頻繁的睡眠、被喚醒,線程上下文來回切換,耗費了一些性能。何不等到QuartzScheduler準備好時再啟動QuartzSchedulerThread線程呢?

建立排程器的代碼如下:

// Create Scheduler ref...
            Scheduler scheduler = instantiate(rsrcs, qs);           

這裡建立排程器時以,實際是用StdScheduler将之前建立的QuartzScheduler進行了封裝,代碼如下:

protected Scheduler instantiate(QuartzSchedulerResources rsrcs, QuartzScheduler qs) {

        Scheduler scheduler = new StdScheduler(qs);
        return scheduler;
    }           

其他處理

剩餘的工作包括:設定作業工廠,對插件初始化,給QuartzScheduler的監聽器管理器注冊作業監聽器和觸發器監聽器,設定排程器上下文屬性,觸發JobStore,觸發腳本運作工廠,将排程器注冊到SchedulerRepository等,見代碼清單23。

代碼清單23

// set job factory if specified
            if(jobFactory != null) {
                qs.setJobFactory(jobFactory);
            }
    
            // Initialize plugins now that we have a Scheduler instance.
            for (int i = 0; i < plugins.length; i++) {
                plugins[i].initialize(pluginNames[i], scheduler, loadHelper);
            }
    
            // add listeners
            for (int i = 0; i < jobListeners.length; i++) {
                qs.getListenerManager().addJobListener(jobListeners[i], EverythingMatcher.allJobs());
            }
            for (int i = 0; i < triggerListeners.length; i++) {
                qs.getListenerManager().addTriggerListener(triggerListeners[i], EverythingMatcher.allTriggers());
            }
    
            // set scheduler context data...
            for(Object key: schedCtxtProps.keySet()) {
                String val = schedCtxtProps.getProperty((String) key);    
                scheduler.getContext().put((String)key, val);
            }
    
            // fire up job store, and runshell factory
    
            js.setInstanceId(schedInstId);
            js.setInstanceName(schedName);
            js.setThreadPoolSize(tp.getPoolSize());
            js.initialize(loadHelper, qs.getSchedulerSignaler());

            jrsf.initialize(scheduler);
            
            qs.initialize();
    
            getLog().info(
                    "Quartz scheduler '" + scheduler.getSchedulerName()
                            + "' initialized from " + propSrc);
    
            getLog().info("Quartz scheduler version: " + qs.getVersion());
    
            // prevents the repository from being garbage collected
            qs.addNoGCObject(schedRep);
            // prevents the db manager from being garbage collected
            if (dbMgr != null) {
                qs.addNoGCObject(dbMgr);
            }
    
            schedRep.bind(scheduler);
            return scheduler;           

總結

可以看到建立排程器的過程,幾乎完全是順序程式設計,步驟也十厘清楚。但是可以看到其中可以優化的地方也比較多,另外代碼組織稍微不太合理,例如instantiate方法的長度1355-579=776行。建立完排程器還應該考慮如何啟動它,請接着看《

Quartz與Spring內建——啟動排程器

》一文。

後記:個人總結整理的《深入了解Spark:核心思想與源碼分析》一書現在已經正式出版上市,目前京東、當當、天貓等網站均有銷售,歡迎感興趣的同學購買。

Quartz與Spring內建——建立排程器

京東:

http://item.jd.com/11846120.html

當當:

http://product.dangdang.com/23838168.html