use std::task::Poll; use crate::{Actor, Error, Handler}; use tokio::sync::oneshot; /// Represents a message that can be sent to an actor. The response type is what the actor must return in its handler implementation. pub trait Message { type Response; } /// Represents a type erased message that ultimately gets stored in an [Envelope]. We need this indirection so we can abstract away the concrete message /// type when creating an actor handle, otherwise we would only be able to send a single message type to the actor. pub trait ActorMessage { fn handle(&mut self, actor: &mut A, cx: &mut std::task::Context<'_>) -> Poll<()>; } /// Used by [ActorHandle][super::ActorHandle]s to pack [Message]s into [Envelope]s so we have a type erased message to send to the actor. pub trait Enveloper { /// Wrap a message in an envelope with an optional response channel. fn pack(message: M, tx: Option>) -> Envelope; } /// A type erased wrapper for messages. This wrapper essentially enables us to send any message to the actor /// so long as it implements the necessary handler. pub struct Envelope { message: Box + Send>, } impl Envelope where A: Actor, { pub fn new(message: M, tx: Option>) -> Self where A: Handler, M: Message + Send + 'static, M::Response: Send, { Self { message: Box::new(EnvelopeInner { message: Some(message), tx, }), } } } /// The inner parts of the [Envelope] containing the actual message as well as an optional /// response channel. struct EnvelopeInner { message: Option, tx: Option>, } impl ActorMessage for Envelope where A: Actor, { fn handle(&mut self, actor: &mut A, cx: &mut std::task::Context<'_>) -> Poll<()> { self.message.handle(actor, cx) } } impl ActorMessage for EnvelopeInner where M: Message, A: Actor + Handler, { fn handle(&mut self, actor: &mut A, cx: &mut std::task::Context<'_>) -> Poll<()> { let message = self.message.take().expect("Message already processed"); match actor.handle(message).as_mut().poll(cx) { Poll::Ready(result) => { if let Some(res_tx) = self.tx.take() { let _ = res_tx.send(result.unwrap()); } Poll::Ready(()) } Poll::Pending => Poll::Pending, } } } impl Enveloper for A where A: Actor + Handler, M: Message + Send + 'static, M::Response: Send, { fn pack(message: M, tx: Option>) -> Envelope { Envelope::new(message, tx) } } pub struct MessageRequest { pub response_rx: oneshot::Receiver, } impl std::future::Future for MessageRequest { type Output = Result; fn poll( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll { println!("Awaiting response"); match self.as_mut().response_rx.try_recv() { Ok(msg) => { println!("Future ready"); std::task::Poll::Ready(Ok(msg)) } Err(e) => { println!("Future pending {e}"); match e { oneshot::error::TryRecvError::Empty => { cx.waker().wake_by_ref(); std::task::Poll::Pending } oneshot::error::TryRecvError::Closed => std::task::Poll::Ready(Err(e.into())), } } } } }