天天看点

使用activemq 和 quartz构建简易版企业调度中心

java定时任务框架quartz和activemq有什么关系呢?实际上我们使用activemq进行解耦用的,可以看一下简易的设计图

使用activemq 和 quartz构建简易版企业调度中心

调度中心:上面记录各种定时任务的信息,比如我们有一个每天计算利息的跑批,那么我们可以用quartz定时向某个queue或者topic发送消息

子系统监听:比如我们的贷款系统,监听一个queue,我们监听代码里面就写着计算利息的业务逻辑

由此我们可以知道调度中心只是通过定时发送mq的方式来触发我们业务系统的业务逻辑,真正执行业务逻辑的还是业务系统本身和调度中心没关系

这里我给出一份简要版的代码,主要讲解一下实现的思路

首先我们编写调度中心

创建一个maven工程,结构如下

使用activemq 和 quartz构建简易版企业调度中心

关于spring和mq的整合可以参考我的这篇文章,这里不再重复叙述了

activemq使用系列: 使用JmsGatewaySupport构建出通用的消息收发代码

首先创建我们的消息发送类

/**
 * 


 */
package com.pcx.amqproducer;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.support.JmsGatewaySupport;

/**
 * 
 * @author scarletbullet
 * @version $Id: MySpringProducer2.java, 
 */
public class MySpringProducer2 extends JmsGatewaySupport {
    
    public void send(final String destination){
        if (destination.startsWith("topic")) {
            this.getJmsTemplate().setPubSubDomain(true);
        } else {
            this.getJmsTemplate().setPubSubDomain(false);
        }    
      this.getJmsTemplate().send(destination, new MessageCreator() {
        
        public Message createMessage(Session session) throws JMSException {
            MapMessage message=session.createMapMessage();  
            message.setString("userId", "4545454");  
            message.setString("userName", "jmstemplateGatewag");  
            message.setInt("age",25);  
            return message;  
        }
    });
        System.out.println("成功发送了一条JMS消息");
    }

}
           

接下来创建我们的定时任务job

/**
 * 


 */
package com.pcx.amqproducer;

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

/**
 * 
 * @author scarletbullet
 * @version $Id: MessageJob.java, 
 */
public class MessageJob  implements Job{

    /** 
     * @see org.quartz.Job#execute(org.quartz.JobExecutionContext)
     */
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        
        MySpringProducer2 mySpringProducer2= (MySpringProducer2)jobExecutionContext.getJobDetail().getJobDataMap().get("mySpringProducer");
        mySpringProducer2.send(jobExecutionContext.getJobDetail().getName());
        System.out.println(jobExecutionContext.getJobDetail().getName() +"发送消息");
        
    }

}
           

最后我们编写调度的测试代码

/**
 * 


 */
package com.pcx.amqproducer;

import java.text.ParseException;

import org.quartz.CronTrigger;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 
 * @author scarletbullet
 * @version $Id: SchedulerMQSender.java
 */
public class SchedulerMQSender {

    /**
     * 
     * @param args
     */
    private static Scheduler scheduler;
    public static void main(String[] args) {
        ApplicationContext context;
        context= new ClassPathXmlApplicationContext("classpath:spring/spring-context.xml");
        MySpringProducer2 mySpringProducer = (MySpringProducer2) context.getBean("mySpringProducer");
       
        SchedulerFactory sf = new StdSchedulerFactory();
        try {
            String schedulerName="que12";//就是我们发消息的队列或者topic名称
            //创建一个调度对象
            scheduler = sf.getScheduler();
            scheduler.start();
          
            //设置job详情
            JobDetail jobDetail = new JobDetail();
            jobDetail.setJobClass(MessageJob.class);
            jobDetail.setName(schedulerName);
            jobDetail.setGroup(schedulerName);
            JobDataMap jobDataMap = new JobDataMap();
           
jobDataMap.put("mySpringProducer", mySpringProducer);
            jobDetail.setJobDataMap(jobDataMap);
          

            CronTrigger cronTrigger = null;
           
            cronTrigger = new CronTrigger(schedulerName, schedulerName, "*/5 * * * * ?");
          //5秒调用一次
            scheduler.scheduleJob(jobDetail, cronTrigger);
            scheduler.start();
         
        } catch (SchedulerException e) {
          e.printStackTrace();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

}
           

给出spring-context.xml配置文件的代码

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
	xsi:schemaLocation="http://www.springframework.org/schema/beans  
           http://www.springframework.org/schema/beans/spring-beans-3.2.xsd  
           http://www.springframework.org/schema/aop   
           http://www.springframework.org/schema/aop/spring-aop-3.2.xsd  
           http://www.springframework.org/schema/tx  
           http://www.springframework.org/schema/tx/spring-tx-3.2.xsd  
           http://www.springframework.org/schema/context  
           http://www.springframework.org/schema/context/spring-context-3.2.xsd"
	default-autowire="byName" default-lazy-init="false">
	
	

	<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
	<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<!-- ActiveMQ服务地址 -->
        <property name="brokerURL" value="tcp://192.168.1.10:61616" />
       
	</bean>

 	<!-- <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
 		<constructor-arg value="user.queue"/>
 	</bean> -->
 	
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
	    <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
	    <property name="connectionFactory" ref="connectionFactory"/>  
	  <!--   <property name="defaultDestinationName" value="user.queue"/> -->
	</bean> 
	<bean id="mySpringProducer" class="com.pcx.amqproducer.MySpringProducer2">
		 <property name="jmsTemplate" ref="jmsTemplate"/>
	</bean>
	
	<bean id="mySpringProducer3" class="com.pcx.amqproducer.MySpringProducer3">
		 <property name="jmsTemplate" ref="jmsTemplate"/>
	</bean>

</beans>
           

从以上代码我们得出,我的意图是每5秒向queue名称为"que12"的队列发送消息

接下来我们编写业务实现端

新建一个maven工程,结构如下

使用activemq 和 quartz构建简易版企业调度中心

applicationContext.xml代码如下

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
	xsi:schemaLocation="http://www.springframework.org/schema/beans  
           http://www.springframework.org/schema/beans/spring-beans-3.2.xsd  
           http://www.springframework.org/schema/aop   
           http://www.springframework.org/schema/aop/spring-aop-3.2.xsd  
           http://www.springframework.org/schema/tx  
           http://www.springframework.org/schema/tx/spring-tx-3.2.xsd  
           http://www.springframework.org/schema/context  
           http://www.springframework.org/schema/context/spring-context-3.2.xsd">
	
	<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">	
        <property name="brokerURL" value="tcp://192.168.1.10:61616" />       
	</bean>

	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  	 
	    <property name="connectionFactory" ref="connectionFactory"/>  
	</bean> 
	
	<bean id="myListener"  class="com.pcx.amqweb.MyListener" />
	
	<bean id="generalListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer" abstract="true">
	    <property name="connectionFactory" ref="connectionFactory"/>	 	   	  
	</bean>
	
	<bean id="listenerone"   parent="generalListener">	
		<property name="destinationName" value="que12"/>
	    <property name="messageListener" ref="myListener" />
	</bean>
</beans>
           

创建一个消息监听器

/**
 * 


 */
package com.pcx.amqweb;

import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;

/**
 * 
 * @author scarletbullet
 * @version $Id: MyListener.java, 
 */
public class MyListener implements MessageListener {

    /** 
     * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
     */
    public void onMessage(Message message) {
        MapMessage mapMessage=(MapMessage)message;
        try {
            System.out.println("处理自己的业务逻辑");
            System.out.println(mapMessage.getString("userName"));
        } catch (Exception e) {
           
        }

    }

}
           

当然这里是不用传任何对象的,因为我们只是触发它而已,接下来就自己写自己的业务逻辑就可以了

对象上面配置不熟悉的可以参考我的文章

activemq使用系列: 使用JmsGatewaySupport构建出通用的消息收发代码

简单测试一下,我们运行调度中心的测试类

使用activemq 和 quartz构建简易版企业调度中心

可以看到消息每5秒发送一次,我们看下amq控制台

使用activemq 和 quartz构建简易版企业调度中心

我们运行子系统可以看见

使用activemq 和 quartz构建简易版企业调度中心

我们以每5秒一次在执行业务逻辑

这样一个简易的调度中心完成了,实际上我们可以把我们的这些要调度的任务信息全部存放到数据库,然后把调度中心写成一个web项目,再加上前台界面,可以更加人性化的管理,比如删除任务,手动触发任务,暂停任务等等功能

有兴趣的可以自己完善一下

继续阅读