大家好,又见面了,我是全栈君,今天给大家准备了Idea注册码。
Akka FSM 源代码分析
有限状态机本身不是啥新奇东西,在GoF的设计模式一书中就有状态模式, 也给出了实现的建议。各种语言对状态机模式都有非常多种实现的方式。我自己以前用C++和java实现过,也以前把 apache mina 源代码中的一个状态机实现抠出来单独使用。
但Akka的状态机是我见过的最简洁美丽实现。充分利用了Scala的很多先进的语言机制让代码更加简洁清晰。利用了Akka Actor实现并发。用户基本不用考虑线程安全的问题。所有实现才短短760行代码(含凝视)。
Akka FSM 有一个简单的官方文档。这里有中文翻译。只是这文档也说得云山雾罩的。看文档不如直接看代码,因为Akka FSM的代码非常短。也花不了多少时间。本文将会对Akka FSM 的代码做详尽的分析,我们一起花点功夫,保证你能对FSM的实现了如指掌。这样才干使用起来得心应手。
本文基于Akka 2.2.3源代码。建议你阅读时也看着Akka FSM实现的源代码,这里有传送门。
官方文档中将状态机抽象成例如以下的一种表达:
State(S) x Event(E) -> Actions (A), State(S’)
这些关系的意思能够这样理解:
假设我们当前处于状态S。发生了E事件, 我们应运行操作A,然后将状态转换为S’。
定义一个状态机。我们就要决定这个状态机中有多少种状态,每一个状态可以响应什么样的事件,收到事件后会做如何的处理(比方说改动数据),是否要转换到下一个状态。
我们有两个地方来表达我们的业务逻辑:
-
在某个状态下。收到事件 E 时,要做什么业务动作 Acton
-
当状态从 A 转换 到 B 时。要做什么事情
这也是我们观察一个状态机的两个视角。
那么这里就引申出实现一个状态机的第一个关键问题Q1:怎样表达状态、事件、动作、转换之间的关系。
这个问题事实上包括两个子问题:
-
Q1.1:怎样设计设计一个数据结构。可以通用的表达状态机的内部结构(状态、事件、动作、转换)
-
Q1.2:怎样针对一个特定的状态机初始化这个数据结构
问题Q1完毕了状态机的静态描写叙述,也就是说,我们可以描写叙述出这个状态机应该是个什么样子,什么时候该做什么事情。应该怎样在各个状态之间跳转。
可是这个状态机还运转不起来。
我们还必须解决第二个关键问题 Q2:状态机怎样动态的运转。问题Q2也包括下面子问题:
-
Q2.1 怎样处理某个状态下收到的事件,并转换到下一个状态
-
Q2.2 怎样在状态转换过程中实现我们的业务逻辑
-
Q2.3 状态的超时怎样处理
-
Q2.4 某些状态转换是定时发生的,怎样引入并处理定时器
-
Q2.5 状态机怎样终止(善终是个大问题。谁也不希望自己的程序不得好死)
-
Q2.6 怎样处理业务数据(状态机不是自己转着玩的,终于目的还是要处理数据)
-
Q2.7 线程安全性。怎样防止多线程情况下状态机出问题
-
Q2.8 更好的外部交互性。主要是指状态变化时。外部系统怎样可以得到通知
-
Q2.9 怎样调试(决定程序猿幸福程度的关键因素)
Akka FSM 对这些问题都有非常好的解决方式,回答问题也是我们剖析 Akka FSM 源代码的线索。全部的这些问题的处理都浓缩在一个源文件,短短的700多行代码中。所以这部分代码也是学习Scala Akka 编程的经典范例。
Text
为了行文方便,我们须要约定一些术语:
- 状态名:
-
业务状态名称,是我们用来表达业务逻辑的状态名称
- 状态实例:
-
程序中实例化的一个状态对象,能够从中取得状态名
- FSM Trait:
-
指 akka.actor.FSM 特质。这个特质也是你的程序中使用FSM应该混入的特质。
- FSM Object:
-
FSM Trait 的伴生对象。这里定义了用于FSM Trait 的一些数据结构
- 事件处理函数:
-
某个状态下,收到事件之后的处理动作,其输入參数是事件,输出下一个状态实例。
- 转换处理函数:
-
从当前状态名A转换到下一个状态名B时须要运行的操作,其输入參数(A,B)的元组。无返回值。
首先我们看一下 FSM Trait 的定义:
trait FSM[S, D] extends Actor with Listeners with ActorLogging { // S 是状态名,我们定义的每个状态名都应该是这个S类型或者其子类型 import FSM._ type State = FSM.State[S, D] //状态实例的类型定义 type StateFunction = scala.PartialFunction[Event, State] //事件处理函数类型定义 type Timeout = Option[FiniteDuration] type TransitionHandler = PartialFunction[(S, S), Unit] //转换处理函数类型定义
我们先弄清楚前面说到的两个术语,状态名 和 状态实例。
FSM[S, D] 中的模板參数 S 是状态名的类型。我们定义的每个状态名都应该是这个S类型或者其子类型。 FSM.State[S, D]是定义在 FSM Object中的数据类型(一个case class),它是真正在状态机实现中代表一个状态的实例。
251行对给这个类型定义了一个简称。
252行和254行定义了事件处理函数和转换处理函数的类型。注意:这两个函数的类型是 PartialFunction ,这意味着我们能用orElse方法把多个同类型函数串接起来。
为了解决这个问题Q1.1,我们须要设计数据结构来保存状态名、状态名相应的事件处理函数,还有转换处理函数。看代码:
private val stateFunctions = mutable.Map[S, StateFunction]() private val stateTimeouts = mutable.Map[S, Timeout]()
private var transitionEvent: List[TransitionHandler] = Nil
private var currentState: State = _
这是几个私有的数据定义。stateFunctions 是以状态名为key 。事件处理函数为值创建的可改变的Map,初始值为空Map。这一个数据结构就保存了全部的状态名和事件处理函数。
每一个状态名相应一个可选的超时时间,保存在 stateTimeouts 映射中。
transitionEvent 保存了全部的转换处理函数。
currentState 是仅仅状态机当前状态的状态实例。初始值为 null 。
这几个重要的作用结构保存了状态机的静态结构,问题Q1.1解决。
Akka FSM 提供了一些内部DSL语法来协助装配状态机,也就是用来把你的状态机结构用前面的数据结构定义出来。
使用DSL机制有个特点。你要是明确DSL详细是如何干活的。使用起来就简单方便,心里非常踏实。要是不明确就会感觉有点神奇主义,用了也会不踏实。感觉没着没落的。
所以我们要把它弄明确。事实上也非常easy。
final def when(stateName: S, stateTimeout: FiniteDuration = null)(stateFunction: StateFunction): Unit =
register(stateName, stateFunction, Option(stateTimeout))
private def register(name: S, function: StateFunction, timeout: Timeout): Unit = { if (stateFunctions contains name) { stateFunctions(name) = stateFunctions(name) orElse function stateTimeouts(name) = timeout orElse stateTimeouts(name) } else { stateFunctions(name) = function stateTimeouts(name) = timeout } }
when 语法主要工作就是将状态函数和超时函数放到 stateFunctions 和stateTimeouts 两个Map 中。(肉戏在register函数)
register的动作是保存状态函数和超时。假设状态函数已经存在,会把给定的函数和已有的函数串接起来(PartialFunction 的 orElse 方法。超时设置会用新的取代旧的,新的没有指定就用旧的。
when 语法有两个參数列表,第一个列表两个參数,状态名和可选超时,第二个列表是该状态的事件处理函数(记住。是一个 PartialFunction)。
利用Scala 的语言机制。我们能够这样来写when函数。
class Buncher extends Actor with FSM[State, Data] { startWith(Idle, Uninitialized) when(Idle) { case Event(SetTarget(ref), Uninitialized) => stay using Todo(ref, Vector.empty) } // transition elided ... when(Active, stateTimeout = 1 second) { case Event(Flush | StateTimeout, t: Todo) => goto(Idle) using t.copy(queue = Vector.empty) } // unhandled elided ... initialize() }
注意 when 函数的调用方法。这里使用了两个scala 的语法机制 :
-
当參数列表中仅仅有一个參数时。能够使用花括号取代圆括号。“其目的是让客户程序猿能写出包括在花括号内的函数字面量”(Programing in Scala 9.4)
-
模式匹配匿名函数(Programing in Scala 15.7,Scala 语言规范8,5)
因为when 的第二和參数列表仅仅有一个參数。所以能够写成用花括号包围,花括号里的代码是一个匿名函数。编译器依据类型判断知道期待的类型是偏函数 StateFunction 类型,编译器会自己主动生成正确的函数对象,详情见Scala 语言规范8,5,一定要读一读。
when 语法构造了状态机的结构,可是我们还不知道初始状态是哪个,变量 currentState 的值还为 null 。
final def startWith(stateName: S, stateData: D, timeout: Timeout = None): Unit =
currentState = FSM.State(stateName, stateData, timeout)
startWith干的活很easy,就是给 currentState 赋值。这里的要害事实上还是要准确理解状态名和状态实例的差别。Akka FSM 是用状态名来定义状态机的结构。用状态实例来跟踪状态机的运转。当前状态是状态机的运转中的概念,所以 currentState 的类型是状态实例(FSM.State[S,D),状态机运转的起点一般而言当然是状态机的初始状态。startWith 就是把把当前状态设置为状态机的初始状态。
有时候 startWith 指定的状态未必是状态机的初始状态,比方当我们的状态机运转到某一个中间状态时,被持久化到了数据库中。但从数据库中恢复时。startWith 应该将当前状态恢复到持久化之前的状态。
以下我们来看看状态机是怎样运转起来的。
FSM Trait 混入了 Akka Actor 。全部FSM的事件处理、超时、定时的处理、状态的转换都是通过Actor 的消息来实现的,这就攻克了Q2.7有关线程安全的问题。
正常状态事件的处理。这里要解决两个问题。Q2.1和Q2.2。也就是事件处理函数怎样被调用,状态转换函数怎样被调用。
FSM Trate 的 receive 方法的最后一个case语句处理正常的事件消息(上面的case处理定时器,listen等)。
case value => { if (timeoutFuture.isDefined) { timeoutFuture.get.cancel() timeoutFuture = None } generation += 1 processMsg(value, sender) }
value 是收到的Akka 原始消息。
private def processMsg(value: Any, source: AnyRef): Unit = { val event = Event(value, currentState.stateData) processEvent(event, source) }
processMsg 将Akka原始消息封装成 状态事件 Event 调用 processEvent .
private[akka] def processEvent(event: Event, source: AnyRef): Unit = { val stateFunc = stateFunctions(currentState.stateName) val nextState = if (stateFunc isDefinedAt event) { stateFunc(event) } else { // handleEventDefault ensures that this is always defined handleEvent(event) } applyState(nextState) }
processEvent 函数很关键,先找到当前状态的事件处理函数(593行)。假设找到的事件处理函数可以处理收到的事件(594行)。那就调用事件处理函数(595行),不能处理该事件则做缺省处理(598行。后文再细说)。
事件处理函数会返回下一个状态实例 nextStat,然后调用 applyState 函数试图转换到下一个状态(600行)。
private[akka] def applyState(nextState: State): Unit = { nextState.stopReason match { case None => makeTransition(nextState) case _ => nextState.replies.reverse foreach { r => sender ! r } terminate(nextState) context.stop(self) } }
applyState 先检查目标状态是否是终止状态(604行),假设不是则调用makeTransition 函数运行状态转换(605行)。否则启动终止流程(608行)。
private[akka] def makeTransition(nextState: State): Unit = { if (!stateFunctions.contains(nextState.stateName)) { terminate(stay withStopReason Failure("Next state %s does not exist".format(nextState.stateName))) } else { nextState.replies.reverse foreach { r => sender ! r } if (currentState.stateName != nextState.stateName) { this.nextState = nextState handleTransition(currentState.stateName, nextState.stateName) gossip(Transition(self, currentState.stateName, nextState.stateName)) this.nextState = null } currentState = nextState ... } }
makeTransition 先检查目标状态是否存在(614行),不存在则启动状态机终止流程。然后用当前状态和目标状态的元组做參数调用 handleTransition 方法。
private def handleTransition(prev: S, next: S) { val tuple = (prev, next) for (te ← transitionEvent) { if (te.isDefinedAt(tuple)) te(tuple) } }
handleTransition 从全部保存的状态处理函数中找到匹配的进行调用(538行)。从这里能够看出来,假设我们把onTransition 函数调用了两次。每次给的參数是一样的(都是从状态A转换到状态B)。那么这两次注冊的转换处理函数都会被调用。顺序就是依照注冊的顺序。
makeTransition 函数会在状态转换函数被运行完毕之后才将当前状态设置为目标状态(624行)。
processEvent 中假设发现当前状态的事件处理函数不能处理某个消息会调用 handleEvent(event) 函数。
private val handleEventDefault: StateFunction = { case Event(value, stateData) ⇒ log.warning("unhandled event " + value + " in state " + stateName) stay } private var handleEvent: StateFunction = handleEventDefault
handleEvent 是一个可变变量初始值被设置为 handleEventDefault ,缺省实现是输出一行日志。然后保留 在当前状态。我们能够用 whenUnhandled (DSL)来改变这个缺省行为。
final def whenUnhandled(stateFunction: StateFunction): Unit =
handleEvent = stateFunction orElse handleEventDefault
whenUnhandled 提供一个未知事件处理函数,并把这个函数置于handleEventDefault 之前被调用,假设我们提供的未知事件处理函数被调用了。缺省的函数就不会被调用。
状态事件处理函数要求我们返回下一个状态实例,这里是提出状态转换要求的唯一地方。
你能提出的要求非常easy。要么留下(stay)。要么走人(goto)。stay 事实上是goto 到当前状态。
/** * Produce transition to other state. Return this from a state function in * order to effect the transition. * * @param nextStateName state designator for the next state * @return state transition descriptor */ final def goto(nextStateName: S): State = FSM.State(nextStateName, currentState.stateData) /** * Produce "empty" transition descriptor. Return this from a state function * when no state change is to be effected. * * @return descriptor for staying in current state */ final def stay(): State = goto(currentState.stateName) // cannot directly use currentState because of the timeout field
Akka FSM 的状态超时指的是“进入某个状态后一定时间内没有收到不论什么事件”。
-
在FSM的伴生对象中定义了
case object StateTimeout
这是超时事件,事件处理函数中能够匹配并处理它。
-
FSM Trait 中定义了一个不变量
val StateTimeout = FSM.StateTimeout
应该是方便使用。
-
FSM Trait 中定义了一个类型简称
type Timeout = Option[FiniteDuration]
-
FSM 伴生对象中定义了一个私有类型
private case class TimeoutMarker(generation: Long)
-
FSM Trait 中保存有每一个状态名相应的可选超时
private val stateTimeouts = mutable.Map[S, Timeout]()
-
FSM Trait 中定义了一个可变量保存用于取消超时调度的句柄
private var timeoutFuture: Option[Cancellable] = None
-
FSM.State[S, D] 有一个成员 timeout: Option[FiniteDuration] ,这是一个超时的可选项,缺省值是 None。
when dsl 可以可选的指定每一个状态名的超时时间。通过register函数将超时时间保存在 stateTimeouts 中 FSM.State 中的 timeout 同意在实例化 FSM.State 时指定超时时间
状态迁跃完毕之后(makeTransition 函数中。 currentState = nextState 被运行之后。625行),会马上检查新的 FSM.State 中的超时定义。假设没有就检查 状态名关联的超时定义(保存在 stateTimeouts Map 中),假设该状态有超时定义,则调度一个定时器用于超时的触发。
if (timeout.isDefined) { val t = timeout.get if (t.isFinite && t.length >= 0) { import context.dispatcher timeoutFuture = Some(context.system.scheduler.scheduleOnce(t, self, TimeoutMarker(generation))) } }
Actor 会在超时时间到达后给自己发送消息 TimeoutMarker(generation) ,receive函数检測到 TimeoutMarker 消息,超时被触发,调用 processMsg(StateTimeout, “state timeout”) 状态的事件处理函数中应该有对 StateTimeout 事件的处理。
FSM Actor 收到不论什么一个有效的事件消息时,会在消息处理之前把超时调度取消(receive 函数末尾)。
在FSM的伴生对象中定义了定时器类型。自带了调度和取消的函数。
private[akka] case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(context: ActorContext) extends NoSerializationVerificationNeeded { private var ref: Option[Cancellable] = _ //用于取消定时调度的句柄 private val scheduler = context.system.scheduler private implicit val executionContext = context.dispatcher def schedule(actor: ActorRef, timeout: FiniteDuration): Unit = ref = Some( if (repeat) scheduler.schedule(timeout, timeout, actor, this) else scheduler.scheduleOnce(timeout, actor, this)) def cancel(): Unit = if (ref.isDefined) { ref.get.cancel() ref = None } }
定时时间调度使用的是 ActorSystem 中的调度器。每一个定时器都有自己的名称 (name)。schedule 函数用来运行调度。并保存用于取消的句柄。
99~100行调用ActorSystem 的调度器设定给Actor 发送消息的时间,注意,发送的消息就是 Timer 对象自己。
FSM Trait 中定义了一个Map用来依据名字保存定时器。
private val timers = mutable.Map[String, Timer]()
用于外部调用的 API 是setTimer 函数 和 cancelTimer函数。
setTimer 构造Timer实例。保存在 timers Map中 通过Timer的schedule方法设定让Actor在一定时间后收到消息,消息内容就是Timer对象自己。消息的接收者就是当前FSM的Actor。
final def setTimer(name: String, msg: Any, timeout: FiniteDuration, repeat: Boolean): Unit = { if (debugEvent) log.debug("setting " + (if (repeat) "repeating " else "") + "timer '" + name + "'/" + timeout + ": " + msg) if (timers contains name) { timers(name).cancel } val timer = Timer(name, msg, repeat, timerGen.next)(context) timer.schedule(self, timeout) timers(name) = timer } /** * Cancel named timer, ensuring that the message is not subsequently delivered (no race). * @param name of the timer to cancel */ final def cancelTimer(name: String): Unit = { if (debugEvent) log.debug("canceling timer '" + name + "'") if (timers contains name) { timers(name).cancel timers -= name } }
FSM Trait 的 receive函数收到 Timer 消息后,取出Timer中的业务事件,调用状态机的事件处理函数。定时器处理会取消装的超时机制。
case t @ Timer(name, msg, repeat, gen) => if ((timers contains name) && (timers(name).generation == gen)) { if (timeoutFuture.isDefined) { timeoutFuture.get.cancel() timeoutFuture = None } generation += 1 if (!repeat) { timers -= name } processMsg(msg, t) }
私有的 terminate 函数会启动终止流程。搜索这个函数出现的地方就能够发现状态机进入终止流程的原因:
-
状态转换的目标状态不存在(makeTransition 函数中。615行)
-
状态的事件处理函数返回的下一个状态中包括了终止状态机的原因 (applyState 函数中,606行)
-
Actor 终止 (postStop 函数中,649行)
来看看 terminate 函数:
private def terminate(nextState: State): Unit = { if (currentState.stopReason.isEmpty) { //当前状态必须没要求终止 val reason = nextState.stopReason.get //下一个状态必须有终止原因 logTermination(reason) for (timer ← timers.values) timer.cancel() //取消全部定时器 timers.clear() currentState = nextState val stopEvent = StopEvent(reason, currentState.stateName, currentState.stateData) if (terminateEvent.isDefinedAt(stopEvent)) terminateEvent(stopEvent) //再给用户一个机会来处理状态机终止事件 } }
终止流程会取消全部的定时器(657行)。同一时候构造一个 stopEvent 事件。让用户可以对终止进行最后的处理。
要处理 stopEvent。你须要使用onTermination DSL 语法注冊一个终止事件处理函数。
final def onTermination(terminationHandler: PartialFunction[StopEvent, Unit]): Unit =
terminateEvent = terminationHandler
private var terminateEvent: PartialFunction[StopEvent, Unit] = NullFunction
从使用者角度看。有两个地方能够对终止流程进行控制:
-
通过在状态的事件处理函数返回的下一个状态中增加终止原因,启动终止流程
-
通过 onTermination 函数(DSL 机制)定制终止事件的处理完毕必要的清理工作
假设是因为 Actor 终止(出现异常或正常退出Actor),postStop函数会以当前状态启动终止流程(650行)。
FSM.State 的using 方法是唯一能够改变状态数据的函数 在状态的事件处理函数中,收到事件后,通过currentState 获取当前的状态数据,构造下一个状态的 FSM.State实例, 假设须要改变数据就把数据放进去。
不须要改变就 goto 或 stay。
在转换处理函数中 (onTransition) ,能够通过 stateData / nextStateData 活动当前状态和下一个状态的数据。记住, 转换出炉函数运行完毕之后。状态才会被切换到下一个状态。
可是在转换处理函数中是无法改动状态数据的。当然你能够通过 nextStateData 得到数据,然后改动当中的 var 变量。可是最好别这么干,你的状态数据假设都定义成 val 的,那就完美了。
状态实例 FSM.Stat[S,D]中有一个 replies 參数,类型是 List[Any],也提供了一个方法操作设个数据:
def replying(replyValue: Any): State[S, D] = {
copy(replies = replyValue :: replies)
}
replying 会返回一个新的状态实例将提供的參数replyValue增加到新实例的replies列表中。
在状态转换时。在前一个状态的事件处理函数被调用之后,转换处理函数被调用之前。会给消息的发送者(sender )回复下一个状态实例的replies中保存的全部内容(makeTransition。617行)。终止流程启动前也会做这件事情(applyState。607行)。
由上面的分析可见。reply机制能够用来在状态转换前向消息的发送者回复不论什么信息。指定信息的方式是在状态的事件处理函数返回下一个状态时将要回复的信息保存在下一个状态的 replies 列表中。
Note | |
---|---|
有非常多函数返回的类型都是状态实例 FSM.Stat[S,D]。如FSM.Stat[S,D]的全部内部方法,goto ,stop。stay 等DSL函数等等。 这样就能够使用一种级联的语法,如 goto Processing using msg forMax 5seconds replying WillDo 。 |
Akka FSM 提供了两套消息来供外部订阅状态的转换。
-
FSM 混入了 akka.routing.Listeners 。这是Akka routing 的监听机制,支持使用 Listen / Deafen 消息进行订阅和取消
-
FSM Object 中定义消息 SubscribeTransitionCallBack / UnsubscribeTransitionCallBack 进行订阅和取消
订阅和取消都是操作 akka.routing.Listeners 中的 listeners: Set[ActorRef] 成员。状态转换时。在转换处理函数运行完毕之后会给监听者发送转换通知(makeTransition,621行)。
gossip(Transition(self, currentState.stateName, nextState.stateName))
protected def gossip(msg: Any)(implicit sender: ActorRef = Actor.noSender): Unit = { val i = listeners.iterator while (i.hasNext) i.next ! msg }
版权声明:本文博客原创文章,博客,未经同意,不得转载。
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/117699.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...