java定时任务框架quartz和activemq有什么关系呢?实际上我们使用activemq进行解耦用的,可以看一下简易的设计图
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICN1QjNwYTM1AjNxUDM2EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
调度中心:上面记录各种定时任务的信息,比如我们有一个每天计算利息的跑批,那么我们可以用quartz定时向某个queue或者topic发送消息
子系统监听:比如我们的贷款系统,监听一个queue,我们监听代码里面就写着计算利息的业务逻辑
由此我们可以知道调度中心只是通过定时发送mq的方式来触发我们业务系统的业务逻辑,真正执行业务逻辑的还是业务系统本身和调度中心没关系
这里我给出一份简要版的代码,主要讲解一下实现的思路
首先我们编写调度中心
创建一个maven工程,结构如下
关于spring和mq的整合可以参考我的这篇文章,这里不再重复叙述了
activemq使用系列: 使用JmsGatewaySupport构建出通用的消息收发代码
首先创建我们的消息发送类
/**
*
*/
package com.pcx.amqproducer;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.support.JmsGatewaySupport;
/**
*
* @author scarletbullet
* @version $Id: MySpringProducer2.java,
*/
public class MySpringProducer2 extends JmsGatewaySupport {
public void send(final String destination){
if (destination.startsWith("topic")) {
this.getJmsTemplate().setPubSubDomain(true);
} else {
this.getJmsTemplate().setPubSubDomain(false);
}
this.getJmsTemplate().send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
MapMessage message=session.createMapMessage();
message.setString("userId", "4545454");
message.setString("userName", "jmstemplateGatewag");
message.setInt("age",25);
return message;
}
});
System.out.println("成功发送了一条JMS消息");
}
}
接下来创建我们的定时任务job
/**
*
*/
package com.pcx.amqproducer;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
/**
*
* @author scarletbullet
* @version $Id: MessageJob.java,
*/
public class MessageJob implements Job{
/**
* @see org.quartz.Job#execute(org.quartz.JobExecutionContext)
*/
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
MySpringProducer2 mySpringProducer2= (MySpringProducer2)jobExecutionContext.getJobDetail().getJobDataMap().get("mySpringProducer");
mySpringProducer2.send(jobExecutionContext.getJobDetail().getName());
System.out.println(jobExecutionContext.getJobDetail().getName() +"发送消息");
}
}
最后我们编写调度的测试代码
/**
*
*/
package com.pcx.amqproducer;
import java.text.ParseException;
import org.quartz.CronTrigger;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
*
* @author scarletbullet
* @version $Id: SchedulerMQSender.java
*/
public class SchedulerMQSender {
/**
*
* @param args
*/
private static Scheduler scheduler;
public static void main(String[] args) {
ApplicationContext context;
context= new ClassPathXmlApplicationContext("classpath:spring/spring-context.xml");
MySpringProducer2 mySpringProducer = (MySpringProducer2) context.getBean("mySpringProducer");
SchedulerFactory sf = new StdSchedulerFactory();
try {
String schedulerName="que12";//就是我们发消息的队列或者topic名称
//创建一个调度对象
scheduler = sf.getScheduler();
scheduler.start();
//设置job详情
JobDetail jobDetail = new JobDetail();
jobDetail.setJobClass(MessageJob.class);
jobDetail.setName(schedulerName);
jobDetail.setGroup(schedulerName);
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("mySpringProducer", mySpringProducer);
jobDetail.setJobDataMap(jobDataMap);
CronTrigger cronTrigger = null;
cronTrigger = new CronTrigger(schedulerName, schedulerName, "*/5 * * * * ?");
//5秒调用一次
scheduler.scheduleJob(jobDetail, cronTrigger);
scheduler.start();
} catch (SchedulerException e) {
e.printStackTrace();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
给出spring-context.xml配置文件的代码
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.2.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.2.xsd"
default-autowire="byName" default-lazy-init="false">
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- ActiveMQ服务地址 -->
<property name="brokerURL" value="tcp://192.168.1.10:61616" />
</bean>
<!-- <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="user.queue"/>
</bean> -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory"/>
<!-- <property name="defaultDestinationName" value="user.queue"/> -->
</bean>
<bean id="mySpringProducer" class="com.pcx.amqproducer.MySpringProducer2">
<property name="jmsTemplate" ref="jmsTemplate"/>
</bean>
<bean id="mySpringProducer3" class="com.pcx.amqproducer.MySpringProducer3">
<property name="jmsTemplate" ref="jmsTemplate"/>
</bean>
</beans>
从以上代码我们得出,我的意图是每5秒向queue名称为"que12"的队列发送消息
接下来我们编写业务实现端
新建一个maven工程,结构如下
applicationContext.xml代码如下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.2.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.2.xsd">
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.1.10:61616" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<bean id="myListener" class="com.pcx.amqweb.MyListener" />
<bean id="generalListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer" abstract="true">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<bean id="listenerone" parent="generalListener">
<property name="destinationName" value="que12"/>
<property name="messageListener" ref="myListener" />
</bean>
</beans>
创建一个消息监听器
/**
*
*/
package com.pcx.amqweb;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
/**
*
* @author scarletbullet
* @version $Id: MyListener.java,
*/
public class MyListener implements MessageListener {
/**
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
public void onMessage(Message message) {
MapMessage mapMessage=(MapMessage)message;
try {
System.out.println("处理自己的业务逻辑");
System.out.println(mapMessage.getString("userName"));
} catch (Exception e) {
}
}
}
当然这里是不用传任何对象的,因为我们只是触发它而已,接下来就自己写自己的业务逻辑就可以了
对象上面配置不熟悉的可以参考我的文章
activemq使用系列: 使用JmsGatewaySupport构建出通用的消息收发代码
简单测试一下,我们运行调度中心的测试类
可以看到消息每5秒发送一次,我们看下amq控制台
我们运行子系统可以看见
我们以每5秒一次在执行业务逻辑
这样一个简易的调度中心完成了,实际上我们可以把我们的这些要调度的任务信息全部存放到数据库,然后把调度中心写成一个web项目,再加上前台界面,可以更加人性化的管理,比如删除任务,手动触发任务,暂停任务等等功能
有兴趣的可以自己完善一下