Redis Subscribe timeout 报错的问题解决
目录
- 介绍
- 溯因
- 复现
介绍
Redisson版本 2.8.2
最近公司系统偶尔报出org.redisson.client.RedisTimeoutException: Subscribe timeout: (7500ms)
的错误,观察堆栈信息看到报错是一段使用Redisson的redis锁的地方,去除业务逻辑代码基本如下
public void mockLock(String phoneNum) { log.info("{} - prepare lock", threadName); RLock lock = redissonClient.getLock("redis_cache_test" + phoneNum); try { lock.lock(); log.info("{} - get lock", threadName); //睡眠10s Thread.sleep(10000); } catch (Exception e) { log.info("{} - exception", threadName,e); } finally { log.info("{} - unlock lock", threadName); lock.unlock(); }
导致报错的代码是lock.lock()
的实现
@Override public void syncSubscription(RFuture<?> future) { MasterSlaveServersConfig config = connectionManager.getConfig(); try { int timeout = config.getTimeout() + config.getRetryInterval()*config.getRetryAttempts(); if (!future.await(timeout)) { throw new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms)"); } } catch (InterruptedException e) js{ Thread.currentThread().interrupt(); } future.syncUninterruptibly(); }
溯因
syncSubscription
中的future
是RedissonLock.subscribe(long threadId)
方法
protected RFuture<RedissonLockEntry> subscribe(long threadId) { return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager()); }
这里可以看出大概是在PUBSUB
中获取一个订阅,再往下看源码
public RFuture<E> subscribe(final String entryName, final String channelName, final ConnectionManager connectionManager) { //监听持有 final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>(); //获取锁订阅队列 final AsyncSemaphore semaphjsore = connectionManager.getSemaphore(channelName); //订阅拒绝实现 final RPromise<E> newpromise = new PromiseDelegator<E>(connectionManager.<E>newPromise()) { @Override public boolean cancel(boolean mayInterruptIfRunning) { return semaphore.remove(listenerHolder.get()); } }; Runnable listener = new Runnable() { @Override public void run() { //判断是否已经存在相同的entry E entry = entries.get(entryName); if (entry != null) { entry.aquire(); semaphore.release(); entry.getPromise().addListener(new TransferListener<E>(newPromise)); return; } //没有则新建 E value = createEntry(newPromise); value.aquire(); E oldValue = entries.putIfAbsent(entryName, value); if (oldValue != null) { oldValue.aquire(); semaphore.release(); oldValue.getPromise().addListener(new TransferListener<E>(newPromise)); return; } //监听对应的entry RedisPubSubListener<Object> listener = createListener(channelName, value); //订阅事件 connectionManager.subscribe(LongCodec.INSTANCE, channelName, listener, semaphore); } }; //用semaphore管理监听队列,因为可能存在多个线程等待一个锁 semaphore.acquire(listener); //保证订阅拒绝逻辑 listenerHolder.set(listener); return newPromise; }
这里可以看到这个方法其实只是定义了一个名叫listener
的Runnable, semaphore.acquire(listener);
则保证了同一个channel仅会有一个线程去监听,其他的继续等待,而订阅逻辑还在connectionManager.subscribe
里面
private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?> listener, final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock) { final PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName); if (connEntry != null) { connEntry.addListener(channelName, listener); connEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() { @Override public void operationComplete(Future<Void> future) throws Exception { lock.release(); promise.trySuccess(connEntry); } }); return; } freePubSublock.acquire(new Runnable() { @Override public void run() { if (promise.isDone()) { return; } //如果没有获取到公共的连接直接返回 final PubSubConnectionEntry freeEntry = freePubSubConnections.peek(); if (freeEntry == null) { connect(codec, channelName, listener, promise, type, lock); return; } //entry有个计数器subscriptionsPerConnection 如果为-1报错因为下面有0的判断 int remainFreeAmount = freeEntry.tryAcquire(); if (remainFreeAmount == -1) { throw new IllegalStateException()GrDhkls; } final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry); if (oldEntry != null) { freeEntry.release(); freePubSubLock.release(); oldEntry.addListener(channelName, listener); oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() { @Override public void operationComplete(Future<Void> future) throws Exception { lock.release(); promise.trySuccess(oldEntry); } }); return; } //subscriptionsPerConnection为0时从公共连接池中吐出 if (remainFreeAmount == 0) { freePubSubConnections.poll(); } freePubSubLock.release(); freeEntry.addListener(channelName, listener); freeEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() { @Override public void operationComplete(Future<Void> future) throws Exception { lock.release(); promise.trySuccess(freeEntry); } }); if (PubSubType.PSUBSCRIBE == type) { freeEntry.psubscribe(codec, channelName); } else { freeEntry.subscribe(codec, channelName); } } }); }
这里在没有连接的情况下会进到connect(codec, channelName, listener, promise, type, lock);
中去
private void connect(final Codec codec, final String channelName, final RedisPubSubListener<?> listener, final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock) { final int slot = calcSlot(channelName); //根据subscriptionConnectionPoolSize获取下一个链接 RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(slot); connFuture.addListener(new FutureListener<RedisPubSubConnection>() { @Override public void operationComplete(Future<RedisPubSubConnection> future) throws Exception { if (!future.isSuccess()) { freePubSubLock.release(); lock.release(); promise.tryFailure(future.cause()); return; } RedisPubSubConnection conn = future.getNow(); final PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); entry.tryAcquire(); final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); if (oldEntry != null) { releaseSubscribeConnection(slot, entry); freePubSubLock.release(); oldEntry.addListener(channelName, listener); oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() { @Override public void operationComplete(Future<Void> future) throws Exception { lock.release(); promise.trySuccess(oldEntry); } }); return; } freePubSubConnections.add(entry); freePubSubLock.release(); entry.addListener(channelName, listener); entry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() { @Override public void operationComplete(Future<Void> future) throws Exception { lock.release(); promise.trySuccess(entry); } }); if (PubSubType.PSUBSCRIBE == type) { entry.psubscribe(codec, channelName); } else { entry.subscribe(codec, channelName); } } }); }
这里的RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
最终会调用ClientConnectionsEntry#acquireSubscribeConnection
方法的
freeSubscribeConnectionsCounter.acquire(runnable)
至此我们找到原因 当同时等待锁订阅消息达到subscriptionConnectionPoolSize*subscriptionsPerConnection
个时,再多一个订阅消息,连接一直无法获取导致MasterSlaveConnectionManager
中的freePubSubLock
没有释放。 另外由于在超时场景下MasterSlaveConnectionManager
向连接池获取连接后是直接缓存下来,不把分发订阅链接释返回给连接池的,因此导致freeSubscribeConnectionsCounter
一直等待,出现死锁情况。
最终表现就是org.redisson.client.RedisTimeoutException: Subscribe timeout: (7500ms)
复现
Redis配置
public RedissonClient redissonClient(RedisConfig redisConfig) { Config config = new Config(); config.useSingleServer() .setAddress(redisConfig.getHost() + ":" + redisConfig.getPort()) .setPassword(redisConfig.getPassword()) .setDatabase(redisConfig.getDatabase()) .setConnectTimeout(redisConfig.getConnectionTimeout()) .setTimeout(redisConfig.getTimeout()) //把两个配置项设置为1 .setSubscriptionConnectionPoolSize(1) .setSubscriptionsPerConnection(1); return Redisson.create(config); }
测试方法
void contextLoads() throws InterruptedException { Runnable runnable = () -> { redissonLock.tryRedissonLock(); }; new Thread(runnable, "线程1").start(); new Thread(runnable, "线程12").start(); new Thread(runnable, "线程23").start(); new Thread(runnable, "线程21").start(); Thread.sleep(200000); }
结果
org.redisson.client.RedisTimeoutException: Subscribe timeout: (5500ms) at org.redisson.command.CommandAsyncService.syncSubscription(CommandAsyncService.http://www.devze.comJava:126) ~[redisson-2.8.2.jar:na] at org.redisson.RedissonLock.lockInterruptibly(RedissonLock.java:121) ~[redisson-2.8.2.jar:na] at org.redisson.RedissonLock.lockInterruptibly(RedissonLock.java:108) ~[redisson-2.8.2.jar:na] at org.redisson.RedissonLock.lock(RedissonLock.java:90) ~[redisson-2.8.2.jar:na] at com.rick.redislock.lock.RedissonLock.registerPersonalMember(RedissonLock.java:30) ~[classes/:na] at com.rick.redislock.RedisLockApplicationTests.lambda$contextLoads$0(RedisLockApplicationTests.java:15) [test-classes/:na] at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_144]
符合预期
到此这篇关编程客栈于Redis Subscribe timeout 报错的问题解决的文章就介绍到这了,更多相关Redis Subscribe timeout 报错内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!
精彩评论