use std::sync::Arc; use crate::{Actor, Error, Handler}; use async_trait::async_trait; use tokio::sync::oneshot; use tokio::sync::Mutex; /// Represents a type erased message that ultimately gets stored in an [Envelope]. We need this indirection so we can abstract away the concrete message /// type when creating an actor handle, otherwise we would only be able to send a single message type to the actor. #[async_trait] pub trait ActorMessage { async fn handle(self: Box, actor: Arc>); } /// Used by [ActorHandle][super::ActorHandle]s to pack messages into [Envelope]s so we have a type erased message to send to the actor. pub trait Enveloper where A: Handler, { /// Wrap a message in an envelope with an optional response channel. fn pack(message: M, tx: Option>::Response>>) -> Envelope; } /// A type erased wrapper for messages. This wrapper essentially enables us to send any message to the actor /// so long as it implements the necessary handler. pub struct Envelope where A: Actor, { message: Box + Send>, } impl Envelope where A: Actor, { pub fn new(message: M, tx: Option>) -> Self where A: Handler + Send + 'static, A::Response: Send, M: Send + 'static, { Self { message: Box::new(EnvelopeInner { message: Box::new(message), tx, }), } } } #[async_trait] impl ActorMessage for Envelope where A: Actor + Send, { async fn handle(self: Box, actor: Arc>) { ActorMessage::handle(self.message, actor).await } } /// The inner parts of the [Envelope] containing the actual message as well as an optional /// response channel. struct EnvelopeInner { message: Box, tx: Option>, } #[async_trait] impl ActorMessage for EnvelopeInner>::Response> where A: Handler + Send + 'static, A::Response: Send, M: Send, { async fn handle(self: Box, actor: Arc>) { let result = A::handle(actor, self.message).await; if let Some(res_tx) = self.tx { let _ = res_tx.send(result.unwrap()); } } } impl Enveloper for A where A: Handler + Send + 'static, A::Response: Send, M: Send + Sync + 'static, { fn pack(message: M, tx: Option>::Response>>) -> Envelope { Envelope::new(message, tx) } } pub struct MessageRequest { pub response_rx: oneshot::Receiver, } impl std::future::Future for MessageRequest { type Output = Result; fn poll( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll { match self.as_mut().response_rx.try_recv() { Ok(response) => std::task::Poll::Ready(Ok(response)), Err(e) => match e { oneshot::error::TryRecvError::Empty => { cx.waker().wake_by_ref(); std::task::Poll::Pending } oneshot::error::TryRecvError::Closed => std::task::Poll::Ready(Err(e.into())), }, } } }