NoSql与Rdbms取长补短相辅相成
常见的NoSql
1. 键值数据库 Redis、Memcached、Riak使用键值对,通过散列表实现,用于内存缓存,如会话、配置文件、参数等;频繁读写、拥有简单数据模型的应用; 2. 列族数据库 HBase、Bigtable、Cassandra以列族式存储将同一列数据存在一起,用于分布式数据存储与管理; 3. 文档数据库 MongoDB、CouchDB使用value时JSON结构的文档,用于Web应用,存储面向文档或类似半结构化的数据; 4. 图形数据库 Neo4j、InfoGrid使用图结构,用于社交网络、推荐系统,专注构建关系图谱。
Mysql搭配Redis
Mysql中的name字段做索引,相当于Redis的key,data字段为100字节的数据,相当于redis的Value:
@SpringBootApplication @Slf4j public class CommonMistakesApplication { //模拟10万条数据存到Redis和MySQL public static final int ROWS = 100000; public static final String PAYLOAD = IntStream.rangeClosed(1, 100).mapToObj(__ -> "a").collect(Collectors.joining("")); @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private JdbcTemplate jdbcTemplate; @Autowired private StandardEnvironment standardEnvironment; public static void main(String[] args) { SpringApplication.run(CommonMistakesApplication.class, args); } @PostConstruct public void init() { //使用-Dspring.profiles.active=init启动程序进行初始化 if (Arrays.stream(standardEnvironment.getActiveProfiles()).anyMatch(s -> s.equalsIgnoreCase("init"))) { initRedis(); initMySQL(); } } //填充数据到MySQL private void initMySQL() { //删除表 jdbcTemplate.execute("DROP TABLE IF EXISTS `r`;"); //新建表,name字段做了索引 jdbcTemplate.execute("CREATE TABLE `r` (\n" + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" + " `data` varchar(2000) NOT NULL,\n" + " `name` varchar(20) NOT NULL,\n" + " PRIMARY KEY (`id`),\n" + " KEY `name` (`name`) USING BTREE\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;"); //批量插入数据 String sql = "INSERT INTO `r` (`data`,`name`) VALUES (?,?)"; jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() { @Override public void setValues(PreparedStatement preparedStatement, int i) throws SQLException { preparedStatement.setString(1, PAYLOAD); preparedStatement.setString(2, "item" + i); } @Override public int getBatchSize() { return ROWS; } }); log.info("init mysql finished with count {}", jdbcTemplate.queryForObject("SELECT COUNT(*) FROM `r`", Long.class)); } //填充数据到Redis private void initRedis() { IntStream.rangeClosed(1, ROWS).forEach(i -> stringRedisTemplate.opsForValue().set("item" + i, PAYLOAD)); log.info("init redis finished with count {}", stringRedisTemplate.keys("item*")); } }
Mysql和Redis随机读取单条数据的性能:
@Autowired private JdbcTemplate jdbcTemplate; @Autowired private StringRedisTemplate stringRedisTemplate; @GetMapping("redis") public void redis() { //使用随机的Key来查询Value,结果应该等于PAYLOAD Assert.assertTrue(stringRedisTemplate.opsForValue().get("item" + (ThreadLocalRandom.current().nextInt(CommonMistakesApplication.ROWS) + 1)).equals(CommonMistakesApplication.PAYLOAD)); } @GetMapping("mysql") public void mysql() { //根据随机name来查data,name字段有索引,结果应该等于PAYLOAD Assert.assertTrue(jdbcTemplate.queryForObject("SELECT data FROM `r` WHERE name=?", new Object[]{("item" + (ThreadLocalRandom.current().nextInt(CommonMistakesApplication.ROWS) + 1))}, String.class) .equals(CommonMistakesApplication.PAYLOAD)); }
使用wrk加10个线程50个并发连接做压测,可以看到,Mysql90%的请求需要61ms,QPS 为 1460;而 Redis 90% 的请求在 5ms 左右,QPS 达到了 14008,几乎是 MySQL 的十倍;
但Redis薄弱的地方是不擅长做key的搜索。对mysql可以使用like操作前匹配走B+树索引实现快速索引;但对Redis使用keys命令对key的搜索,相当于msyql里做了全表扫描。
@GetMapping("redis2") public void redis2() { Assert.assertTrue(stringRedisTemplate.keys("item71*").size() == 1111); } @GetMapping("mysql2") public void mysql2() { Assert.assertTrue(jdbcTemplate.queryForList("SELECT name FROM `r` WHERE name LIKE 'item71%'", String.class).size() == 1111); }
在 QPS 方面,MySQL 的 QPS 达到了 Redis 的 157 倍;在延迟方面,MySQL 的延迟只有 Redis 的十分之一。
Redis慢的原因:
1. Redis 的 Keys 命令是 O(n) 时间复杂度。如果数据库中 Key 的数量很多,就会非常慢。
2. Redis 是单线程的,对于慢的命令如果有并发,串行执行就会非常耗时。
因此,我们使用Redis都是针对某一个key使用,而不能在业务代码中使用keys命令从Redis中“搜索数据”,因为这不是redis擅长。对于key的搜索,可以先通过关系型数据库进行,然后再从redis存取数据(如果实在需要搜索key可以使用SCAN命令)。在生产环境中,可以配置Redis禁止类似keys这样危险的命令。
Redis 提供了丰富的数据结构(Set、SortedSet、Hash、List),并围绕这些数据结构提供了丰富的 API。如果我们好好利用这个特点的话,可以直接在 Redis 中完成一部分服务端计算,避免“读取缓存 -> 计算数据 -> 保存缓存”三部曲中的读取和保存缓存的开销,进一步提高性能。
Mysql搭配Elasticsearch
Elasticsearch(以下简称 ES),是目前非常流行的分布式搜索和分析数据库,独特的倒排索引结构尤其适合进行全文搜索。
倒排索引可以认为是一个 Map,其 Key 是分词之后的关键字,Value 是文档 ID/ 片段 ID 的列表。我们只要输入需要搜索的单词,就可以直接在这个 Map 中得到所有包含这个单词的文档 ID/ 片段 ID 列表,然后再根据其中的文档 ID/ 片段 ID 查询出实际的文档内容。
简单测试
@Entity @Document(indexName = "news", replicas = 0) //@Document注解定义了这是一个ES的索引,索引名称news,数据不需要冗余 @Table(name = "news", indexes = {@Index(columnList = "cateid")}) //@Table注解定义了这是一个MySQL表,表名news,对cateid列做索引 @Data @AllArgsConstructor @NoArgsConstructor @DynamicUpdate public class News { @Id private long id; @Field(type = FieldType.Keyword) private String category;//新闻分类名称 private int cateid;//新闻分类ID @Column(columnDefinition = "varchar(500)")//@Column注解定义了在MySQL中字段,比如这里定义title列的类型是varchar(500) @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")//@Field注解定义了ES字段的格式,使用ik分词器进行分词 private String title;//新闻标题 @Column(columnDefinition = "text") @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart") private String content;//新闻内容 }
从csv文件加载4000条新闻数据,复制100份,平成40万数据写入
@SpringBootApplication @Slf4j @EnableElasticsearchRepositories(includeFilters = @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = NewsESRepository.class)) //明确设置哪个是ES的Repository @EnableJpaRepositories(excludeFilters = @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = NewsESRepository.class)) //其他的是MySQL的Repository public class CommonMistakesApplication { public static void main(String[] args) { Utils.loadPropertySource(CommonMistakesApplication.class, "es.properties"); SpringApplication.run(CommonMistakesApplication.class, args); } @Autowired private StandardEnvironment standardEnvironment; @Autowired private NewsESRepository newsESRepository; @Autowired private NewsMySQLRepository newsMySQLRepository; @PostConstruct public void init() { //使用-Dspring.profiles.active=init启动程序进行初始化 if (Arrays.stream(standardEnvironment.getActiveProfiles()).anyMatch(s -> s.equalsIgnoreCase("init"))) { //csv中的原始数据只有4000条 Listnews = loadData(); AtomicLong atomicLong = new AtomicLong(); news.forEach(item -> item.setTitle("%%" + item.getTitle())); //我们模拟100倍的数据量,也就是40万条 IntStream.rangeClosed(1, 100).forEach(repeat -> { news.forEach(item -> { //重新设置主键ID item.setId(atomicLong.incrementAndGet()); //每次复制数据稍微改一下title字段,在前面加上一个数字,代表这是第几次复制 item.setTitle(item.getTitle().replaceFirst("%%", String.valueOf(repeat))); }); initMySQL(news, repeat == 1); log.info("init MySQL finished for {}", repeat); initES(news, repeat == 1); log.info("init ES finished for {}", repeat); }); } } //从news.csv中解析得到原始数据 private List loadData() { //使用jackson-dataformat-csv实现csv到POJO的转换 CsvMapper csvMapper = new CsvMapper(); CsvSchema schema = CsvSchema.emptySchema().withHeader(); ObjectReader objectReader = csvMapper.readerFor(News.class).with(schema); ClassLoader classLoader = getClass().getClassLoader(); File file = new File(classLoader.getResource("news.csv").getFile()); try (Reader reader = new FileReader(file)) { return objectReader. readValues(reader).readAll(); } catch (Exception e) { e.printStackTrace(); } return null; } //把数据保存到ES中 private void initES(List news, boolean clear) { if (clear) { //首次调用的时候先删除历史数据 newsESRepository.deleteAll(); } newsESRepository.saveAll(news); } //把数据保存到MySQL中 private void initMySQL(List news, boolean clear) { if (clear) { //首次调用的时候先删除历史数据 newsMySQLRepository.deleteAll(); } newsMySQLRepository.saveAll(news); } }
计算符合条件的新闻总数量
@Repository public interface NewsMySQLRepository extends JpaRepository{ //JPA:搜索分类等于cateid参数,且内容同时包含关键字keyword1和keyword2,计算符合条件的新闻总数量 long countByCateidAndContentContainingAndContentContaining(int cateid, String keyword1, String keyword2); } @Repository public interface NewsESRepository extends ElasticsearchRepository { //ES:搜索分类等于cateid参数,且内容同时包含关键字keyword1和keyword2,计算符合条件的新闻总数量 long countByCateidAndContentContainingAndContentContaining(int cateid, String keyword1, String keyword2); }
使用相同条件搜索,搜索分类是1,关键字是社会和评估,输出结果和耗时
//测试MySQL搜索,最后输出耗时和结果 @GetMapping("mysql") public void mysql(@RequestParam(value = "cateid", defaultValue = "1") int cateid, @RequestParam(value = "keyword1", defaultValue = "社会") String keyword1, @RequestParam(value = "keyword2", defaultValue = "苹果") String keyword2) { long begin = System.currentTimeMillis(); Object result = newsMySQLRepository.countByCateidAndContentContainingAndContentContaining(cateid, keyword1, keyword2); log.info("took {} ms result {}", System.currentTimeMillis() - begin, result); } //测试ES搜索,最后输出耗时和结果 @GetMapping("es") public void es(@RequestParam(value = "cateid", defaultValue = "1") int cateid, @RequestParam(value = "keyword1", defaultValue = "社会") String keyword1, @RequestParam(value = "keyword2", defaultValue = "苹果") String keyword2) { long begin = System.currentTimeMillis(); Object result = newsESRepository.countByCateidAndContentContainingAndContentContaining(cateid, keyword1, keyword2); log.info("took {} ms result {}", System.currentTimeMillis() - begin, result); }
ES 耗时仅仅 48ms,MySQL 耗时 6 秒多是 ES 的 100 倍。很遗憾,虽然新闻分类 ID 已经建了索引,但是这个索引只能起到加速过滤分类 ID 这一单一条件的作用,对于文本内容的全文搜索,B+ 树索引无能为力。
但 ES 这种以索引为核心的数据库,也不是万能的,频繁更新就是一个大问题。
MySQL 可以做到仅更新某行数据的某个字段,但 ES 里每次数据字段更新都相当于整个文档索引重建。即便 ES 提供了文档部分更新的功能,但本质上只是节省了提交文档的网络流量,以及减少了更新冲突,其内部实现还是文档删除后重新构建索引。因此,如果要在 ES 中保存一个类似计数器的值,要实现不断更新,其执行效率会非常低。
ES 是一个分布式的全文搜索数据库,所以与 MySQL 相比的优势在于文本搜索,而且因为其分布式的特性,可以使用一个大 ES 集群处理大规模数据的内容搜索。但,由于 ES 的索引是文档维度的,所以不适用于频繁更新的 OLTP 业务。一般而言,我们会把 ES 和 MySQL 结合使用,MySQL 直接承担业务系统的增删改操作,而 ES 作为辅助数据库,直接扁平化保存一份业务数据,用于复杂查询、全文搜索和统计。
Mysql搭配Mongo
使用场景:非重要数据,并且数据结构不固定的,插入量又很大的原始数据(比如爬虫原始数据)可以考虑Mongo。
1. MongoDB 是目前比较火的文档型 NoSQL。 2. 虽然 MongoDB 在 4.0 版本后具有了事务功能,但是它整体的稳定性相比 MySQL 还是有些差距。 3. MongoDB 不太适合作为重要数据的主数据库,但可以用来存储日志、爬虫等数据重要程度不那么高,但写入并发量又很大的场景。 4. 虽然 MongoDB 的写入性能较高,但复杂查询性能却相比 Elasticsearch 来说没啥优势; 5. 虽然 MongoDB 有 Sharding 功能,但是还不太稳定。 6. 在数据写入量不大、更新不频繁,并且不需要考虑事务的情况下,使用 Elasticsearch 来替换 MongoDB。
结合 NoSQL 和 MySQL 应对高并发的复合数据库架构
//每一个存储系统都有其独特的数据结构,数据结构的设计就决定了其擅长和不擅长的场景。 1. 比如,MySQL InnoDB 引擎的 B+ 树对排序和范围查询友好,频繁数据更新的代价不是太大,因此适合 OLTP(On-Line Transaction Processing)。 2. ES 的 Lucene 采用了 FST(Finite State Transducer)索引 + 倒排索引,空间效率高,适合对变动不频繁的数据做索引,实现全文搜索。存储系统本身不可能对一份数据使用多种数据结构保存,因此不可能适用于所有场景。 3. InfluxDB 是一款优秀的时序数据库。使用 InfluxDB 来做的 Metrics 打点;时序数据库的优势,在于处理指标数据的聚合,并且读写效率非常高。但不能把influxDB当作普通数据库: 3.1 InfluxDB 不支持数据更新操作,毕竟时间数据只能随着时间产生新数据,肯定无法对过去的数据做修改; 3.2 从数据结构上说,时间序列数据数据没有单一的主键标识,必须包含时间戳,数据只能和时间戳进行关联,不适合普通业务数据。 所以,时间序列数据库可以作为特定场景(比如监控、统计)的主存储,也可以和关系型数据库搭配使用,作为一个辅助数据源,保存业务系统的指标数据。
例子
1. 包含多个数据库系统的、能应对各种高并发场景的一套数据服务的系统架构
2. 包含了同步写服务、异步写服务和查询服务三部分,分别实现主数据库写入、辅助数据库写入和查询路由。
// 首先要明确的是,重要的业务主数据只能保存在 MySQL 这样的关系型数据库中,原因有三点: 1. RDBMS 经过了几十年的验证,已经非常成熟; 2. RDBMS 的用户数量众多,Bug 修复快、版本稳定、可靠性很高; 3. RDBMS 强调 ACID,能确保数据完整。 //有两种类型的查询任务可以交给 MySQL 来做,性能会比较好,这也是 MySQL 擅长的地方: 1. 按照主键 ID 的查询。直接查询聚簇索引,其性能会很高。但是单表数据量超过亿级后,性能也会衰退,而且单个数据库无法承受超大的查询并发。 因此我们可以把数据表进行 Sharding 操作,均匀拆分到多个数据库实例中保存。我们把这套数据库集群称作 Sharding 集群。 2. 按照各种条件进行范围查询,查出主键 ID。对二级索引进行查询得到主键,只需要查询一棵 B+ 树,效率同样很高。但索引的值不宜过大。
分析
如图上蓝色线所示,写入两种 MySQL 数据表和发送 MQ 消息的这三步,我们用一个同步写服务完成了。我在“异步处理”中提到,所有异步流程都需要补偿,这里的异步流程同样需要。
如图中绿色线所示,有一个异步写服务,监听 MQ 的消息,继续完成辅助数据的更新操作。这里我们选用了 ES 和 InfluxDB 这两种辅助数据库,因此整个异步写数据操作有三步:
1. MQ 消息不一定包含完整的数据,甚至可能只包含一个最新数据的主键 ID,我们需要根据 ID 从查询服务查询到完整的数据。 2. 写入 InfluxDB 的数据一般可以按时间间隔进行简单聚合,定时写入 InfluxDB。因此,这里会进行简单的客户端聚合,然后写入 InfluxDB。 3. ES 不适合在各索引之间做连接(Join)操作,适合保存扁平化的数据。比如,我们可以把订单下的用户、商户、商品列表等信息,作为内嵌对象嵌入整个订单 JSON,然后把整个扁平化的 JSON 直接存入 ES。
对于数据写入操作,我们认为操作返回的时候同步数据一定是写入成功的,但是由于各种原因,异步数据写入无法确保立即成功,会有一定延迟,比如:
1. 异步消息丢失的情况,需要补偿处理;
2. 写入 ES 的索引操作本身就会比较慢;
3. 写入 InfluxDB 的数据需要客户端定时聚合。
如图中红色线所示,对于查询服务,需要根据一定的上下文条件(比如查询一致性要求、时效性要求、搜索的条件、需要返回的数据字段、搜索时间区间等)来把请求路由到合适的数据库,并且做一些聚合处理:
1. 需要根据主键查询单条数据,可以从 MySQL Sharding 集群或 Redis 查询,如果对实时性要求不高也可以从 ES 查询。
2. 按照多个条件搜索订单的场景,可以从 MySQL 索引表查询出主键列表,然后再根据主键从 MySQL Sharding 集群或 Redis 获取数据详情。
3. 各种后台系统需要使用比较复杂的搜索条件,甚至全文搜索来查询订单数据,或是定时分析任务需要一次查询大量数据,这些场景对数据实时性要求都不高,可以到 ES 进行搜索。此外,MySQL 中的数据可以归档,我们可以在 ES 中保留更久的数据,而且查询历史数据一般并发不会很大,可以统一路由到 ES 查询。
4. 监控系统或后台报表系统需要呈现业务监控图表或表格,可以把请求路由到 InfluxDB 查询。
原文链接:https://time.geekbang.org/column/article/234930