使用Redis实现延时消息队列(Sorted Set)

2022-07-24,,,,

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 一、场景设计
  • 二、延时队列实现
        • Sorted Set相关命令
        • RedisDelayQueue
        • Consumer
  • 三、演示
  • 总结

一、场景设计

1.用户下单15分钟未付款,取消订单恢复库存.

二、延时队列实现

  1. 订单创建的时候,订单ID和当前时间戳分别作为Sorted Set的member和score添加到订单队列Sorted Set中
  2. 通过Sorted Set的命令ZREVRANGEBYSCORE返回指定分数区间内的所有订单ID进行处理。
  3. 同时删除返回指定分数区间内的所有数据,并且由于指定分数区间内的所有数据已经在Redis中删除,如果数据处理失败则可能需要从数据库重新查询补偿。
  4. 第二步和第三部必须要保存原子性所有必须在lua脚本中执行

Sorted Set相关命令

  1. ZADD命令 - 将一个或多个成员元素及其分数值加入到有序集当中
    zadd key score1 value1… scoren valuen

  2. ZREVRANGEBYSCORE命令 - 返回有序集中指定分数区间内的所有的成员。有序集成员按分数值递减(从大到小)的次序排列。
    zrevrangebyscore key max min [withscores] [limit offset count]

  3. ZREM命令 - 用于移除有序集中的一个或多个成员,不存在的成员将被忽略
    zrem key member [member …]

RedisDelayQueue

延迟队列接口


/**
 * 延迟队列接口
 *
 * @Author:hjk
 * @Date:2021/3/1 14:11
 */
public interface IDelayQueue<T> {

    /**
     * 入队
     *
     * @param message
     */
    void enqueue(T message);

    /**
     * 出队
     *
     * @param min    分数区间 - 最大分数
     * @param max    分数区间 - 最小分数
     * @param offset offset和limit原理和MySQL的LIMIT offset一致,如果不指定此参数则返回整个集合的数据
     * @param limit
     * @return
     */
    List<T> dequeue(String min, String max, String offset, String limit);

    /**
     * 出队
     *
     * @return
     */
    List<T> dequeue();
}

延迟队列接口 实现



/**
 * 延迟队列接口 实现
 *
 * @Author:hjk
 * @Date:2021/3/1 14:14
 */
@Component
public class OrderDelayQueue implements IDelayQueue<Order>, InitializingBean {

    /**
     * 分数区间 - 最小分数 默认最小值
     */
    private static final String MIN_SCORE = "0";
    private static final String OFFSET = "0";
    private static final String LIMIT = "10";
    /**
     * 延迟队列名称
     */
    private static final String ORDER_QUEUE = "ORDER_DELAY_QUEUE";
    private static final String DEQUEUE_LUA = "dequeue.lua";
    private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>();


    @Resource
    public RedisTemplate<Object, Object> redisTemplate;

    @Override
    public void enqueue(Order order) {
        //60秒后执行
        String s = String.valueOf(order.getCreateTime().getTime() + 60 * 1000);
        redisTemplate
                .opsForZSet()
                .add(ORDER_QUEUE, JSON.toJSONString(order), Double.parseDouble(s));
    }

    @Override
    public List<Order> dequeue(String min, String max, String offset, String limit) {
        RedisScript<List<String>> redisScript = RedisScript.of(DEQUEUE_LUA_SHA.get(), List.class);
        List<Object> keys = Lists.newArrayList();
        keys.add(ORDER_QUEUE);
        keys.add(min);
        keys.add(max);
        keys.add(offset);
        keys.add(limit);
        List<String> list = redisTemplate.execute(redisScript, keys);
        List<Order> result = new ArrayList<>();
        if (!CollectionUtils.isEmpty(list)) {
            for (String order : list) {
                if (StringUtils.isNotBlank(order)) {
                    result.add(JSON.parseObject(order, Order.class));
                }
            }
        }
        return result;
    }

    @Override
    public List<Order> dequeue() {
        //zset 分数区间 - 最大分数 为当前时间戳
        String maxScore = String.valueOf(System.currentTimeMillis());
        return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT);
    }

    /**
     * 实现 InitializingBean 在初始化bean的时候都会执行该方法
     *
     * @throws Exception
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        ClassPathResource resource = new ClassPathResource(DEQUEUE_LUA);
        String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
        //如果当前值 ==为预期值,则将luaContent设置为给定的更新值
        DEQUEUE_LUA_SHA.compareAndSet(null, luaContent);
    }

}


lua

local zset_key = KEYS[1]
local min_score = KEYS[2]
local max_score = KEYS[3]
local offset = KEYS[4]
local limit = KEYS[5]
local status, type = next(redis.call('TYPE', zset_key))
if status ~= nil and status == 'ok' then
    if type == 'zset' then
        local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
        if list ~= nil and #list > 0 then
            redis.call('ZREM', zset_key, unpack(list))
            return list
            else
        end
    end
end


Consumer

@Component
public class OrderConsumer {
    private static final Logger log = LoggerFactory.getLogger(OrderMessageConsumer.class);

    @Autowired
    private IDelayQueue<Order> iDelayQueue;

    public void consumption() {
        boolean result = false;
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        List<Order> messages = iDelayQueue.dequeue();
        if (!CollectionUtils.isEmpty(messages)) {
            result = true;
            log.info("订单消息处理定时任务开始执行......");
            // 集合等分放到线程池中执行
            List<List<Order>> partition = Lists.partition(messages, 2);
            int size = partition.size();
            final CountDownLatch latch = new CountDownLatch(size);
            for (List<Order> p : partition) {
                async(new OrderConsumer.ConsumeTask(p, latch));
            }
            try {
                latch.await();
            } catch (InterruptedException ignore) {
                log.error("InterruptedException====>>", ignore);
            }
        }
        if (result) {
            stopWatch.stop();
            log.info("订单消息处理定时任务执行完毕,耗时:{} ms......", stopWatch.getTotalTimeMillis());
        }
    }


    @RequiredArgsConstructor
    private static class ConsumeTask implements Runnable {

        private final List<Order> orders;
        private final CountDownLatch latch;

        @Override
        public void run() {
            try {
                for (Order order : orders) {
                    try {
                        log.info("延迟处理成功!订单信息:{}", order);
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } finally {
                latch.countDown();
            }
        }
    }
}

三、演示

总结

基于Redis和Scheduled短轮询给出了一个完整的示例。如果需要在生产环境运行还是建议把Scheduled换成Quartz。当前的示例只是处于可运行的状态,代码中的参数需要根据环境来配置。如果队列数据很大则可以根据id取模分片到不同队列。

感谢您的阅读

如果你发现了错误的地方,可以在留言区提出来,我对其加以修改。

本文地址:https://blog.csdn.net/qq_39140300/article/details/114261063

《使用Redis实现延时消息队列(Sorted Set).doc》

下载本文的Word格式文档,以方便收藏与打印。