Redis中4种延时队列实现方式小结
目录
- 1. 基于Sorted Set的延时队列
- 原理
- 代码实现
- 优缺点
- 2. 基于List + 定时轮询的延时队列
- 原理
- 代码实现
- 优缺点
- 3. 基于发布/订阅(Pub/Sub)的延时队列
- 原理
- 代码实现
- 优缺点
- 4. 基于Redis Stream的延时队列
- 原理
- 代码实现
- 优缺点
- 性能对比与选型建议
- 总结
延时队列是一种特殊的消息队列,它允许消息在指定的时间后被消费。在微服务架构、电商系统和任务调度场景中,延时队列扮演着关键角色。例如,订单超时自动取消、定时提醒、延时支付等都依赖延时队列实现。
Redis作为高性能的内存数据库,具备原子操作、数据结构丰富和简单易用的特性,本文将介绍基于Redis实现分布式延时队列的四种方式。
1. 基于Sorted Set的延时队列
原理
利用Redis的Sorted Set(有序集合),将消息ID作为member,执行时间戳作为score进行存储。通过ZRANGEBYSCORE
命令可以获取到达执行时间的任务。
代码实现
public class RedisZSetDelayQueue { private final StringRedisTemplate redisTemplate; private final String queueKey = "delay_queue:tasks"; public RedisZSetDelayQueue(StringRedisTemplate redisTemplate) { this.redisTewww.devze.commplate = redisTemplate; } /** * 添加延时任务 * @param taskId 任务ID * @param tashttp://www.devze.comkInfo 任务信息(jsON字符串) * @param delayTime 延迟时间(秒) */ public void addTask(String taskId, String taskInfo, long delayTime) { // 计算执行时间 long executeTime = System.currentTimeMillis() + delayTime * 1000; // 存储任务详情 redisTemplate.opsForHash().put("delay_queue:details", taskId, taskInfo); // 添加到延时队列 redisTemplate.opsForZSet().add(queueKey, taskId, executeTime); System.out.println("Task added: " + taskId + ", will execute at: " + executeTime); } /** * 轮询获取到期任务 */ public List<String> pollTasks() { long now = System.currentTimeMillis(); // 获取当前时间之前的任务 Set<String> taskIds = redisTemplate.opsForZSet() .rangeByScore(queueKey, 0, now); if (taskIds == null || taskIds.isEmpty()) { return Collections.emptyList(); } // 获取任务详情 List<String> tasks = new ArrayList<>(); for (String taskId : taskIds) { String taskInfo = (String) redisTemplate.opsForHash() .get("delay_queue:details", taskId); if (taskInfo != null) { tasks.add(taskInfo); // 从集合和详情中移除任务 redisTemplate.opsForZSet().remove(queueKey, taskId); redisTemplate.opsForHash().delete("delay_queue:details", taskId); } } return tasks; } // 定时任务示例 public void startTaskProcessor() { ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(() -> { try { List<String> tasks = pollTasks(); for (String task : tasks) { processTask(task); } } catch (Exception e) { e.printStackTrace(); } }, 0, 1, TimeUnit.SECONDS); } private void processTask(String taskInfo) { System.out.println("Processing task: " + taskInfo); // 实际任务处理逻辑 } }
优缺点
优点
- 实现简单,易于理解
- 任务按执行时间自动排序
- 支持精确的时间控制
缺点
- 需要轮询获取到期任务,消耗CPU资源
- 大量任务情况下,
ZRANGEBYSCORE
操作可能影响性能 - 没有消费确认机制,需要额外实现
2. 基于List + 定时轮询的延时队列
原理
这种方式使用多个List作为存储容器,按延迟时间的不同将任务分配到不同的队列中。通过定时轮询各个队列,将到期任务移动到一个立即执行队列。
代码实现
public class RedisListDelayQueue { private final StringRedisTemplate redisTemplate; private final String readyQueueKey = "delay_queue:ready"; // 待处理队列 private final Map<Integer, String> delayQueueKeys; // 延迟队列,按延时时间分级 public RedisListDelayQueue(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; // 初始化不同延迟级别的队列 delayQueueKeys = new HashMap<>(); delayQueueKeys.put(5, "delay_queue:delay_5s"); // 5秒 delayQueueKeys.put(60, "delay_queue:delay_1m"); // 1分钟 delayQueueKeys.put(300, "delay_queue:delay_5m"); // 5分钟 delayQueueKeys.put(1800, "delay_queue:delay_30m"); // 30分钟 } /** * 添加延时任务 */ public void addTask(String taskInfo, int delaySeconds) { // 选择合适的延迟队列 String queueKey = selectDelayQueue(delaySeconds); // 任务元数据,包含任务信息和执行时间 long executeTime = System.currentTimeMillis() + delaySeconds * 1000; String taskData = executeTime + ":" + taskInfo; // 添加到延迟队列 redisTemplate.opsForList().rightPush(queueKey, taskData); System.out.println("Task added to " + jsqueueKey + ": " + taskData); } /** * 选择合适的延迟队列 */ private String selectDelayQueue(int delaySeconds) { // 找到最接近的延迟级别 int closestDelay = delayQueueKeys.keySet().stream() 编程 .filter(delay -> delay >= delaySeconds) .min(Integer::compareTo) .orElse(Collections.max(delayQueueKeys.keySet())); return delayQueueKeys.get(closestDelay); } /** * 移动到期任务到待处理队列 */ public void moveTasksToReadyQueue() { long now = System.currentTimeMillis(); // 遍历所有延迟队列 for (String queueKey : delayQueueKeys.values()) { boolean hasMoreTasks = true; while (hasMoreTasks) { // 查看队列头部任务 String taskData = redisTemplate.opsForList().index(queueKey, 0); if (taskData == null) { hasMoreTasks = false; continue; } // 解析任务执行时间 long executeTime = Long.parseLong(taskData.split(":", 2)[0]); // 检查是否到期 if (executeTime <= now) { // 通过LPOP原子性地移除队列头部任务 String task = redisTemplate.opsForList().leftPop(queueKey); // 任务可能被其他进程处理,再次检查 if (task != null) { // 提取任务信息并添加到待处理队列 String taskInfo = task.split(":", 2)[1]; redisTemplate.opsForList().rightPush(readyQueueKey, taskInfo); System.out.println("Task moved to ready queue: " + taskInfo); } } else { // 队列头部任务未到期,无需检查后面的任务 hasMoreTasks = false; } } } } /** * 获取待处理任务 */ public String getReadyTask() { return redisTemplate.opsForList().leftPop(readyQueueKey); } /** * 启动任务处理器 */ public void startTaskProcessors() { // 定时移动到期任务 ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); // 移动任务线程 scheduler.scheduleAtFixedRate(this::moveTasksToReadyQueue, 0, 1, TimeUnit.SECONDS); // 处理任务线程 scheduler.scheduleAtFixedRate(() -> { String task = getReadyTask(); if (task != null) { processTask(task); } }, 0, 100, TimeUnit.MILLISECONDS); } private void processTask(String taskInfo) { System.out.println("Processing task: " + taskInfo); // 实际任务处理逻辑 } }
优缺点
优点
- 分级队列设计,降低单队列压力
- 相比Sorted Set占用内存少
- 支持队列监控和任务优先级
缺点
- 延迟时间精度受轮询频率影响
- 实现复杂度高
- 需要维护多个队列
- 时间判断和队列操作非原子性,需特别处理并发问题
3. 基于发布/订阅(Pub/Sub)的延时队列
原理
结合Redis发布/订阅功能与本地时间轮算法,实现延迟任务的分发和处理。任务信息存储在Redis中,而时间轮负责任务的调度和发布。
代码实现
public class RedisPubSubDelayQueue { private final StringRedisTemplate redisTemplate; private final String TASK_TOPIC = "delay_queue:task_channel"; private final String TASK_HASH = "delay_queue:tasks"www.devze.com; private final HashedwheelTimer timer; public RedisPubSubDelayQueue(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; // 初始化时间轮,刻度100ms,轮子大小512 this.timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512); // 启动消息订阅 subscribeTaskChannel(); } /** * 添加延时任务 */ public void addTask(String taskId, String taskInfo, long delaySeconds) { // 存储任务信息到Redis redisTemplate.opsForHash().put(TASK_HASH, taskId, taskInfo); // 添加到时间轮 timer.newTimeout(timeout -> { // 发布任务就绪消息 redisTemplate.convertAndSend(TASK_TOPIC, taskId); }, delaySeconds, TimeUnit.SECONDS); System.out.println("Task scheduled: " + taskId + ", delay: " + delaySeconds + "s"); } /** * 订阅任务通道 */ private void subscribeTaskChannel() { redisTemplate.getConnectionFactory().getConnection().subscribe( (message, pattern) -> { String taskId = new String(message.getBody()); // 获取任务信息 String taskInfo = (String) redisTemplate.opsForHash().get(TASK_HASH, taskId); if (taskInfo != null) { // 处理任务 processTask(taskId, taskInfo); // 删除任务 redisTemplate.opsForHash().delete(TASK_HASH, taskId); } }, TASK_TOPIC.getBytes() ); } private void processTask(String taskId, String taskInfo) { System.out.println("Processing task: " + taskId + " - " + taskInfo); // 实际任务处理逻辑 } // 模拟HashedWheelTimer类 public static class HashedWheelTimer { private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private final long tickDuration; private final TimeUnit unit; public HashedWheelTimer(long tickDuration, TimeUnit unit, int wheelSize) { this.tickDuration = tickDuration; this.unit = unit; } public void newTimeout(TimerTask task, long delay, TimeUnit timeUnit) { long delayMillis = timeUnit.toMillis(delay); scheduler.schedule( () -> task.run(null), delayMillis, TimeUnit.MILLISECONDS ); } public interface TimerTask { void run(Timeout timeout); } public interface Timeout { } } }
优缺点
优点:
- 即时触发,无需轮询
- 高效的时间轮算法
- 可以跨应用订阅任务
- 分离任务调度和执行,降低耦合
缺点:
- 依赖本地时间轮,非纯Redis实现
- Pub/Sub模式无消息持久化,可能丢失消息
- 服务重启时需要重建时间轮
- 订阅者需要保持连接
4. 基于Redis Stream的延时队列
原理
Redis 5.0引入的Stream是一个强大的数据结构,专为消息队列设计。结合Stream的消费组和确认机制,可以构建可靠的延时队列。
代码实现
public class RedisStreamDelayQueue { private final StringRedisTemplate redisTemplate; private final String delayQueueKey = "delay_queue:stream"; private final String consumerGroup = "delay_queue_consumers"; private final String consumerId = UUID.randomUUID().toString(); public RedisStreamDelayQueue(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; // 创建消费者组 try { redisTemplate.execute((RedisCallback<String>) connection -> { connection.streamCommands().xGroupCreate( delayQueueKey.getBytes(), consumerGroup, ReadOffset.from("0"), true ); return "OK"; }); } catch (Exception e) { // 消费者组可能已存在 System.out.println("Consumer group may already exist: " + e.getMessage()); } } /** * 添加延时任务 */ public void addTask(String taskInfo, long delaySeconds) { long executeTime = System.currentTimeMillis() + delaySeconds * 1000; Map<String, Object> task = new HashMap<>(); task.put("executeTime", String.valueOf(executeTime)); task.put("taskInfo", taskInfo); redisTemplate.opsForStream().add(delayQueueKey, task); System.out.println("Task added: " + taskInfo + ", execute at: " + executeTime); } /** * 获取待执行的任务 */ public List<String> pollTasks() { long now = System.currentTimeMillis(); List<String> readyTasks = new ArrayList<>(); // 读取尚未处理的消息 List<MapRecord<String, Object, Object>> records = redisTemplate.execute( (RedisCallback<List<MapRecord<String, Object, Object>>>) connection -> { return connection.streamCommands().xReadGroup( consumerGroup.getBytes(), consumerId.getBytes(), StreamReadOptions.empty().count(10), StreamOffset.create(delayQueueKey.getBytes(), ReadOffset.from(">")) ); } ); if (records != null) { for (MapRecord<String, Object, Object> record : records) { String messageId = record.getId().getValue(); Map<Object, Object> value = record.getValue(); long executeTime = Long.parseLong((String) value.get("executeTime")); String taskInfo = (String) value.get("taskInfo"); // 检查任务是否到期 if (executeTime <= now) { readyTasks.add(taskInfo); // 确认消息已处理 redisTemplate.execute((RedisCallback<String>) connection -> { connection.streamCommands().xAck( delayQueueKey.getBytes(), consumerGroup.getBytes(), messageId.getBytes() ); return "OK"; }); // 可选:从流中删除消息 redisTemplate.opsForStream().delete(delayQueueKey, messageId); } else { // 任务未到期,放回队列 redisTemplate.execute((RedisCallback<String>) connection -> { connection.streamCommands().xAck( delayQueueKey.getBytes(), consumerGroup.getBytes(), messageId.getBytes() ); return "OK"; }); // 重新添加任务(可选:使用延迟重新入队策略) Map<String, Object> newTask = new HashMap<>(); newTask.put("executeTime", String.valueOf(executeTime)); newTask.put("taskInfo", taskInfo); redisTemplate.opsForStream().add(delayQueueKey, newTask); } } } return readyTasks; } /** * 启动任务处理器 */ public void startTaskProcessor() { ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(() -> { try { List<String> tasks = pollTasks(); for (String task : tasks) { processTask(task); } } catch (Exception e) { e.printStackTrace(); } }, 0, 1, TimeUnit.SECONDS); } private void processTask(String taskInfo) { System.out.println("Processing task: " + taskInfo); // 实际任务处理逻辑 } }
优缺点
优点:
- 支持消费者组和消息确认,提供可靠的消息处理
- 内置消息持久化机制
- 支持多消费者并行处理
- 消息ID包含时间戳,便于排序
缺点:
- 要求Redis 5.0+版本
- 实现相对复杂
- 仍需轮询获取到期任务
- 对未到期任务的处理相对繁琐
性能对比与选型建议
实现方式 | 性能 | 可靠性 | 实现复杂度 | 内存占用 | 适用场景 |
---|---|---|---|---|---|
Sorted Set | ★★★★☆ | ★★★☆☆ | 低 | 中 | 任务量适中,需要精确调度 |
List + 轮询 | ★★★★★ | ★★★☆☆ | 中 | 低 | 高并发,延时精度要求不高 |
Pub/Sub + 时间轮 | ★★★★★ | ★★☆☆☆ | 高 | 低 | 实时性要求高,可容忍服务重启丢失 |
Stream | ★★★☆☆ | ★★★★★ | 高 | 中 | 可靠性要求高,需要消息确认 |
总结
在实际应用中,可根据系统规模、性能需求、可靠性要求和实现复杂度等因素进行选择,也可以组合多种方式打造更符合业务需求的延时队列解决方案。无论选择哪种实现,都应关注可靠性、性能和监控等方面,确保延时队列在生产环境中稳定运行。
到此这篇关于Redis中4种延时队列实现方式小结的文章就介绍到这了,更多相关Redis延时队列内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!
精彩评论