热点新闻
肝了三晚,终于吃透了Druid连接池
2023-07-23 02:41  浏览:2090  搜索引擎搜索“手机财发网”
温馨提示:信息一旦丢失不一定找得到,请务必收藏信息以备急用!本站所有信息均是注册会员发布如遇到侵权请联系文章中的联系方式或客服删除!
联系我时,请说明是在手机财发网看到的信息,谢谢。
展会发布 展会网站大全 报名观展合作 软文发布

前言

作为一个java程序员,数据库的JDBC几乎每天都在做,数据库连接池Druid每天也在使用,但可能用起来太简单了(spring中引入依赖即可),往往忽略了连接池的意义和优化

本文从源码的角度分析Druid的常用配置及原理

连接

当我们程序需要访问数据库时,需要创建一个本地到数据库服务的网络连接,此时本地代码就相当于一个数据库的客户端,可以通过这个连接去访问数据、执行sql,如下

Driver driver = new com.mysql.cj.jdbc.Driver(); // 创建连接 Connection con = driver.connect(JDBC_URL, props); Statement statement = con.createStatement(); ResultSet resultSet = statement.executeQuery("show tables"); while (resultSet.next()) { System.out.println(resultSet.getString(1)); } con.close();

池化技术

由于我们的代码需要不断与数据库交互读取数据,如果每次请求数据都创建一个连接的话,网络开销是很大的,也会导致我们的程序比较慢,同时连接如果太多也会给数据库造成压力

为了解决这个问题,就有了池化技术,把创建好的连接放在池里,用时去池里获取,节省了创建连接的时间,也可以通过配置来限定池的最大连接数等




池化技术

连接池最常用的工具基本就是阿里的Druid了,简单使用如下

// druid 数据源 DruidDataSource druidDataSource = new DruidDataSource(); // 数据源配置 druidDataSource.setUrl(JDBC_URL); druidDataSource.setUsername(USERNAME); druidDataSource.setPassword(PASSWORD); // 初始化 druidDataSource.init(); // 获取表名 Connection con = druidDataSource.getConnection(); Statement statement = con.createStatement(); ResultSet resultSet = statement.executeQuery("show tables"); while (resultSet.next()) { System.out.println(resultSet.getString(1)); } con.close();

可以看到使用了Druid,获取连接不再是直接使用驱动创建连接,而是通过DruidDataSource对象获取连接

DruidDataSource

接下来就分析DruidDataSource的源码,从三个方面入手:配置、存储、线程

配置

首先作为一个连接池工具,首先要支持重要参数的可配置,以下只列举一部分常用的配置和其简单含义,后面的源码分析会实际的分析每个配置的作用

  • maxActive 最大连接数
  • initialSize 初始化连接数
  • minIdle 最小空闲数
  • keepAlive 是否保持连接
  • asyncInit 是否异步初始化
  • timeBetweenEvictionRunsMillis 回收连接任务运行的频率
  • minEvictableIdleTimeMillis 最小闲置时间,连接闲置时间小于这个时间不会被回收,大于有可能被回收
  • maxEvictableIdleTimeMillis 最大闲置时间,连接闲置时间超过这个数是一定被回收的
  • validationQuery 测试是否有效的sql
  • phyTimeoutMillis 连接物理超时时间

有很多配置都是和其它配置配合使用的,所以很多配置单独拿出来说它的作用没有意义,还是要结合代码看一下

存储

DruidDataSource作为一个连接池,内部一定会有一个容器来存储连接,这应该是最重要的属性

private volatile DruidConnectionHolder[] connections; // 当前的所有连接

connections存储的就是所有的数据库连接对象,并封装了一个连接的持有对象DruidConnectionHolder,在持有物理连接的同时,也记录了一些连接的其它属性,比如:

  • connectTimeMillis 连接建立的时间
  • lastActiveTimeMillis 连接上一次被使用的时间

还有非常重要的一点,这个存储连接的容器是有排序的,每次使用连接都从最后拿,这就导致容器尾部的连接是最活跃的,也就导致前面的连接闲置时间肯定是要高于后面的

计数

同时,池内部有很多计数器来存储当前各种维度的数量值

private int poolingCount = 0; // 可用连接数 private int activeCount = 0; // 正在使用连接数 private volatile long discardCount = 0; // 丢弃连接数 private int notEmptyWaitThreadCount = 0; // 等待连接的线程数

线程

DruidDataSource中有几个线程,在初始化方法init被创建并运行,它们分别承担不同的工作

public void init() throws SQLException { // ... createAndLogThread(); // 开启负责日志统计的线程 createAndStartCreatorThread(); // 开启负责创建连接的线程 createAndStartDestroyThread(); // 开启负责负责销毁连接的线程 // ... }

实际上,DruidDataSource就是依靠这些线程来维护整个线程池中连接的创建和销毁任务,它们可以看做是线程池的维护人员

小结

所以Druid池简单来说就是一个连接的容器(connections),可配的参数,状态/计数的存储组成的一个类,在初始化方法中会创建多个线程,这些线程在连接池的生命周期一直运行并监控这当前线程池的状态,并根据配置和计数数据在需要的时候在容器中创建/销毁线程




Druid

连接池中这几个线程是可以被替代的,如果我们设置了调度器,则可以按我们自己的方式去调度创建销毁连接的任务,这属于比较高级的用法了,本文不做探讨

线程源码分析

协调

线程池内部运行的两个主要线程:创建连接的线程和销毁连接的线程,池外部还有我们用户代码中想要获取连接的线程(在此统一称之为用户线程)

各个线程可能都要访问和修改各种计数和连接容器,为了达到线程安全,DruidDataSource内部提供了一个统一的ReentrantLock锁

protected ReentrantLock lock;

各线程也少不了沟通,比如某用户线程想获取连接,如何通知创建线程去创建连接,创建线程创建完连接有如何告知用户线程,为解决这个问题,DruidDataSource内部提了两个主要的Condition

protected Condition notEmpty; protected Condition empty;

其中empty代表空条件,创建线程通过empty.await()即可等待空信号,而用户线程通过empty.signal()即可发送空信号给创建线程,此时用户线程notEmpty.await()开始等待非空条件,而创建线程一般会创建连接,创建完成后通过notEmpty.signal()通知线程创建完毕




Condition

创建连接的线程

CreateConnectionThread是专门负责创建连接的,可以说DruidDataSource中的连接基本都是由它负责实际创建的(也会有特例,比如默认情况下initialSize设置的连接数是在init方法中直接创建的)

大部分情况下CreateConnectionThread是在empty条件上等待空信号,即empty.wait(),当得到信号时再创建连接

接下来就看一下CreateConnectionThread的源码

public class CreateConnectionThread extends Thread { public CreateConnectionThread(String name){ super(name); // 设置守护线程 this.setDaemon(true); } public void run() { initedLatch.countDown(); long lastDiscardCount = 0; int errorCount = 0; // 线程一直运行着 for (;;) { // 一.判断是否需要创建连接 // 获取锁 lock.lockInterruptibly(); // 当前被丢弃的连接数 long discardCount = DruidDataSource.this.discardCount; // 对比上一次记录被丢弃的连接数,看看是否有变化 boolean discardChanged = discardCount - lastDiscardCount > 0; lastDiscardCount = discardCount; try { // 标志是否需要等待空信号 boolean emptyWait = true; // 存在异常,当前池连接数为0,且没有新丢弃的连接 if (createError != null && poolingCount == 0 && !discardChanged) { emptyWait = false; } // 如果设置了异步初始化,且当前创建的连接数少于设置初始连接数,则跳过等待直接创建连接 if (emptyWait && asyncInit && createCount < initialSize) { emptyWait = false; } // 如果没有跳过等待,并不是实际的去等待,而是还有一层判断 if (emptyWait) { // 有三种情况可以跳过这一步的等待 // 1.等待使用连接的线程数大于当前可用连接数 // 2.设置了keeplive=true且当前池的总连接数小于设置最小连接数 // 3.连续失败isFailContinuous(这一项先忽略) // 跳过这一步等待并不代表可以直接创建,还要进行下一步的是否到达最大设置数量的判断 if (poolingCount >= notEmptyWaitThreadCount // && (!(keepAlive && activeCount + poolingCount < minIdle)) && !isFailContinuous() ) { // 等待空信号 empty.await(); } // 如果当前连接数量已超过设置最大数量,则等待空信号,否则就可以去创建连接了 if (activeCount + poolingCount >= maxActive) { empty.await(); // 等待到了空信号,并不是直接创建连接,而是重新判断一次是否需要等待,因为连接数是绝对不能超越maxActive的,所以为了安全,必须重新判断一次 continue; } } } catch (InterruptedException e) { //... } finally { // 释放锁 lock.unlock(); } // 二.开始创建连接 PhysicalConnectionInfo connection = null; try { // 创建物理连接 connection = createPhysicalConnection(); } catch (SQLException e) { //... } // 加入连接池的连接列表,即connections boolean result = put(connection); // 如果连接池关闭,创建连接线程也停止 if (closing || closed) { break; } } } }

代码看起来还是比较复杂,简单总结一下:
<特殊情况>
创建连接的线程有两种特殊情况,这两种情况主要是异步初始化化和处理异常,这种情况下直接跳过等待,也不需考虑maxActive,直接创建连接,这种情况相对特殊暂不做考虑
<常规情况>
大部分情况下,创建连接的线程要根据minIdle,maxActive等配置以及线程池的状态来判断是否需要等待,如果不需要等待也会创建连接

常规情况下有三种条件,满意任意一种就可以不需等待直接创建连接,但还有个大前提就是池中的连接总数不能超过maxActive设置的数量

三种条件分别是

  • 当等待使用连接的线程数(notEmptyWaitThreadCount)大于池中可用连接数(poolingCount),即供不应求时
  • 当线程池设置保持连接(keepAlive=true),且当前池中的总连接数(activeCount + poolingCount)小于设置最小连接数(minIdle),即池中没有保持足够的最小连接数时
  • isFailContinuous 连续失败时

三种条件如果都不满足,则在empty条件上等待索要连接的信号,得到信号则创建连接(还需要判断最大连接数)

如果三个条件满足任意一个,但连接数已到达maxActive,依然在empty条件上等待信号,得到信号重新再判断一次,是为了确保连接数不超过最大配置

画个图梳理一下




CreateConnectionThread.run()

用一句话总结一下:

CreateConnectionThread负责给线程池创建连接,当线程池中供不应求、最小保持连接数不足、连续错误时线程会主动创建连接,否则就会休息节省体力,得到需求信号再创建连接,创建完成后重新开始审视创建的工作, ps:整个过程确保连接数不能超出设定范围

销毁连接的线程

与CreateConnectionThread对应,DestroyConnectionThread承担销毁连接的任务,主要根据配置的参数和当前的技术器,销毁掉需要销毁的连接

public class DestroyConnectionThread extends Thread { public DestroyConnectionThread(String name) { super(name); // 设置守护线程 this.setDaemon(true); } public void run() { initedLatch.countDown(); // 不断执行 for (;;) { try { //... // 根据配置timeBetweenEvictionRunsMillis决定销毁任务执行的间隔 if (timeBetweenEvictionRunsMillis > 0) { Thread.sleep(timeBetweenEvictionRunsMillis); } else { Thread.sleep(1000); } //... // 执行销毁任务 destroyTask.run(); } catch (InterruptedException e) { break; } } } }

销毁连接的任务实时性要求并不是太高,所以可能会隔一段时间才去计算并销毁一次,这个间隔的时间就是配置timeBetweenEvictionRunsMillis

其中DestroyTask的run方法定义如下

public void run() { shrink(true, keepAlive); if (isRemoveAbandoned()) { removeAbandoned(); } }

主要调用的方法即shrink,意指收缩线程池,重点看一下这个方法:

public void shrink(boolean checkTime, boolean keepAlive) { // 获取锁 lock.lockInterruptibly(); // 是否需要补充 boolean needFill = false; // 驱逐的数量 int evictCount = 0; // 需要保活的数量 int keepAliveCount = 0; int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink; fatalErrorCountLastShrink = fatalErrorCount; try { // 未初始化完成不执行 if (!inited) { return; } // 池中可用连接数超出最小连接数的数量 final int checkCount = poolingCount - minIdle; final long currentTimeMillis = System.currentTimeMillis(); // 循环池中可用连接 for (int i = 0; i < poolingCount; ++i) { DruidConnectionHolder connection = connections[i]; // 异常的处理,暂不做考虑 if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) { keepAliveConnections[keepAliveCount++] = connection; continue; } // 如果检查时间,销毁线程传入的是true if (checkTime) { // 如果设置了物联连接超时时间 if (phyTimeoutMillis > 0) { // 当前连接连接时间过过了超时时间,加入要待回收集合中 long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis; if (phyConnectTimeMillis > phyTimeoutMillis) { evictConnections[evictCount++] = connection; continue; } } // 计算当前连接已闲置的时间 long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis; // 如果连接闲置时间比较短,则可不被回收,可以直接跳出循环,因为连接池是尾部更活跃,后面的肯定更短不需要判断了 if (idleMillis < minEvictableIdleTimeMillis && idleMillis < keepAliveBetweenTimeMillis ) { break; } // 如果连接闲置时间超出了设置的 最小闲置时间 if (idleMillis >= minEvictableIdleTimeMillis) { // 如果当前连接的位置在checkCount以内,则加入待回收集合 if (checkTime && i < checkCount) { evictConnections[evictCount++] = connection; continue; // 否则如果已超出最大闲置时间,也要加入待回收集合 } else if (idleMillis > maxEvictableIdleTimeMillis) { evictConnections[evictCount++] = connection; continue; } } // 如果闲置时间超出保活检测时间,且设置了keepAlive,则加入待验证保活的集合中 if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) { keepAliveConnections[keepAliveCount++] = connection; } } else { //... } } // 要删除的连接总数,实际上keepAliveCount只是有可能被删除,还没有最终定论,这里做法是先删除掉,如果验证连接可用后续再加回来即可 int removeCount = evictCount + keepAliveCount; if (removeCount > 0) { // 删除连接池中的废弃连接,由于废弃的连接一定是前removeCount个连接,所以直接使用复制即可删除 System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount); Arrays.fill(connections, poolingCount - removeCount, poolingCount, null); // 当前可用连接数变小 poolingCount -= removeCount; } keepAliveCheckCount += keepAliveCount; // 如果设置了保活,且总连接数小于最小连接数,则需要补充 if (keepAlive && poolingCount + activeCount < minIdle) { needFill = true; } } finally { lock.unlock(); } // 如果有要回收的连接 if (evictCount > 0) { // 循环 for (int i = 0; i < evictCount; ++i) { DruidConnectionHolder item = evictConnections[i]; Connection connection = item.getConnection(); // 关闭连接 JdbcUtils.close(connection); destroyCountUpdater.incrementAndGet(this); } // 清空需要回收的连接集合 Arrays.fill(evictConnections, null); } // 如果有要进行保活的连接 if (keepAliveCount > 0) { // 循环要保活的连接 for (int i = keepAliveCount - 1; i >= 0; --i) { DruidConnectionHolder holer = keepAliveConnections[i]; Connection connection = holer.getConnection(); holer.incrementKeepAliveCheckCount(); boolean validate = false; try { // 验证链接是否有效,此时要用到配置的validationQuery来验证连接的有效性,如果没设置,就默认有效 this.validateConnection(connection); validate = true; } catch (Throwable error) { if (LOG.isDebugEnabled()) { LOG.debug("keepAliveErr", error); } } boolean discard = !validate; // 如果连接有效 if (validate) { holer.lastKeepTimeMillis = System.currentTimeMillis(); // 重新加入连接池最左侧 boolean putOk = put(holer, 0L, true); if (!putOk) { discard = true; } } // 如果连接无效 if (discard) { try { // 关闭连接 connection.close(); } catch (Exception e) { // skip } lock.lock(); try { // 记录被丢弃的连接数+1 discardCount++; // 如果且总连接数小于最小连接数,发出空信号 if (activeCount + poolingCount <= minIdle) { emptySignal(); } } finally { lock.unlock(); } } } this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount); // 清空需要保活的连接集合 Arrays.fill(keepAliveConnections, null); } // 如果需要补充 if (needFill) { lock.lock(); try { // 计算需要补充的数量,createTaskCount是使用自定义调度时的逻辑,暂时忽略 int fillCount = minIdle - (activeCount + poolingCount + createTaskCount); // 发出空信号 for (int i = 0; i < fillCount; ++i) { emptySignal(); } } finally { lock.unlock(); } } else if (onFatalError || fatalErrorIncrement > 0) { // 异常处理 忽略.. } }

核心代码依然相当复杂,还是尝试总结一下

(一) 销毁任务实时性不高,销毁线程执行是一个定时任务,时间间隔可配

(二) 销毁线程只考虑数目为poolingCount的池中可用连接,正在使用的连接不可能被销毁(其实也已不在池中)

(三) 销毁线程会从前往后循环查看所有的池中连接,主要判断是否需要销毁或者保活,主要包含如下逻辑:

  • 循环前会提前计算当前可用连接超出最小限制连接的数量,为checkCount,这个数量其实就是线程池中多余连接的数量,而且按照容器的排序,越前面的连接越不活跃,所以前checkCount就是多余连接,但多余连接不一定会被移除,有可能因为闲置时间(说明刚用完不久)较短而被暂时保留
  • 如果当前连接闲置时间比较短,不需要进行销毁或保活测试,直接跳出循环,因为后面的连接活跃度更高
  • 如果连接闲置时间比较长,比如超过了设置的最大闲置时间,或超过最小闲置时间且当前连接本身就是多余连接,就会从池中移出至待销毁的集合中
  • 如果连接闲置时间比较长,超过了保活测试的设定时间(且keepAlive),就会从池中移出至待测试有效性的集合中
  • 待销毁集合的连接后续会被直接关闭,待测试有效性集合的连接需要测试连接是否可用,如果不可用直接销毁,通过校验加回至连接池中
  • 由于销毁了很多连接,可能导致keepAlive情况下最小连接数不够了,所以需要通过empty.signal通知创建线程补充连接

再画个示意图




CreateConnectionThread

用户线程

用户线程主要是去池中获取连接,上文也提到过,是从最后拿连接,重点方法takeLast

DruidConnectionHolder takeLast() throws InterruptedException, SQLException { try { while (poolingCount == 0) { // 发送空信号,让创建线程创建连接 emptySignal(); // send signal to CreateThread create connection // 增加等待线程数 notEmptyWaitThreadCount++; // 等待非空信号 try { notEmpty.await(); // signal by recycle or creator } finally { notEmptyWaitThreadCount--; } //... } } catch (InterruptedException ie) { //... } // 有了可用连接 // 可用连接减一,因为要拿出用了 decrementPoolingCount(); // 取出最后一个连接 DruidConnectionHolder last = connections[poolingCount]; connections[poolingCount] = null; // 返回 return last; }

逻辑就是取池中最后一个连接,如果没有通知创建线程创建连接

最后

费了好大劲,基本捋明白了Druid连接池的重要代码,感觉真的很复杂

总结一下Druid的优点

  • 连接的创建销毁异步执行,保证效率
  • 连接池的固定最大连接数避免了连接的过度创建
  • 连接池中连接的存活时间可配置,保证高并发下连接不会被回收,可重复利用
  • 连接池的保活机制,可以固定维持一定数量的连接长期保留在池中,还可以定时检测连接的有效性,固定维持的连接可以在并发骤增的情况下提前预热,避免一次性建立过多连接

其实还是有很多地方并没有想太明白,而且很多结论也很难测试,如果有误,欢迎指正

发布人:63a3****    IP:117.173.23.***     举报/删稿
展会推荐
让朕来说2句
评论
收藏
点赞
转发