背景:
现如今互联网无时无刻不在面临着高并发问题,比如早年的小米手机出新产品时,大量的买家使用各种终端设备进行疯抢。再比如春运火车票开始发售时,微信群里发红包时。互联网的开发包括java后台、Nosql、数据库、限流、CDN、负载均衡等内容。
高并发系统的分析和设计
任何系统都不是独立于业务开发的,都应该先分析业务需求和实际的场景。对于业务分析,首先是有效需求和无效需求,有效需求就是指真正的需求,这里主要讨论无效的需求。
无效需求分为很多种类,比如通过脚本连续刷新网站首页,使得网站频繁的访问数据库和其他资源,又比如使用刷票软件连续请求。鉴别有效请求和无效请求是获取有效请求的高并发网站业务分析的第一步,下面分析无效请求的场景,以及应对的方法。
首先,一个账号的连续请求,对于一些懂技术的用户来说,可以使用软件对请求的服务接口连续请求,甚至在一秒内请求成百上千次,这毫无疑问是无效请求。应对的方法有很多,最常见的就是加入验证码。一般而言,首次无验证码以便减少用户录入,第二次请求开始加入验证码,可以是图形也可以是等式运算等。使用图片验证码有可能存在识别图片作弊软件的攻击,所以在一些互联网网站上,图片验证码还会被加工成东倒西歪的形式,简单的等式运算会使图片识别作弊软件更加的难以辨认。 其次,使用短信服务,把验证码发送到短信平台以规避部分作弊软件。仅仅做这一步还是不够的,,毕竟验证码还是可以有其他的作弊软件可以读取的。应该在进一步限制请求,比如限制用户在单位时间内的购买量来压制用户的请求量,注意这些判断逻辑应该在负载均衡中实现,而不是在web服务器中。负载均衡判断验证码可以使用Lua或者C语言联合Redis进行判断。
这是对一个账号连续无效请求的压制,但有时候有些用户可以申请多个帐户来迷惑服务器,使得可以避开对单个账户的验证,从而获得更多的服务器资源。一个人多个账户的场景还是比较容易应对的,可以通过提高账户的等级来压制请求,比如支付交易的网站,可以通过银行卡验证,实名制获取相关证件号码,从而使用证件号码使得多个账户归结为一人。
还有就是有组织的请求,对于一些黄牛组织,可能通过多个人的账号发送请求,统一组织伪造有效的请求,对于这样的请求我们可以使用僵尸账号排除法对可交易的账号进行排除,所谓僵尸账号,是指那些平时没有交易的账号,在某些特殊的日子交易,比如春运期间进行大批量的抢购的账号。还可以使用IP封禁,尤其是对同一网段或者是同一IP频繁请求的,但是这样可能会误伤有效请求,所以IP封禁需要慎重!
系统设计
高并发的系统往往需要分布式的系统进行分摊请求的压力,这就需要负载均衡服务了。它进行简易判断后就会分发到WEB服务器了。服务器可以按照业务分,比如购买分为产品维护、交易维护、资金维护、报表统计和用户维护模块,按照功能模块进行区分,使得它们相互隔离,就可以降低数据的复杂性。这种方法被称为水平分法。
这种方法的好处在于:首先一个服务管理一种业务,业务简单,提高了开发效率;其次,数据库的设计方便了很多。但是也会带来麻烦,由于各个系统业务之间存在关联,还要通过RPC处理关联信息,比较流行的有Dubbo、Thrift、Hessian等。其原理是,每一个服务都会暴露一些公共接口给RPC服务,这样对于任何一个服务器都能通过RPC服务获取其他服务器对应的接口气调度各个服务器的逻辑来完成功能,但是接口的互相调用会造成一定的缓慢。有水平分法就有垂直分法,垂直分法不是按照功能进行划分,而是多台服务器均摊请求,每个服务器处理自己的业务互不干扰,使得每个服务器都包含所有的业务逻辑,会造成开发上的业务困难,数据库也是如此。
对于大型网站多采用两种方式结合的方法,首先按照业务区分为多个子系统,然后在每个子系统下再分多个服务器,通过每个子系统的路由器找到对应的子系统的服务器提供服务。
数据库设计
对于数据库而言,为了高性能,可以使用分表或者分库技术。分表是指本来在一张表可以存储的数据房子多张表进行存储,比如transcation表,可以根据年份分为transcation_2016/transcation_2017等。分库则不一样,他把表数据放在不同的数据库,分库数据库需要一个路由算法确定数据在哪个数据库上。一些会员网站还区分活跃会员和非活跃会员。活跃会员通过数据迁移的手段,也就是先记录在某个时间段会员的活跃度,然后通过数据迁移,将活跃会员平均分摊到各个数据库中,从而达到数据库的负载均衡。
做完这些还可以考虑优化sql,建立索引等优化,提高数据库的性能。对于优化sql,在开发网站使用更新语句或者复杂查询语句要时刻记住更新是表锁定还是行锁定,比如id是主键,而user_name是用户名称,也是唯一索引,更新用户的生日时,可以使用如下两条sql语句实现。
update t_user set birthday=#{birthday} where id=#{id};
update t_user set birthday=#{birthday] where user_name={userName};
,对于上面的两条sql语句优先使用主键更新,其原因是MYSQL在运行的过程中,第二条sql语句会锁表,即不仅锁定更新的数据,而且锁定其他表的数据,从而影响并发,而使用主键更新则是行锁定。
sql优化还有很多细节,比如可以使用连接查询代替子查询。查询一个没有分配角色用户的id,可能有人会用下面的sql
select u.id from t_user u
where u.id not in(select ur.user_id form t_user_role ur);
这是一个not in语句,性能低下,对于这样的not in 或者 not exists语句,应该全部修改为表连接去执行。如下
select u.id from t_user u left join t_user_role ur
on u.id=ur.user_id
where ur.user_id is null;
这样就大大提高了sql的执行性能。
此外还可以通过读/写分离技术,一台主机主要负责写业务,多台备机负责读业务,有助于性能的提高。
对于分布式数据库而言,还有一个麻烦就是事务的一致性。这个比较麻烦。
动静分离技术
对于互联网来说大部分的数据都是静态数据,动态数据非常少,如果像视频、图片之类的静态数据都从动态服务器(Tocmat、WildFly、WebLogic等)进行获取,那么动态服务器的带宽压力就会很大,这时候可以采用动静分离技术。有条件的企业可以考虑使用CDN技术,它允许企业将自己的静态数据存储在网络CDN的节点中,就近取出缓存节点的数据,速度会很快。。一些企业也会是用自己的静态HTTP服务器,将静态数据存储在静态服务器上,尽量使用Cookie等技术,让客户端缓存可以缓存数据,避免多次请求,降低服务器压力。
锁和高并发
动态数据是高并发网站关注的重点,而动态数据的请求最后会落在一台WEB服务器上,高并发系统存在的一个麻烦就是并发数据不一致的问题。比如抢红包,会出现多个线程同时享有大红包的场景,在高并发的场景中,由于每一个线程的顺序不一样,所有会导致数据不一致的问题。并且在一瞬间产生很高的并发,还要考虑性能的问题。加锁会影响并发,而不加锁难以保证数据的一致性,这就是锁和高并发的矛盾。
为解决这一矛盾,很多企业提出了悲观锁和乐观锁的概念。
开发设计
搭建抢红包开发环境和超发现象
搭建Service层和DAO层
数据库使用mysql数据库,工具使用navicat for mysql.
遇到的第一个问题
2003 can't connect to mysql server 10038
原因
是因为把服务禁用的缘故
解决方法(3)
第一种方法:
第一步:
先看报错窗口
2003 can't connect to MySQL server on '127.0.0.1'(10038).
第二步:
原因是:远程3306端口未对外开放操作。
第三步:
首先远程连接服务器,点击"开始"--> "管理工具"-->"高级安全Windows防火墙"。
第四步:
在打开的窗口中,左边选中"入站规则",右边点击"新建规则"来建立一个入站规则。
第五步:
在"规则类型"中选择"端口",然后下一步。
第六步:
选择"特定本地连接",输入3306。
第七步:
选择"允许连接",然后点击下一步。
第八步:
默认都选中就行,然后下一步。
第九步:
给规则起一个名字,什么都可以,只要自己认识就行。
第十步:
重新打开MySql就可以了。
第二种方法:
第一步:
打开服务,看看MySql是否启动。
第二步:
启动MySql服务。
第三种方法:
第一步:
找到"开始"菜单,打开cmd。
第二步:
输入net start mysql。
开发设计
搭建抢红包开发环境和超发现象
首先要在数据库建表,一个是红包表,另一个是用户抢红包表。这里的红包表是指一个大红包的信息,分为若干个小红包,为了业务简单,假设每个红包是等额的。下面给出这两个表的sql和数据。如下:
CREATE TABLE T_RED_PACKET
(
id int(12) not null auto_increment,
user_id int(12) not NULL,
amount decimal(16,2) not null,
send_date timestamp not null,
total int(12) not NULL,
unit_amount decimal(12) not null,
stock int(12) not null,
version int(12) default 0 not null,
note varchar(256) null,
primary key clustered(id)
);
create table T_USER_RED_PACKET
(
id int(12) not null auto_increment,
red_packet_id int(12) not null,
user_id int(12) not null,
amount decimal(16,2) not null,
grab_time timestamp not null,
note varchar(256) null,
primary key clustered (id)
);
insert into t_red_packet(user_id,amount,send_date,total,unit_amount,stock,note)
values(1,200000.00,now(),20000,10.00,20000,'20万元金额,2万个小红包,每个10元');
有了这两个表,我们就可以为这两个表建pojo了
首先创建web项目,如果不会请参考-eclipse创建java web项目
接下来创建包和pojo
//RedPacket.java
package com.ssm.wdz.pojo;
import java.io.Serializable;
import java.sql.Timestamp;
public class RedPacket implements Serializable{
private static final long serialVersionUID = -5921274777655497099L;
private Long id;
private Long userId;
private Double amount;
private Timestamp sendDate;
private Integer total;
private Double unitAmount;
private Integer stock;
private Integer version;
private String note;
...
}
//UserRedPacket.java
package com.ssm.wdz.pojo;
import java.io.Serializable;
import java.sql.Timestamp;
public class UserRedPacket implements Serializable{
private static final long serialVersionUID = 1229009027725607718L;
private Long id;
private Long redPacketId;
private Long userId;
private Double amount;
private Timestamp grabTime;
private String note;
...
}
这两个pojo,一个是红包信息,一个是抢红包信息,使用Mybatis开发它们,先来完成大红包的查询,定义DAO对象。
//RedPacketDao.java
package com.ssm.wdz.dao;
import com.ssm.wdz.pojo.RedPacket;
@Repository
public interface RedPacketDao {
public RedPacket getRedPacket(Long id);
public int decreaseRedPacket(Long id);
}
其中的两个方法,其中的两个方法,一个是查询红包,另一个是扣减红包库存。下面使用xml映射,如下:
//RedPacket.xml
< ? xml version="1.0" encoding="UTF-8"?>
< ! DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
< mapper namespace="com.ssm.wdz.dao.RedPacketDao">
< !-- 查询红包具体信息 -->
< select id="getRedPacket" parameterType="long" resultType="com.ssm.wdz.pojo.RedPacket">
select id user_id as userId,amount,send_date as sendDate,total,unit_amount as unitAmount,stock,version,note
from T_RED_PACKET where id=#{id}
< /select>
< !-- 扣减抢红包库存 -->
< update id="decreaseRedPacket">
update T_RED_PACKET set stock=stock-1 where id=#{id}
< /update>
< /mapper>
这里没有加锁之类的操作,目的是为了演示超发红发的情况,接下来就是抢红包的设计了,下面是插入抢红包的DAO。
开发设计
搭建抢红包开发环境和超发现象
紧接上次的设计,抢红包的设计,先来定义抢红包的DAO,如下:
UserRedPacketDao.java
package com.ssm.wdz.dao;
import org.springframework.stereotype.Repository;
import com.ssm.wdz.pojo.UserRedPacket;
@Repository
public interface UserRedPacketDao {
public int grapRedPacket(UserRedPacket userRedPacket);
}
同样的,也要映射xml,如下:
//UserRedPacket.xml
< ?xml version="1.0" encoding="UTF-8"?>
< !DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
< mapper namespace="com.ssm.wdz.dao.UserRedPacketDao">
< !-- 插入抢红包信息 -->
< insert id="grapRedPacket" useGeneratedKeys="true" keyProperty="id" parameterType="com.ssm.wdz.pojo.UserRedPacket">
insert into t_user_red_packet(red_packed_id,user_id,amount,grab_time,note)
values(#{redPacketId},#{userId},#{amount},now(),#{note})
< /insert>
< /mapper>
在这里使用了useGeneratedKeys和KeyProPerty,这就意味着会返回数据库生成的主键信息,进行主键回填,关于DAO层在这里就基本完成了。
接下来定义两个Service接口,分别是UserRedPacketService和RedPacketService,如下:
//RedPacketService.java
package com.ssm.wdz.service;
import com.ssm.wdz.pojo.RedPacket;
public interface RedPacketService {
public int decreaseRedPacket(Long id);
public RedPacket getRedPacket(Long id);
}
//UserRedPacketService.java
package com.ssm.wdz.service;
public interface UserRedPacketService {
public int grapRedPacket(Long redPacketId,Long userId);
}
它的两个实现类,如下:
//RedPacketServiceImpl.java
package com.ssm.wdz.service.impl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import com.ssm.wdz.dao.RedPacketDao;
import com.ssm.wdz.pojo.RedPacket;
import com.ssm.wdz.service.RedPacketService;
@Service
public class RedPacketServiceImpl implements RedPacketService{
@Autowired
private RedPacketDao redPacketDao=null;
@Override
@Transactional(isolation=Isolation.READ_COMMITTED,propagation=Propagation.REQUIRED)
public RedPacket getRedPacket(Long id) {
return redPacketDao.getRedPacket(id);
}
@Override
@Transactional(isolation=Isolation.READ_COMMITTED,propagation=Propagation.REQUIRED)
public int decreaseRedPacket(Long id) {
return redPacketDao.decreaseRedPacket(id);
}
}
配置了事务注解@Transactional,让程序能够在事务中运行,保证数据的一致性,这里采用的是读/写提交的隔离级别,对于传播行为采用的Propagation.REQUIRED,这样调用这个 方法的时候,如果没有事务就会创建,如果有则沿用当前的事务。
//UserRedPacketServiceImpl.java
package com.ssm.wdz.service.impl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import com.ssm.wdz.dao.RedPacketDao;
import com.ssm.wdz.dao.UserRedPacketDao;
import com.ssm.wdz.pojo.RedPacket;
import com.ssm.wdz.pojo.UserRedPacket;
import com.ssm.wdz.service.UserRedPacketService;
@Service
public class UserRedPacketServiceImpl implements UserRedPacketService{
@Autowired
private UserRedPacketDao userRedPacketDao=null;
@Autowired
private RedPacketDao redPacketDao=null;
//失败
private static final int FAILED=0;
@Override
@Transactional(isolation=Isolation.READ_COMMITTED,propagation=Propagation.REQUIRED)
public int grapRedPacket(Long redPacketId,Long userId) {
//获取红包信息
RedPacket redPacket=redPacketDao.getRedPacket(redPacketId);
//当前小红包库存大于0
if(redPacket.getStock()>0) {
redPacketDao.decreaseRedPacket(redPacketId);
//生成抢红包信息
UserRedPacket userRedPacket=new UserRedPacket();
userRedPacket.setRedPacketId(redPacketId);
userRedPacket.setUserId(userId);
userRedPacket.setAmount(redPacket.getUnitAmount());
userRedPacket.setNote("抢红包"+redPacketId);
//插入强红包信息
int result=userRedPacketDao.grapRedPacket(userRedPacket);
return result;
}
//失败返回
return FAILED;
}
}
grapRedPacket方法的逻辑是先获取红包信息,如果发现红包库存大于0,则抢红包并生成抢红包的信息将其保存到数据库中。因为有注解Transactional的存在,所有的操作都在一个事务中完成。在高并发中就会发生超发现象。
使用全注解搭建SSM开发环境
//WebAppInitializer.java
package com.ssm.wdz.config;
import javax.servlet.MultipartConfigElement;
import javax.servlet.ServletRegistration.Dynamic;
import org.springframework.web.servlet.support.AbstractAnnotationConfigDispatcherServletInitializer;
public class WebAppInitializer extends AbstractAnnotationConfigDispatcherServletInitializer{
//Sring Ioc环境配置
@Override
protected Class< ?>[] getRootConfigClasses(){
//配置Spring Ioc资源
return new Class< ?>[] {RootConfig.class};
}
//DispactchServlet环境配置
@Override
protected Class< ?>[] getServletConfigClasses() {
//加载java配置类
return new Class< ?>[] {WebConfig.class};
}
//DispactchServlet拦截请求配置
@Override
protected String[] getServletMappings() {
return new String[] {"*.do"};
}
@Override
protected void customizeRegistration(Dynamic dynamic) {
//配置上传文件路径
String filepath="e:/mvc/uploads";
//5MB
Long singleMax=(long) (5*Math.pow(2, 20));
//10MB
Long totalMax=(long) (10*Math.pow(2, 20));
//设置上传文件配置
dynamic.setMultipartConfig(new MultipartConfigElement(filepath,singleMax,totalMax,0));
}
}
这个类继承了AbstractAnnotationConfigDispatcherServletInitializer,它实现了3个抽象方法,并且覆盖了父类的customizeRedistration方法,作为上传文件的配置。3个方法为:
getRootConfigClasses是一个配置Spring Ioc容器的上下文配置,此配置在代码中将由RootConfig完成。
getServletConfigClasses配置DispatcherServlet上下文配置,将会由WebConfig完成。
getServletMappings配置DispatcherServlet拦截内容,拦截所有的以.do结尾的请求。
通过这三个方法就可以配置Web工程中的Spring Ioc资源和DispatcherServlet的内容,首先是配置Spring Ioc的内容,配置类RootConfig,如下:
//RootConfig.java
package com.ssm.wdz.config;
import java.util.Properties;
import javax.sql.DataSource;
import org.apache.tomcat.dbcp.dbcp.BasicDataSourceFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.mapper.MapperScannerConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.ComponentScan.Filter;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.FilterType;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Repository;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.TransactionManagementConfigurer;
@Configuration
//定义Spring扫描的包
@ComponentScan(value="com.*",includeFilters= {@Filter(type=FilterType.ANNOTATION,value= {Service.class})})
//使用事务驱动管理器
@EnableTransactionManagement
//实现接口TransactionManagementConfigurer,这样就可以配置注解驱动事务
public class RootConfig implements TransactionManagementConfigurer{
private DataSource dataSource=null;
@Bean(name="dataSource")
public DataSource initDataSource() {
if(dataSource!=null) {
return dataSource;
}
Properties props=new Properties();
props.setProperty("driverClassName", "com.mysql.jdbc.Driver");
props.setProperty("url", "jdbc:localhost:3306/redpacket");
props.setProperty("username", "root");
props.setProperty("password", "123456");
props.setProperty("maxActive", "200");
props.setProperty("maxIdle", "20");
props.setProperty("maxWait", "30000");
try {
dataSource=BasicDataSourceFactory.createDataSource(props);
}catch(Exception e) {
e.printStackTrace();
}
return dataSource;
}
@Bean(name="sqlSessionFactory")
public SqlSessionFactoryBean initSqlSessionFactory() {
SqlSessionFactoryBean sqlSessionFactory=new SqlSessionFactoryBean();
sqlSessionFactory.setDataSource(initDataSource());
//配置Mybatis配置文件
Resource resource=new ClassPathResource("mybatis/mybatis-config.xml");
sqlSessionFactory.setConfigLocation(resource);
return sqlSessionFactory;
}
@Bean
public MapperScannerConfigurer initMapperScannerConfigurer() {
MapperScannerConfigurer msc=new MapperScannerConfigurer();
msc.setBasePackage("com.*");
msc.setSqlSessionFactoryBeanName("sqlSessionFactory");
msc.setAnnotationClass(Repository.class);
return msc;
}
@Override
@Bean(name="annotationDrivenTransactionManager")
public PlatformTransactionManager annotationDrivenTransactionManager() {
DataSourceTransactionManager transactionManager=new DataSourceTransactionManager();
transactionManager.setDataSource(initDataSource());
return transactionManager;
}
}
这个类标注了注解@EnableTransactionManagement,实现了接口TransactionManagementConfigurer,这样配置是为了实现注解式的事务,将来可以通过注解@Transactional配置数据库事务。它有一个方法定义,这个方法是annotationDrivenTransactionManager,这需要将一个事务管理器返回给它就行了。除了配置数据库事务外,还配置了数据源SqlSessionFactoryBean和Mybatis的扫描类,并把Mybatis的扫描类通过注解@Repository和包名限定。这样Mu把提示就会通过Spring的机制找到对应的接口和配置,Spring会自动把对应的接口装配到Ioc容器中。
有了Spring Ioc容器后,还需要配置DispatcherServlet上下文,完成这个任务的是WenConfig配置类,
开发设计
昨天完成了Spring Ioc容器的配置,还需要配置DispatcherServlet上下文,完成这个任务的类是WebConfig,如下:
//WebConfig.java
package com.ssm.wdz.config;
import java.util.ArrayList;
import java.util.List;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.ComponentScan.Filter;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.FilterType;
import org.springframework.http.MediaType;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.stereotype.Controller;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.web.servlet.HandlerAdapter;
import org.springframework.web.servlet.ViewResolver;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter;
import org.springframework.web.servlet.view.InternalResourceViewResolver;
@Configuration
//定义Spring Mvc的扫描包
@ComponentScan(value="com.*",includeFilters= {@Filter(type=FilterType.ANNOTATION,value=Controller.class)})
//启动Spring Mvc配置
@EnableWebMvc
public class WebConfig {
@Bean(name="internalResourceViewResolver")
public ViewResolver initViewResolver() {
InternalResourceViewResolver viewResolver=new InternalResourceViewResolver();
viewResolver.setPrefix("/WEB-INF/jsp/");
viewResolver.setSuffix(".jsp");
return viewResolver;
}
@Bean(name="requestMappingHandlerAdapter")
public HandlerAdapter initRequestMappingHandlerAdapter() {
//创建RequestMappingHandlerAdapter适配器
RequestMappingHandlerAdapter rmhd=new RequestMappingHandlerAdapter();
//HTTP JSON转换器
MappingJackson2HttpMessageConverter jsonConverter=new MappingJackson2HttpMessageConverter();
//MappingJackson2HttpMessageConverter接收JSON类型消息的转换
MediaType mediaType=MediaType.APPLICATION_JSON;
List mediaTypes=new ArrayList();
mediaTypes.add(mediaType);
//加入转化器支持类型
jsonConverter.setSupportedMediaTypes(mediaTypes);
//往适配器加入json转换器
rmhd.getMessageConverters().add(jsonConverter);
return rmhd;
}
}
这里配置了一个视图解析器,通过他找到对应的jsp文件,然后使用数据模型进行渲染,采用自定义创建RequestMappingHandlerAdapter,为了让它能够支持JSON格式(@ResponseBody)的转换,所以需要创建一个关于对象和JSON的转换消息类,那就是MappingJackSon2HttpMessageConverter类对象。创建它之后,把它注册给RequestMappingHandlerAdapter对象,这样当控制器遇到注解@ResponseBody的时候就知道采用JSON消息类型进行应答,那么在控制器完成逻辑之后,由处理器将其和消息转换类型做匹配,找到MappingJackSon2HttpMessageConverter类对象,从而转变为JSON数据。
通过之前的的三个类就搭建好了Spring Mvc和Spring的开发环境,但是没有完成对MyBatis配置文件,在RootConfig中可以看出,使用文件/mybatis/mybatis-config.xml进行配置,代码如下:
//mybatis-config.xml
< ?xml version="1.0" encoding="UTF-8"?>
< !DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
< configutation>
< mappers>
< mapper resource="com/ssm/wdz/mapper/UserRedPacket.xml"/>
< mapper resource="com/ssm/wdz/mapper/RedPacket.xml"/>
< /mappers>
< /configutation>
这样关于后台的逻辑已经完成,接下来就要开发控制器。
乐观锁重入机制
使用乐观锁造成大量的更新失败的问题,使用时间戳执行乐观锁重入,是一种提高成功率的方法,比如考虑在100毫秒内允许重入,把UserRedPacketServiceImpl中的方法grapRedPacketForVersion修改为如下代码:
/ *
* 乐观锁重入机制,时间戳限制
* /
long startTime = System.currentTimeMillis();
//记录开始时间
long start=startTime;
//无限循环,等待 成功或者满100毫秒退出
while(true) {
//获取循环当前时间
long end=System.currentTimeMillis();
if(end-start>100) {
return FAILED;
}
//获取红包信息,注意version值
RedPacket redPacket=redPacketDao.getRedPacket(redPacketId);
//当前小红包库存大于0
if(redPacket.getStock()>0) {
//再次传入线程保存的version旧值给sql判断,是否有其他线程更改过数据
int update=redPacketDao.decreaseRedPacketForVersion(redPacketId, redPacket.getVersion());
//如果没有数据更新,则说明其他线程已经修改过数据,本次抢红包失败
if(update==0) {
continue;
}
//生成抢红包信息
UserRedPacket userRedPacket=new UserRedPacket();
userRedPacket.setRedPacketId(redPacketId);
userRedPacket.setUserId(userId);
userRedPacket.setAmount(redPacket.getUnitAmount());
userRedPacket.setNote("redPacketId:"+redPacketId);
//插入抢红包信息
int result=userRedPacketDao.grapRedPacket(userRedPacket);
return result;
}else {
long endTime = System.currentTimeMillis();
//输出程序运行时间
System.out.println("抢红包程序运行时间:" + (endTime - startTime) + "ms");
//失败返回
return FAILED;
}
}
当因为版本号原因更新失败的时候,会重新尝试抢红包,但是会实现判断时间戳,如果时间戳在100毫秒之内,就继续,否则就不再重新尝试,这样可以避免过多sql执行,维持系统稳定,但是有的时候时间戳也不是那么稳定,也会随着系统的空闲或者繁忙导致重试次数不一,有时候我们会考虑重试次数,比如3次,下面在改写上一个方法,如下:
/*
* 乐观锁重入机制,次数限制
* /
long startTime = System.currentTimeMillis();
for(int i=0;i<3;i++) {
//获取红包信息,注意version值
RedPacket redPacket=redPacketDao.getRedPacket(redPacketId);
//当前小红包库存大于0
if(redPacket.getStock()>0) {
//再次传入线程保存的version旧值给sql判断,是否有其他线程更改过数据
int update=redPacketDao.decreaseRedPacketForVersion(redPacketId, redPacket.getVersion());
//如果没有数据更新,则说明其他线程已经修改过数据,本次抢红包失败
if(update==0) {
continue;
}
//生成抢红包信息
UserRedPacket userRedPacket=new UserRedPacket();
userRedPacket.setRedPacketId(redPacketId);
userRedPacket.setUserId(userId);
userRedPacket.setAmount(redPacket.getUnitAmount());
userRedPacket.setNote("redPacketId:"+redPacketId);
//插入抢红包信息
int result=userRedPacketDao.grapRedPacket(userRedPacket);
return result;
}else {
long endTime = System.currentTimeMillis();
//输出程序运行时间
System.out.println("抢红包程序运行时间:" + (endTime - startTime) + "ms");
//失败返回
return FAILED;
}
}
return FAILED;
通过for循环限定重试3次,3次过后无论失败与否都会判定为失败而退出,这样就能避免过多的重试导致过多的sql被执行的问题,从而保证数据库的性能。进行了上述两种方法的测试,结果如下:
时间戳限制:抢红包程序运行时间:909231ms
次数限制:抢红包程序运行时间:311098ms
显然,使用次数限制的重入乐观锁取得了很好的效果,没有超发但是会有有抢红包失败!
还是没怎么解决高概率失败现象。因机器而异吧。
但是现在是使用数据库的情况,有时候并不想使用数据库作为抢红包的数据保存载体,而是选择性能优于数据库的redis。明天将使用redis处理高并发的请求。
开发设计
使用Redis实现抢红包
数据库最终会将数据保存到磁盘中,而Redis使用的是内存,内存的速度要比磁盘的速度快的多,所以这里将谈论使用Redis实现抢红包。
对于使用Redis实现抢红包,首先需要知道的是Redis功能不如数据库强大,事务也不完整,因此要保证数据的正确性,数据的正确性可以通过严格地验证进行保证。而Redis的Lua语言是原子性的,且功能更为强大,所以优先选择使用Lua语言来实现抢红包。但是无论如何对于数据而言,在Redis当中存储,始终都不是长久之计,因为Redis并非一个长久存储数据的地方,它存储的数据是非严格和安全的环境,更多的时候只是为了提供更多的缓存,所以当红包金额为0或者红包超时的时候(超时操作可以使用定时机制实现),会将红包数据保存到数据库中,这样才能保证数据的安全性和严格性。
使用注解方式配置Redis
想要使用
接下来进行编码,
首先在RootConfig上创建一个RedisTemplate对象,并将其装载到Spring Ioc容器中,如下:
@Bean(name="redisTemplate")
public RedisTemplate initRedisTemplate() {
JedisPoolConfig poolConfig=new JedisPoolConfig();
//最大空闲数
poolConfig.setMaxIdle(50);
//最大连接数
poolConfig.setMaxTotal(100);
//最大等待毫秒数
poolConfig.setMaxWaitMillis(20000);
//创建Jedis链接工厂
JedisConnectionFactory connectionFactory=new JedisConnectionFactory(poolConfig);
connectionFactory.setHostName("localhost");
connectionFactory.setPort(6379);
//调用初始化方法,没有它将抛出异常
connectionFactory.afterPropertiesSet();
//自定义Redis序列化器
RedisSerializer jdkSerializationRedisSerializer=new JdkSerializationRedisSerializer();
RedisSerializer stringRedisSerializer=new StringRedisSerializer();
//定义RedisTemplate,并设置连接工厂
RedisTemplate redisTemplate=new RedisTemplate();
redisTemplate.setConnectionFactory(connectionFactory);
//设置序列化器
redisTemplate.setDefaultSerializer(stringRedisSerializer);
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setValueSerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
redisTemplate.setHashValueSerializer(stringRedisSerializer);
return redisTemplate;
}
这样RedisTemplate就可以在Spring上下文中使用了。注意,JedisConnectionFactory对象在最后的时候需要自行调用afterPropertiesSet方法,它实现了InitializingBean接口,如果其配置在Spring Ioc容器中,Spring会自动调用它,但是这里我们是自行创建的,因此需要自行调用,否则在运用的时候会抛出异常,从而出现错误。
数据存储设计
Redis并不是一个严格的事务,而且事务功能也是有限的。加上Redis本身的命令也比较有限,功能性不强,为了增强功能性,还可以使用Lua语言。Redis中的Lua语言是一种原子性的操作,可以保证数据的一致性。依据这个原理可以避免超发现象,完成抢红包的功能,而且对性能而言,Redis要比数据库快的多。
第一次运行Lua脚本的时候,现在Redis中编译和缓存脚本,这样就可以得到一个SHAl字符串,之后通过SHAl字符串和参数就能调用Lua脚本了。先来编写Lua脚本,代码如下:
local listKey="'red_packet_list_'..KEYS[1] "
+"local redPacket='red_packet_'..KEYS[1] "
+"local stock=tonumber(redis.call('hget',redPacket,'stock')) "
+"if stock<=0 then return 0 end "
+"stock=stock-1 "
+"redis.call('hset',redPacket,'stock',toString(stock)) "
+"redis.call('rpush',listKey,ARGV[1]) "
+"if stock ==0 then return 2 end "
+"return 1"
这里可以看到这样一个流程:
判断是否存在可以抢夺的红包,对于红包的库存,如果已经没有可以抢夺的红包,则返回为0,结束流程。
有可抢夺的红包,对于红包的库存减1,然后重新设置库存。
将抢红包数据保存到Redis的链表当中,链表的key为red_packet_list_{id}。
如果当前库存为0,那么返回2,这说明可以触发数据库对Redis链表数据的保存。
链表的key为red_packet_list_{id}.它将保存抢红包的用户名和抢得时间。
如果当前库存不为0,那么返回1,这说明抢红包信息保存成功。
当返回为2的时候(现实中如果抢不完红包,可以使用超时机制触发,比较复杂),说明红包已经没有库存,会触发数据库对链表的数据的保存,这是一个大数据量的保存。为了不影响最后一次抢红包的响应,在实际的操作中往往会考虑使用JMS消息发送到别的服务器进行操作,这样会比较复杂,这里只是创建一条新的线程去运行保存Redis链表数据到数据库,为此我们需要一个新的服务类,如下
//RedisRedPacketService.java
package com.ssm.wdz.service;
public interface RedisRedPacketService {
/ **
* 保存redis抢红包列表
* @param redPacketId 抢红包编号
* @param unitAmount 红包金额
* /
public void saveUserRedPacketByRedis(Long redPacketId,Double unitAmount);
}
//RedisRedPacketServiceImpl.java
package com.ssm.wdz.service.impl;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundListOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import com.ssm.wdz.pojo.UserRedPacket;
import com.ssm.wdz.service.RedisRedPacketService;
@Service
public class RedisRedPacketServiceImpl implements RedisRedPacketService{
private static final String PREFIX="red_packet_list_";
//每次取出1000条,避免一次取出消耗太多内存
private static final int TIME_SIZE=1000;
@Autowired
//RedisTemplate
private RedisTemplate redisTemplate=null;
@Autowired
//数据源
DataSource dataSource=null;
@Override
//开启新线程运行
@Async
public void saveUserRedPacketByRedis(Long redPacketId,Double unitAmount) {
System.err.println("开始存数据");
Long start =System.currentTimeMillis();
//获取列表操作对象
BoundListOperations ops=redisTemplate.boundListOps(PREFIX+redPacketId);
Long size=ops.size();
Long times=size%TIME_SIZE==0?size/TIME_SIZE:size/TIME_SIZE+1;
int count=0;
List userRedPacketList=new ArrayList(TIME_SIZE);
for(int i=0;i
//获取至多TIME_SIZE个抢红包信息
List userIdList=null;
if(i==0) {
userIdList=ops.range(i*TIME_SIZE, (i+1)*TIME_SIZE);
}else {
userIdList=ops.range(i*TIME_SIZE+1,(i+1)*TIME_SIZE);
}
userRedPacketList.clear();
//保存红包信息
for(int j=0;j
String args=userIdList.get(j).toString();
String[] arr=args.split("-");
String userIdStr=arr[0];
String timeStr=arr[1];
Long userId =Long.parseLong(userIdStr);
Long time=Long.parseLong(timeStr);
//生成抢红包信息
UserRedPacket userRedPacket=new UserRedPacket();
userRedPacket.setRedPacketId(redPacketId);
userRedPacket.setUserId(userId);
userRedPacket.setAmount(unitAmount);
userRedPacket.setGrabTime(new Timestamp(time));
userRedPacket.setNote("redPakcetId:"+redPacketId);
userRedPacketList.add(userRedPacket);
}
//插入抢红包信息
count+=executeBatch(userRedPacketList);
}
//删除Redis列表
redisTemplate.delete(PREFIX+redPacketId);
Long end=System.currentTimeMillis();
System.err.println("保存数据结束,耗时"+(end-start)+"毫秒,共"+count+"条记录被保存。");
}
/ **
*使用JDBC批量处理Redis缓存数据
* @param userRedPacketList --抢红包列表
* @return 抢红包插入数量
* /
private int executeBatch(List userRedPacketList) {
Connection conn=null;
Statement stmt=null;
int []count=null;
try {
conn=dataSource.getConnection();
conn.setAutoCommit(false);
stmt=conn.createStatement();
for(UserRedPacket userRedPacket:userRedPacketList) {
String sql1="update t_red_packet set stock=stock-1 where id="+userRedPacket.getRedPacketId();
DateFormat df=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String sql2="insert into t_user_red_packet(red_packet_id,user_id,"+"amount,grab_time,note)"
+ "values("+userRedPacket.getRedPacketId()+","
+userRedPacket.getUserId()+","
+userRedPacket.getAmount()+","
+"'"+userRedPacket.getNote()+"')";
stmt.addBatch(sql1);
stmt.addBatch(sql2);
}
//执行批量
count=stmt.executeBatch();
//提交事务
conn.commit();
}catch(SQLException e) {
/ **错误处理逻辑** /
throw new RuntimeException("抢红包批量执行程序错误");
}finally {
try {
if(conn!=null&&!conn.isClosed()) {
conn.close();
}
}catch(SQLException e) {
e.printStackTrace();
}
}
//返回插入抢红包数据记录
return count.length/2;
}
}
注意,注解@Async表示让Spring自动创建另外一条线程去运行它,这样它便不在抢最后一个红包的线程内。因为这个方法是一个较长时间的方法,如果在同一个线程内,那么对于最后一个抢红包的用户需要等待的时间太长,影响体验。这里是每次取出1000个抢红包的信息,之所以这样做是为了避免取出的数据过大,导致JVM消耗过多的内存影响系统性能。对于大批量的数据操作,这是我们在实际操作中要注意的,最后还会删除Redis保存的链表信息,这样就能帮助Redis释放内存了。对于数据库的保存,这里采用了JDBC的批量处理,每1000条批量保存一次,使用批量有助于性能的提高。
使用@Async的前提是提供一个任务池给Spring环境,这个时候要在原有的基础上改写配置类WebConfig,如下:
@EnableAsync
public class WebConfig extends AsyncConfigurerSupport{
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor taskExecutor=new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(10);
taskExecutor.setQueueCapacity(200);
taskExecutor.initialize();
return taskExecutor;
}
使用@EnableAsync表明支持异步调用,接着重写了抽象类AsyncConfigurerSupport的getAsyncExecutor方法,它是获取一个任务池,当在Spring环境中遇到注解@Async就会启动这个任务池的一条线程去运行对应的方法,这样便能够异步执行了。
使用Redsi实现抢红包
首先应该编写Lua语言,使用对应的链接发送给Redis服务器,那么Redis会返回一个SHAl字符串,我们保存它,之后的发送可以只发送这个字符和对应的参数。下面在UserRedPacketService中加入一个新的方法:
//UserRedPacketService.java
/ **
* 通过Redis实现抢红包
* @param redPacketId 红包编号
* @param userId 用户编号
* @return
* 0-没有库存,失败
* 1-成功,且不是最后一个红包
* 2-成功,且是最后一个红包
* /
public Long grapRedPacketByRedis(Long redPacketId,Long userId);
它的实现类UserRedPacketServiceImpl也要加入实现方法,如下:
//UserRedpacketServiceImpl.java
@Autowired
private RedisTemplate redisTemplate=null;
@Autowired
private RedisRedPacketService redisRedPacketService=null;
//Lua脚本
String script="local listKey='red_packet_list_'..KEYS[1] \n"
+"local redPacket='red_packet_'..KEYS[1] \n"
+"local stock=tonumber(redis.call('hget',redPacket,'stock')) \n"
+"if stock<=0 then return 0 end \n"
+"stock=stock-1 \n"
+"redis.call('hset',redPacket,'stock',toString(stock)) \n"
+"redis.call('rpush',listKey,ARGV[1]) \n"
+"if stock ==0 then return 2 end \n"
+"return 1 \n";
//缓存Lua脚本后,使用该变量保存Redis返回的32位SHAl编码,使用它去执行缓存的Lua脚本
String shal=null;
@Override
public Long grapRedPacketByRedis(Long redPacketId,Long userId) {
//当前抢红包用户和日期信息
String args=userId+"-"+System.currentTimeMillis();
Long result=null;
//获取底层Redis操作对象
Jedis jedis=(Jedis) redisTemplate.getConnectionFactory().getConnection().getNativeConnection();
try {
//如果脚本没有加载过那么进行加载,这样就会返回一个SHAl编码
if(shal==null) {
shal=jedis.scriptLoad(script);
}
//执行脚本,返回结果
Object res=jedis.evalsha(shal,1,redPacketId+"",args);
result =(Long) res;
//返回2时为最后一个红包,此时将红包信息通过异步保存到数据库中
if(result==2) {
//获取单个小红包金额
String unitAmountStr=jedis.hget("red_packet_"+redPacketId,"unit_amount" );
//触发保存数据库操作
Double unitAmount=Double.parseDouble(unitAmountStr);
System.err.println("thread_name="+Thread.currentThread().getName());
redisRedPacketService.saveUserRedPacketByRedis(redPacketId, unitAmount);
}
}finally {
//确保jedis顺利关闭
if(jedis!=null&&jedis.isConnected()) {
jedis.close();
}
}
return result;
}
这里使用了保存脚本返回的SHAl字符串,所以只会发送一次脚本到Redis服务器,之后只传输SHAl字符串和参数到Redis就能执行脚本了,当脚本返回2的时候,表示此时所有的红包都被抢光了,那么就会触发redisRedPacketService的saveUserRedPacketByRedis方法。由于在此方法加入了注解@Async,所以Spring会创建一条新的线程去运行它,这样就不会影响最后抢红包用户的响应时间了。
此时重新在控制器UserRedPacketController中加入新的方法作为响应就可以了,如下
//UserRedPacketController.java
@RequestMapping(value="/grapRedPacketByRedis")
@ResponseBody
public Map grapRedPacketByRedis(Long redPacketId,Long userId){
Map resultMap=new HashMap();
long result=userRedPacketService.grapRedPacketByRedis(redPacketId, userId);
boolean flag=result>0;
resultMap.put("result", flag);
resultMap.put("message", flag?"抢红包成功":"抢红包失败");
return resultMap;
}
为了测试它,我们现在Redis上添加红包信息,于是执行如下指令:
初始化了一个编号为5的大红包,其中库存为2万个,每个10元。最后测试时间仅需2秒就可抢完所有红包而且也没有超发少抢现象。