Akka-Cluster Sharding

 Cluster Sharding

当您需要在集群中的多个节点上分发actors并希望能够使用其逻辑标识符(logical identifier)与它们进行交互时,集群分片是非常有用的,且不必关心actors在集群中的物理位置,它们的物理位置也可能随时间而改变。

例如,Actors可以是代表DDD中的Aggregate Roots。 在这里我们把这些Actors称为“entities”。 这些Actors通常具有persistent(durable)状态,但是这个特征不限于具有持久状态的Actors。

当你有许多有状态的Actors共同消耗大量的资源(例如内存)且不是在同一台机器上时,通常使用群集分片。 如果您只有少数有状态的Actors,那么在Cluster Singleton节点上运行它们可能会更容易。

在这种情况下,分片意味着具有确定标识符的Actors(所谓的entities)可以自动分布在集群中的多个节点上。 每个实体actor只在一个地方运行,消息可以发送给实体,而不需要发送者知道目标actor的位置。 这是通过由ShardRegion扩展提供的Actor发送消息来实现的,该Actor知道如何将带有实体id的消息路由到最终目的地。

如果启用了该功能,那么状态为WeaklyUp的集群成员将不会激活集群分片。

警告:

不要将Cluster Sharding与Automatic Downing一起使用,因为它允许将一个集群分成两个独立的集群,这会导致多个分片和实体的启动,每个集群都有一个! 见Downing

An Example

这是一个实体actor:

上面的actor使用Event Sourcing和AbstractPersistentActor中提供的支持来存储它的状态。 它不一定是一个需要持久化的actor,但若节点之间的实体发生故障或迁移,且它是有价值的,它必须能够恢复它的状态。

需要注意如何定义persistenceId。 actor的名字即是entity identifier(utf-8 URL-encoded)。 你可以用另一种方式来定义它,但它必须是唯一的。

在使用分片扩展时,首先(通常是在系统启动时在集群中的每个节点上)应该使用ClusterSharding.start方法注册受支持的实体类型。 ClusterSharding.start为您提供了可以传递的参数。

messageExtractor定义了特定于应用程序的方法,从传入消息中提取实体标识符(entity identifier )和分片标识符(shard identifier)。

这个例子阐明了在消息中定义实体标识符的两种不同的方式:

  • Get消息自身包含identifier。
  • EntityEnvelope持有identifier,并将发送给实体actor的实际消息包装在信封中

注意这两个消息类型是如何在上面示例的entityId和entityMessage方法中处理的。 发送给实体actor的消息是entityMessage返回的消息,如果需要,可以展开envelopes。

分片是一组统一管理的实体。 分组由上面示例的extractShardId函数定义。 对于明确的实体标识符(entity identifier),分片标识符(shard identifier)必须始终相同。 否则,实体actor可能会在不同的地方同时启动。

创建一个好的分片算法本身就是一个有趣的挑战。 尝试产生一个均匀的分布,即在每个分片中的相同数量的实体。 作为一个经验法则,分片的数量应该比计划的最大群集节点数量大10倍。 少于节点数量的分片会导致某些节点不会托管任何分片。 太多的分片会导致对分片的低效管理,例如, 重新平衡开销,以及由于协调器(coordinator)涉及每个分片的第一条消息的路由而增加的延迟。 正在运行的群集中的所有节点上的分片算法必须相同。 停止群集中的所有节点后才可以更改它。

在大多数情况下,工作正常的简单分片算法是:取实体标识符与分片数量取模的hashCode的绝对值。 方便起见,这个算法由ShardRegion.HashCodeMessageExtractor提供。

消息到实体总是通过本地的ShardRegion发送。 ClusterSharding.start返回指定实体类型的ShardRegion actor引用,也可以使用ClusterSharding.shardRegion检索。 如果ShardRegion还不知道它的位置,ShardRegion将查找实体分片的位置。 ShardRegion将委托消息到正确的节点,并将按需创建实体actor,即当一个特定实体的第一个消息被传递时。

How it works

ShardRegion Actor在群集中的每个节点上启动,或者使用特定角色标记的一组节点上启动。 ShardRegion是用两个特定于应用程序的函数创建的,用于从传入消息中提取实体标识符和分片标识符。 分片是一组统一管理的实体。 对于特定分片中的第一条消息,ShardRegion从中央协调器ShardCoordinator请求分片的位置。

ShardCoordinator决定哪个ShardRegion将拥有Shard,并通知该ShardRegion。 该Shard将确认此请求,并创建Shard supervisor作为子actor。 然后在Shard actor需要时创建个体实体。 传入的消息因此通过ShardRegion和Shard传送到目标实体。

如果shard归属是另一个ShardRegion实例,消息将被转发到该ShardRegion实例。 在解决分片传入消息的位置的同时,还可以在分片归属已知的情况下进行缓存。 到解决的分片的后续消息可以立即传递到目标目的地,而不涉及ShardCoordinator。

情景1:

  1. 传入消息M1到ShardRegion实例R1。
  2. M1被映射到 Shard S1。 R1不知道S1的位置,所以请求Coordinator C寻找S1的位置。
  3. C回答S1的归属是R1。
  4. R1创建子actor作为实体E1,并将S1的缓冲消息发送给E1。
  5. 所有到达R1的传入消息都可以由R1处理,不需要C,它根据需要创建子类实体,并将消息转发给它们。

情景2:

  1. 传入消息M2到R1。
  2. M2被映射到S2。 R1不知道S2的位置,所以向C请求S2的位置。
  3. C回复S2的归属是R2。
  4. R1向R2发送S2的缓冲消息。
  5. 到达R1的S2的所有传入消息都可以由R1处理,且不通过C。R1将消息转发给R2。
  6. R2收到S2的消息,询问C,回答S2的归属是R2,即情景1。

为了确保特定实体Actor的至多一个实例正在集群中的某个位置运行,所有节点对于分片所在的位置具有相同的视图是很重要的。 因此,分片的分配决策由作为集群单例运行的中央分片协调器(ShardCoordinator)执行。

决定分片所在位置的逻辑在可插入的分片分配策略中定义。 默认实现ShardCoordinator.LeastShardAllocationStrategy,为先前最少分片数量的ShardRegion分配新的分片。 这个策略可以被应用程序特定的实现所取代。

为了能够使用群集中新增加的成员,Coordinator会促进重新平衡分片,即将实体从一个节点迁移到另一个节点。在重新平衡过程中,Coordinator首先通知所有ShardRegion actors:一个分片的切换已经开始。这意味着ShardRegion将开始缓冲该分片的传入消息,就像该分片位置未知时的情况。在重新平衡期间,Coordinator将不能回应对分片位置重新平衡的任何请求,即本地缓存将继续进行,直到分片切换完成。负责重新平衡分片的ShardRegion将通过发送指定的handOffStopMessage(默认PoisonPill)来停止分片中的所有实体。当所有实体都被终止时,与实体的ShardRegion将向协调器确认切换已完成。当所有实体都被终止时,该实体的ShardRegion将向Coordinator确认切换已完成。之后,Coordinator将回复对分片位置的请求,为分片分配一个新的归属地,然后将ShardRegion中的缓存消息发送到新的位置。这意味着实体的状态不会被转移或迁移。如果实体的状态是重要的,那么它应该是持久的,例如使用Persistence,以便它可以在新的位置被恢复。

决定要重新平衡哪些分片的逻辑在可插入的分片分配策略中定义。默认实现ShardCoordinator.LeastShardAllocationStrategy,从以前分配的分片数量最多的ShardRegion中选取分片。然后,他们将被分配到分片数量最少的ShardRegion中,即集群中的新成员。有一个可配置的阈值:分片数量差异必须有多大时才开始重新平衡。 这个策略可以被应用程序特定的实现所取代。

ShardCoordinator中Shard的位置状态是持久的,通过Distributed Data or Persistence,使其可以在故障中保存下来。当一个崩溃或无法访问的Coordinator节点已经从群集中删除(通过down)时,一个新的ShardCoordinator单例Actor将接管并恢复状态。在这种故障期间,已知位置的分片仍然可用,而新的(未知)分片的信息将被缓存,直到新的ShardCoordinator变为可用。

只要Sender使用相同的ShardRegion Actor将消息传递给实体Actor,消息的顺序就会被保留。 只要没有达到缓冲区限制,消息就会尽可能以最多一次的传送语义传送,就像普通的消息发送一样。 通过使用AtLeastOnceDelivery in Persistence,可以达成至少交付一次语义,可靠的点到点消息。

由于到Coordinator的查询,对于新的或以前未使用的分片的消息引入了一些额外的延迟。 分片重新平衡也可能增加延迟。 在设计特定于应用程序的分片决议时应该考虑这一点,例如, 避免太细的碎片。

Distributed Data vs. Persistence Mode

Coordinator的状态和分片的实体(Remembering Entities)状态是持久的,以保证可以在故障中保存下来。可用Distributed Data or Persistence进行存储。 默认使用Distributed Data。

使用这两种模式的功能是相同的。 如果你的分片实体本身没有使用Akka Persistence,那么使用Distributed Data模式会更方便,因为你不必为了持久化而设置和操作单独的数据存储(例如Cassandra)。 除此之外,没有什么重要的原因去选择其中一种模式。

在群集中的所有节点上使用相同的模式是非常重要的,无法执行滚动升级来更改此设置。

Distributed Data Mode

此模式启用配置(默认启用):

ShardCoordinator的状态将由具有WriteMajority / ReadMajority一致性的 Distributed Data模块在集群内复制。 Coordinator的状态不是持久的,它不存储到磁盘。 当群集中的所有节点都停止时,状态将会丢失,不再被需要。

Remembering Entities的状态也是持久的,即它被存储到磁盘。 存储的实体在群集完成重启后也会启动。

每个节点角色的群集分片使用其自己的Distributed Data Replicator。通过这种方式,您可以为某些实体类型使用所有节点的子集,为其他实体类型使用另一个子集。 每个这样的Replicator都有一个包含节点角色的名称,因此在群集中的所有节点上配置的角色必须相同,即在执行滚动升级时不能更改角色。

Distributed Data的设置在akka.cluster.sharding.distributed-data部分进行配置。 无法对不同的分片实体类型做不同的设置。

Persistence Mode

该模式启用配置:

由于它在群集中运行,因此必须使用distributed journal来配置Persistence 。

Startup after minimum number of members

群集设置akka.cluster.min-nr-of-members或akka.cluster.role.<role-name>.min-nr-of-members。这将推迟分片的分配,直到至少Regions的数量已经启动并注册到coordinator。 这避免了许多分片被分配给第一个注册的Region,只有后来的才重新分配给其他节点。

请参阅How To Startup when Cluster Size Reached 中关于min-nr-of-members的更多信息。

Proxy Only Mode

ShardRegion Actor也可仅以代理模式启动,即它不会自己管理任何实体,但知道如何将消息委托给正确的位置。 ShardRegion在仅是代理模式下使用方法ClusterSharding.startProxy方法启动。

Passivation

如果实体的状态是持久的,那么可能会停止那些不被使用的实体,来减少内存消耗。 这是通过定义接收超时(context.setReceiveTimeout)的实体actor的应用程序特定实现完成的。 如果一条消息在实体停止时已经入队,则邮箱中入队的消息将被丢弃。 为了支持优雅的钝化而不丢失这样的消息,实体Actor可以发送ShardRegion.Passivate到其父分片。 Passivate中的指定包装消息将被发送回应该自行停止的实体。 在实体终止前,传入消息将由分片缓冲, 此后,这些缓存的消息被传递给实体的新化身。

Remembering Entities

通过在ClusterShardingSettings中将rememberEntities标志设置为true,在调用ClusterSharding.start并确保shardIdExtractor处理Shard.StartEntity(EntityId)时,可以使每个Shard中的实体列表持久化,这意味着ShardId必须可以从EntityId提取。

当设置rememberEntities为true时,每当一个Shard重新平衡到另一个节点上或在崩溃后恢复时,新的Shard它将重新创建之前在那个Shard中运行的所有实体。 如果要永久停止实体,必须向实体Actor的父节点发送Passivate(使钝化)消息,否则在配置中指定的实体重新启动后,该实体将自动重新启动。

使用“Distributed Data”模式时,实体的标识符将存储在“ Durable Storage of Distributed Data”中。 您可能需要更改akka.cluster.sharding.distributed-data.durable.lmdb.dir的配置,因为默认目录包含Actor System的远程端口。 如果使用动态分配的端口(0),则目录每次都会有所不同,并且以前存储的数据将不会被加载。

当rememberEntities设置为false时,Shard不会在重新平衡或从崩溃中恢复后自动重启任何实体。 只有在Shard中收到该实体的第一条消息后,才会启动实体。 如果不使用Passivate消息,实体将不会重新启动。

请注意,实体本身的状态将不会被恢复,除非它们已被持久化,例如, 使用Persistence

当启动/停止实体时,rememberEntities的性能成本相当高,以及何时重新平衡shards。 这个成本随着每个shard的实体数目而增加,并且我们目前不建议每个shard使用超过10000个实体。

Supervision

如果您需要为实体actors使用另一个supervisorStrategy,而不使用默认(重新启动)的策略,则需要创建一个为子实体actor定义supervisorStrategy的中间父级actor。

你以同样的方式开始这样一个supervisor,就好像它是实体actor一样。

请注意,当新的消息被定位到实体时,被停止的实体将再次启动。

Graceful Shutdown

您可以将ShardRegion.gracefulShutdownInstance消息发送给ShardRegion actor,以交出由该ShardRegion承载的所有shards,然后ShardRegion actor将被停止。 你可以监视ShardRegion actor,知道上述工作什么时候完成。在此期间,按照coordinator触发重新平衡相同的方式,其他regions将缓存发送给这些shards的消息。当shards停止时,coordinator将在其他地方分配这些shards。

这由Coordinated Shutdown 自动执行,因此这是群集成员正常离开过程的一部分。

Removal of Internal Cluster Sharding Data

Cluster Sharding coordinator使用Akka Persistence存储shards的位置。 重新启动整个Akka集群时,可以安全地删除这些数据。 请注意,这不是应用程序数据。

有一个实用程序akka.cluster.sharding.RemoveInternalClusterShardingData,去删除这些数据。

警告:

运行正在使用群集分片的Akka群集节点时,切勿使用此程序。 在使用此程序之前应停止所有群集节点。

如果Cluster Sharding coordinator由于数据损坏而无法启动,可能需要删除数据,如果偶然发生两个群集同时运行,可能会发生这种情况。例如,由于使用自动关闭而导致网络分区。

警告:

不要将Cluster Sharding与Automatic Downing一起使用,因为它允许将集群分成两个独立的集群,这又会导致多个shards和实体的启动,每个集群都有一个。See Downing

使用这个程序作为一个独立的Java主程序:

该程序包含在akka-cluster-sharding.jar文件中。 使用与普通应用程序相同的类路径和配置运行它是最容易的。 它可以以类似的方式从sbt或Maven运行。

指定实体类型名称(与您在ClusterSharding的start方法中使用的相同)作为程序参数。

如果你指定-2.3作为第一个程序参数,它也将尝试使用不同的persistenceId去除Akka 2.3.x中的Cluster Sharding存储的数据。

Dependencies

要使用群集分片,您必须在您的项目中添加以下依赖项。

Configuration

ClusterSharding扩展可以使用以下属性进行配置。 这些配置属性由ClusterShardingSettings在使用ActorSystem参数创建时读取。 也可以修改ClusterShardingSettings或从另一个配置部分创建它,其布局如下。 ClusterShardingSettings是ClusterSharding扩展的start方法的一个参数,即每个实体类型可以根据需要配置不同的设置。

自定义shard分配策略可以在ClusterSharding.start的可选参数中定义。 有关如何实施自定义shard分配策略的详细信息,请参阅AbstractShardAllocationStrategy的API文档。

Inspecting cluster sharding state

有两个可用的检查集群状态的请求:

ShardRegion.getShardRegionStateInstance将返回一个ShardRegion.ShardRegionState,其中包含在Region中运行的shards的标识符,以及每个shard中的哪些实体是活动的。

ShardRegion.GetClusterShardingStats将查询集群中的所有regions,并返回一个ShardRegion.ClusterShardingStats,其中包含每个region中运行的shards的标识符以及每个shard中活动的实体的数量。

所有启动的shards的类型名称可以通过ClusterSharding.getShardTypeNames获取。

这些消息的目的是测试和监控,它们不提供直接发送消息给各个实体的权限。

Rolling upgrades

在进行滚动升级时,必须特别小心,不要改变分片的以下任何方面:

  • extractShardId函数
  • ShardRegions运行的角色
  • 持久化模式

如果其中任何一个需要更改,则需要重新启动整个群集。

 

知识共享许可协议
本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可。

发表评论

电子邮件地址不会被公开。 必填项已用*标注