Browse Source

add recipient and dox

biblius 1 year ago
parent
commit
eefa07db83
4 changed files with 199 additions and 98 deletions
  1. 146 39
      src/lib.rs
  2. 20 33
      src/message.rs
  3. 8 9
      src/runtime.rs
  4. 25 17
      src/ws.rs

+ 146 - 39
src/lib.rs

@@ -1,6 +1,6 @@
 use crate::runtime::{ActorRuntime, Runtime};
 use flume::{SendError, Sender};
-use message::{Envelope, Message, MessagePacker, MessageRequest};
+use message::{Envelope, Enveloper, Message, MessageRequest};
 use std::fmt::Debug;
 use tokio::sync::oneshot;
 pub mod debug;
@@ -8,6 +8,8 @@ pub mod message;
 pub mod runtime;
 pub mod ws;
 
+const DEFAULT_CHANNEL_CAPACITY: usize = 16;
+
 pub trait Actor {
     fn start(self) -> ActorHandle<Self>
     where
@@ -18,6 +20,17 @@ pub trait Actor {
     }
 }
 
+/// The main trait to implement on an [Actor] to enable it to handle messages.
+pub trait Handler<M>: Actor
+where
+    M: Message,
+{
+    fn handle(&mut self, message: M) -> Result<M::Response, Error>;
+}
+
+/// A handle to a spawned actor. Obtained when calling `start` on an [Actor] and is used to send messages
+/// to it.
+#[derive(Debug, Clone)]
 pub struct ActorHandle<A>
 where
     A: Actor,
@@ -26,55 +39,155 @@ where
     command_tx: Sender<ActorCommand>,
 }
 
-impl<A> Clone for ActorHandle<A>
+impl<A> ActorHandle<A>
 where
     A: Actor,
 {
-    fn clone(&self) -> Self {
+    pub fn new(message_tx: Sender<Envelope<A>>, command_tx: Sender<ActorCommand>) -> Self {
         Self {
-            message_tx: self.message_tx.clone(),
-            command_tx: self.command_tx.clone(),
+            message_tx,
+            command_tx,
         }
     }
-}
 
-impl<A> ActorHandle<A>
-where
-    A: Actor + 'static,
-{
-    pub async fn send_sync<M>(&self, message: M) -> Result<M::Response, Error>
+    /// Sends a message to the actor and returns a [MessageRequest] that can
+    /// be awaited. This method should be used when one needs a response from the
+    /// actor.
+    pub fn send_wait<M>(&self, message: M) -> Result<MessageRequest<M::Response>, SendError<M>>
     where
-        M: Message + Send + 'static,
-        A: Handler<M> + MessagePacker<A, M>,
+        M: Message + Send,
+        A: Handler<M> + Enveloper<A, M>,
     {
+        if self.message_tx.is_full() {
+            return Err(SendError(message));
+        }
         let (tx, rx) = oneshot::channel();
-        let packed = A::pack(message, Some(tx));
-        self.message_tx
-            .send(packed)
-            .map_err(Error::send_err_boxed)?;
-        MessageRequest { response_rx: rx }.await
+        let _ = self.message_tx.send(A::pack(message, Some(tx)));
+        Ok(MessageRequest { response_rx: rx })
     }
 
-    pub async fn send<M>(&self, message: M) -> Result<(), Error>
+    /// Send a message to the actor without waiting for any response, but still returning an
+    /// error if the channel is full.
+    pub fn send<M>(&self, message: M) -> Result<(), SendError<M>>
     where
         M: Message + Send + 'static,
-        A: Handler<M> + MessagePacker<A, M> + 'static,
+        A: Handler<M> + Enveloper<A, M> + 'static,
     {
-        let packed = A::pack(message, None);
-        self.message_tx.send(packed).map_err(Error::send_err_boxed)
+        if self.message_tx.is_full() {
+            return Err(SendError(message));
+        }
+        let _ = self.message_tx.send(A::pack(message, None));
+        Ok(())
     }
 
-    pub async fn send_cmd(&self, cmd: ActorCommand) -> Result<(), Error> {
-        self.command_tx.send(cmd).unwrap();
-        Ok(())
+    /// Send a message ignoring any errors in the process. The true YOLO way to send messages.
+    pub fn send_forget<M>(&self, message: M)
+    where
+        M: Message + Send + 'static,
+        A: Handler<M> + Enveloper<A, M> + 'static,
+    {
+        let _ = self.message_tx.send(A::pack(message, None));
+    }
+
+    pub fn send_cmd(&self, cmd: ActorCommand) -> Result<(), SendError<ActorCommand>> {
+        self.command_tx.send(cmd)
+    }
+
+    pub fn recipient<M>(&self) -> Recipient<M>
+    where
+        M: Message + Send + 'static,
+        M::Response: Send,
+        A: Handler<M> + 'static,
+    {
+        Recipient {
+            message_tx: Box::new(self.message_tx.clone()),
+            command_tx: self.command_tx.clone(),
+        }
     }
 }
 
-pub trait Handler<M>: Actor
+/// The same as an [ActorHandle], but instead of being tied to a specific actor, it is only
+/// tied to the message type. Can be obtained from an [ActorHandle].
+///
+/// Useful for grouping different types of actors that can handle the same message.
+pub struct Recipient<M>
 where
     M: Message,
 {
-    fn handle(&mut self, message: M) -> Result<M::Response, Error>;
+    message_tx: Box<dyn DynamicSender<M>>,
+    command_tx: Sender<ActorCommand>,
+}
+
+impl<M> Recipient<M>
+where
+    M: Message + Send,
+{
+    pub fn send_wait(&self, message: M) -> Result<MessageRequest<M::Response>, SendError<M>> {
+        self.message_tx.send_sync(message)
+    }
+
+    pub fn send(&self, message: M) -> Result<(), SendError<M>> {
+        self.message_tx.send(message)
+    }
+
+    pub fn send_forget(&self, message: M) {
+        let _ = self.message_tx.send(message);
+    }
+
+    pub fn send_cmd(&self, cmd: ActorCommand) -> Result<(), SendError<ActorCommand>> {
+        self.command_tx.send(cmd)
+    }
+}
+
+/// A helper trait used solely by [Recipient]'s message channel to erase the actor type.
+/// This is achieved by implementing it on [Sender<Envelope<A>].
+trait DynamicSender<M>
+where
+    M: Message + Send,
+{
+    fn send_sync(&self, message: M) -> Result<MessageRequest<M::Response>, SendError<M>>;
+
+    fn send(&self, message: M) -> Result<(), SendError<M>>;
+}
+
+impl<A, M> DynamicSender<M> for Sender<Envelope<A>>
+where
+    M: Message + Send + 'static,
+    M::Response: Send,
+    A: Actor + Handler<M> + Enveloper<A, M>,
+{
+    fn send(&self, message: M) -> Result<(), SendError<M>> {
+        if self.is_full() {
+            return Err(SendError(message));
+        }
+        let _ = self.send(A::pack(message, None));
+        Ok(())
+    }
+
+    fn send_sync(
+        &self,
+        message: M,
+    ) -> Result<MessageRequest<<M as Message>::Response>, SendError<M>> {
+        if self.is_full() {
+            return Err(SendError(message));
+        }
+        let (tx, rx) = oneshot::channel();
+        let _ = self.send(A::pack(message, Some(tx)));
+        Ok(MessageRequest { response_rx: rx })
+    }
+}
+
+impl<A, M> From<ActorHandle<A>> for Recipient<M>
+where
+    M: Message + Send + 'static,
+    M::Response: Send,
+    A: Actor + Handler<M> + Enveloper<A, M> + 'static,
+{
+    /// Just calls `ActorHandler::recipient`, i.e. clones the underlying channels
+    /// into the recipient.
+    fn from(handle: ActorHandle<A>) -> Self {
+        handle.recipient()
+    }
 }
 
 #[derive(Debug, thiserror::Error)]
@@ -83,18 +196,10 @@ pub enum Error {
     ActorChannelClosed,
     #[error("Channel closed: {0}")]
     ChannelClosed(#[from] oneshot::error::TryRecvError),
-    #[error("Send error: {0}")]
-    Send(Box<dyn std::error::Error + Send + 'static>),
     #[error("Warp error: {0}")]
     Warp(#[from] warp::Error),
 }
 
-impl Error {
-    fn send_err_boxed<T: Send + 'static>(error: SendError<T>) -> Self {
-        Self::Send(Box::new(error))
-    }
-}
-
 #[derive(Debug)]
 pub enum ActorCommand {
     Stop,
@@ -145,13 +250,15 @@ mod tests {
         let mut res = 0;
         let mut res2 = 0;
         for _ in 0..100 {
-            res += handle.send_sync(Foo {}).await.unwrap();
-            res2 += handle.send_sync(Bar {}).await.unwrap();
+            res += handle.send_wait(Foo {}).unwrap().await.unwrap();
+            res2 += handle.send_wait(Bar {}).unwrap().await.unwrap();
         }
 
-        handle.send_cmd(ActorCommand::Stop).await.unwrap();
+        let rec: Recipient<Foo> = handle.recipient();
+        res += rec.send_wait(Foo {}).unwrap().await.unwrap();
+        handle.send_cmd(ActorCommand::Stop).unwrap();
 
-        assert_eq!(res, 1000);
+        assert_eq!(res, 1010);
         assert_eq!(res2, 1000);
     }
 }

+ 20 - 33
src/message.rs

@@ -1,4 +1,4 @@
-use crate::{runtime::ActorRuntime, Actor, Error, Handler};
+use crate::{Actor, Error, Handler};
 use tokio::sync::oneshot;
 
 /// Represents a message that can be sent to an actor. The response type is what the actor must return in its handler implementation.
@@ -7,20 +7,21 @@ pub trait Message {
 }
 
 /// Represents a type erased message that ultimately gets stored in an [Envelope]. We need this indirection so we can abstract away the concrete message
-/// type when creating an actor handle - otherwise we would only be able to send a single message type to the actor.
-pub trait PackedMessage<A: Actor> {
+/// type when creating an actor handle, otherwise we would only be able to send a single message type to the actor.
+pub trait ActorMessage<A: Actor> {
     fn handle(&mut self, actor: &mut A);
 }
 
 /// Used by [ActorHandle][super::ActorHandle]s to pack [Message]s into [Envelope]s so we have a type erased message to send to the actor.
-pub trait MessagePacker<A: Actor, M: Message + Send + 'static> {
+pub trait Enveloper<A: Actor, M: Message> {
+    /// Wrap a message in an envelope with an optional response channel.
     fn pack(message: M, tx: Option<oneshot::Sender<M::Response>>) -> Envelope<A>;
 }
 
 /// A type erased wrapper for messages. This wrapper essentially enables us to send any message to the actor
 /// so long as it implements the necessary handler.
 pub struct Envelope<A: Actor> {
-    message: Box<dyn PackedMessage<A> + Send>,
+    message: Box<dyn ActorMessage<A> + Send>,
 }
 
 impl<A> Envelope<A>
@@ -44,12 +45,12 @@ where
 
 /// The inner parts of the [Envelope] containing the actual message as well as an optional
 /// response channel.
-struct EnvelopeInner<M: Message + Send> {
+struct EnvelopeInner<M: Message> {
     message: Option<M>,
     tx: Option<oneshot::Sender<M::Response>>,
 }
 
-impl<A> PackedMessage<A> for Envelope<A>
+impl<A> ActorMessage<A> for Envelope<A>
 where
     A: Actor,
 {
@@ -58,39 +59,25 @@ where
     }
 }
 
-impl<A, M> PackedMessage<A> for EnvelopeInner<M>
+impl<A, M> ActorMessage<A> for EnvelopeInner<M>
 where
-    M: Message + Send + 'static,
-    M::Response: Send,
+    M: Message,
     A: Actor + Handler<M>,
 {
     fn handle(&mut self, actor: &mut A) {
-        if let Some(message) = self.message.take() {
-            match actor.handle(message) {
-                Ok(result) => {
-                    if let Some(res_tx) = self.tx.take() {
-                        // TODO
-                        let _ = res_tx.send(result);
-                    }
-                }
-                Err(_) => todo!(),
-            };
-        }
-    }
-}
-
-impl<A, M> MessagePacker<A, M> for ActorRuntime<A>
-where
-    A: Actor + Handler<M>,
-    M: Message + Send + 'static,
-    M::Response: Send,
-{
-    fn pack(message: M, tx: Option<oneshot::Sender<<M as Message>::Response>>) -> Envelope<A> {
-        A::pack(message, tx)
+        let Some(message) = self.message.take() else { panic!("Message already processed") };
+        match actor.handle(message) {
+            Ok(result) => {
+                let Some(res_tx) = self.tx.take() else { panic!("Message already processed") };
+                // TODO
+                let _ = res_tx.send(result);
+            }
+            Err(_) => todo!(),
+        };
     }
 }
 
-impl<A, M> MessagePacker<A, M> for A
+impl<A, M> Enveloper<A, M> for A
 where
     A: Actor + Handler<M>,
     M: Message + Send + 'static,

+ 8 - 9
src/runtime.rs

@@ -1,4 +1,7 @@
-use crate::{message::PackedMessage, Actor, ActorCommand, ActorHandle, Envelope, Error};
+use crate::{
+    message::ActorMessage, Actor, ActorCommand, ActorHandle, Envelope, Error,
+    DEFAULT_CHANNEL_CAPACITY,
+};
 use flume::Receiver;
 use futures::Future;
 use pin_project::pin_project;
@@ -13,14 +16,10 @@ pub trait Runtime<A> {
     where
         A: Actor + Send + 'static,
     {
-        let (tx, rx) = flume::unbounded();
-        let (cmd_tx, cmd_rx) = flume::unbounded();
-        let rt = ActorRuntime::new(actor, cmd_rx, rx);
-        tokio::spawn(rt);
-        ActorHandle {
-            message_tx: tx,
-            command_tx: cmd_tx,
-        }
+        let (message_tx, message_rx) = flume::bounded(DEFAULT_CHANNEL_CAPACITY);
+        let (command_tx, command_rx) = flume::bounded(DEFAULT_CHANNEL_CAPACITY);
+        tokio::spawn(ActorRuntime::new(actor, command_rx, message_rx));
+        ActorHandle::new(message_tx, command_tx)
     }
 }
 

+ 25 - 17
src/ws.rs

@@ -1,11 +1,5 @@
-use std::{
-    collections::VecDeque,
-    pin::Pin,
-    task::{Context, Poll},
-};
-
 use crate::{
-    message::{Envelope, PackedMessage},
+    message::{ActorMessage, Envelope},
     runtime::Runtime,
     Actor, ActorCommand, ActorHandle, Error, Handler,
 };
@@ -15,6 +9,11 @@ use futures::{
     Future, SinkExt, StreamExt,
 };
 use pin_project::pin_project;
+use std::{
+    collections::VecDeque,
+    pin::Pin,
+    task::{Context, Poll},
+};
 use warp::ws::WebSocket;
 
 pub struct WebsocketActor {
@@ -43,13 +42,25 @@ impl WebsocketActor {
 pub struct WebsocketRuntime {
     actor: WebsocketActor,
 
+    // Pin these 2 as we are polling them directly so we know they never move
+    /// The receiving end of the websocket
+    #[pin]
     ws_stream: SplitStream<WebSocket>,
+
+    /// The sending end of the websocket
+    #[pin]
     ws_sink: SplitSink<WebSocket, warp::ws::Message>,
 
+    /// Actor message receiver
     message_rx: Receiver<Envelope<WebsocketActor>>,
+
+    /// Actor command receiver
     command_rx: Receiver<ActorCommand>,
 
+    /// Received, but not yet processed messages
     message_queue: VecDeque<Envelope<WebsocketActor>>,
+
+    /// Received, but not yet processed websocket messages
     ws_queue: VecDeque<warp::ws::Message>,
 }
 
@@ -81,7 +92,7 @@ impl Future for WebsocketRuntime {
     type Output = Result<(), Error>;
 
     fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
-        let this = self.project();
+        let mut this = self.project();
 
         loop {
             // Poll command receiver
@@ -93,7 +104,7 @@ impl Future for WebsocketRuntime {
                     }
                 },
                 Poll::Ready(Err(_)) => {
-                    println!("Command stream dropped, ungracefully stopping actor");
+                    println!("Actor stopping"); // TODO drain the queue and all that graceful stuff
                     break Poll::Ready(Err(Error::ActorChannelClosed));
                 }
                 Poll::Pending => {}
@@ -148,6 +159,7 @@ impl Future for WebsocketRuntime {
                     }
                 }
             };
+
             cx.waker().wake_by_ref();
             return Poll::Pending;
         }
@@ -156,14 +168,10 @@ impl Future for WebsocketRuntime {
 
 impl Runtime<WebsocketActor> for WebsocketRuntime {
     fn run(actor: WebsocketActor) -> ActorHandle<WebsocketActor> {
-        let (tx, rx) = flume::unbounded();
-        let (cmd_tx, cmd_rx) = flume::unbounded();
-        let rt = WebsocketRuntime::new(actor, cmd_rx, rx);
-        tokio::spawn(rt);
-        ActorHandle {
-            message_tx: tx,
-            command_tx: cmd_tx,
-        }
+        let (message_tx, message_rx) = flume::unbounded();
+        let (command_tx, command_rx) = flume::unbounded();
+        tokio::spawn(WebsocketRuntime::new(actor, command_rx, message_rx));
+        ActorHandle::new(message_tx, command_tx)
     }
 }