Java中ShardingSphere 数据分片的实现

2022-07-21,,,

目录
  • shardingsphere介绍

前言

其实很多人对分库分表多少都有点恐惧,其实我也是,总觉得这玩意是运维干的、数据量上来了或者sql过于复杂、一些数据分片的中间件支持的也不是很友好、配置繁琐等多种问题。

我们今天用shardingsphere 给大家演示数据分片,包括分库分表、只分表不分库进行说明。

下一节有时间的话在讲讲读写分离吧。

github地址:https://github.com/362460453/boot-sharding-jdbc

shardingsphere介绍

shardingsphere是一套开源的分布式数据库中间件解决方案组成的生态圈,它由sharding-jdbc、sharding-proxy和sharding-sidecar(计划中)这3款相互独立的产品组成。 他们均提供标准化的数据分片、分布式事务和数据库治理功能,可适用于如java同构、异构语言、容器、云原生等各种多样化的应用场景。

shardingsphere的功能能帮助我们做什么

  • 数据分片
  • 读写分离
  • 编排治理
  • 分布式事务

2016年初sharding-jdbc被开源,这个产品是当当的,加入了apache 后改名为 shardingsphere 。他是我们应用和数据库之间的中间层,虽代码入侵性很强,但不会对现有业务逻辑进行改变。

更多文档请点击官网:

为什么不用mycat

大家如果去查相关资料会知道,mycat和shardingsphere是同类型的中间件,主要的功能,数据分片和读写分离两个都能去做,但是姿势却有很大的差别, 从字面意义上看sharding 含义是分片、碎片的意思,所以不难理解shardingsphere 对数据分片有很强对能力,对于99%对sql都是支持的,官网也有sql支持的相关内容,大家详细阅读,只有 类似sum 这种函数不支持,而且对 orm框架和常用数据库基本都兼容,所以个人建议如果你们做数据分片,也就是是分库分表对话,强烈建议选择shardingsphere,因为我私下也和一些朋友交流过,mycat 的数据分片对多表查询不是很友好,而且用 mycat 要有很强的运维来做,还有一点就是mycat 都是靠xml配置的,没有代码入侵,所以这也算是他的优点吧。如果你们只做读写分离对话,那么我建议用mycat,是没问题的。

实践前的准备工作

启动你的mysql,创建两个数据库,分别叫 sharding_master 和 sharding_salve分别在这两个数据库执行如下sql

create table if not exists `t_order_0` (
  `order_id` int not null,
  `user_id`  int not null,
  primary key (`order_id`)
);
create table if not exists `t_order_1` (
  `order_id` int not null,
  `user_id`  int not null,
  primary key (`order_id`)
);

做完以上两步结果如下

代码案例

环境

工具 版本
jdk

1.8.0_144

springboot 2.0.4.release
sharding 1.3.1
mysql 5.7

创建一个springboot工程,我们使用 jdbctemplate 框架,如果用mybatis也是无影响的。

pom引用依赖如下

<parent>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-parent</artifactid>
        <version>2.0.4.release</version>
    </parent>
 
    <properties>
        <java.version>1.8</java.version>
        <druid.version>1.0.26</druid.version>
        <sharding.jdbc.core.version>1.3.3</sharding.jdbc.core.version>
    </properties>
 
    <dependencies>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-jdbc</artifactid>
        </dependency>
        <dependency>
            <groupid>mysql</groupid>
            <artifactid>mysql-connector-java</artifactid>
        </dependency>
        <dependency>
            <groupid>com.dangdang</groupid>
            <artifactid>sharding-jdbc-core</artifactid>
            <version>${sharding.jdbc.core.version}</version>
        </dependency>
        <dependency>
            <groupid>com.alibaba</groupid>
            <artifactid>druid</artifactid>
            <version>${druid.version}</version>
        </dependency>
    </dependencies>

application.yml 配置如下

server:
  port: 8050
sharding:
  jdbc: 
    driverclassname: com.mysql.jdbc.driver
    url: jdbc:mysql://localhost:3306/sharding_master?useunicode=true&characterencoding=utf-8&autoreconnect=true&failoverreadonly=false
    username: root
    password: 123456
    filters: stat
    maxactive: 100
    initialsize: 1
    maxwait: 15000
    minidle: 1
    timebetweenevictionrunsmillis: 30000
    minevictableidletimemillis: 180000
    validationquery: select 'x'
    testwhileidle: true
    testonborrow: false
    testonreturn: false
    poolpreparedstatements: false
    maxpoolpreparedstatementperconnectionsize: 20
    removeabandoned: true
    removeabandonedtimeout: 600
    logabandoned: false
    connectioninitsqls: 
    
    url0: jdbc:mysql://localhost:3306/sharding_master?useunicode=true&characterencoding=utf-8&autoreconnect=true&failoverreadonly=false
    username0: root
    password0: 123456
    
    url1: jdbc:mysql://localhost:3306/sharding_salve?useunicode=true&characterencoding=utf-8&autoreconnect=true&failoverreadonly=false
    username1: root
    password1: 123456

yml映射成bean

@data
@configurationproperties(prefix="sharding.jdbc")
public class sharddatasourceproperties {
	
	private string driverclassname;
	
	private string url;
	
	private string username;
	
	private string password;
	
	private string url0;
	
	private string username0;
	
	private string password0;
	
	private string url1;
	
	private string username1;
	
	private string password1;
	
	private string filters;
	
	private int maxactive;
	
	private int initialsize;
	
	private int maxwait;
	
	private int minidle;
	
	private int timebetweenevictionrunsmillis;
	
	private int minevictableidletimemillis;
	
	private string validationquery;
	
	private boolean testwhileidle;
	
	private boolean testonborrow;
	
	private boolean testonreturn;
	
	private boolean poolpreparedstatements;
	
	private int maxpoolpreparedstatementperconnectionsize;
	
	private boolean removeabandoned;
 
	private int removeabandonedtimeout;
	
	private boolean logabandoned;
	
	private list<string> connectioninitsqls;
//省略geter setter

分库策略

//通过实现singlekeydatabaseshardingalgorithm接口实现分库
public class modulodatabaseshardingalgorithm implements singlekeydatabaseshardingalgorithm<integer> {
 
	@override
	public string doequalsharding(collection<string> availabletargetnames, shardingvalue<integer> shardingvalue) {
		for (string each : availabletargetnames) {
            if (each.endswith(shardingvalue.getvalue() % 2 + "")) {
                return each;
            }
        }
        throw new illegalargumentexception();
	}
 
	@override
	public collection<string> doinsharding(collection<string> availabletargetnames,
			shardingvalue<integer> shardingvalue) {
		collection<string> result = new linkedhashset<>(availabletargetnames.size());
        for (integer value : shardingvalue.getvalues()) {
            for (string targetname : availabletargetnames) {
                if (targetname.endswith(value % 2 + "")) {
                    result.add(targetname);
                }
            }
        }
        return result;
	}
 
	@override
	public collection<string> dobetweensharding(collection<string> availabletargetnames,
			shardingvalue<integer> shardingvalue) {
		collection<string> result = new linkedhashset<>(availabletargetnames.size());
        range<integer> range = (range<integer>) shardingvalue.getvaluerange();
        for (integer i = range.lowerendpoint(); i <= range.upperendpoint(); i++) {
            for (string each : availabletargetnames) {
                if (each.endswith(i % 2 + "")) {
                    result.add(each);
                }
            }
        }
        return result;
	}
}

分表策略

public class modulotableshardingalgorithm implements singlekeytableshardingalgorithm<integer> {
 
    /**
     * 对于分片字段的等值操作 都走这个方法。(包括 插入 更新)
     * 如:
     * <p>
     * select * from t_order from t_order where order_id = 11
     * └── select *  from t_order_1 where order_id = 11
     * select * from t_order from t_order where order_id = 44
     * └── select *  from t_order_0 where order_id = 44
     * </p>
     */
	@override
    public string doequalsharding(final collection<string> tablenames, final shardingvalue<integer> shardingvalue) {
        for (string each : tablenames) {
            if (each.endswith(shardingvalue.getvalue() % 2 + "")) {
                return each;
            }
        }
        throw new illegalargumentexception();
    }
    
    /**
     * 对于分片字段的in操作,都走这个方法。
    *  select * from t_order from t_order where order_id in (11,44)  
    *          ├── select *  from t_order_0 where order_id in (11,44) 
    *          └── select *  from t_order_1 where order_id in (11,44) 
    *  select * from t_order from t_order where order_id in (11,13,15)  
    *          └── select *  from t_order_1 where order_id in (11,13,15)  
    *  select * from t_order from t_order where order_id in (22,24,26)  
    *          └──select *  from t_order_0 where order_id in (22,24,26) 
    */
	@override
    public collection<string> doinsharding(final collection<string> tablenames, final shardingvalue<integer> shardingvalue) {
        collection<string> result = new linkedhashset<>(tablenames.size());
        for (integer value : shardingvalue.getvalues()) {
            for (string tablename : tablenames) {
                if (tablename.endswith(value % 2 + "")) {
                    result.add(tablename);
                }
            }
        }
        return result;
    }
    
    /**
     * 对于分片字段的between操作都走这个方法。
    *  select * from t_order from t_order where order_id between 10 and 20 
    *          ├── select *  from t_order_0 where order_id between 10 and 20 
    *          └── select *  from t_order_1 where order_id between 10 and 20 
    */
	@override
    public collection<string> dobetweensharding(final collection<string> tablenames, final shardingvalue<integer> shardingvalue) {
        collection<string> result = new linkedhashset<>(tablenames.size());
        range<integer> range = (range<integer>) shardingvalue.getvaluerange();
        for (integer i = range.lowerendpoint(); i <= range.upperendpoint(); i++) {
            for (string each : tablenames) {
                if (each.endswith(i % 2 + "")) {
                    result.add(each);
                }
            }
        }
        return result;
    } 
}

对特定表和库,进行特定的分库分表规则

简单说,就是分库按照了user_id的奇偶区分,分表按照order_id 的奇偶区分,

如果你有多个表进行分片,就写多个tablerule,

配置两个数据源,分别是我在yml里的配置,根据你的需求个性化配置就可以。

@configuration
@enableconfigurationproperties(sharddatasourceproperties.class)
public class sharddatasourceconfig {
 
	@autowired
	private sharddatasourceproperties sharddatasourceproperties;
 
	private druiddatasource parentds() throws sqlexception {
		druiddatasource ds = new druiddatasource();
		ds.setdriverclassname(sharddatasourceproperties.getdriverclassname());
		ds.setusername(sharddatasourceproperties.getusername());
		ds.seturl(sharddatasourceproperties.geturl());
		ds.setpassword(sharddatasourceproperties.getpassword());
		ds.setfilters(sharddatasourceproperties.getfilters());
		ds.setmaxactive(sharddatasourceproperties.getmaxactive());
		ds.setinitialsize(sharddatasourceproperties.getinitialsize());
		ds.setmaxwait(sharddatasourceproperties.getmaxwait());
		ds.setminidle(sharddatasourceproperties.getminidle());
		ds.settimebetweenevictionrunsmillis(sharddatasourceproperties.gettimebetweenevictionrunsmillis());
		ds.setminevictableidletimemillis(sharddatasourceproperties.getminevictableidletimemillis());
		ds.setvalidationquery(sharddatasourceproperties.getvalidationquery());
		ds.settestwhileidle(sharddatasourceproperties.istestwhileidle());
		ds.settestonborrow(sharddatasourceproperties.istestonborrow());
		ds.settestonreturn(sharddatasourceproperties.istestonreturn());
		ds.setpoolpreparedstatements(sharddatasourceproperties.ispoolpreparedstatements());
		ds.setmaxpoolpreparedstatementperconnectionsize(
				sharddatasourceproperties.getmaxpoolpreparedstatementperconnectionsize());
		ds.setremoveabandoned(sharddatasourceproperties.isremoveabandoned());
		ds.setremoveabandonedtimeout(sharddatasourceproperties.getremoveabandonedtimeout());
		ds.setlogabandoned(sharddatasourceproperties.islogabandoned());
		ds.setconnectioninitsqls(sharddatasourceproperties.getconnectioninitsqls());
		return ds;
	}
 
	private datasource ds0() throws sqlexception {
		druiddatasource ds = parentds();
		ds.setusername(sharddatasourceproperties.getusername0());
		ds.seturl(sharddatasourceproperties.geturl0());
		ds.setpassword(sharddatasourceproperties.getpassword0());
		return ds;
	}
 
	private datasource ds1() throws sqlexception {
		druiddatasource ds = parentds();
		ds.setusername(sharddatasourceproperties.getusername1());
		ds.seturl(sharddatasourceproperties.geturl1());
		ds.setpassword(sharddatasourceproperties.getpassword1());
		return ds;
	}
 
	private datasourcerule datasourcerule() throws sqlexception {
		map<string, datasource> datasourcemap = new hashmap<>(2);
		datasourcemap.put("ds_0", ds0());
		datasourcemap.put("ds_1", ds1());
		datasourcerule datasourcerule = new datasourcerule(datasourcemap);
		return datasourcerule;
	}
//对order对策略
	private tablerule ordertablerule() throws sqlexception {
		tablerule ordertablerule = tablerule.builder("t_order").actualtables(arrays.aslist("t_order_0", "t_order_1"))
				.datasourcerule(datasourcerule()).build();
		return ordertablerule;
	}
 
//分库分表策略
	private shardingrule shardingrule() throws sqlexception {
		shardingrule shardingrule = shardingrule.builder().datasourcerule(datasourcerule())
				.tablerules(arrays.aslist(ordertablerule(), orderitemtablerule()))
				.databaseshardingstrategy(
						new databaseshardingstrategy("user_id", new modulodatabaseshardingalgorithm()))
				.tableshardingstrategy(new tableshardingstrategy("order_id", new modulotableshardingalgorithm()))
				.build();
		return shardingrule;
	}
 
	@bean
	public datasource datasource() throws sqlexception {
		return shardingdatasourcefactory.createdatasource(shardingrule());
	}
 
 
    @bean
    public platformtransactionmanager transactionmanager() throws sqlexception {
        return new datasourcetransactionmanager(datasource());
    }
}

我们需要从controller调用接口进行对数据的增加和查询

下面所有的类都是用来模拟请求进行测试

@restcontroller
@requestmapping("/order")
public class ordercontroller {
    @autowired
    private orderdao orderdao;
 
    @requestmapping(path = "/createorder/{userid}/{orderid}", method = {requestmethod.get})
    public string createorder(@pathvariable("userid") integer userid, @pathvariable("orderid") integer orderid) {
        order order = new order();
        order.setorderid(orderid);
        order.setuserid(userid);
        orderdao.createorder(order);
        return "success";
    }
 
    @requestmapping(path = "/{userid}", method = {requestmethod.get})
    public list<order> getorderlistbyuserid(@pathvariable("userid") integer userid) {
        return orderdao.getorderlistbyuserid(userid);
    }
}
 
 
---------------------------------------------------
public interface orderdao {
    list<order> getorderlistbyuserid(integer userid);
 
    void createorder(order order);
}
---------------------------------------------------
@service
public class orderdaoimpl implements orderdao {
    @autowired
    jdbctemplate jdbctemplate;
 
 
    @override
    public list<order> getorderlistbyuserid(integer userid) {
 
        stringbuilder sqlbuilder = new stringbuilder();
        sqlbuilder
                .append("select order_id, user_id from t_order where user_id=? ");
        return jdbctemplate.query(sqlbuilder.tostring(), new object[]{userid},
                new int[]{types.integer}, new beanpropertyrowmapper<order>(
                        order.class));
    }
 
    @override
    public void createorder(order order) {
        stringbuffer sb = new stringbuffer();
        sb.append("insert into t_order(user_id, order_id)");
        sb.append("values(");
        sb.append(order.getuserid()).append(",");
        sb.append(order.getorderid());
        sb.append(")");
        jdbctemplate.update(sb.tostring());
 
    }
}
 
---------------------------------------------------
public class order implements serializable {
 
	private int userid;
 
	private int orderid;
 
---------------------------------------------------
@springbootapplication
public class application {
	
	public static void main(string[] args) {
        springapplication.run(application.class, args);
    }
}

测试

启动项目,访问:http://localhost:8050/order/createorder/1/1

更换参数多次访问,可以插入多条记录,观察你的数据库入库情况,已经按照我们制定的分库分表策略进行划分了。

需要注意的是

shareding是不支持jdbctemplate的批量修改操作的。

表名前不要加上库名,原生的情况加库名,不加库名其实是一样的,但使用shareding的表就会报错。

如果想进行只分表不分库的话

  • 注释掉 modulodatabaseshardingalgorithm 类
  • 还有sharddatasourceconfig.shardingrule() 中的分库策略那行代码
  • 还有相关数据源配置改成 1 个

到此这篇关于java中shardingsphere 数据分片的实现的文章就介绍到这了,更多相关shardingsphere 数据分片内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

《Java中ShardingSphere 数据分片的实现.doc》

下载本文的Word格式文档,以方便收藏与打印。