号码校验系统-demo

吴书松
吴书松
发布于 2026-03-05 / 13 阅读
0

号码校验系统-demo

1、环境

1、黑名单校验系统

2、50万黑名单号码

3、-Xms512m -Xmx2048m

4、springboot:2.7.18

5、单机压测结果

2、代码

2.1、gav

        <dependency>
            <groupId>io.github.wushusong</groupId>
            <artifactId>wss-utils-core</artifactId>
            <version>1.0.3</version>
        </dependency>
        <dependency>
            <groupId>io.github.wushusong</groupId>
            <artifactId>wss-utils-redis</artifactId>
            <version>1.0.3</version>
        </dependency>

        <!-- MyBatis-Plus Starter -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.5.5</version>
        </dependency>
        <!-- MySQL Connector -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.33</version>
        </dependency>
        <!-- HikariCP (Spring Boot 默认自带,无需额外引入) -->

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.2、配置

2.2.1、yml配置

server:
  port: 8080


wss:
  redis:
    defaultRedis: v1
    configs:
      v1:
        host: 192.168.2.240
        port: 6379
        password: 123456
        database: 1
        beanName: redisTemplate
        stringBeanName: stringRedisTemplate
        serializerType: json

spring:
  datasource:
    url: jdbc:mysql://192.168.2.240:3306/wss_test?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai&characterEncoding=utf8
    username: root
    password: 123456
    driver-class-name: com.mysql.cj.jdbc.Driver
    # HikariCP 配置 (Spring Boot 2.x 默认使用 HikariCP)
    hikari:
      minimum-idle: 10
      maximum-pool-size: 30
      idle-timeout: 600000
      connection-timeout: 30000
      pool-name: HikariPool
      auto-commit: true
      leak-detection-threshold: 60000


  rabbitmq:
    host: 192.168.2.240
    port: 5672
    username: guest
    password: 123456
    listener:
      simple:
        acknowledge-mode: manual   # 全局手动确认

# MyBatis-Plus 配置
mybatis-plus:
  mapper-locations: classpath*:/mapper/**/*.xml
  type-aliases-package: com.example.entity
  global-config:
    db-config:
      id-type: auto          # 主键自增
      logic-delete-field: deleted  # 逻辑删除字段
      logic-delete-value: 1
      logic-not-delete-value: 0
  configuration:
    map-underscore-to-camel-case: true   # 驼峰映射
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl  # 开发环境打印SQL

2.2.2、RabbitMQ配置

package com.wss.wssdemo.v2.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQBeanConfig {


//    // 可选:自定义消息转换器(JSON) 如何加了这个配置,发送的地方不需要重复解析
//    @Bean
//    public MessageConverter messageConverter() {
//        return new Jackson2JsonMessageConverter();
//    }

    // 批量监听容器工厂
    @Bean
    public RabbitListenerContainerFactory<?> batchContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setBatchListener(true);          // 开启批量监听
        factory.setConsumerBatchEnabled(true);   // 开启批量消费(一次拉取多条)
        factory.setBatchSize(10);                // 每次拉取的消息数量(可根据需要调整)
        factory.setReceiveTimeout(5000L);        // 超时时间
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    // 普通监听容器工厂(单条消息)
    @Bean
    public RabbitListenerContainerFactory<?> normalContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setBatchListener(false);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

//    // ========== sub_balance (Topic) ==========
//    @Bean
//    public TopicExchange subBalanceExchange() {
//        return ExchangeBuilder.topicExchange(MqEnums.sub_balance.getExchange())
//                .durable(true)
//                .build();
//    }
//
//    @Bean
//    public Queue subBalanceQueue() {
//        return QueueBuilder.durable(MqEnums.sub_balance.getQueue())
//                .build();
//    }
//
//    @Bean
//    public Binding subBalanceBinding() {
//        return BindingBuilder.bind(subBalanceQueue())
//                .to(subBalanceExchange())
//                .with(MqEnums.sub_balance.getRoutingKey());
//    }
//
//    // ========== flush_all (Fanout) ==========
//    @Bean
//    public FanoutExchange flushAllExchange() {
//        return ExchangeBuilder.fanoutExchange(MqEnums.flush_all.getExchange())
//                .durable(true)
//                .build();
//    }
//
//    @Bean
//    public Queue flushAllQueue() {
//        return QueueBuilder.durable(MqEnums.flush_all.getQueue())
//                .build();
//    }
//
//    @Bean
//    public Binding flushAllBinding() {
//        // Fanout 模式 routingKey 会被忽略,直接绑定
//        return BindingBuilder.bind(flushAllQueue())
//                .to(flushAllExchange());
//    }
}
package com.wss.wssdemo.v2.config;

import com.wss.common.core.other.RandomUtils;
import com.wss.wssdemo.v2.constant.MqEnums;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

@Configuration
public class RabbitMQManualAdminConfig {

    @Resource
    private RabbitAdmin rabbitAdmin;

    @PostConstruct
    public void manualDeclare() {
        // 处理 sub_balance (Topic)
        buildTopicQueue(MqEnums.sub_balance_batch);

        // 处理 flush_all (Fanout)
        buildFanoutQueue(MqEnums.flush_all);
    }

    public void buildTopicQueue(MqEnums mqEnums){
        Exchange topicExchange = ExchangeBuilder.topicExchange(mqEnums.getExchange())
                .durable(true).build();
        Queue topicQueue = QueueBuilder.durable(mqEnums.getQueue()).build();
        Binding topicBinding = BindingBuilder.bind(topicQueue)
                .to(topicExchange)
                .with(mqEnums.getRoutingKey())
                .noargs();

        rabbitAdmin.declareExchange(topicExchange);
        rabbitAdmin.declareQueue(topicQueue);
        rabbitAdmin.declareBinding(topicBinding);
    }

    public void buildFanoutQueue(MqEnums mqEnums){
        //fanout 一定要一个消费者,独占一个queue,这样交换机才会将消息广播到每一个queue,如果多个消费者监听同一个queue,那么消息只会被一个消费者消费到
        String queueName = mqEnums.getQueue() + "_"+ RandomUtils.randomString(8);
        mqEnums.setQueue(queueName);

        Exchange fanoutExchange = ExchangeBuilder.fanoutExchange(mqEnums.getExchange())
                .durable(true).build();
        Queue fanoutQueue = QueueBuilder.durable(queueName).autoDelete().build();
        Binding fanoutBinding = BindingBuilder.bind(fanoutQueue)
                .to(fanoutExchange)
                .with("")   // fanout 忽略 routingKey
                .noargs();

        rabbitAdmin.declareExchange(fanoutExchange);
        rabbitAdmin.declareQueue(fanoutQueue);
        rabbitAdmin.declareBinding(fanoutBinding);
    }



}

监听者

package com.wss.wssdemo.v2.listens;

import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.rabbitmq.client.Channel;
import com.sun.xml.internal.bind.v2.TODO;
import com.wss.common.core.json.JsonUtils;
import com.wss.common.redis.utils.RedisBaseUtils;
import com.wss.wssdemo.v2.config.LocalCache;
import com.wss.wssdemo.v2.domain.dto.ListenAddDto;
import com.wss.wssdemo.v2.domain.dto.ListenSubBalanceDto;
import com.wss.wssdemo.v2.entity.SysUserEntity;
import com.wss.wssdemo.v2.mapper.SysUserMapper;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@Component
@AllArgsConstructor
public class Mqlisten {

    private final SysUserMapper sysUserMapper;

    /**
     * 批量监听 sub_balance_batch (topic-batch)
     * 注意:containerFactory 需要指定为支持批量的工厂
     * 方法参数可以是 List<Message> 或 List<String>(如果消息体是 String)
     */
    @RabbitListener(queues = "#{T(com.wss.wssdemo.v2.constant.MqEnums).sub_balance_batch.getQueue()}", containerFactory = "batchContainerFactory")
    public void handleSubBalanceBatch(List<Message> messages, Channel channel) {
        try {
            log.info("[sub_balance_batch] 接收到 " + messages.size() + " 条消息");
            Map<Long,Long> userBalance = new HashMap<>();
            for (Message msgObj : messages) {
                String msg = new String(msgObj.getBody(), StandardCharsets.UTF_8);
                log.info("  -> " + msg);
                ListenSubBalanceDto listenSubBalanceDto = JsonUtils.toObject(msg, ListenSubBalanceDto.class);
                userBalance.merge(listenSubBalanceDto.getUserId(), listenSubBalanceDto.getNum(), Long::sum);
            }

            for (Map.Entry<Long, Long> userBalanceEntry : userBalance.entrySet()) {
                sysUserMapper.update(Wrappers.<SysUserEntity>lambdaUpdate()
                        .eq(SysUserEntity::getUserId, userBalanceEntry.getKey())
                        .setSql( "version = version - " + userBalanceEntry.getValue())
                );
            }

            // 批量确认:最后一个消息的 deliveryTag,multipe=true 表示确认该 tag 之前的所有消息
            long lastDeliveryTag = messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag();
            // 批量确认,multipe=true 表示确认该 tag 及之前的所有未确认消息
            channel.basicAck(lastDeliveryTag, true);
        } catch (Exception e) {
            e.printStackTrace();
            // 批量拒绝并重新入队(具体策略可根据业务调整)
            try {
                long lastDeliveryTag = messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag();
                channel.basicNack(lastDeliveryTag, true, true);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        String msg = JsonUtils.toString(new ListenSubBalanceDto(1L, 1L));
        ListenSubBalanceDto listenSubBalanceDto = JsonUtils.toObject(msg, ListenSubBalanceDto.class);
        System.out.println(listenSubBalanceDto);
    }

    @RabbitListener(queues = "#{T(com.wss.wssdemo.v2.constant.MqEnums).flush_all.getQueue()}", containerFactory = "normalContainerFactory")
    public void handleFlushAll(String message, Channel channel, Message amqpMessage) {
        try {
            log.info("[flush_all 2] " + message);
            listenFlushAll();
            channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            try { channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, true); } catch (Exception ex) {}
        }
    }

    /**
     * 广播通知
     * 号码增量监听
     * 这里可以采用MQ,批量监听
     */
    public void listenAdd(List<String> jsonList){
        //1:新增,2:删除
        for (String json : jsonList) {
            ListenAddDto listenAddDto = JsonUtils.toObject(json, ListenAddDto.class);
            if("1".equals(listenAddDto.getType())){
                LocalCache.addNum2LocalCache(listenAddDto.getNum());
            }else if("2".equals(listenAddDto.getType())){
                LocalCache.removeNum2LocalCache(listenAddDto.getNum());
            }
        }
    }

    /**
     * 广播通知
     * 号码全量更新
     */
    public void listenFlushAll(){
        LocalCache.loadAllNum2LocalCache(RedisBaseUtils.getRedisTemplate());
    }

}

生产者

package com.wss.wssdemo.v2.service;

import com.wss.common.core.collection.Dict;
import com.wss.common.core.json.JsonUtils;
import com.wss.common.core.other.IdUtils;
import com.wss.wssdemo.v2.constant.MqEnums;
import com.wss.wssdemo.v2.domain.dto.ListenSubBalanceDto;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@AllArgsConstructor
public class MqService {

    private final RabbitTemplate rabbitTemplate;

    public void sendSubBalance(Long userId, Long count) {
        CorrelationData correlationData = new CorrelationData(IdUtils.simpleUUID());
        rabbitTemplate.convertAndSend(
                MqEnums.sub_balance_batch.getExchange(),
                MqEnums.sub_balance_batch.getRoutingKey(),
                JsonUtils.toString(new ListenSubBalanceDto(userId,count)),
                correlationData
        );
    }

    public void sendFlushAll(){
        CorrelationData correlationData = new CorrelationData(IdUtils.simpleUUID());
        rabbitTemplate.convertAndSend(
                MqEnums.flush_all.getExchange(),
                MqEnums.flush_all.getRoutingKey(),
                "1",
                correlationData
        );
    }

}

2.2.3、本地缓存

package com.wss.wssdemo.v2.config;

import com.wss.wssdemo.v2.constant.NumConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.SetOperations;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class LocalCache {

    private static final Object lock = new Object();

    private static final Set<Long> blacklist = ConcurrentHashMap.newKeySet();

    public static void loadAllNum2LocalCache(RedisTemplate<String, Object> redisTemplate){
        loadAllNum2LocalCacheScan(redisTemplate);
    }

    public static void loadAllNum2LocalCacheAll(RedisTemplate<String, Object> redisTemplate){
        log.info("全量加载黑名单:开始");
        SetOperations<String, Object> setOps = redisTemplate.opsForSet();
        Set<Object> members = setOps.members(NumConstants.REDIS_KEY_NUM_SET);
        //临时暂存
        Set<Long> newSet = ConcurrentHashMap.newKeySet();
        if(null != members){
            for (Object num : members) {
                newSet.add(Long.parseLong(num.toString()));
            }
        }
        //原子替换
        log.info("全量加载黑名单:原子替换-开始");
        synchronized (lock){
            LocalCache.blacklist.clear();
            LocalCache.blacklist.addAll(newSet);
        }
        log.info("全量加载黑名单:原子替换-结束");

        log.info("全量加载黑名单:完成,数量: {}", LocalCache.blacklist.size());
    }

    public static void loadAllNum2LocalCacheScan(RedisTemplate<String, Object> redisTemplate){
        log.info("开始分批加载黑名单...");
        long start = System.currentTimeMillis();
        Set<Long> newSet = ConcurrentHashMap.newKeySet();
        ScanOptions options = ScanOptions.scanOptions().count(10000).build();
        try (Cursor<Object> cursor = redisTemplate.opsForSet().scan(NumConstants.REDIS_KEY_NUM_SET, options)) {
            while (cursor.hasNext()) {
                newSet.add(Long.parseLong(cursor.next().toString()));
            }
        } catch (Exception e) {
            log.error("分批加载黑名单失败", e);
            throw new RuntimeException("加载黑名单失败", e);
        }
        //原子替换
        log.info("全量加载黑名单:原子替换-开始");
        synchronized (lock) {
            blacklist.clear();
            blacklist.addAll(newSet);
        }
        log.info("全量加载黑名单:原子替换-结束");
        log.info("分批加载黑名单完成,数量: {},耗时: {} ms", blacklist.size(), System.currentTimeMillis() - start);
    }

    public static void addNum2LocalCache(String num){
        blacklist.add(Long.parseLong(num));
    }

    public static void removeNum2LocalCache(String num){
        blacklist.remove(Long.parseLong(num));
    }

    public static boolean contains(String num){
        return blacklist.contains(Long.parseLong(num));
    }

}
package com.wss.wssdemo.v2.config;

import com.wss.common.redis.config.WssRedisConfig;
import com.wss.common.redis.utils.RedisBaseUtils;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Slf4j
@Configuration
@AllArgsConstructor
public class NumConfig {

    /**
     * redis 配置 这里显示注入
     */
    private final WssRedisConfig wssRedisConfig;

    @PostConstruct
    public void init() {
        LocalCache.loadAllNum2LocalCache(RedisBaseUtils.getRedisTemplate());
    }

}

2.2.4、线程配置

package com.wss.wssdemo.v2.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.MessageSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.ReloadableResourceBundleMessageSource;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class TaskExecutorConfiguration implements AsyncConfigurer {

	/**
	 * 获取当前机器的核数, 不一定准确 请根据实际场景 CPU密集 || IO 密集
	 */
	public static final int cpuNum = Runtime.getRuntime().availableProcessors();

	@Value("${thread.pool.corePoolSize:}")
	private Optional<Integer> corePoolSize;

	@Value("${thread.pool.maxPoolSize:}")
	private Optional<Integer> maxPoolSize;

	@Value("${thread.pool.queueCapacity:}")
	private Optional<Integer> queueCapacity;

	@Value("${thread.pool.awaitTerminationSeconds:}")
	private Optional<Integer> awaitTerminationSeconds;

	@Override
	@Bean(value = "masterAsyncExecutor")
	public Executor getAsyncExecutor() {
		ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
		// 核心线程大小 默认区 CPU 数量
		taskExecutor.setCorePoolSize(corePoolSize.orElse(cpuNum*2));
		// 最大线程大小 默认区 CPU * 2 数量
		taskExecutor.setMaxPoolSize(maxPoolSize.orElse(cpuNum * 4));
		// 队列最大容量
		taskExecutor.setQueueCapacity(queueCapacity.orElse(500));
		taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
		taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
		taskExecutor.setAwaitTerminationSeconds(awaitTerminationSeconds.orElse(60));
		taskExecutor.setThreadNamePrefix("WSS-CLOUD-Thread-");
		taskExecutor.initialize();
		return taskExecutor;
	}

}

2.3、业务代码

package com.wss.wssdemo.v2.controller;

import com.wss.common.core.collection.Dict;
import com.wss.common.core.result.R;
import com.wss.common.core.string.StringUtils;
import com.wss.wssdemo.v2.config.LocalCache;
import com.wss.wssdemo.v2.domain.dto.ListenAddDto;
import com.wss.wssdemo.v2.entity.NumEntity;
import com.wss.wssdemo.v2.mapper.NumMapper;
import com.wss.wssdemo.v2.service.BizService;
import com.wss.wssdemo.v2.service.UserBalanceService;
import com.wss.wssdemo.v2.utils.BatchDbUtils;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.springframework.web.bind.annotation.*;

import java.util.ArrayList;
import java.util.List;

@RestController
@AllArgsConstructor
@RequestMapping(value = "/api")
public class ApiController {

    private final BizService bizService;
    private final UserBalanceService userBalanceService;
    private final NumMapper numMapper;

    @PostMapping(value = "/flushUserBalance")
    public R<String> flushUserBalance(@RequestParam(value = "userId",required = false)Long userId){
        userBalanceService.flushUserBalance(userId);
        return R.ok(null,"success");
    }

    @PostMapping(value = "/flushCache")
    public R<String> flushCache(){
        bizService.flushRedisCache();
        return R.ok(null,"success");
    }

    @PostMapping(value = "/doNum")
    public R<String> doNum(){
        List<NumEntity> numList = new ArrayList<>();
        for (int i = 0; i < 500000; i++) {
            NumEntity numEntity = new NumEntity();

            StringBuilder sb = new StringBuilder("");
            String iStr = String.valueOf(i);
            for (int i1 = 0; i1 < (11 - iStr.length()); i1++) {
                sb.append("0");
            }
            sb.append(iStr);
            numEntity.setNum("1" + sb.toString());
            numList.add(numEntity);
        }
        BatchDbUtils.insertBatch(numMapper, numList,"");
        return R.ok(null,"success");
    }


    @GetMapping(value = "/checkNum")
    public R<Dict> checkNum(@RequestParam(value = "num")String num){
        Dict result = bizService.check(num);
        return R.ok(result);
    }


}
package com.wss.wssdemo.v2.service;

import com.wss.common.core.collection.Dict;
import com.wss.common.core.exception.BusinessException;
import com.wss.common.redis.utils.RedisBaseUtils;
import com.wss.wssdemo.v2.config.LocalCache;
import com.wss.wssdemo.v2.constant.NumConstants;
import com.wss.wssdemo.v2.entity.NumEntity;
import com.wss.wssdemo.v2.mapper.NumMapper;
import com.wss.wssdemo.v2.utils.BatchDbUtils;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.stream.Collectors;

@Slf4j
@Service
@AllArgsConstructor
public class BizService {

    private final NumMapper numMapper;
    private final MqService mqService;
    private final UserBalanceService userBalanceService;

    public void flushRedisCache(){
        //1、批量读取数据库,写入redis
        RedisBaseUtils.del(NumConstants.REDIS_KEY_NUM_SET);
        BatchDbUtils.pageListByPri(null, NumEntity::getNum,"num",numMapper,list -> {
            RedisBaseUtils.sSet(NumConstants.REDIS_KEY_NUM_SET, list.stream().map(NumEntity::getNum).toArray());
            return 0;
        });
        //可以设置过期时间,定时任务刷新缓存
//        RedisBaseUtils.expire(NumConstants.REDIS_KEY_NUM_SET, NumConstants.REDIS_KEY_NUM_SET_EXPIRE);

        //2、MQ广播通知,各个服务刷新本地缓存
        mqService.sendFlushAll();
    }

    public Dict check(String num){
        Long userId = 1L;
        //校验余额
        boolean hasBalance = userBalanceService.hasBalance(userId);
        if(!hasBalance){
            throw new BusinessException("余额不足");
        }

        Dict resut = Dict.create();
        //校验号码
        boolean contains = LocalCache.contains(num);
        resut.set("contains", contains);
        //记录次数
        userBalanceService.sub(userId);
        return resut;
    }



}
package com.wss.wssdemo.v2.service;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.wss.common.core.date.LocalDateTimeUtils;
import com.wss.common.redis.config.WssFastJson2JsonRedisSerializer;
import com.wss.common.redis.config.WssRedisConfigProperties;
import com.wss.common.redis.utils.RedisBaseUtils;
import com.wss.wssdemo.v2.constant.NumConstants;
import com.wss.wssdemo.v2.entity.SysUserEntity;
import com.wss.wssdemo.v2.mapper.SysUserMapper;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Service
@AllArgsConstructor
public class UserBalanceService {
    private final MqService mqService;
    private final SysUserMapper sysUserMapper;
    private final WssRedisConfigProperties wssRedisConfigProperties;

    private final ConcurrentHashMap<Long, Long> pendingCounts = new ConcurrentHashMap<>();

    public String getHdKey(Long userId){
        return RedisBaseUtils.get(String.format(NumConstants.REDIS_KEY_USER_API_COUNT_CURRENT,userId));
    }

    /**
     * 需要每日凌晨执行一次,缓存两天过期
     * @param userId
     */
    public void flushUserBalance(Long userId){
        log.info("刷新用户余额:开始");
        //1、批量读取数据库,写入redis
        List<SysUserEntity> userList = sysUserMapper.selectList(Wrappers.<SysUserEntity>lambdaQuery()
                .select(SysUserEntity::getUserId, SysUserEntity::getVersion)
                .eq(null != userId, SysUserEntity::getUserId, userId)
        );
        String hdKey = LocalDateTimeUtils.format(LocalDateTime.now(), "yyyyMMddHHmmssSSS");

//        WssFastJson2JsonRedisSerializer<Object> valueSerializer = new WssFastJson2JsonRedisSerializer<>(Object.class,wssRedisConfigProperties.getJsonWhiteClassList(),wssRedisConfigProperties.isActivateDefaultTyping());
        RedisSerializer<Object> valueSerializer = (RedisSerializer<Object>) RedisBaseUtils.getRedisTemplate().getValueSerializer();
        byte[] hdKeyBytes = valueSerializer.serialize(hdKey);
        // 使用 Redis pipeline 批量 INCRBY
        RedisBaseUtils.getRedisTemplate().executePipelined((RedisCallback<Object>) connection -> {
            for (SysUserEntity user : userList) {
                byte[] key = String.format(NumConstants.REDIS_KEY_USER_API_COUNT_,user.getUserId(), hdKey).getBytes();
                connection.incrBy(key, user.getVersion());
                connection.expire(key, NumConstants.REDIS_KEY_USER_API_COUNT_EXPIRE);

                byte[] key2 = String.format(NumConstants.REDIS_KEY_USER_API_COUNT_CURRENT, user.getUserId()).getBytes();
                connection.set(key2, hdKeyBytes);
                connection.expire(key2, NumConstants.REDIS_KEY_USER_API_COUNT_EXPIRE);
            }
            return null;
        });
        log.info("刷新用户余额:结束");
    }

    // 每次请求调用该方法,累加本地计数
    public void sub(Long userId) {
        //本地扣减
        pendingCounts.merge(userId, 1L, Long::sum);//记录次数-MQ落库
//        mqService.sendSubBalance( userId,1L);
    }

    public boolean hasBalance(Long userId) {
        String hdKey = getHdKey(userId);
        String key = String.format(NumConstants.REDIS_KEY_USER_API_COUNT_,userId, hdKey);
        Long balance = RedisBaseUtils.get(key);
        if(null == balance){
            return false;
        }
        return balance > 0;
    }


    // 定时(每秒)将本地聚合的数据 flush 到 Redis
    @Scheduled(fixedDelay = 1000)
    public void flushToRedis() {
        if (pendingCounts.isEmpty()) {
            return;
        }
        // 取出并清空 pendingCounts(注意线程安全)
        Map<Long, Long> userSubMap = new HashMap<>(pendingCounts);
        pendingCounts.clear();

        //先发MQ 扣减数据库,也可以挪到sub下面扣减
        for (Map.Entry<Long, Long> userSubEntry : userSubMap.entrySet()) {
            //记录次数-MQ落库
            mqService.sendSubBalance( userSubEntry.getKey(),userSubEntry.getValue());
        }

        // 使用 Redis pipeline 批量 INCRBY
        RedisBaseUtils.getRedisTemplate().executePipelined((RedisCallback<Object>) connection -> {
            for (Map.Entry<Long, Long> userSubEntry : userSubMap.entrySet()) {
//                String key = "count:" + entry.getKey() + ":" + getTodayDate();
                String hdKey = getHdKey(userSubEntry.getKey());
                byte[] key = String.format(NumConstants.REDIS_KEY_USER_API_COUNT_,userSubEntry.getKey(), hdKey).getBytes();
                connection.decrBy(key, userSubEntry.getValue());
                connection.expire(key, NumConstants.REDIS_KEY_USER_API_COUNT_EXPIRE);
            }
            return null;
        });
        log.debug("批量写Redis,共{}个手机号", userSubMap.size());
    }

}