背景:
我有一个名为 的 Actor Peer
。在我的应用程序中,Peer
表示创建它的服务器与另一台服务器之间的连接。它本质上是一个http-client
。
首次运行实例时,它PeerManager
会查询数据库以获取服务器条目列表。然后,它会Peer
为该服务器创建一个参与者。状态Peer
取决于其RegistrationStatus
字段(PENDING_OUT、PENDING_IN、REGISTERED,...)
APeer
有一个默认的接收函数,如下所示:
override def receive: Receive = {
case FirstTick =>
system.log.info("receive FirstTick")
timers.startTimerWithFixedDelay(TickKey, Tick, FiniteDuration(2, TimeUnit.SECONDS))
case Tick =>
system.log.info("receive Tick")
// process every state until all are complete
var statesLeft = false
states = states.sortWith{ (s1, s2) => s1.order > s2.order }
for (state <- states) {
if (!state.complete) {
statesLeft = true
if (state.waitForPing) {
context.become(pingService(state))
} else {
context.become(state.behavior)
}
}
}
if (!statesLeft) context.become(pingService(null))
}
这基本上允许Peer
通过状态列表进行转换,执行每个状态的行为直到状态完成。
一组状态可能看起来像这样:
case PENDING_OUT =>
Seq(
new InitState(0, peer),
new SessionHandshake(1, peer),
new WaitMessages(2, peer)
)
其中一个状态的示例:
class WaitAccept(order: Int, peer: Peer)(implicit system: ActorSystem) extends PeerState(order, false, true) {
override def behavior: Receive = {
case Tick =>
system.log.info("Waiting for accept")
if (complete) peer.getContext.unbecome()
case rA@RegistrationAccept(serverID, serverIP, _) =>
system.log.info(s"Processing RegistrationAccept: ${rA.toProtoString}")
complete = true
}
}
PeerState
超类(superclass):
abstract case class PeerState(var order: Int, var complete: Boolean, var waitForPing: Boolean) {
def behavior: Receive // <- must be overridden
}
此状态使对等方坐等“RegistrationAccept”消息,该消息在用户单击网页上的“接受”时生成。
问题:
某些州(例如,Handshake
负责交换加密密钥的州)会向为其生成 的 发送消息http
。如果接收消息的实例发现 具有所请求的,则会将该消息转发给 ,如下所示:server
Peer
Peer
serverID
PeerManager
Peer
ask
case peerRequest@PeerRequest(serverID, SessionRequest(sessionRequest), _) =>
// get the relevant peer for the requested serverID
val peerEntry = peerMap.get(serverID)
if (peerEntry.isDefined) {
// ask the peer Actor for a response to the sessionRequest, block and wait for this response, then send it back to the peerManager
val peerState = queryPeerState(peerEntry.get) // <-- the issue
if (peerState.isInstanceOf[SessionHandshake]) { // <-- the issue
try {
sender() ! Await.result((peerEntry.get ? (self, sessionRequest)).mapTo[PeerResponse.Response], timeout.duration)
} catch {
case e: TimeoutException =>
sender() ! PeerResponse.Response.StatusResponse(HydraStatusCodes.PEER_MANAGER_TIMEOUT.getStatusResponse)
}
} else {
sender() ! PeerResponse.Response.StatusResponse(HydraStatusCodes.PEER_NOT_READY.getStatusResponse)
}
} else {
sender() ! PeerResponse.Response.StatusResponse(HydraStatusCodes.PEER_NOT_READY.getStatusResponse)
}
当其他实例Peer
处于接收消息的不正确状态时,就会出现问题。如果发生这种情况,我希望用适当的 做出响应HydraStatusCode
。对我来说,最正常的方法是在转发消息之前以某种方式询问Peer
它处于什么状态,如果处于错误状态,则返回适当的 StatusCode。
但是,为了实现这一点,我必须QueryState
在有Receive
行为的每个地方实现案例。这看起来笨重而混乱,作为一名熟练的工程师,我觉得这不是正确的做法。
我希望以某种方式将其实现到PeerState
类中作为始终被调用的默认行为QueryState
,或者以某种方式允许直接PeerManager
调用对Peer
对象的引用而不是对 ActorRef 的引用(但我知道这是不可能的)
还有其他方法可以解决这个问题吗?或者混乱的方法可能是最好的?