开发者

java如何实现高并发场景下三级缓存的数据一致性

下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:

1.缓存结构

  • 本地缓存:使用Caffeine实现,最大容量10,000,写入后10分钟过期
  • 分布式缓存:使用Redisson的RMap结构操作Redis
  • 数据库:作为最终数据源

2.数据读取流程

  • 先查本地缓存,命中则返回
  • 未命中则查Redis,命中则更新本地缓存并返回
  • 仍未命中则获取锁,再次检查两级缓存(双重检查)
  • 最后从数据库读取,更新两级缓存后返回

3.数据更新流程

  • 使用分布式锁保证写操作原子性
  • 先更新数据库
  • 删除本地缓存和Redis缓存
  • 通过Redis Pub/Sub发布缓存清除消息给集群内其他节点
  • 执行延迟双删(100毫秒后再次删除Redis缓存)

4.并发控制

  • 读取时使用本地锁(ReentrantLock)防止缓存击穿
  • 更新时使用Redisson分布式锁(RLock)保证跨节点原子性
  • 锁使用完成后从ConcurrentHashMap中移除

5.集群同步

  • 使用Redis的RTopic实现消息发布订阅
  • 接收到清除消息时自动删除本地缓存
  • 确保集群内各节点缓存一致性

该实现综合运用了延迟双删、发布订阅、锁机制和TTL等多种策略,保障了高并发场景下三级缓存的数据一致性,尤其适合分布式微服务架构。

实战代码:

import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.redisson.api.*;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.CacheBuilder;

@Service
public class CacheService {
    // 本地一级缓存(Caffeine)
    private final Cache<String, Object> localCache;
    // Rphpedisson客户端,用于分布式操作
    private final RedissonClient redissonClient;
    // 锁缓存,用于控制并发
    private final ConcurrentHashMap<String, ReentrantLock> lockMap = new ConcurrentHashMap<>();
    // 延迟任务执行器
    private final ScheduledExecutorService scheduledExecutorService;
    // 主题订阅,用于接收集群消息
    private final RTopic cacheClearTopic;

    @Autowired
  编程  public CacheService(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
        this.localCache = CacheBuilder.newBuilder()
                .maximumSize(10_000)
                .expireAfterWrite(10, TimeUnit.MINUTES)
                .build();
        this.scheduledExecutorService = Executors.newScheduledThreadPool(5);
        this.cacheClearTopic = redissonClient.getTopiwww.devze.comc("cache:clear");
        
        // 注册消息监听器
        cacheClearTopic.addListener(String.class, (channel, key) -> {
            localCache.invalidate(key);
        });
    }

    // 读取缓存
    public Object get(String key) {
        // 1. 先查本地缓存
        Object value = localCache.getIfPresent(key);
        if (value != null) {
            return value;
        }

        // 2. 本地缓存未命中,查Redis
        RMap<String, Object> redisMap = redissonClient.getMap("cache");
        value = redisMap.get(key);
        if (value != null) {
            localCache.put(key, value);
            return value;
        }

        // 3. Redis未命中,查数据库
        ReentrantLock lock = lockMap.computephpIfAbsent(key, k -> new ReentrantLock());
        lock.lock();
        try {
            // 双重检查
            value = localCache.getIfPresent(key);
            if (value != null) {
                return value;
            }
            
            value = redisMap.get(key);
            if (value != null) {
                localCache.put(key, value);
                return value;
            }

            // 从数据库读取
   VMGPmRs         value = readFromDatabase(key);
            if (value != null) {
                // 放入Redis并设置TTL
                redisMap.put(key, value, 300, TimeUnit.SECONDS);
                // 放入本地缓存
                localCache.put(key, value);
            }
            return value;
        } finally {
            lock.unlock();
            lockMap.remove(key);
        }
    }

    // 更新数据
    public void update(String key, Object value) {
        // 使用分布式锁保证写操作的原子性
        RLock lock = redissonClient.getLock("writeLock:" + key);
        lock.lock();
        try {
            // 1. 更新数据库
            boolean success = updateDatabase(key, value);
            if (success) {
                // 2. 先删除本地缓存
                localCache.invalidate(key);
                // 3. 删除Redis缓存
                RMap<String, Object> redisMap = redissonClient.getMap("cache");
                redisMap.remove(key);
                // 4. 发布清除缓存的消息到集群
                cacheClearTopic.publish(key);
                // 5. 延迟双删
                scheduledExecutorService.schedule(() -> {
                    redisMap.remove(key);
                }, 100, TimeUnit.MILLISECONDS);
            }
        } finally {
            lock.unlock();
        }
    }

    // 从数据库读取数据(示例方法)
    private Object readFromDatabase(String key) {
        // 实际实现中会查询数据库
        return "data_from_db_" + key;
    }

    // 更新数据库(示例方法)
    private boolean updateDatabase(String key, Object value) {
        // 实际实现中会更新数据库
        return true;
    }
}    

redisson配置

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        // 单机模式配置
        config.useSingleServer()
              .setAddress("redis://localhost:6379")
              .setConnectionMinimumIdleSize(5)
              .setConnectionPoolSize(50);
        
        // 集群模式配置示例
        /*
        config.useClusterServers()
              .addNodeAddress("redis://node1:6379", "redis://node2:6379")
              .setScanInterval(2000)
              .setMasterConnectionMinimumIdleSize(10)
              .setMasterConnectionPoolSize(64)
              .setSlaveConnectionMinimumIdleSize(10)
              .setSlaveConnectionPoolSize(64);
        */
        
        return Redisson.create(config);
    }
}    

到此这篇关于java如何实现高并发场景下三级缓存的数据一致性的文章就介绍到这了,更多相关java三级缓存内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新开发

开发排行榜