message.rs 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. use std::sync::Arc;
  2. use crate::{Actor, Error, Handler};
  3. use async_trait::async_trait;
  4. use tokio::sync::oneshot;
  5. use tokio::sync::Mutex;
  6. /// Represents a type erased message that ultimately gets stored in an [Envelope]. We need this indirection so we can abstract away the concrete message
  7. /// type when creating an actor handle, otherwise we would only be able to send a single message type to the actor.
  8. #[async_trait]
  9. pub trait ActorMessage<A: Actor> {
  10. async fn handle(self: Box<Self>, actor: Arc<Mutex<A>>);
  11. }
  12. /// Used by [ActorHandle][super::ActorHandle]s to pack messages into [Envelope]s so we have a type erased message to send to the actor.
  13. pub trait Enveloper<A, M>
  14. where
  15. A: Handler<M>,
  16. {
  17. /// Wrap a message in an envelope with an optional response channel.
  18. fn pack(message: M, tx: Option<oneshot::Sender<<A as Handler<M>>::Response>>) -> Envelope<A>;
  19. }
  20. /// A type erased wrapper for messages. This wrapper essentially enables us to send any message to the actor
  21. /// so long as it implements the necessary handler.
  22. pub struct Envelope<A>
  23. where
  24. A: Actor,
  25. {
  26. message: Box<dyn ActorMessage<A> + Send>,
  27. }
  28. impl<A> Envelope<A>
  29. where
  30. A: Actor,
  31. {
  32. pub fn new<M>(message: M, tx: Option<oneshot::Sender<A::Response>>) -> Self
  33. where
  34. A: Handler<M> + Send + 'static,
  35. A::Response: Send,
  36. M: Send + 'static,
  37. {
  38. Self {
  39. message: Box::new(EnvelopeInner {
  40. message: Box::new(message),
  41. tx,
  42. }),
  43. }
  44. }
  45. }
  46. #[async_trait]
  47. impl<A> ActorMessage<A> for Envelope<A>
  48. where
  49. A: Actor + Send,
  50. {
  51. async fn handle(self: Box<Self>, actor: Arc<Mutex<A>>) {
  52. ActorMessage::handle(self.message, actor).await
  53. }
  54. }
  55. /// The inner parts of the [Envelope] containing the actual message as well as an optional
  56. /// response channel.
  57. struct EnvelopeInner<M, R> {
  58. message: Box<M>,
  59. tx: Option<oneshot::Sender<R>>,
  60. }
  61. #[async_trait]
  62. impl<A, M> ActorMessage<A> for EnvelopeInner<M, <A as Handler<M>>::Response>
  63. where
  64. A: Handler<M> + Send + 'static,
  65. A::Response: Send,
  66. M: Send,
  67. {
  68. async fn handle(self: Box<Self>, actor: Arc<Mutex<A>>) {
  69. let result = A::handle(actor, self.message).await;
  70. if let Some(res_tx) = self.tx {
  71. let _ = res_tx.send(result.unwrap());
  72. }
  73. }
  74. }
  75. impl<A, M> Enveloper<A, M> for A
  76. where
  77. A: Handler<M> + Send + 'static,
  78. A::Response: Send,
  79. M: Send + Sync + 'static,
  80. {
  81. fn pack(message: M, tx: Option<oneshot::Sender<<A as Handler<M>>::Response>>) -> Envelope<A> {
  82. Envelope::new(message, tx)
  83. }
  84. }
  85. pub struct MessageRequest<R> {
  86. pub response_rx: oneshot::Receiver<R>,
  87. }
  88. impl<R> std::future::Future for MessageRequest<R> {
  89. type Output = Result<R, Error>;
  90. fn poll(
  91. mut self: std::pin::Pin<&mut Self>,
  92. cx: &mut std::task::Context<'_>,
  93. ) -> std::task::Poll<Self::Output> {
  94. match self.as_mut().response_rx.try_recv() {
  95. Ok(response) => std::task::Poll::Ready(Ok(response)),
  96. Err(e) => match e {
  97. oneshot::error::TryRecvError::Empty => {
  98. cx.waker().wake_by_ref();
  99. std::task::Poll::Pending
  100. }
  101. oneshot::error::TryRecvError::Closed => std::task::Poll::Ready(Err(e.into())),
  102. },
  103. }
  104. }
  105. }