message.rs 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. use std::task::Poll;
  2. use crate::{Actor, Error, Handler};
  3. use tokio::sync::oneshot;
  4. /// Represents a message that can be sent to an actor. The response type is what the actor must return in its handler implementation.
  5. pub trait Message {
  6. type Response;
  7. }
  8. /// Represents a type erased message that ultimately gets stored in an [Envelope]. We need this indirection so we can abstract away the concrete message
  9. /// type when creating an actor handle, otherwise we would only be able to send a single message type to the actor.
  10. pub trait ActorMessage<A: Actor> {
  11. fn handle(&mut self, actor: &mut A, cx: &mut std::task::Context<'_>) -> Poll<()>;
  12. }
  13. /// Used by [ActorHandle][super::ActorHandle]s to pack [Message]s into [Envelope]s so we have a type erased message to send to the actor.
  14. pub trait Enveloper<A: Actor, M: Message> {
  15. /// Wrap a message in an envelope with an optional response channel.
  16. fn pack(message: M, tx: Option<oneshot::Sender<M::Response>>) -> Envelope<A>;
  17. }
  18. /// A type erased wrapper for messages. This wrapper essentially enables us to send any message to the actor
  19. /// so long as it implements the necessary handler.
  20. pub struct Envelope<A: Actor> {
  21. message: Box<dyn ActorMessage<A> + Send>,
  22. }
  23. impl<A> Envelope<A>
  24. where
  25. A: Actor,
  26. {
  27. pub fn new<M>(message: M, tx: Option<oneshot::Sender<M::Response>>) -> Self
  28. where
  29. A: Handler<M>,
  30. M: Message + Send + 'static,
  31. M::Response: Send,
  32. {
  33. Self {
  34. message: Box::new(EnvelopeInner {
  35. message: Some(message),
  36. tx,
  37. }),
  38. }
  39. }
  40. }
  41. /// The inner parts of the [Envelope] containing the actual message as well as an optional
  42. /// response channel.
  43. struct EnvelopeInner<M: Message> {
  44. message: Option<M>,
  45. tx: Option<oneshot::Sender<M::Response>>,
  46. }
  47. impl<A> ActorMessage<A> for Envelope<A>
  48. where
  49. A: Actor,
  50. {
  51. fn handle(&mut self, actor: &mut A, cx: &mut std::task::Context<'_>) -> Poll<()> {
  52. self.message.handle(actor, cx)
  53. }
  54. }
  55. impl<A, M> ActorMessage<A> for EnvelopeInner<M>
  56. where
  57. M: Message,
  58. A: Actor + Handler<M>,
  59. {
  60. fn handle(&mut self, actor: &mut A, cx: &mut std::task::Context<'_>) -> Poll<()> {
  61. let message = self.message.take().expect("Message already processed");
  62. match actor.handle(message).as_mut().poll(cx) {
  63. Poll::Ready(result) => {
  64. if let Some(res_tx) = self.tx.take() {
  65. let _ = res_tx.send(result.unwrap());
  66. }
  67. Poll::Ready(())
  68. }
  69. Poll::Pending => Poll::Pending,
  70. }
  71. }
  72. }
  73. impl<A, M> Enveloper<A, M> for A
  74. where
  75. A: Actor + Handler<M>,
  76. M: Message + Send + 'static,
  77. M::Response: Send,
  78. {
  79. fn pack(message: M, tx: Option<oneshot::Sender<M::Response>>) -> Envelope<A> {
  80. Envelope::new(message, tx)
  81. }
  82. }
  83. pub struct MessageRequest<R> {
  84. pub response_rx: oneshot::Receiver<R>,
  85. }
  86. impl<R> std::future::Future for MessageRequest<R> {
  87. type Output = Result<R, Error>;
  88. fn poll(
  89. mut self: std::pin::Pin<&mut Self>,
  90. cx: &mut std::task::Context<'_>,
  91. ) -> std::task::Poll<Self::Output> {
  92. println!("Awaiting response");
  93. match self.as_mut().response_rx.try_recv() {
  94. Ok(msg) => {
  95. println!("Future ready");
  96. std::task::Poll::Ready(Ok(msg))
  97. }
  98. Err(e) => {
  99. println!("Future pending {e}");
  100. match e {
  101. oneshot::error::TryRecvError::Empty => {
  102. cx.waker().wake_by_ref();
  103. std::task::Poll::Pending
  104. }
  105. oneshot::error::TryRecvError::Closed => std::task::Poll::Ready(Err(e.into())),
  106. }
  107. }
  108. }
  109. }
  110. }