Elasticsearch 技术分析(四): 分布式工作原理


前言

通过前面章节的了解,我们已经知道 Elasticsearch 是一个实时的分布式搜索分析引擎,它能让你以一个之前从未有过的速度和规模,去探索你的数据。它被用作全文检索、结构化搜索、分析以及这三个功能的组合。 Elasticsearch 可以横向扩展至数百(甚至数千)的服务器节点,同时可以处理PB级数据。

虽然说 Elasticsearch 是分布式的,但是对于我们开发者来说并未过多的参与其中,我们只需启动对应数量的 ES 实例(即节点),并给它们分配相同的 cluster.name 让它们归属于同一个集群,创建索引的时候只需指定索引 分片数副本数 即可,其他的都交给了 ES 内部自己去实现。

这和数据库的分布式和 同源的 solr 实现分布式都是有区别的,数据库分布式(分库分表)需要我们指定路由规则和数据同步策略等,solr的分布式也需依赖 zookeeper,但是 Elasticsearch 完全屏蔽了这些。

所以我们说,Elasticsearch 天生就是分布式的,并且在设计时屏蔽了分布式的复杂性。Elasticsearch 在分布式方面几乎是透明的。我们可以使用笔记本上的单节点轻松地运行Elasticsearch 的程序,但如果你想要在 100 个节点的集群上运行程序,一切也依然顺畅。

Elasticsearch 尽可能地屏蔽了分布式系统的复杂性。这里列举了一些在后台自动执行的操作:

  • 分配文档到不同的容器 或 分片 中,文档可以储存在一个或多个节点中。
  • 按集群节点来均衡分配这些分片,从而对索引和搜索过程进行负载均衡。
  • 复制每个分片以支持数据冗余,从而防止硬件故障导致的数据丢失。
  • 将集群中任一节点的请求路由到存有相关数据的节点。
  • 集群扩容时无缝整合新节点,重新分配分片以便从离群节点恢复。

虽然我们可以不了解 Elasticsearch 分布式内部实现机制也能将Elasticsearch使用的很好,但是了解它们将会从另一个角度帮助我们更完整的学习和理解 Elasticsearch 知识。接下里我们从以下几个部分来详细讲解 Elasticsearch 分布式的内部实现机制。

集群的原理

对于我们之前的分布式经验,我们知道,提升分布式性能可以通过购买性能更强大( 垂直扩容 ,或 纵向扩容 ) 或者数量更多的服务器( 水平扩容 ,或 横向扩容 )来实现。

虽然Elasticsearch 可以获益于更强大的硬件设备,例如将存储硬盘设为SSD,但是 垂直扩容 由于硬件设备的技术和价格限制,垂直扩容 是有极限的。真正的扩容能力是来自于 水平扩容 --为集群添加更多的节点,并且将负载压力和稳定性分散到这些节点中。

对于大多数的数据库而言,通常需要对应用程序进行非常大的改动,才能利用上横向扩容的新增资源。 与之相反的是,ElastiSearch天生就是 分布式的 ,它知道如何通过管理多节点来提高扩容性和可用性。 这也意味着你的应用无需关注这个问题。那么它是如何管理的呢?

主节点

启动一个 ES 实例就是一个节点,节点加入集群是通过配置文件中设置相同的 cluste.name 而实现的。所以集群是由一个或者多个拥有相同 cluster.name 配置的节点组成, 它们共同承担数据和负载的压力。当有节点加入集群中或者从集群中移除节点时,集群将会重新平均分布所有的数据。

与其他组件集群(mysql,redis)的 master-slave模式一样,ES集群中也会选举一个节点成为主节点,主节点它的职责是维护全局集群状态,在节点加入或离开集群的时候重新分配分片。具体关于主节点选举的内容可以阅读。

所有主要的文档级别API(索引,删除,搜索)都不与主节点通信,主节点并不需要涉及到文档级别的变更和搜索等操作,所以当集群只拥有一个主节点的情况下,即使流量的增加它也不会成为瓶颈。 任何节点都可以成为主节点。如果集群中就只有一个节点,那么它同时也就是主节点。

所以如果我们使用 kibana 来作为视图操作工具的话,我们只需在kibana.yml的配置文件中,将elasticsearch.url: "http://localhost:9200"设置为主节点就可以了,通过主节点 ES 会自动关联查询所有节点和分片以及副本的信息。所以 kibana 一般都和主节点在同一台服务器上。

作为用户,我们可以将请求发送到 集群中的任何节点 ,包括主节点。 每个节点都知道任意文档所处的位置,并且能够将我们的请求直接转发到存储我们所需文档的节点。 无论我们将请求发送到哪个节点,它都能负责从各个包含我们所需文档的节点收集回数据,并将最终结果返回給客户端。 Elasticsearch 对这一切的管理都是透明的。

发现机制

ES 是如何实现只需要配置相同的cluste.name就将节点加入同一集群的呢?答案是发现机制(discovery module)。

发现机制 负责发现集群中的节点,以及选择主节点。每次集群状态发生更改时,集群中的其他节点都会知道状态(具体方式取决于使用的是哪一种发现机制)。

ES目前主要推荐的自动发现机制,有如下几种:

  1. Azure classic discovery 插件方式,多播
  2. EC2 discovery 插件方式,多播
  3. Google Compute Engine (GCE) discovery 插件方式,多播
  4. Zen discovery 默认实现,多播/单播

这里额外介绍下单播,多播,广播的定义和区别,方便我们更好的理解发现机制。

单播,多播,广播的区别:

  • 单播(unicast):网络节点之间的通信就好像是人们之间的对话一样。如果一个人对另外一个人说话,那么用网络技术的术语来描述就是“单播”,此时信息的接收和传递只在两个节点之间进行。例如,你在收发电子邮件、浏览网页时,必须与邮件服务器、Web服务器建立连接,此时使用的就是单播数据传输方式。

  • 多播(multicast):“多播”也可以称为“组播”,多播”可以理解为一个人向多个人(但不是在场的所有人)说话,这样能够提高通话的效率。因为如果采用单播方式,逐个节点传输,有多少个目标节点,就会有多少次传送过程,这种方式显然效率极低,是不可取的。如果你要通知特定的某些人同一件事情,但是又不想让其他人知道,使用电话一个一个地通知就非常麻烦。多播方式,既可以实现一次传送所有目标节点的数据,也可以达到只对特定对象传送数据的目的。多播在网络技术的应用并不是很多,网上视频会议、网上视频点播特别适合采用多播方式。

  • 广播(broadcast):可以理解为一个人通过广播喇叭对在场的全体说话,这样做的好处是通话效率高,信息一下子就可以传递到全体,广播是不区分目标、全部发送的方式,一次可以传送完数据,但是不区分特定数据接收对象。

上面列举的发现机制中, Zen Discovery 是 ES 默认内建发现机制。它提供单播多播的发现方式,并且可以扩展为通过插件支持云环境和其他形式的发现。所以我们接下来重点介绍下 Zen Discovery是如何在Elasticsearch中使用的。

集群是由相同cluster.name的节点组成的。当你在同一台机器上启动了第二个节点时,只要它和第一个节点有同样的 cluster.name 配置,它就会自动发现集群并加入到其中。但是在不同机器上启动节点的时候,为了加入到同一集群,你需要配置一个可连接到的单播主机列表。

单播主机列表通过discovery.zen.ping.unicast.hosts来配置。这个配置在 elasticsearch.yml 文件中:

discovery.zen.ping.unicast.hosts: ["host1", "host2:port"]

具体的值是一个主机数组或逗号分隔的字符串。每个值应采用host:porthost的形式(其中port默认为设置transport.profiles.default.port,如果未设置则返回transport.tcp.port)。请注意,必须将IPv6主机置于括号内。此设置的默认值为127.0.0.1,[:: 1]

Elasticsearch 官方推荐我们使用 单播 代替 组播。而且 Elasticsearch 默认被配置为使用 单播 发现,以防止节点无意中加入集群。只有在同一台机器上运行的节点才会自动组成集群。

虽然 组播 仍然作为插件提供, 但它应该永远不被使用在生产环境了,否则你得到的结果就是一个节点意外的加入到了你的生产环境,仅仅是因为他们收到了一个错误的 组播 信号。对于 组播 本身并没有错,组播会导致一些愚蠢的问题,并且导致集群变的脆弱(比如,一个网络工程师正在捣鼓网络,而没有告诉你,你会发现所有的节点突然发现不了对方了)。

使用单播,你可以为 Elasticsearch 提供一些它应该去尝试连接的节点列表。当一个节点联系到单播列表中的成员时,它就会得到整个集群所有节点的状态,然后它会联系 master 节点,并加入集群。

这意味着你的单播列表不需要包含你的集群中的所有节点,它只是需要足够的节点,当一个新节点联系上其中一个并且说上话就可以了。如果你使用 master 候选节点作为单播列表,你只要列出三个就可以了。

关于 Elasticsearch 节点发现的详细信息,请参阅 。

应对故障

对于分布式系统的熟悉,我们应该知道分布式系统设计的目的是为了提高可用性和容错性。在单点系统中的问题在 ES 中同样也会存在。

单节点的问题

如果我们启动了一个单独的节点,里面不包含任何的数据和索引,那我们的集群就是一个包含空内容节点的集群,简称空集群

当集群中只有一个节点在运行时,意味着会有一个单点故障问题——没有冗余。单点的最大问题是系统容错性不高,当单节点所在服务器发生故障后,整个 ES 服务就会停止工作。

让我们在包含一个空节点的集群内创建名为 user 的索引。索引在默认情况下会被分配5个主分片和每个主分片的1个副本, 但是为了演示目的,我们将分配3个主分片和一份副本(每个主分片拥有一个副本分片):

PUT /user
{
   "settings" : {
      "number_of_shards" : 3,
      "number_of_replicas" : 1
   }
}

我们的集群现在是下图所示情况,所有3个主分片都被分配在 Node 1 。

此时检查集群的健康状况GET /_cluster/health,我们会发现:

{
  "cluster_name": "elasticsearch",
  "status": "yellow",                     # 1
  "timed_out": false,
  "number_of_nodes": 1,
  "number_of_data_nodes": 1,
  "active_primary_shards": 3,
  "active_shards": 3,
  "relocating_shards": 0,
  "initializing_shards": 0,
  "unassigned_shards": 3,                 # 2
  "delayed_unassigned_shards": 0,
  "number_of_pending_tasks": 0,
  "number_of_in_flight_fetch": 0,
  "task_max_waiting_in_queue_millis": 0,
  "active_shards_percent_as_number": 50
}

#1 集群的状态值是 yellow
#2 未分配的副本数是 3

集群的健康状况为 yellow 则表示全部 主 分片都正常运行(集群可以正常服务所有请求),但是 副本 分片没有全部处在正常状态。 实际上,所有3个副本分片都是 unassigned —— 它们都没有被分配到任何节点。 在同一个节点上既保存原始数据又保存副本是没有意义的,因为一旦失去了那个节点,我们也将丢失该节点上的所有副本数据。

主分片和对应的副本分片是不会在同一个节点上的。所以副本分片数的最大值是 n -1(其中n 为节点数)。

虽然当前我们的集群是正常运行的,但是在硬件故障时有丢失数据的风险。

水平扩容

既然单点是有问题的,那我们只需再启动几个节点并加入到当前集群中,这样就可以提高可用性并实现故障转移,这种方式即 水平扩容

还以上面的 user 为例,我们新增一个节点后,新的集群如上图所示。

当第二个节点加入到集群后,3个 副本分片 将会分配到这个节点上——每个主分片对应一个副本分片。 这意味着当集群内任何一个节点出现问题时,我们的数据都完好无损。

所有新近被索引的文档都将会保存在主分片上,然后被并行的复制到对应的副本分片上。这就保证了我们既可以从主分片又可以从副本分片上获得文档。

cluster-health 现在展示的状态为 green ,这表示所有6个分片(包括3个主分片和3个副本分片)都在正常运行。我们的集群现在不仅仅是正常运行的,并且还处于 始终可用 的状态。

动态扩容

产品不断升级,业务不断增长,用户数也会不断新增,也许我们之前设计的索引容量(3个主分片和3个副本分片)已经不够使用了,用户数据的不断增加,每个主分片和副本分片的数据不断累积,达到一定程度之后也会降低搜索性能。那么怎样为我们的正在增长中的应用程序按需扩容呢?

我们将之前的两个节点继续水平扩容,再增加一个节点,此时集群状态如下图所示:

为了分散负载,ES 会对分片进行重新分配。Node 1 和 Node 2 上各有一个分片被迁移到了新的 Node 3 节点,现在每个节点上都拥有2个分片,而不是之前的3个。 这表示每个节点的硬件资源(CPU, RAM, I/O)将被更少的分片所共享,每个分片的性能将会得到提升。

分片是一个功能完整的搜索引擎,它拥有使用一个节点上的所有资源的能力。 我们这个拥有6个分片(3个主分片和3个副本分片)的索引可以最大扩容到6个节点,每个节点上存在一个分片,并且每个分片拥有所在节点的全部资源。

但是如果我们想要扩容超过6个节点怎么办呢?

主分片的数目在索引创建时 就已经确定了下来。实际上,这个数目定义了这个索引能够 存储 的最大数据量。(实际大小取决于你的数据、硬件和使用场景。) 但是,读操作——搜索和返回数据——可以同时被主分片 或 副本分片所处理,所以当你拥有越多的副本分片时,也将拥有越高的吞吐量。

索引的主分片数这个值在索引创建后就不能修改了(默认值是 5),但是每个主分片的副本数(默认值是 1 )对于活动的索引库,这个值可以随时修改的。至于索引的主分片数为什么在索引创建之后就不能修改了,我们在下面的文档存储原理章节中说明。

既然在运行中的集群上是可以动态调整副本分片数目的 ,那么我们可以按需伸缩集群。让我们把副本数从默认的 1 增加到 2 :

PUT /user/_settings
{
   "number_of_replicas" : 2
}

如下图 所示, user 索引现在拥有9个分片:3个主分片和6个副本分片。 这意味着我们可以将集群扩容到9个节点,每个节点上一个分片。相比原来3个节点时,集群搜索性能可以提升 3 倍。

当然,如果只是在相同节点数目的集群上增加更多的副本分片并不能提高性能,因为每个分片从节点上获得的资源会变少。 你需要增加更多的硬件资源来提升吞吐量。

但是更多的副本分片数提高了数据冗余量:按照上面的节点配置,我们可以在失去2个节点的情况下不丢失任何数据。

节点故障

如果我们某一个节点发生故障,节点服务器宕机或网络不可用,这里假设主节点1发生故障,这时集群的状态为:

此时我们检查一下集群的健康状况,可以发现状态为 red ,表示不是所有主分片都在正常工作。

我们关闭的节点是一个主节点。而集群必须拥有一个主节点来保证正常工作,所以发生的第一件事情就是选举一个新的主节点: Node 2 。

在我们关闭 Node 1 的同时也失去了主分片 1 和 2 ,并且在缺失主分片的时候索引也不能正常工作。

幸运的是,在其它节点上存在着这两个主分片的完整副本, 所以新的主节点立即将这些分片在 Node 2 和 Node 3 上对应的副本分片提升为主分片, 此时集群的状态将会为 yellow 。 这个提升主分片的过程是瞬间发生的,如同按下一个开关一般。

为什么我们集群状态是 yellow 而不是 green 呢? 虽然我们拥有所有的三个主分片,但是同时设置了每个主分片需要对应2份副本分片,而此时只存在一份副本分片。 所以集群不能为 green 的状态,不过我们不必过于担心:如果我们同样关闭了 Node 2 ,我们的程序 依然 可以保持在不丢任何数据的情况下运行,因为 Node 3 为每一个分片都保留着一份副本。

如果我们重新启动 Node 1 ,集群可以将缺失的副本分片再次进行分配,那么集群的状态又将恢复到原来的正常状态。 如果 Node 1 依然拥有着之前的分片,它将尝试去重用它们,同时仅从主分片复制发生了修改的数据文件。

处理并发冲突

分布式系统中最麻烦的就是并发冲突,既然 ES 也是分布式的那它是如何处理并发冲突的呢?

通常当我们使用 索引 API 更新文档时 ,可以一次性读取原始文档,做我们的修改,然后重新索引 整个文档 。 最近的索引请求将获胜:无论最后哪一个文档被索引,都将被唯一存储在 Elasticsearch 中。如果其他人同时更改这个文档,他们的更改将丢失。

很多时候这是没有问题的。也许我们的主数据存储是一个关系型数据库,我们只是将数据复制到 Elasticsearch 中并使其可被搜索。也许两个人同时更改相同的文档的几率很小。或者对于我们的业务来说偶尔丢失更改并不是很严重的问题。

但有时丢失了一个变更就是非常严重的 。试想我们使用 Elasticsearch 存储我们网上商城商品库存的数量, 每次我们卖一个商品的时候,我们在 Elasticsearch 中将库存数量减少。

有一天,管理层决定做一次促销。突然地,我们一秒要卖好几个商品。 假设有两个 web 程序并行运行,每一个都同时处理所有商品的销售,那么会造成库存结果不一致的情况。

变更越频繁,读数据和更新数据的间隙越长,也就越可能丢失变更。

乐观并发控制 - 版本号

在数据库领域中,有两种方法通常被用来确保并发更新时变更不会丢失:

  • 悲观锁
    这种方法被关系型数据库广泛使用,它假定有变更冲突可能发生,因此阻塞访问资源以防止冲突。 一个典型的例子是读取一行数据之前先将其锁住,确保只有放置锁的线程能够对这行数据进行修改。

  • 乐观锁
    Elasticsearch 中使用的这种方法假定冲突是不可能发生的,并且不会阻塞正在尝试的操作。然而,如果源数据在读写当中被修改,更新将会失败。应用程序接下来将决定该如何解决冲突。例如,可以重试更新、使用新的数据、或者将相关情况报告给用户。

Elasticsearch 中对文档的 index , GET 和 delete 请求时,我们指出每个文档都有一个 _version (版本)号,当文档被修改时版本号递增。

Elasticsearch 使用这个 _version 号来确保变更以正确顺序得到执行。如果旧版本的文档在新版本之后到达,它可以被简单的忽略。

我们可以利用 _version 号来确保应用中相互冲突的变更不会导致数据丢失。我们通过指定想要修改文档的 version 号来达到这个目的。 如果该版本不是当前版本号,我们的请求将会失败。

所有文档的更新或删除 API,都可以接受 version 参数,这允许你在代码中使用乐观的并发控制,这是一种明智的做法。

乐观并发控制 - 外部系统

版本号(version)只是其中一个实现方式,我们还可以借助外部系统使用版本控制,一个常见的设置是使用其它数据库作为主要的数据存储,使用 Elasticsearch 做数据检索, 这意味着主数据库的所有更改发生时都需要被复制到 Elasticsearch ,如果多个进程负责这一数据同步,你可能遇到类似于之前描述的并发问题。

如果你的主数据库已经有了版本号,或一个能作为版本号的字段值比如 timestamp,那么你就可以在 Elasticsearch 中通过增加 version_type=external到查询字符串的方式重用这些相同的版本号,版本号必须是大于零的整数, 且小于 9.2E+18(一个 Java 中 long 类型的正值)。

外部版本号的处理方式和我们之前讨论的内部版本号的处理方式有些不同, Elasticsearch 不是检查当前 _version 和请求中指定的版本号是否相同,而是检查当前_version 是否小于指定的版本号。如果请求成功,外部的版本号作为文档的新 _version 进行存储。

外部版本号不仅在索引和删除请求是可以指定,而且在创建新文档时也可以指定。

例如,要创建一个新的具有外部版本号 5 的博客文章,我们可以按以下方法进行:

PUT /website/blog/2?version=5&version_type=external
{
  "title": "My first external blog entry",
  "text":  "Starting to get the hang of this..."
}

在响应中,我们能看到当前的 _version 版本号是 5 :

{
  "_index":   "website",
  "_type":    "blog",
  "_id":      "2",
  "_version": 5,
  "created":  true
}

现在我们更新这个文档,指定一个新的 version 号是 10 :

PUT /website/blog/2?version=10&version_type=external
{
  "title": "My first external blog entry",
  "text":  "This is a piece of cake..."
}

请求成功并将当前 _version 设为 10 :

{
  "_index":   "website",
  "_type":    "blog",
  "_id":      "2",
  "_version": 10,
  "created":  false
}

如果你要重新运行此请求时,它将会失败,并返回像我们之前看到的同样的冲突错误,因为指定的外部版本号不大于 Elasticsearch 的当前版本号。

文档存储原理

创建索引的时候我们只需要指定分片数和副本数,ES 就会自动将文档数据分发到对应的分片和副本中。那么文件究竟是如何分布到集群的,又是如何从集群中获取的呢? Elasticsearch 虽然隐藏这些底层细节,让我们好专注在业务开发中,但是我们深入探索这些核心的技术细节,这能帮助你更好地理解数据如何被存储到这个分布式系统中。

文档是如何路由到分片中的

当索引一个文档的时候,文档会被存储到一个主分片中。 Elasticsearch 如何知道一个文档应该存放到哪个分片中呢?当我们创建文档时,它如何决定这个文档应当被存储在分片 1 还是分片 2 中呢?

首先这肯定不会是随机的,否则将来要获取文档的时候我们就不知道从何处寻找了。实际上,这个过程是根据下面这个公式决定的:

shard = hash(routing) % number_of_primary_shards

routing 是一个可变值,默认是文档的 _id ,也可以设置成一个自定义的值。 routing 通过 hash 函数生成一个数字,然后这个数字再除以 number_of_primary_shards (主分片的数量)后得到 余数 。这个分布在 0 到 number_of_primary_shards-1 之间的余数,就是我们所寻求的文档所在分片的位置。

这就解释了为什么我们要在创建索引的时候就确定好主分片的数量 并且永远不会改变这个数量:因为如果数量变化了,那么所有之前路由的值都会无效,文档也再也找不到了。

你可能觉得由于 Elasticsearch 主分片数量是固定的会使索引难以进行扩容,所以在创建索引的时候合理的预分配分片数是很重要的。

所有的文档 API( get 、 index 、 delete 、 bulk 、 update 以及 mget )都接受一个叫做 routing 的路由参数 ,通过这个参数我们可以自定义文档到分片的映射。一个自定义的路由参数可以用来确保所有相关的文档——例如所有属于同一个用户的文档——都被存储到同一个分片中。更多路由相关的内容可以访问这里。

主分片和副本分片如何交互

上面介绍了一个文档是如何路由到一个分片中的,那么主分片是如何和副本分片交互的呢?

假设有个集群由三个节点组成, 它包含一个叫 user 的索引,有两个主分片,每个主分片有两个副本分片。相同分片的副本不会放在同一节点,所以我们的集群看起来如下图所示:

我们可以发送请求到集群中的任一节点。每个节点都有能力处理任意请求。每个节点都知道集群中任一文档位置,所以可以直接将请求转发到需要的节点上。 在下面的例子中,将所有的请求发送到 Node 1 ,我们将其称为 协调节点(coordinating node)

当发送请求的时候,为了扩展负载,更好的做法是轮询集群中所有的节点。

对文档的新建、索引和删除请求都是写操作,必须在主分片上面完成之后才能被复制到相关的副本分片。

以下是在主副分片和任何副本分片上面 成功新建,索引和删除文档所需要的步骤顺序:

  1. 客户端向 Node 1 发送新建、索引或者删除请求。
  2. 节点使用文档的 _id 确定文档属于分片 0 。请求会被转发到 Node 3,因为分片 0 的主分片目前被分配在 Node 3 上。
  3. Node 3 在主分片上面执行请求。如果成功了,它将请求并行转发到 Node1 和 Node2 的副本分片上。一旦所有的副本分片都报告成功,Node 3 将向协调节点报告成功,协调节点向客户端报告成功。

在客户端收到成功响应时,文档变更已经在主分片和所有副本分片执行完成,变更是安全的。

在处理读取请求时,协调结点在每次请求的时候都会通过轮询所有的副本分片来达到负载均衡。

在文档被检索时,已经被索引的文档可能已经存在于主分片上但是还没有复制到副本分片。在这种情况下,副本分片可能会报告文档不存在,但是主分片可能成功返回文档。一旦索引请求成功返回给用户,文档在主分片和副本分片都是可用的。