Akka-Networking Cluster Usage

Akka集群使用

有关Akka集群概念的介绍,请参阅集群规范 Akka-Networking Cluster Specification

1.为你的项目支持集群做准备

Akka集群是一个单独的jar包。 确保您的项目具有以下依赖关系:

最新的jar包版本请参考Akka官方网站。

2.一个简单的集群示例

以下配置可以启用群集扩展。 Actor加入集群、订阅集群成员事件并记录它们。

application.conf配置如下所示:

要在Akka项目中启用群集功能,至少应该添加 Remoting 设置。通常还应将akka.cluster.seed-nodes添加到application.conf文件中。

注意:

如果您在Docker容器中运行Akka,或者某些其他原因的节点具有单独的内部和外部IP地址,您必须根据 Akka behind NAT or in a Docker container中配置远程处理。

种子节点被配置为集群初始的、自动的、加入的联系点。

请注意,如果要在不同机器上启动节点,则需要在application.conf中指定机器的ip-addresses或host names,而不是127.0.0.1。

使用群集扩展的Actor如下所示:

Actor将自己注册为某些集群事件的订阅者。它在订阅开始时接收与集群当前状态相对应的事件,然后接收集群中发生的更改的事件。

自己运行这个例子的最简单的方法是参照本教程下载运行Java的 Akka Cluster Sample with Java 。它包含有关如何运行SimpleClusterApp的说明。 该示例的源代码可以在Akka Samples Repository中找到。

3.加入种子节点(Joining to Seed Nodes)

您可以决定是否应手动或自动添加到集群,以配置初始联系点,即种子节点。joining过程之后,种子节点不是一种特殊的存在,它们以与其他节点完全相同的方式参与到集群中。

当一个新节点启动时,它向所有种子节点发送一个消息,然后将join命令发送到首先应答的种子节点。如果没有任何一个种子节点回复(可能种子节点尚未启动),则会重试此过程,直到成功或关闭。

您可以在配置文件configuration (application.conf)中定义种子节点:

当使用以下语法启动JVM时,也可以将其定义为Java系统属性:

这种配置通常由外部工具动态创建,参见:

种子节点可以按任意顺序启动,不需要运行所有的种子节点,但是种子节点的配置列表中第一个节点必须在集群初始启动时启动,否则,其他种子节点将不会被初始化,并且其他节点也不能加入群集。特定第一个种子节点的原因是,避免从空集群开始时形成分离的集群。同时启动所有配置的种子节点是最快的(顺序不重要),否则它会占用seed-node-timeout(通过配置),直到节点可以加入。

一旦启动了两个以上的种子节点,便可以关闭第一个种子节点。如果第一个种子节点重新启动,首先它将尝试加入现有群集中的其他种子节点。请注意,如果同时停止所有种子节点并使用相同的种子节点配置列表重新启动它们,则它们将加入自己,并形成的新群集,而不是通过现有群集的剩余节点加入。这可能是不希望看到的,应该通过列出几个节点作为冗余的种子节点来避免,并且不要同时停止所有节点。

若联系种子节点失败,在配置参数seed-node-timeout中定时的时间之后,会自动重试。在配置的retry-unsuccessful-join-after之后,会自动重新尝试加入特定种子节点。重试意味着它试图联系所有的种子节点,然后加入首先回应的节点。如果种子节点列表中的第一个节点在配置的seed-node-timeout时间内,无法加入配置列表中的任何其他种子节点,他将加入自己。

joining到给定的种子节点,将默认无限期的重试,直到join成功。如果配置了失败超时,该进程可以中止。当中止时,它将运行Coordinated Shutdown(协调关闭),默认情况下会终止ActorSystem,CoordinatedShutdown也可以配置为退出JVM。 如果种子节点是动态组合的,并且在尝试失败之后尝试使用新的种子节点重新开始,那么定义这个失败超时是非常有用的。

如果您未配置种子节点或使用joinSeedNode,则需要手动加入群集,这可以使用JMX or HTTP.执行。

您可以加入群集中的任何节点, 它不必配置为种子节点。 请注意,您只能加入到集群中现有的成员,这意味着为了启动引导,某个节点必须加入自己,然后其他节点可以加入它以组成一个集群。

Actor系统只能加入一次集群,其他尝试将被忽略。 成功加入后,必须重新启动才能加入另一个群集或再次加入同一个群集。 重启后可以使用相同的主机名和端口号,当它成为集群中已有成员的新化身,尝试加入时,已经存在的成员将被从集群中删除,然后才被允许加入。

注意:

ActorSystem的名称必须与群集的所有成员相同。 这个名字是在你启动ActorSystem的时候给出的。

4.Downing

当故障检测器认为成员unreachable时,Leader不能执行其职责,例如将新加入成员的状态更改为“Up”。unreachable节点必须先重新可达(become reachable),否则unreachable成员的状态必须更改为“Down”。可以自动或手动将节点状态更改为“Down”,默认必须使用JMX或 HTTP手动完成。

也可以使用Cluster.get(system).down(address)以编码方式执行。

Split Brain Resolver 是downing问题的预包装解决方案,是 Lightbend Reactive Platform的一部分。如果您不使用RP,您应该仔细阅读Split Brain Resolver的文档documentation,并确保您使用的解决方案能够处理在此描述的问题。

Auto-downing (DO NOT USE)

有一个自动停机功能,不应该在生产中使用。 为了测试目的,可以通过配置启用它:

这意味着在配置的不可达时间之后,集群Leader会将unreachable节点状态自动更改为down。

这是从群集成员中删除不可达到的节点的不可靠的方法。它适用于崩溃和短暂的瞬态网络分区,但不适用于长网络分区。网络分区的两端都会将另一端视为无法访问,在一段时间后,将其从集群成员中删除。由于这是在两端都会发生的,结果是创建了两个分离的断开的集群。这也可能是由于长时间的GC暂停或系统过载导致。

警告:

我们建议不要在生产中使用Akka Cluster的自动关闭功能。如果您使用 Cluster SingletonCluster Sharding,尤其是与Akka Persistence一起,正确的行为至关重要。对于使用群集分片的Akka持久化,在网络分区的情况下可能导致数据损坏。

5.Leaving

从集群中删除成员有两种方法。

您可以只停止Actor系统(或JVM进程)。 如上所述,在自动或手动downing后,它将被检测为unreachable并被移除。

应通过更为优雅的退出方式,告诉集群某个节点应该离开。 这可以使用 JMX 或 HTTP来执行。 它也可以以编码方式执行:

请注意,此命令可以发布到集群中的任何成员,而不一定是要离开的成员。

当集群节点将自身视为Exiting时,Coordinated Shutdown 将自动运行,即从另一个节点上离开将会在leaving节点上触发关机过程。当使用Akka群集时,会自动添加优雅地leaving集群的任务(包括Cluster Singletons和Cluster Sharding的正常关闭)。即,如果关机过程还没有进行,则节点的关闭过程也将触发正常leaving 。

通常这是自动处理的,若在此过程中出现网络故障,仍可能需要将节点的状态设置为Down,以完成删除。

6.WeaklyUp Members

如果一个节点unreachable,那么gossip convergence是不可能的,因此Leader任何的行动也是不可能的。 但是,在这种情况下,我们仍然可能需要新节点加入群集。

如果不能达成convergence,Joining的成员将被提升为WeaklyUp,并成为集群的一部分。 一旦达成gossip convergence,Leader将把WeaklyUp成员转移到Up。

此功能默认启用,但可以使用配置选项禁用:

您可以订阅WeaklyUp成员事件以使用处于此状态的成员,但您应该知道网络分区另一端的成员并不知道新成员的存在。 例如,您不应将WeaklyUp成员计入集群的法定人数。

7.Subscribe to Cluster Events

您可以使用Cluster.get(system).subscribe订阅群集成员发生变化的通知。

集群完整状态的快照:akka.cluster.ClusterEvent.CurrentClusterState,作为第一条消息发送给订阅者,然后发送增量更新的事件消息。

请注意,如果在初始加入过程完成之前启动预订,则可能会收到一个空的CurrentClusterState,其中不包含任何集群成员。 这是可预期的行为。 当节点在集群中被接受时,您将收到该节点的MemberUp以及其他节点。

如果您发现处理CurrentClusterState不方便,则可以使用ClusterEvent.initialStateAsEvents()作为参数进行订阅。 这意味着,接收到的第一条消息不是CurrentClusterState,而是与当前状态相对应的事件,以模仿在过去发生事件时是否正在监听事件。 请注意,这些初始事件仅与当前状态相对应,并不是实际发生在集群中的所有变化的完整历史记录。

跟踪集群成员生命周期的事件是:

  • ClusterEvent.MemberJoined – 新成员正在加入群集,其状态已更改为Joining。
  • ClusterEvent.MemberUp – 新成员已加入群集,其状态已更改为Up。
  • ClusterEvent.MemberExited – 成员正在离开群集,其状态已更改为“Exiting ”,请注意,当此事件在另一个节点上发布时,该节点可能已经关闭。
  • ClusterEvent.MemberRemoved – 成员完全从集群中删除。
  • ClusterEvent.UnreachableMember – 成员被视为是unreachable,由至少一个其他节点的故障检测器检测到。
  • ClusterEvent.ReachableMember – 成员被视为unreachable后,当所有之前检测到的节点都将其检测为可再次访问,该unreachable节点可被视为可再次访问(reachable)。

有更多类型的更改事件,请参阅扩展类akka.cluster.ClusterEvent.ClusterDomainEvent的API文档以获取有关事件的详细信息。

有时候只需要通过Cluster.get(system).state()获得完整的成员状态就可以了,而不是通过订阅集群事件。请注意,此状态不一定与发布到集群订阅的事件同步。

Worker Dial-in Example

我们来看一个例子,命名为后端的工作人员如何检测并注册到新的称为前端的主节点。

示例应用程序提供了一个转换文本的服务。 当某些文本发送到其中一个前端服务时,它将被委托给一个执行转换作业的后端工作人员,并将结果发送回原始客户端。 新的后端节点以及新的前端节点可以在集群中动态添加或移除。

Messages:

执行转换作业的后端worker:

请注意,TransformationBackend actor订阅集群事件以检测新的、潜在的前端节点,并向其发送注册消息,以便让他们知道可以使用后端worker。

前端Actor,接收用户作业并委托给其中一个注册的后端worker:

请注意,TransformationFrontend actor监视注册的后端,能够从可用的后端工作者列表中移除它。 Death watch使用群集故障检测器来检测群集中的节点,除了监视角色的正常终止外,还可以检测网络故障和JVM崩溃。 当无法访问的集群节点被关闭并被移除时,死亡监视器向正在监视的Actor发送Terminated消息。

运行Worker Dial-in Example示例的最简单方法是下载准备运行Akka Cluster Sample with Java 和阅读本教程。 它包含有关如何运行“Worker Dial-in Example”示例的说明。 此示例的源代码可以在 Akka Samples Repository中找到。

8.Node Roles

并不是所有节点都需要执行相同的功能:可能有一个运行Web前端的子集,一个运行数据访问层,一个运行数据运算。 部署actors时(例如通过集群感知路由器:cluster-aware routers)可以考虑节点角色来实现这种责任分配。

一个节点的角色在名为akka.cluster.roles的配置属性中定义,通常在启动脚本中定义为系统属性或环境变量。

节点的角色是可以订阅的MemberEvent中成员信息的一部分。

9.How To Startup when Cluster Size Reached

一个常见的用例是在集群初始化后、成员已经加入、集群达到一定的大小时,启动actors。

使用配置选项,您可以在Leader将“Joining”成员的成员状态更改为“Up”之前定义所需的成员数量。

以类似的方式,您可以在Leader将“Joining”成员的成员状态更改为“Up”之前,定义特定角色成员的所需数量。

您可以在registerOnMemberUp回调中启动actors,当当前成员状态更改为“Up”时,即在集群中至少具有定义数量的成员时,将调用此参数。

除了启动actors以外,这个回调还可以用于的其他事情。

10.How To Cleanup when Member is Removed

您可以在registerOnMemberRemoved回调中进行一些清理,当当前成员状态更改为“Removed”或群集已关闭时,将会调用该回调。

另一种方法是将任务注册到 Coordinated Shutdown

注意:

在已经关闭的集群上注册一个OnMemberRemoved回调,回调将在调用者线程上立即被调用,否则将在当前成员状态改变为“Removed”时被调用。 您可能想要在群集启动后设置一些清理处理,但群集在设置时可能已经关闭,并且依赖的种族并不健康。

11.Cluster Singleton

集群单例以确保您恰好有一个特定类型的actor在群集中的某处运行,对于一些使用情况来说,这很方便,有时也是强制性的。

这可以通过订阅成员事件来实现,但有几个极端状况需要考虑。这个特定的用例详细内容可以访问 Cluster Singleton

12.Cluster Sharding

将actors分布在集群中的多个节点上,并支持使用其逻辑标识符与actors进行交互,且不必关心他们在集群中的物理位置。

详细内容请查看Cluster Sharding

13.Distributed Publish Subscribe

在群集中的actors之间发布 – 订阅消息,以及使用actors逻辑路径发送的点对点消息。即,发送者不必知道目标actor正在哪个节点上运行。

详细内容请查看 Distributed Publish Subscribe in Cluster

14.Cluster Client

从不属于集群一部分的Actor系统到在集群中运行的Actor之间的通信。 客户端不必知道目标actor在哪个节点上运行。

详细内容请查看 Cluster Client

15.Distributed Data

当您需要在Akka集群中的节点之间共享数据时,Akka分布式数据非常有用。 数据通过提供像key-value存储的API这样的actor来访问的。

详细内容请查看 Distributed Data

16.Failure Detector

在一个集群中,每个节点被其他几个节点进行(默认最多5个)监视,当这些节点中的任何一个检测到节点unreachable时,该信息将通过gossip传播到集群的其余部分。 换句话说,只有一个节点需要标记unreachable的节点,以使群集的其余部分标记该节点unreachable。

故障检测器还将检测节点是否再次reachable。 当监控unreachable节点的所有节点将其检测为可再次reachable时,在gossip传播之后,将其视为reachable。

如果系统消息无法传递到某一节点,该节点会被隔离,然后无法恢复。 如果存在太多未确认的系统消息(例如,watch,Terminated,remote actor deployment, failures of actors supervised by remote parent),则可能发生这种情况。 然后需要将节点移动到down或removed状态,被隔离节点的Actor系统必须重新启动才能重新加入集群。

群集中的节点通过发送心跳来相互监视,以检测节点是否与群集的其余部分不可达。 心跳到达时间由The Phi Accrual Failure Detector的实现来解释。

怀疑的失败水平是由phi值给出的。 phi故障检测器的基本思想是通过动态调整以反映当前网络状况的等级来表示phi的值。

phi值的计算如下:

其中F是历史心跳到达间隔时间估计的均值和标准差的正态分布的累积分布函数。

在配置(configuration)中,您可以调整akka.cluster.failure-detector.threshold以定义何时将phi值视为失败。

低阈值可能会产生许多误报,但确保在真实崩溃事件中快速检测。 相反,高阈值产生较少的错误,但需要更多的时间来检测实际的崩溃。 默认阈值为8,适用于大多数情况。 但是,在云环境(例如Amazon EC2)中,可能会将此值增加到12,以解决此类平台上偶尔出现的网络问题。

下图说明了自上次心跳以来phi值如何随着时间的增加而增加。

phi1.png

 

 

 

 

 

 

 

 

Phi是根据历史到达时间的平均值和标准差计算出来的。 上图是200 ms标准偏差的一个例子。 如果心跳以较小偏差到达,则曲线变得更陡,即可以更快地确定失败。 下图曲线是标准偏差为100毫秒的样子。

phi2.png

为了能够承受突如其来的异常情况,例如GC暂停和暂时的网络故障,故障检测器配置有余量:akka.cluster.failure-detector.acceptable-heartbeat-pause。 您可能需要根据您的环境调整此配置。 下图曲线是将“acceptable-heartbeat-pause”配置为3秒时的样子。

phi3.png

Death watch使用Cluster Failure Detector来检测群集中的节点,即除了监视actor的正常终止外,还可以检测网络故障和JVM崩溃。 当无法访问的集群节点被关闭并被移除时,Death watch向正在监视的actor生成一个Terminated消息。

如果在系统低负载的情况下遇到可疑的误报,则应按Cluster Dispatcher中所述为集群的actors定义一个单独的dispatcher。

17.Cluster Metrics

群集的成员节点可以收集系统健康度量标准,并借助群集度量将其发布到其他群集节点和系统事件总线上的已注册用户。

18.How to Test

目前使用sbt-multi-jvm插件进行测试,仅记录在Scala文档中。 有关详细信息,请转到此页面的Scala版本

19.Management

HTTP

集群的信息和管理可以使用HTTP API。 请参阅Akka Management的文档。

JMX

群集的信息和管理可用作具有root name – “akka.Cluster”的JMX MBean。 JMX信息可以用普通的JMX控制台(如JConsole或JVisualVM)来显示。

通过JMX可以:

  • 看看哪些成员是集群的一部分
  • 查看该节点的状态
  • 看到每个成员的角色
  • 将此节点加入群集中的另一个节点
  • 将群集中的任何节点标记为down
  • 告诉集群中的任何节点离开

成员节点由他们的地址标识,格式为akka.:@@ :。

20.Configuration

有几个集群的配置属性。 建议参考reference configuration了解更多信息。

Cluster Info Logging

您可以使用配置属性来消除info级别的群集事件日志记录:

Cluster Dispatcher

集群扩展是通过actor来实现的,可能有必要为这些actor创建一个隔仓,以避免其他actor的干扰。 特别是用于故障检测的心跳监视actor,如果没有定期运行的机会,可能会产生误报。 为此,您可以为群集actor定义一个单独的dispatcher:

注意:

通常,不需要为群集配置单独的调度程序。 default-dispatcher应该足以执行群集任务,即不应该更改akka.cluster.use-dispatcher。 如果在使用default-dispatcher时遇到与集群有关的问题,这通常表示您的程序正在阻塞或在default-dispatcher上运行CPU密集型actors/tasks。 使用专门的dispatchers来执行这些actors/tasks,而不是在default-dispatcher上运行它们,因为这可能会导致系统内部任务瘫痪。 相关配置属性:akka.cluster.use-dispatcher = akka.cluster.cluster-dispatcher。 对应的默认值:akka.cluster.use-dispatcher =。

知识共享许可协议
本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可。

发表评论

电子邮件地址不会被公开。 必填项已用*标注