开发者

Druid之连接创建及销毁示例详解

目录
  • 前言
  • 正文
    • 一. DruidDataSource连接创建
    • 二. DruidDataSource连接销毁
  • 总结

    前言

    Druid是阿里开源的数据库连接池,是阿里监控系统Dragoon的副产品,提供了强大的可监控性和基于Filter-Chain的可扩展性。

    本篇文章将对Druid数据库连接池的连接创建销毁进行分析。分析Druid数据库连接池的源码前,需要明确几个概念。

    • Druid数据库连接池中可用的连接存放在一个数组connections中;
    • Druid数据库连接池做并发控制,主要靠一把可重入锁以及和这把锁关联的两个Condition对象;
    public DruidAbstractDataSource(boolean lockFair) {
       lock = new ReentrantLock(lockFair);
       notEmpty = lock.newCondition();
       empty = lock.newCondition();
    }
    
    • 连接池没有可用连接时,应用线程会在notEmpty上等待,连接池已满时,生产连接的线程会在empty上等待;
    • 对连接保活,就是每间隔一定时间,对达到了保活间隔周期的连接进行有效性校验,可以将无效连接销毁,也可以防止连接长时间不与数据库服务端通信。

    Druid版本:1.2.11

    正文

    开发者_JAVA

    一. DruidDataSource连接创建

    DruidDataSource连接的创建由CreateConnectionThread线程完成,其run() 方法如下所示。

    public void run() {
        initedLatch.countDown();
        long lastDiscardCount = 0;
        int errorCount = 0;
        for (; ; ) {
            try {
                lock.lockInterruptibly();
            } catch (InterruptedException e2) {
                break;
            }
            long discardCount = DruidDataSource.this.discardCount;
            boolean discardChanged = discardCount - lastDiscardCount > 0;
            lastDiscardCount = discardCount;
            try {
                // emptyWait为true表示生产连接线程需要等待,无需生产连接
                boolean emptyWait = true;
                // 发生了创建错误,且池中已无连接,且丢弃连接的统计没有改变
                // 此时生产连接线程需要生产连接
                if (createError != null
                        && poolingCount == 0
                        && !discardChanged) {
                    emptyWait = false;
                }
                if (emptyWait
                        && asyncInit && createCount < initialSize) {
                    emptyWait = false;
                }
                if (emptyWait) {
                    // 池中已有连接数大于等于正在等待连接的应用线程数
                    // 且当前是非keepAlive场景
                    // 且当前是非连续失败
                    // 此时生产连接的线程在empty上等待
                    // keepAlive && activeCount + poolingCount < minIdle时会在shrink()方法中触发emptySingal()来添加连接
                    // isFailContinuous()返回true表示连续失败,即多次(默认2次)创建物理连接失败
                    if (poolingCoupythonnt >= notEmptyWaitThreadCount
                            && (!(keepAlive && activeCount + poolingCount < minIdle))
                            && !isFailContinuous()
                    ) {
                        empty.await();
                    }
                    // 防止创建超过maxActive数量的连接
                    if (activeCount + poolingCount >= maxActive) {
                        empty.await();
                        continue;
                    }
                }
            } catch (InterruptedException e) {
                // 省略
            } finally {
                lock.unlock();
            }
            PhysicalConnectionInfo connection = null;
            try {
                connection = createPhysicalConnection();
            } catch (SQLException e) {
                LOG.error("create connection SQLException, url: " + jdbcUrl
                        + ", errorCode " + e.getErrorCode()
                        + ", state " + e.getSQLState(), e);
                errorCount++;
                if (errorCount > connectionErrorRetryAttempts
                        && timeBetweenConnectErrorMillis > 0) {
                    // 多次创建失败
                    setFailContinuous(true);
                    // 如果配置了快速失败,就唤醒所有在notEmpty上等待的应用线程
                    if (failFast) {
                        lock.lock();
                        try {
                            notEmpty.signalAll();
                        } finally {
                            lock.unlock();
                        }
                    }
                    if (breakAfterAcquireFailure) {
                        break;
                    }
                    try {
                        Thread.sleep(timeBetweenConnectErrorMillis);
                    } catch (InterruptedException interruptEx) {
                        break;
                    }
                }
            } catch (RuntimeException e) {
                LOG.error("create connection RuntimeException", e);
                setFailContinuous(true);
                continue;
            } catch (Error e) {
                LOG.error("create connection Error", e);
                setFailContinuous(true);
                break;
            }
            if (connection == null) {
                continue;
            }
            // 把连接添加到连接池
            boolean result = put(connection);
            if (!result) {
                JdbcUtils.close(connection.getPhysicalConnection());
                LOG.info("put physical connection to pool failed.");
            }
            errorCount = 0;
            if (closing || closed) {
                break;
            }
        }
    }
    

    CreateConnectionThreadrun() 方法整体就是在一个死循环中不断的等待,被唤醒,然后创建线程。当一个物理连接被创建出来后,会调用DruidDataSource#put方法将其放到连接池connections中,put() 方法源码如下所示。

    protected boolean put(PhysicalConnec编程tionInfo physicalConnectionInfo) {
        DruidConnectionHolder holder = null;
        try {
            holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo);
        } catch (SQLException ex) {
            // 省略
            return false;
        }
        return put(holder, physicalConnectionInfo.createTaskId, false);
    }
    private boolean put(DruidConnectionHolder holder,
                        long createTaskId, boolean checkExists) {
        // 涉及到连接池中连接数量改变的操作,都需要加锁
        lock.lock();
        try {
            if (this.closing || this.closed) {
                return false;
            }
            // 池中已有连接数已经大于等于最大连接数,则不再把连接加到连接池并直接返回false
            if (poolingCount >= maxActive) {
                if (createScheduler != null) {
                    clearCreateTask(createTaskId);
                }
                return false;
            }
            // 检查重复添加
            if (checkExists) {
                for (int i = 0; i < poolingCount; i++) {
                    if (connections[i] == holder) {
                        return false;
                    }
                }
            }
            // 连接放入连接池
            connections[poolingCount] = holder;
            // poolingCount++
            incrementPoolingCount();
            if (poolingCount > poolingPeak) {
                poolingPeak = poolingCount;
                poolingPeakTime = System.currentTimeMillis();
            }
            // 唤醒在notEmpty上等待连接的应用线程
            notEmpty.signal();
            notEmptySignalCount++;
            if (createScheduler != null) {
                clearCreateTask(createTaskId);
                if (poolingCount + createTaskCount < notEmptyWaitThreadCount
                        && activeCount + poolingCount + createTaskCount < maxActive) {
                    emptySignal();
                }
            }
        } finally {
            lock.unlock();
        }
        return true;
    }
    

    put() 方法会先将物理连接从PhysicalConnectionInfo中获取出来并封装成一个DruidConnectionHolderDruidConnectionHolder就是Druid连接池中的连接。新添加的连接会存放在连接池数组connectionspoolingCount位置,然后poolingCount会加1,也就是poolingCount代表着连接池中可以获取的连接的数量。

    二. DruidDataSource连接销毁

    DruidDataSource连接的销毁由DestroyConnectionThread线程完成,其run() 方法如下所示。

    public void run() {
        // run()方法只要执行了,就调用initedLatch#countDown
        initedLatch.countDown();
        for (; ; ) {
            // 每间隔timeBetweenEvictionRunsMillis执行一次DestroyTask的run()方法
            try {
                if (closed || closing) {
                    break;
                }
                if (timeBetweenEvictionRunsMillis > 0) {
                    Thread.sleep(timeBetweenEvictionRunsMillis);
                } else {
                    Thread.sleep(1000);
                }
                if (Thread.interrupted()) {
                    break;
                }
                // 执行DestroyTask的run()方法来销毁需要销毁的连接
                destroyTask.run();
            } catch (InterruptedException e) {
                break;
            }
        }
    }
    

    DestroyConnectionThreadrun() 方法就是在一个死循环中每间隔timeBetweenEvictionRunsMillis的时间就执行一次DestroyTaskrun() 方法。DestroyTask#run方法实现如下所示。

    public void run() {
        // 根据一系列条件判断并销毁连接
        shrink(true, keepAlive);
        // RemoveAbandoned机制
        if (isRemoveAbandoned()) {
            removeAbandoned();
        }
    }
    

    DestroyTask#run方法中会调用DruidDataSource#shrink方法来根据设定的条件来判断出需要销毁和保活的连接。DruidDataSource#shrink方法如下所示。

    // checkTime参数表示在将一个连接进行销毁前,是否需要判断一下空闲时间
    public void shrink(boolean checkTime, boolean keepAlive) {
        // 加锁
        try {
            lock.lockInterruptibly();
        } catch (InterruptedException e) {
            return;
        }
        // needFill = keepAlive && poolingCount + activeCount < minIdle
        // needFill为true时,会调用empty.signal()唤醒生产连接的线程来生产连接
        boolean needFill = false;
        // evictCount记录需要销毁的连接数
        // keepAliveCount记录需要保活的连接数
        int evictCount = 0;
        int keepAliveCount = 0;
        int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
        fatalErrorCountLastShrink = fatalErrorCount;
        try {
            if (!inited) {
                return;
            }
            // checkCount = 池中已有连接数 - 最小空闲连接数
            // 正常情况下,最多能够将前checkCount个连接进行销毁
            final int checkCount = poolingCount - minIdle;
            final long currentTimeMillis = System.currentTimeMillis();
            // 正常情况下,需要遍历池中所有连接
            // 从前往后遍历,i为数组索引
            for (int i = 0; i < poolingCount; ++i) {
                DruidConnectionHolder connection = connections[i];
                // 如果发生了致命错误(onFatalError == true)且致命错误发生时间(lastFatalErrorTimeMillis)在连接建立时间之后
                // 把连接加入到保活连接数组中
                if ((onFatalError || fatalErrorIncrement > 0)
                        && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) {
                    keepAliveConnections[keepAliveCount++] = connection;
                    continue;
                }
                if (checkTime) {
                    // phyTimeoutMillis表示连接的物理存活超时时间,默认值是-1
                    if (phyTimeoutMillis > 0) {
                        // phyConnectTimeMillis表示连接的物理存活时间
                        long phyConnectTimeMillis = currentTimeMillis
                                - connection.connectTimeMillis;
                        // 连接的物理存活时间大于phyTimeoutMillis,则将这个连接放入evictConnections数组
                        if (phyConnectTimeMillis > phyTimeoutMillis) {
                            evictConnections[evictCount++] = connection;
                            continue;
                        }
                    }
                    // idleMillis表示连接的空闲时间
                    long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
                    // minEvictableIdleTimeMillis表示连接允许的最小空闲时间,默认是30分钟
                    // keepAliveBetweenTimeMillis表示保活间隔时间,默认是2分钟
                    // 如果连接的空闲时间小于minEvictableIdleTimeMillis且还小于keepAliveBetweenTimeMillis
                    // 则connections数组中当前连接之后的连接都会满足空闲时间小于minEvictableIdleTimeMillis且还小于keepAliveBetweenTimeMillis
                    // 此时跳出遍历,不再检查其余的连接
                    if (idleMillis < minEvictableIdleTimeMillis
                            && idleMillis < keepAliveBetweenTimeMillis
                    ) {
                        break;
                    }
                    // 连接的空闲时间大于等于允许的最小空闲时间
                    if (idleMillis >= minEvictableIdleTimeMillis) {
                        if (checkTime && i < checkCount) {
                            // i &ljavascriptt; checkCount这个条件的理解如下:
                            // 每次shrink()方法执行时,connections数组中只有索引0到checkCount-1的连接才允许被销毁
                            // 这样才能保证销毁完连接后,connections数组中至少还有minIdle个连接
                            evictConnections[evictCount++] = connection;
                            continue;
                        } else if (idleMillis > maxEvictableIdleTimeMillis) {
                            // 如果空闲时间过久,已经大于了允许的最大空闲时间(默认7小时)
                            // 那么无论如何都要销毁这个连接
                            evictConnections[evictCount++] = connection;
                            continue;
                        }
                    }
                    // 如果开启了保活机制,且连接空闲时间大于等于了保活间隔时间
                    // 此时将连接加入到保活连接数组中
                    if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
                        keepAliveConnections[keepAliveCount++] = connection;
                    }
                } else {
                    // checkTime为false,那么前checkCount个连接直接进行销毁,不再判断这些连接的空闲时间是否超过阈值
                    if (i < checkCount) {
                        evictConnections[evictCount++] = connection;
                    } else {
                        break;
                    }
                }
            }
            // removeCount = 销毁连接数 + 保活连接数
            // removeCount表示本次从connections数组中拿掉的连接数
            // 注:一定是从前往后拿,正常情况下最后minIdle个连接是安全的
            int removeCount = evictCount + keepAliveCount;
            if (removeCount > 0) {
                // [0, 1, 2, 3, 4, null, null, null] -> [3, 4, 2, 3, 4, null, null, null]
                System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
                // [3, 4, 2, 3, 4, null, null, null] -> [3, 4, null, null, null, null, null, null, null]
                Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
                // 更新池中连接数
                poolingCount -= removeCount;
            }
            keepAliveCheckCount += keepAliveCount;
            // 如果池中连接数加上活跃连接数(借出去的连接)小于最小空闲连接数
            // 则将needFill设为true,后续需要唤醒生产连接的线程来生产连接
            if (keepAlive && poolingCount + activeCount < minIdle) {
                needFill = true;
            }
        } finally {
            lock.unlock();
        }
        if (evictCount > 0) {
            // 遍历evictConnections数组,销毁其中的连接
            for (int i = 0; i < evictCount; ++i) {
                DruidConnectionHolder item = evictConnections[i];
                Connection connection = item.getConnection();
                JdbcUtils.close(connection);
                destroyCountUpdater.incrementAndGet(this);
            }
            Arrays.fill(evictConnections, null);
        }
        if (keepAliveCount > 0) {
            // 遍历keepAliveConnections数组,对其中的连接做可用性校验
            // 校验通过连接就放入connections数组,没通过连接就销毁
            for (int i = keepAliveCount - 1; i >= 0; --i) {
                DruidConnectionHolder holer = keepAliveConnections[i];
                Connection connection = holer.getConnection();
                holer.incrementKeepAliveCheckCount();
                boolean validate = false;
                try {
                    this.validateConnection(connection);
                    validate = true;
                } catch (Throwable error) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("keepAliveErr", error);
                    }
                }
                boolean discard = !validate;
                if (validate) {
                    holer.lastKeepTimeMillis = System.currentTimeMillis();
                    boolean putOk = put(holer, 0L, true);
                    if (!putOk) {
                        discard = true;
                    }
                }
                if (discard) {
                    try {
                        connection.close();
                    } catch (Exception e) {
                    }
                    lock.lock();
                    try {
                        discardCount++;
                        if (activeCount + poolingCount <= minIdle) {
                            emptySignal();
                        }
                    } finally {
                        lock.unlock();
                   python }
                }
            }
            this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
            Arrays.fill(keepAliveConnections, null);
        }
        // 如果needFill为true则唤醒生产连接的线程来生产连接
        if (needFill) {
            lock.lock();
            try {
                // 计算需要生产连接的个数
                int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
                for (int i = 0; i < fillCount; ++i) {
                    emptySignal();
                }
            } finally {
                lock.unlock();
            }
        } else if (onFatalError || fatalErrorIncrement > 0) {
            lock.lock();
            try {
                emptySignal();
            } finally {
                lock.unlock();
            }
        }
    }
    

    DruidDataSource#shrink方法中,核心逻辑是遍历connections数组中的连接,并判断这些连接是需要销毁还是需要保活。通常情况下,connections数组中的前checkCount(checkCountjavascript = poolingCount - minIdle) 个连接是危险的,因为这些连接只要满足了:空闲时间 >= minEvictableIdleTimeMillis(允许的最小空闲时间),那么就需要被销毁,而connections数组中的最后minIdle个连接是相对安全的,因为这些连接只有在满足:空闲时间 > maxEvictableIdleTimeMillis(允许的最大空闲时间) 时,才会被销毁。这么判断的原因,主要就是需要让连接池里能够保证至少有minIdle个空闲连接可以让应用线程获取。

    当确定好了需要销毁和需要保活的连接后,此时会先将connections数组清理,只保留安全的连接,这个过程示意图如下。

    Druid之连接创建及销毁示例详解

    最后,会遍历evictConnections数组,销毁数组中的连接,遍历keepAliveConnections数组,对其中的每个连接做可用性校验,如果校验可用,那么就重新放回connections数组,否则销毁。

    总结

    连接的创建由一个叫做CreateConnectionThread的线程完成,整体流程就是在一个死循环中不断的等待,被唤醒,然后创建连接。每一个被创建出来的物理连接Java.sql.Connection会被封装为一个DruidConnectionHolder,然后存放到connections数组中。

    连接的销毁由一个叫做DestroyConnectionThread的线程完成,核心逻辑是周期性的遍历connections数组中的连接,并判断这些连接是需要销毁还是需要保活,需要销毁的连接最后会被物理销毁,需要保活的连接最后会进行一次可用性校验,如果校验不通过,则进行物理销毁。

    以上就是Druid之连接创建及销毁示例详解的详细内容,更多关于Druid连接创建销毁的资料请关注我们其它相关文章!

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜