1、环境
1、黑名单校验系统
2、50万黑名单号码
3、-Xms512m -Xmx2048m
4、springboot:2.7.18
5、单机压测结果

2、代码
2.1、gav
<dependency>
<groupId>io.github.wushusong</groupId>
<artifactId>wss-utils-core</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>io.github.wushusong</groupId>
<artifactId>wss-utils-redis</artifactId>
<version>1.0.3</version>
</dependency>
<!-- MyBatis-Plus Starter -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.5</version>
</dependency>
<!-- MySQL Connector -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
<!-- HikariCP (Spring Boot 默认自带,无需额外引入) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>2.2、配置
2.2.1、yml配置
server:
port: 8080
wss:
redis:
defaultRedis: v1
configs:
v1:
host: 192.168.2.240
port: 6379
password: 123456
database: 1
beanName: redisTemplate
stringBeanName: stringRedisTemplate
serializerType: json
spring:
datasource:
url: jdbc:mysql://192.168.2.240:3306/wss_test?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai&characterEncoding=utf8
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
# HikariCP 配置 (Spring Boot 2.x 默认使用 HikariCP)
hikari:
minimum-idle: 10
maximum-pool-size: 30
idle-timeout: 600000
connection-timeout: 30000
pool-name: HikariPool
auto-commit: true
leak-detection-threshold: 60000
rabbitmq:
host: 192.168.2.240
port: 5672
username: guest
password: 123456
listener:
simple:
acknowledge-mode: manual # 全局手动确认
# MyBatis-Plus 配置
mybatis-plus:
mapper-locations: classpath*:/mapper/**/*.xml
type-aliases-package: com.example.entity
global-config:
db-config:
id-type: auto # 主键自增
logic-delete-field: deleted # 逻辑删除字段
logic-delete-value: 1
logic-not-delete-value: 0
configuration:
map-underscore-to-camel-case: true # 驼峰映射
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 开发环境打印SQL
2.2.2、RabbitMQ配置
package com.wss.wssdemo.v2.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQBeanConfig {
// // 可选:自定义消息转换器(JSON) 如何加了这个配置,发送的地方不需要重复解析
// @Bean
// public MessageConverter messageConverter() {
// return new Jackson2JsonMessageConverter();
// }
// 批量监听容器工厂
@Bean
public RabbitListenerContainerFactory<?> batchContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setBatchListener(true); // 开启批量监听
factory.setConsumerBatchEnabled(true); // 开启批量消费(一次拉取多条)
factory.setBatchSize(10); // 每次拉取的消息数量(可根据需要调整)
factory.setReceiveTimeout(5000L); // 超时时间
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
// 普通监听容器工厂(单条消息)
@Bean
public RabbitListenerContainerFactory<?> normalContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setBatchListener(false);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
// // ========== sub_balance (Topic) ==========
// @Bean
// public TopicExchange subBalanceExchange() {
// return ExchangeBuilder.topicExchange(MqEnums.sub_balance.getExchange())
// .durable(true)
// .build();
// }
//
// @Bean
// public Queue subBalanceQueue() {
// return QueueBuilder.durable(MqEnums.sub_balance.getQueue())
// .build();
// }
//
// @Bean
// public Binding subBalanceBinding() {
// return BindingBuilder.bind(subBalanceQueue())
// .to(subBalanceExchange())
// .with(MqEnums.sub_balance.getRoutingKey());
// }
//
// // ========== flush_all (Fanout) ==========
// @Bean
// public FanoutExchange flushAllExchange() {
// return ExchangeBuilder.fanoutExchange(MqEnums.flush_all.getExchange())
// .durable(true)
// .build();
// }
//
// @Bean
// public Queue flushAllQueue() {
// return QueueBuilder.durable(MqEnums.flush_all.getQueue())
// .build();
// }
//
// @Bean
// public Binding flushAllBinding() {
// // Fanout 模式 routingKey 会被忽略,直接绑定
// return BindingBuilder.bind(flushAllQueue())
// .to(flushAllExchange());
// }
}
package com.wss.wssdemo.v2.config;
import com.wss.common.core.other.RandomUtils;
import com.wss.wssdemo.v2.constant.MqEnums;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Configuration
public class RabbitMQManualAdminConfig {
@Resource
private RabbitAdmin rabbitAdmin;
@PostConstruct
public void manualDeclare() {
// 处理 sub_balance (Topic)
buildTopicQueue(MqEnums.sub_balance_batch);
// 处理 flush_all (Fanout)
buildFanoutQueue(MqEnums.flush_all);
}
public void buildTopicQueue(MqEnums mqEnums){
Exchange topicExchange = ExchangeBuilder.topicExchange(mqEnums.getExchange())
.durable(true).build();
Queue topicQueue = QueueBuilder.durable(mqEnums.getQueue()).build();
Binding topicBinding = BindingBuilder.bind(topicQueue)
.to(topicExchange)
.with(mqEnums.getRoutingKey())
.noargs();
rabbitAdmin.declareExchange(topicExchange);
rabbitAdmin.declareQueue(topicQueue);
rabbitAdmin.declareBinding(topicBinding);
}
public void buildFanoutQueue(MqEnums mqEnums){
//fanout 一定要一个消费者,独占一个queue,这样交换机才会将消息广播到每一个queue,如果多个消费者监听同一个queue,那么消息只会被一个消费者消费到
String queueName = mqEnums.getQueue() + "_"+ RandomUtils.randomString(8);
mqEnums.setQueue(queueName);
Exchange fanoutExchange = ExchangeBuilder.fanoutExchange(mqEnums.getExchange())
.durable(true).build();
Queue fanoutQueue = QueueBuilder.durable(queueName).autoDelete().build();
Binding fanoutBinding = BindingBuilder.bind(fanoutQueue)
.to(fanoutExchange)
.with("") // fanout 忽略 routingKey
.noargs();
rabbitAdmin.declareExchange(fanoutExchange);
rabbitAdmin.declareQueue(fanoutQueue);
rabbitAdmin.declareBinding(fanoutBinding);
}
}
监听者
package com.wss.wssdemo.v2.listens;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.rabbitmq.client.Channel;
import com.sun.xml.internal.bind.v2.TODO;
import com.wss.common.core.json.JsonUtils;
import com.wss.common.redis.utils.RedisBaseUtils;
import com.wss.wssdemo.v2.config.LocalCache;
import com.wss.wssdemo.v2.domain.dto.ListenAddDto;
import com.wss.wssdemo.v2.domain.dto.ListenSubBalanceDto;
import com.wss.wssdemo.v2.entity.SysUserEntity;
import com.wss.wssdemo.v2.mapper.SysUserMapper;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
@AllArgsConstructor
public class Mqlisten {
private final SysUserMapper sysUserMapper;
/**
* 批量监听 sub_balance_batch (topic-batch)
* 注意:containerFactory 需要指定为支持批量的工厂
* 方法参数可以是 List<Message> 或 List<String>(如果消息体是 String)
*/
@RabbitListener(queues = "#{T(com.wss.wssdemo.v2.constant.MqEnums).sub_balance_batch.getQueue()}", containerFactory = "batchContainerFactory")
public void handleSubBalanceBatch(List<Message> messages, Channel channel) {
try {
log.info("[sub_balance_batch] 接收到 " + messages.size() + " 条消息");
Map<Long,Long> userBalance = new HashMap<>();
for (Message msgObj : messages) {
String msg = new String(msgObj.getBody(), StandardCharsets.UTF_8);
log.info(" -> " + msg);
ListenSubBalanceDto listenSubBalanceDto = JsonUtils.toObject(msg, ListenSubBalanceDto.class);
userBalance.merge(listenSubBalanceDto.getUserId(), listenSubBalanceDto.getNum(), Long::sum);
}
for (Map.Entry<Long, Long> userBalanceEntry : userBalance.entrySet()) {
sysUserMapper.update(Wrappers.<SysUserEntity>lambdaUpdate()
.eq(SysUserEntity::getUserId, userBalanceEntry.getKey())
.setSql( "version = version - " + userBalanceEntry.getValue())
);
}
// 批量确认:最后一个消息的 deliveryTag,multipe=true 表示确认该 tag 之前的所有消息
long lastDeliveryTag = messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag();
// 批量确认,multipe=true 表示确认该 tag 及之前的所有未确认消息
channel.basicAck(lastDeliveryTag, true);
} catch (Exception e) {
e.printStackTrace();
// 批量拒绝并重新入队(具体策略可根据业务调整)
try {
long lastDeliveryTag = messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag();
channel.basicNack(lastDeliveryTag, true, true);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
public static void main(String[] args) {
String msg = JsonUtils.toString(new ListenSubBalanceDto(1L, 1L));
ListenSubBalanceDto listenSubBalanceDto = JsonUtils.toObject(msg, ListenSubBalanceDto.class);
System.out.println(listenSubBalanceDto);
}
@RabbitListener(queues = "#{T(com.wss.wssdemo.v2.constant.MqEnums).flush_all.getQueue()}", containerFactory = "normalContainerFactory")
public void handleFlushAll(String message, Channel channel, Message amqpMessage) {
try {
log.info("[flush_all 2] " + message);
listenFlushAll();
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
try { channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, true); } catch (Exception ex) {}
}
}
/**
* 广播通知
* 号码增量监听
* 这里可以采用MQ,批量监听
*/
public void listenAdd(List<String> jsonList){
//1:新增,2:删除
for (String json : jsonList) {
ListenAddDto listenAddDto = JsonUtils.toObject(json, ListenAddDto.class);
if("1".equals(listenAddDto.getType())){
LocalCache.addNum2LocalCache(listenAddDto.getNum());
}else if("2".equals(listenAddDto.getType())){
LocalCache.removeNum2LocalCache(listenAddDto.getNum());
}
}
}
/**
* 广播通知
* 号码全量更新
*/
public void listenFlushAll(){
LocalCache.loadAllNum2LocalCache(RedisBaseUtils.getRedisTemplate());
}
}
生产者
package com.wss.wssdemo.v2.service;
import com.wss.common.core.collection.Dict;
import com.wss.common.core.json.JsonUtils;
import com.wss.common.core.other.IdUtils;
import com.wss.wssdemo.v2.constant.MqEnums;
import com.wss.wssdemo.v2.domain.dto.ListenSubBalanceDto;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@AllArgsConstructor
public class MqService {
private final RabbitTemplate rabbitTemplate;
public void sendSubBalance(Long userId, Long count) {
CorrelationData correlationData = new CorrelationData(IdUtils.simpleUUID());
rabbitTemplate.convertAndSend(
MqEnums.sub_balance_batch.getExchange(),
MqEnums.sub_balance_batch.getRoutingKey(),
JsonUtils.toString(new ListenSubBalanceDto(userId,count)),
correlationData
);
}
public void sendFlushAll(){
CorrelationData correlationData = new CorrelationData(IdUtils.simpleUUID());
rabbitTemplate.convertAndSend(
MqEnums.flush_all.getExchange(),
MqEnums.flush_all.getRoutingKey(),
"1",
correlationData
);
}
}
2.2.3、本地缓存
package com.wss.wssdemo.v2.config;
import com.wss.wssdemo.v2.constant.NumConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.SetOperations;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class LocalCache {
private static final Object lock = new Object();
private static final Set<Long> blacklist = ConcurrentHashMap.newKeySet();
public static void loadAllNum2LocalCache(RedisTemplate<String, Object> redisTemplate){
loadAllNum2LocalCacheScan(redisTemplate);
}
public static void loadAllNum2LocalCacheAll(RedisTemplate<String, Object> redisTemplate){
log.info("全量加载黑名单:开始");
SetOperations<String, Object> setOps = redisTemplate.opsForSet();
Set<Object> members = setOps.members(NumConstants.REDIS_KEY_NUM_SET);
//临时暂存
Set<Long> newSet = ConcurrentHashMap.newKeySet();
if(null != members){
for (Object num : members) {
newSet.add(Long.parseLong(num.toString()));
}
}
//原子替换
log.info("全量加载黑名单:原子替换-开始");
synchronized (lock){
LocalCache.blacklist.clear();
LocalCache.blacklist.addAll(newSet);
}
log.info("全量加载黑名单:原子替换-结束");
log.info("全量加载黑名单:完成,数量: {}", LocalCache.blacklist.size());
}
public static void loadAllNum2LocalCacheScan(RedisTemplate<String, Object> redisTemplate){
log.info("开始分批加载黑名单...");
long start = System.currentTimeMillis();
Set<Long> newSet = ConcurrentHashMap.newKeySet();
ScanOptions options = ScanOptions.scanOptions().count(10000).build();
try (Cursor<Object> cursor = redisTemplate.opsForSet().scan(NumConstants.REDIS_KEY_NUM_SET, options)) {
while (cursor.hasNext()) {
newSet.add(Long.parseLong(cursor.next().toString()));
}
} catch (Exception e) {
log.error("分批加载黑名单失败", e);
throw new RuntimeException("加载黑名单失败", e);
}
//原子替换
log.info("全量加载黑名单:原子替换-开始");
synchronized (lock) {
blacklist.clear();
blacklist.addAll(newSet);
}
log.info("全量加载黑名单:原子替换-结束");
log.info("分批加载黑名单完成,数量: {},耗时: {} ms", blacklist.size(), System.currentTimeMillis() - start);
}
public static void addNum2LocalCache(String num){
blacklist.add(Long.parseLong(num));
}
public static void removeNum2LocalCache(String num){
blacklist.remove(Long.parseLong(num));
}
public static boolean contains(String num){
return blacklist.contains(Long.parseLong(num));
}
}package com.wss.wssdemo.v2.config;
import com.wss.common.redis.config.WssRedisConfig;
import com.wss.common.redis.utils.RedisBaseUtils;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Slf4j
@Configuration
@AllArgsConstructor
public class NumConfig {
/**
* redis 配置 这里显示注入
*/
private final WssRedisConfig wssRedisConfig;
@PostConstruct
public void init() {
LocalCache.loadAllNum2LocalCache(RedisBaseUtils.getRedisTemplate());
}
}
2.2.4、线程配置
package com.wss.wssdemo.v2.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.MessageSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.ReloadableResourceBundleMessageSource;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class TaskExecutorConfiguration implements AsyncConfigurer {
/**
* 获取当前机器的核数, 不一定准确 请根据实际场景 CPU密集 || IO 密集
*/
public static final int cpuNum = Runtime.getRuntime().availableProcessors();
@Value("${thread.pool.corePoolSize:}")
private Optional<Integer> corePoolSize;
@Value("${thread.pool.maxPoolSize:}")
private Optional<Integer> maxPoolSize;
@Value("${thread.pool.queueCapacity:}")
private Optional<Integer> queueCapacity;
@Value("${thread.pool.awaitTerminationSeconds:}")
private Optional<Integer> awaitTerminationSeconds;
@Override
@Bean(value = "masterAsyncExecutor")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// 核心线程大小 默认区 CPU 数量
taskExecutor.setCorePoolSize(corePoolSize.orElse(cpuNum*2));
// 最大线程大小 默认区 CPU * 2 数量
taskExecutor.setMaxPoolSize(maxPoolSize.orElse(cpuNum * 4));
// 队列最大容量
taskExecutor.setQueueCapacity(queueCapacity.orElse(500));
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(awaitTerminationSeconds.orElse(60));
taskExecutor.setThreadNamePrefix("WSS-CLOUD-Thread-");
taskExecutor.initialize();
return taskExecutor;
}
}
2.3、业务代码
package com.wss.wssdemo.v2.controller;
import com.wss.common.core.collection.Dict;
import com.wss.common.core.result.R;
import com.wss.common.core.string.StringUtils;
import com.wss.wssdemo.v2.config.LocalCache;
import com.wss.wssdemo.v2.domain.dto.ListenAddDto;
import com.wss.wssdemo.v2.entity.NumEntity;
import com.wss.wssdemo.v2.mapper.NumMapper;
import com.wss.wssdemo.v2.service.BizService;
import com.wss.wssdemo.v2.service.UserBalanceService;
import com.wss.wssdemo.v2.utils.BatchDbUtils;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.List;
@RestController
@AllArgsConstructor
@RequestMapping(value = "/api")
public class ApiController {
private final BizService bizService;
private final UserBalanceService userBalanceService;
private final NumMapper numMapper;
@PostMapping(value = "/flushUserBalance")
public R<String> flushUserBalance(@RequestParam(value = "userId",required = false)Long userId){
userBalanceService.flushUserBalance(userId);
return R.ok(null,"success");
}
@PostMapping(value = "/flushCache")
public R<String> flushCache(){
bizService.flushRedisCache();
return R.ok(null,"success");
}
@PostMapping(value = "/doNum")
public R<String> doNum(){
List<NumEntity> numList = new ArrayList<>();
for (int i = 0; i < 500000; i++) {
NumEntity numEntity = new NumEntity();
StringBuilder sb = new StringBuilder("");
String iStr = String.valueOf(i);
for (int i1 = 0; i1 < (11 - iStr.length()); i1++) {
sb.append("0");
}
sb.append(iStr);
numEntity.setNum("1" + sb.toString());
numList.add(numEntity);
}
BatchDbUtils.insertBatch(numMapper, numList,"");
return R.ok(null,"success");
}
@GetMapping(value = "/checkNum")
public R<Dict> checkNum(@RequestParam(value = "num")String num){
Dict result = bizService.check(num);
return R.ok(result);
}
}
package com.wss.wssdemo.v2.service;
import com.wss.common.core.collection.Dict;
import com.wss.common.core.exception.BusinessException;
import com.wss.common.redis.utils.RedisBaseUtils;
import com.wss.wssdemo.v2.config.LocalCache;
import com.wss.wssdemo.v2.constant.NumConstants;
import com.wss.wssdemo.v2.entity.NumEntity;
import com.wss.wssdemo.v2.mapper.NumMapper;
import com.wss.wssdemo.v2.utils.BatchDbUtils;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.stream.Collectors;
@Slf4j
@Service
@AllArgsConstructor
public class BizService {
private final NumMapper numMapper;
private final MqService mqService;
private final UserBalanceService userBalanceService;
public void flushRedisCache(){
//1、批量读取数据库,写入redis
RedisBaseUtils.del(NumConstants.REDIS_KEY_NUM_SET);
BatchDbUtils.pageListByPri(null, NumEntity::getNum,"num",numMapper,list -> {
RedisBaseUtils.sSet(NumConstants.REDIS_KEY_NUM_SET, list.stream().map(NumEntity::getNum).toArray());
return 0;
});
//可以设置过期时间,定时任务刷新缓存
// RedisBaseUtils.expire(NumConstants.REDIS_KEY_NUM_SET, NumConstants.REDIS_KEY_NUM_SET_EXPIRE);
//2、MQ广播通知,各个服务刷新本地缓存
mqService.sendFlushAll();
}
public Dict check(String num){
Long userId = 1L;
//校验余额
boolean hasBalance = userBalanceService.hasBalance(userId);
if(!hasBalance){
throw new BusinessException("余额不足");
}
Dict resut = Dict.create();
//校验号码
boolean contains = LocalCache.contains(num);
resut.set("contains", contains);
//记录次数
userBalanceService.sub(userId);
return resut;
}
}
package com.wss.wssdemo.v2.service;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.wss.common.core.date.LocalDateTimeUtils;
import com.wss.common.redis.config.WssFastJson2JsonRedisSerializer;
import com.wss.common.redis.config.WssRedisConfigProperties;
import com.wss.common.redis.utils.RedisBaseUtils;
import com.wss.wssdemo.v2.constant.NumConstants;
import com.wss.wssdemo.v2.entity.SysUserEntity;
import com.wss.wssdemo.v2.mapper.SysUserMapper;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Service
@AllArgsConstructor
public class UserBalanceService {
private final MqService mqService;
private final SysUserMapper sysUserMapper;
private final WssRedisConfigProperties wssRedisConfigProperties;
private final ConcurrentHashMap<Long, Long> pendingCounts = new ConcurrentHashMap<>();
public String getHdKey(Long userId){
return RedisBaseUtils.get(String.format(NumConstants.REDIS_KEY_USER_API_COUNT_CURRENT,userId));
}
/**
* 需要每日凌晨执行一次,缓存两天过期
* @param userId
*/
public void flushUserBalance(Long userId){
log.info("刷新用户余额:开始");
//1、批量读取数据库,写入redis
List<SysUserEntity> userList = sysUserMapper.selectList(Wrappers.<SysUserEntity>lambdaQuery()
.select(SysUserEntity::getUserId, SysUserEntity::getVersion)
.eq(null != userId, SysUserEntity::getUserId, userId)
);
String hdKey = LocalDateTimeUtils.format(LocalDateTime.now(), "yyyyMMddHHmmssSSS");
// WssFastJson2JsonRedisSerializer<Object> valueSerializer = new WssFastJson2JsonRedisSerializer<>(Object.class,wssRedisConfigProperties.getJsonWhiteClassList(),wssRedisConfigProperties.isActivateDefaultTyping());
RedisSerializer<Object> valueSerializer = (RedisSerializer<Object>) RedisBaseUtils.getRedisTemplate().getValueSerializer();
byte[] hdKeyBytes = valueSerializer.serialize(hdKey);
// 使用 Redis pipeline 批量 INCRBY
RedisBaseUtils.getRedisTemplate().executePipelined((RedisCallback<Object>) connection -> {
for (SysUserEntity user : userList) {
byte[] key = String.format(NumConstants.REDIS_KEY_USER_API_COUNT_,user.getUserId(), hdKey).getBytes();
connection.incrBy(key, user.getVersion());
connection.expire(key, NumConstants.REDIS_KEY_USER_API_COUNT_EXPIRE);
byte[] key2 = String.format(NumConstants.REDIS_KEY_USER_API_COUNT_CURRENT, user.getUserId()).getBytes();
connection.set(key2, hdKeyBytes);
connection.expire(key2, NumConstants.REDIS_KEY_USER_API_COUNT_EXPIRE);
}
return null;
});
log.info("刷新用户余额:结束");
}
// 每次请求调用该方法,累加本地计数
public void sub(Long userId) {
//本地扣减
pendingCounts.merge(userId, 1L, Long::sum);//记录次数-MQ落库
// mqService.sendSubBalance( userId,1L);
}
public boolean hasBalance(Long userId) {
String hdKey = getHdKey(userId);
String key = String.format(NumConstants.REDIS_KEY_USER_API_COUNT_,userId, hdKey);
Long balance = RedisBaseUtils.get(key);
if(null == balance){
return false;
}
return balance > 0;
}
// 定时(每秒)将本地聚合的数据 flush 到 Redis
@Scheduled(fixedDelay = 1000)
public void flushToRedis() {
if (pendingCounts.isEmpty()) {
return;
}
// 取出并清空 pendingCounts(注意线程安全)
Map<Long, Long> userSubMap = new HashMap<>(pendingCounts);
pendingCounts.clear();
//先发MQ 扣减数据库,也可以挪到sub下面扣减
for (Map.Entry<Long, Long> userSubEntry : userSubMap.entrySet()) {
//记录次数-MQ落库
mqService.sendSubBalance( userSubEntry.getKey(),userSubEntry.getValue());
}
// 使用 Redis pipeline 批量 INCRBY
RedisBaseUtils.getRedisTemplate().executePipelined((RedisCallback<Object>) connection -> {
for (Map.Entry<Long, Long> userSubEntry : userSubMap.entrySet()) {
// String key = "count:" + entry.getKey() + ":" + getTodayDate();
String hdKey = getHdKey(userSubEntry.getKey());
byte[] key = String.format(NumConstants.REDIS_KEY_USER_API_COUNT_,userSubEntry.getKey(), hdKey).getBytes();
connection.decrBy(key, userSubEntry.getValue());
connection.expire(key, NumConstants.REDIS_KEY_USER_API_COUNT_EXPIRE);
}
return null;
});
log.debug("批量写Redis,共{}个手机号", userSubMap.size());
}
}