开发者

Mybatis-Plus集成Sharding-JDBC与Flyway实现多租户分库分表实战

目录
  • 背景
  • 方案说明
  • 技术实现
    • 前提
    • 不使用Sharding-JDBC场景
    • 使用Sharding-JDBC

背景

公司产品部收到了一些重要客户的需求,他们希望能够依赖独立的数据库存储来支持他们的业务数据。与此同时,仍有许多中小客户,可以继续使用公共库以满足其需求。技术实现方面,此前持久层框架使用的MyBATis-plus,部分业务场景使用到了Sharding-JDBC用于分表,另外,我们的数据库版本控制工具使用的是Flyway。

方案说明

这里将方案进行简要说明,配置统一通过Nacos管理(有需要的可以自行定义租户配置页面)。

1.首先多数据源管理使用Mybatis-Plus官方推荐的dynamic-datasource-spring-boot-starter组件,需要注意的是构建动态多数据源时,我们要把Sharding-JDBC数据源也纳入管理。因为我们的库里面毕竟只有部分表用到了Sharding-JDBC,这样可以复用数据源。

Mybatis-Plus集成Sharding-JDBC与Flyway实现多租户分库分表实战

2.其次,租户与数据源之间在Nacos建立关系配置,确保根据租户ID能够路由到唯一的租户数据源。我们需要自定义Sharding分片策略和多数据源切换逻辑,根据http请求传入的租户ID,设置正确的数据源。

Mybatis-Plus集成Sharding-JDBC与Flyway实现多租户分库分表实战

3.动态数据源与Sharding数据源配置做为公共配置在Nacos维护,在业务服务启动时,读取公共配置初始化多数据源,并添加对公共多数据源配置的监听。当配置变更时,重新构造Sharding数据源,并并更新动态多数据源。另外数据库脚本通过自定义flyway配置执行。

Mybatis-Plus集成Sharding-JDBC与Flyway实现多租户分库分表实战

技术实现

前提

需要在Nacos提前维护租户与数据源关系配置。

不使用Sharding-JDBC场景

1.引入相关组件依赖。

<dependency>
          <groupId>com.alibaba.nacos</groupId>
          <artifactId>nacos-client</artifactId>
          <version>2.1.0</version>
      </dependency>
      <dependency>
          <groupId>org.flywaydb</groupId>
          <artifactId>flyway-core</artifactId>
          &编程客栈lt;version>7.15.0</version>
      </dependency>
      <dependency>
          <groupId>com.baomidou</groupId>
          <artifactId>mybatis-plus-boot-starter</artifactId>
          <version>3.4.1</version>
      </dependency>
      <dependency>
          <groupId>com.baomidou</groupId>
          <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
          <version>3.4.1</version>
      &jslt;/dependency>
      <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>druid</artifactId>
          <version>1.2.6</version>
      </dependency>

2.关闭Flyway自动配置和配置多数据源。

spring:
flyway:
  #关闭flyway自动配置,自定义实现
  enabled: false
datasource:
  dynamic:
    #默认数据源
    primary: ds0
    datasource:
      ds0:
        type: com.alibaba.druid.pool.DruidDataSource
        driverClassName: org.PostgreSQL.Driver
        url: jdbc:postgresql://127.0.0.1:5432/ds0
        username: ds0
        password: ds0123
      ds1:
        type: com.alibaba.druid.pool.DruidDataSource
        driverClassName: org.postgresql.Driver
        url: jdbc:postgresql://127.0.0.1:5432/ds1
        username: ds1
        password: ds1123

3.自定义实现Flyway配置类,对应的flyway脚本目录结构见下图,主库和租户库SQL脚本独立维护。

Java
@Slf4j
@Configuration
@EnableTransactionManagement
public class FlywayConfig {
  @Value("${spring.application.name}")
  private String appName;
  @Autowired
  private DataSource dataSource;
  @Bean
  public void migrate() {
      log.info("flyway开始逐数据源执行脚本");
      DynamicRoutingDataSource ds = (DynamicRoutingDataSource) dataSource;
      Map<String, DataSource> dataSources = ds.getDataSources();
      dataSources.forEach((k, v) -> {
          if (!"sharding".equals(k)) {
                          // Flyway相关参数建议通过配置管理,以下代码仅供参考
              Flyway flyway = Flyway.configure()
                      .dataSource(v)
                      .table("t_" + k + "_" + appName + "_version")
                      .baselineOnMigrate(true)
                      .outOfOrder(true)
                      .baselineVersion("1.0.0")
                      .baselineDescription(k + "初始化")
                      .locations(CommonConstant.SQL_BASE_LOCATION + (CommonConstant.DEFAULT_DS_NAME.equals(k) ? CommonConstant.MASTER_DB : CommonConstant.TENANT_DB))
                      .load();
              flyway.migrate();
              log.info("flyway在 {} 数据源执行脚本成功", k);
          }
      });
  }
}

Mybatis-Plus集成Sharding-JDBC与Flyway实现多租户分库分表实战

4.自定义实现数据源切换Filter类。

@Slf4j
@Component
@WebFilter(filterName = "dynamicDatasourceFilter", urlPatterns = {"/*"})
public class DynamicDatasourceFilter implements Filter {
  // 构建演示用租户与数据源关系配置
      private static Map<String, String> tenantDsMap = new HashMap<>();
  static {
      tenantDsMap.put("tenant123", "ds0");
      tenantDsMap.put("tenant456", "ds0");
              tenantDsMap.put("tenant789", "ds1");
  }
  @Override
  public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
      HttpServletRequest httpRequest = (HttpServletRequest) request;
      // 从请求头获取租户ID
      String tenantId = httpRequest.getHeader(CommonConstant.TENANT_HEADER);
      try {
          // 设置数据源
          if (tenantDsMap.get(tenantId) == null) {
              // 如果根据租户ID未找到租户数据源配置,默认走主库
              DynamicDataSourceContextHolder.push(CommonConstant.DEFAULT_DS_NAME);
          } else {
              //注意,如果是分片表,那么需要在分片表Service类或方法上加@DS("sharding")注解,最终由sharding的库分片策略决定SQL在哪个库执行。而这里的设置将会被@DS注解配置覆盖
              DynamicDataSourceContextHolder.push(tenantDsMap.get(tenantId));
          }
          // 执行
          chain.doFilter(request, response);
      } catch (Exception e) {
          log.error("切换数据源失败,tenantId={},请求接口uri={},异常原因:{}", tenantId, httpRequest.getRequestURI(), ExceptionUtils.getStackTrace(e));
      } finally {
          // 清空当前线程数据源
          DynamicDataSourceContextHolder.poll();
      }
  }

Mybatis-Plus集成Sharding-JDBC与Flyway实现多租户分库分表实战

使用Sharding-JDBC

如果微服务还需要使用Sharding分片,那么还需要引入sharding-jdbc组件依赖,并配置sharding数据源和分片规则。如果是多个服务共用数据库,那么建议将Sharding数据源配置做为公共配置在Nacos管理,而Sharding分片规则则做为服务个性化配置单独维护(分片规则基本不需要动态变更),这样当有新租户需要申请开通独立租户库的时候,直接变更Sharding数据源公共配置,服务在监听到公共配置变更后,即可重新构建新的Sharding数据源实例和动态数据源更新,无需重启服务。

1.引入sharding-jdbc组件依赖。

<dependency>
          <groupId>org.apache.shardingsphere</groupId>
          <artifactId>sharding-jdbc-core</artifactId>
          <version>4.1.1</version>
      </dependency>

2.配置Sharding数据源和分片规则。

# sharding数据源配置
dataSources:
ds0: !!com.alibaba.druid.pool.DruidDataSource
  driverClassName: org.postgresql.Driver
  url: jdbc:postgresql://127.0.0.1:5432/ds0
  username: ds0
  password: ds0123
ds1: !!com.alibaba.druid.pool.DruidDataSource
  driverClassName: org.postgresql.Driver
  url: jdbc:postgresql://127.0.0.1:5432/ds1
  username: ds1
  password: ds1123
ds2: !!com.alibaba.druid.pool.DruidDataSource
  driverClassName: org.postgresql.Driver
  url: jdbc:postgresql://127.0.0.1:5432/ds2
  username: ds2
  password: ds2123
# sharding分片规则配置
shardingRule:
tables:
  t_order:
    actualDataNodes: ds$->{0..2}.t_order$->{0..1}
    tableStrategy:
      inline:
        shardingColumn: order_no
        algorithmExpression: t_order$->{order_no.toBigInteger() % 2}
defaultDataSourceName: ds0
# 默认库分片策略
defaultDatabaseStrategy:
  standard:
    shardingColumn: tenant_id
          # 自定义精确分片策略
    preciseAlgorithmClassName: cn.xtstu.demo.config.CustomDataSourcePreciseShardingAlgorithm
  #hint:
          # 
  #  algorithmClassName: cn.xtstu.demo.config.CustomHintShardingAlgorithm
defaultTableStrategy:
  none:
props:
sql.show: true

3.自定义精确分片策略。

public class CustomDataSourcePreciseShardingAlgorithm implements PreciseShardingAlgorithm<String> {

  // 构建VZZdIKv演示用租户与数据源关系配置
      private static Map<String, String> tenantDsMap = new HashMap<>();
  static {
      tenantDsMap.put("tenant123", "ds0");
      tenantDsMap.put("tenant456", "ds0");
              tenantDsMap.put("tenant789", "ds1");
  }
      
  @Override
  public String DOSharding(Collection<String> dataSourceNames, PreciseShardingValue<String> shardingValue) {
          // 库分片策略配置的分片键是字段tenant_id,根据分片键查询配置的数据源
          String dsName = tenantDsMap.get(shardingValue.getValue());
      // 如果如前文所属,Sharding子数据源key与dynamic数据源key保持一致的话,这里直接返回就行了
              return dsName;
      // TODO 需要处理未匹配到数据源的情况
  }
}

4.自定义Hint分片策略(可选),适用于分片键与SQL无关的场景。

public class CustomHintShardingAlgorithm implements HintShardingAlgorithm<Integer> {
  // 构建演示用租户与数据源关系配置
      private static Map<String, String> tenantDsMap = new HashMap<>();
  static {
      tenantDsMap.put("tenant123", "ds0");
      tenantDsMap.put("tenant456", "ds0");
              tenantDsMap.put("tenant789", "ds1");
  }
  @Override
  public Collection<String> doSharding(Collection<String> collection, HintShardingValue<Integer> hintShardingValue) {
      Collection<String> result = new ArrayList<>();
      // 从请求头取到当前租户ID
              HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
      result.add(tenantDsMap.get(request.getHeader("tenantId")));
              // TODO  需要处理未匹配到数据源的情况
      return result;
  }
}

5.自定义动态数据源配置(核心就是将sharding数据源及其子数据源添加到动态数据源一起管理)。

@Slf4j
@Configuration
public class CustomDynamicDataSourceConfig {
  @Value("${spring.cloud.nacos.config.extension-configs[0].data-id}")
  private String dataId;
  @Value("${spring.cloud.nacos.config.group:DEFAULT_GROUP}")
  private String group;
  @Resource
  private DynamicDataSourceProperties properties;
  @Resource
  private NacosHelper nacosHelper;

  /**
   * 启动时通过查询Nacos上sharding数据源及分片规则yaml配置初始化sharding-jdbc数据源
   *
   * @return
   */
  @Bean
  public Sha编程客栈rdingDataSource shardingDataSource() {
      ConfigService configService = nacosHelper.getConfigService();
      if (configService == null) {
          log.error("连接nacos失败");
      }
      String configInfo = null;
      try {
          configInfo = configService.getConfig(dataId, group, 5000);
      } catch (NacosException e) {
          log.error("获取{}配置失败,异常原因:{}", dataId, ExceptionUtils.getStackTrace(e));
      }
      if (StringUtils.isBlank(configInfo)) {
          log.error("{}配置为空,启动失败", dataId);
          throw new NullPointerException(dataId + "配置为空");
      }
      try {
          // 通过工厂类和yaml配置创建Sharding数据源
          return (ShardingDataSource) YamlShardingDataSourceFactory.createDataSource(configInfo.getBytes(StandardCharsets.UTF_8));
      } catch (Exception e) {
          log.error("创建sharding-jdbc数据源异常:{}", ExceptionUtils.getStackTrace(e));
          throw new NullPointerException("sharding-jdbc数据源为空");
      }
  }

  /**
   * 将动态数据源设置为首选的
   * 当spring存在多个数据源时, 自动注入的是首选的对象
   * 设置为主要的数据源之后,就可以支持shardingJdbc原生的配置方式了
   */
  @Primary
  @Bean
  public DataSource dataSource() {
      DynamicRoutingDataSource dataSource = new DynamicRoutingDataSource();
      dataSource.setPrimary(properties.getPrimary());
      dataSource.setStrict(properties.getStrict());
      dataSource.setStrategy(properties.getStrategy());
      dataSource.setP6spy(properties.getP6spy());
      dataSource.setSeata(properties.getSeata());
      return dataSource;
  }

  /**
   * 初始化动态数据源
   *
   * @return
   */
  @Bean
  public DynamicDataSourceProvider dynamicDataSourceProvider(ShardingDataSource shardingDataSource) {
      return new AbstractDataSourceProvider() {
     js     @Override
          public Map&lt;String, DataSource&gt; loadDataSources() {
              Map&lt;String, DataSource&gt; dataSourceMap = new HashMap&lt;&gt;();
              // 将sharding数据源整体添加到动态数据源里
              dataSourceMap.put(CommonConstant.SHARDING_DS_NAME, shardingDataSource);
              // 同时把sharding内部管理的子数据源也添加到动态数据源里
              Map&lt;String, DataSource&gt; shardingInnerDataSources = shardingDataSource.getDataSourceMap();
              dataSourceMap.putAll(shardingInnerDataSources);
              return dataSourceMap;
          }
      };
  }
}

6.最后给出一份通过监听Nacos配置变更动态更新数据源的示例代码。注意:这份示例代码中只给出了Sharding配置变更时的处理逻辑,如果是dynamic数据源配置的话,有需要的可以参考着自行实现。

@Slf4j
@Configuration
public class NacosShardingConfigListener {
  @Value("${spring.cloud.nacos.config.extension-configs[0].data-id}")
  private String dataId;
  @Value("${spring.cloud.nacos.config.group:DEFAULT_GROUP}")
  private String group;
  @Value("${spring.application.name}")
  private String appName;
  @Autowired
  private DataSource dataSource;
  @Autowired
  private NacosHelper nacosHelper;
  @PostConstruct
  public void shardingConfigListener() throws Exception {
      ConfigService configService = nacosHelper.getConfigService();
      if (configService == null) {
          return;
      }
      configService.addListener(dataId, group, new Listener() {
          @Override
          public Executor getExecutor() {
              return null;
          }
          @Override
          public void receiveConfigInfo(String configInfo) {
              log.info("configInfo:\n{}", configInfo);
              if (StringUtils.isBlank(configInfo)) {
                  log.warn("sharding-jdbc配置为空,不会刷新数据源");
                  return;
              }
              try {
                  if (StringUtils.isNotBlank(configInfo)) {
                      // 通过yaml配置创建sharding数据源(注意:如果分片规则是独立配置文件,那么需要提前合并数据源和分片规则配置)
                      ShardingDataSource shardingDataSource = (ShardingDataSource) YamlShardingDataSourceFactory.createDataSource(configInfo.getBytes(StandardCharsets.UTF_8));
                      Map<String, DataSource> shardingInnerDataSources = shardingDataSource.getDataSourceMap();
                      DynamicRoutingDataSource ds = (DynamicRoutingDataSource) dataSource;
                      // 遍历sharding子数据源
                      for (String poolName : shardingInnerDataSources.keySet()) {
                          // TODO 这里还有个细节,如果yaml配置删减了数据源,对应数据源应该要从ds中remove掉,且主数据源不能被remove。另外其实只有新增的数据源才需要执行flyway脚本
                          // 将sharding子数据源逐个添加到动态数据源
                          ds.addDataSource(poolName, shardingInnerDataSources.get(poolName));
                          // 通过代码完成数据源Flyway配置,并执行迁移操作
                                                      Flyway flyway = Flyway.configure()
                                  .dataSource(dataSource)
                                  .table("t_" + poolName + "_" + appName + "_version")
                                  .baselineOnMigrate(true)
                                  .outOfOrder(true)
                                  .baselineVersion("1.0.0")
                                  .baselineDescription(poolName + "初始化")
                                  .locations(CommonConstant.SQL_BASE_LOCATION + CommonConstant.TENANT_DB)
                                  .load();
                          flyway.migrate();
                      }
                      // 将sharding数据源自身也添加到动态数据源
                      ds.addDataSource(CommonConstant.SHARDING_DS_NAME, shardingDataSource);
                      log.info("动态数据源刷新完成,现有数据源:{}", jsONUtil.toJsonStr(ds.getDataSources().keySet()));
                  }
              } catch (Exception e) {
                  log.error("创建sharding-jdbc数据源异常:{}", ExceptionUtils.getStackTrace(e));
              }
          }
      });
  }
}

以上就是Mybatis-Plus集成Sharding-JDBC与Flyway实现多租户分库分表实战的详细内容,更多关于Mybatis-Plus集成Sharding-JDBC的资料请关注编程客栈(www.devze.com)其它相关文章!

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新开发

开发排行榜