Browse Source

add docs and finalise concept

biblius 1 year ago
parent
commit
e93da91c34
8 changed files with 353 additions and 290 deletions
  1. 1 0
      Cargo.toml
  2. 170 0
      src/actor.rs
  3. 0 15
      src/debug.rs
  4. 26 133
      src/lib.rs
  5. 62 65
      src/message.rs
  6. 62 45
      src/relay.rs
  7. 20 23
      src/runtime.rs
  8. 12 9
      tests/websocket.rs

+ 1 - 0
Cargo.toml

@@ -23,4 +23,5 @@ tokio = { version = "1.28.2", features = [
   "time",
 ] }
 tokio-tungstenite = "0.19.0"
+tracing = "0.1.37"
 warp = "0.3.5"

+ 170 - 0
src/actor.rs

@@ -0,0 +1,170 @@
+use crate::{
+    message::{BoxedActorMessage, Enveloper, Handler, MailboxSender, MessageRequest},
+    runtime::ActorRuntime,
+    ActorCommand,
+};
+
+use flume::{SendError, Sender};
+use std::fmt::Debug;
+use tokio::sync::oneshot;
+
+pub trait Actor: Sized + Send + Sync + 'static {
+    fn start(self) -> ActorHandle<Self> {
+        println!("Starting actor");
+        let (message_tx, message_rx) = flume::unbounded();
+        let (command_tx, command_rx) = flume::unbounded();
+        tokio::spawn(ActorRuntime::new(self, command_rx, message_rx).run());
+        ActorHandle::new(message_tx, command_tx)
+    }
+}
+
+/// A handle to a spawned actor. Obtained when calling `start` on an [Actor] and is used to send messages
+/// to it.
+#[derive(Debug)]
+pub struct ActorHandle<A>
+where
+    A: Actor,
+{
+    message_tx: MailboxSender<A>,
+    command_tx: Sender<ActorCommand>,
+}
+
+impl<A> Clone for ActorHandle<A>
+where
+    A: Actor,
+{
+    fn clone(&self) -> Self {
+        Self {
+            message_tx: self.message_tx.clone(),
+            command_tx: self.command_tx.clone(),
+        }
+    }
+}
+
+impl<A> ActorHandle<A>
+where
+    A: Actor,
+{
+    pub fn new(message_tx: MailboxSender<A>, command_tx: Sender<ActorCommand>) -> Self {
+        Self {
+            message_tx,
+            command_tx,
+        }
+    }
+
+    /// Sends a message to the actor and returns a [MessageRequest] that can
+    /// be awaited. This method should be used when one needs a response from the
+    /// actor.
+    pub async fn send_wait<M>(&self, message: M) -> Result<A::Response, SendError<M>>
+    where
+        M: Send,
+        A: Handler<M> + Enveloper<A, M>,
+    {
+        if self.message_tx.is_disconnected() {
+            return Err(SendError(message));
+        }
+        let (tx, rx) = oneshot::channel();
+        let _ = self.message_tx.send(A::pack(message, Some(tx)));
+        Ok(MessageRequest { response_rx: rx }.await.unwrap())
+    }
+
+    /// Send a message to the actor without returning a response, but still returning an
+    /// error if the channel is full or disconnected.
+    pub fn send<M>(&self, message: M) -> Result<(), SendError<M>>
+    where
+        M: Send,
+        A: Handler<M> + Enveloper<A, M>,
+    {
+        if self.message_tx.is_disconnected() {
+            return Err(SendError(message));
+        }
+        let _ = self.message_tx.send(A::pack(message, None));
+        Ok(())
+    }
+
+    /// Send a message ignoring any errors in the process. The true YOLO way to send messages.
+    pub fn send_forget<M>(&self, message: M)
+    where
+        M: Send,
+        A: Handler<M> + Enveloper<A, M>,
+    {
+        let _ = self.message_tx.send(A::pack(message, None));
+    }
+
+    pub fn send_cmd(&self, cmd: ActorCommand) -> Result<(), SendError<ActorCommand>> {
+        self.command_tx.send(cmd)
+    }
+
+    pub fn recipient<M>(&self) -> Recipient<M>
+    where
+        M: Send + Sync + 'static,
+        A: Handler<M>,
+    {
+        Recipient {
+            message_tx: Box::new(self.message_tx.clone()),
+            command_tx: self.command_tx.clone(),
+        }
+    }
+}
+
+/// The same as an [ActorHandle], but instead of being tied to a specific actor, it is only
+/// tied to the message type. Can be obtained from an [ActorHandle]. Be aware that you cannot
+/// obtain responses from actors with this.
+///
+/// Useful for grouping different types of actors that can handle the same message.
+pub struct Recipient<M> {
+    message_tx: Box<dyn MessageSender<M>>,
+    command_tx: Sender<ActorCommand>,
+}
+
+impl<M> Recipient<M>
+where
+    M: Send,
+{
+    pub fn send(&self, message: M) -> Result<(), SendError<M>> {
+        self.message_tx.send(message)
+    }
+
+    pub fn send_forget(&self, message: M) {
+        let _ = self.message_tx.send(message);
+    }
+
+    pub fn send_cmd(&self, cmd: ActorCommand) -> Result<(), SendError<ActorCommand>> {
+        self.command_tx.send(cmd)
+    }
+}
+
+/// A helper trait used solely by [Recipient]'s message channel to erase the actor type.
+/// This is achieved by implementing it on [Sender<Envelope<A>].
+trait MessageSender<M>
+where
+    M: Send,
+{
+    fn send(&self, message: M) -> Result<(), SendError<M>>;
+}
+
+impl<A, M> MessageSender<M> for Sender<BoxedActorMessage<A>>
+where
+    M: Send + 'static,
+    A: Handler<M> + Enveloper<A, M>,
+{
+    fn send(&self, message: M) -> Result<(), SendError<M>> {
+        if self.is_disconnected() {
+            return Err(SendError(message));
+        }
+        let _ = self.send(A::pack(message, None));
+        Ok(())
+    }
+}
+
+impl<A, M> From<ActorHandle<A>> for Recipient<M>
+where
+    M: Send + Sync + 'static,
+    A: Handler<M> + Enveloper<A, M> + 'static,
+{
+    /// Just calls `ActorHandler::recipient`, i.e. clones the underlying channels
+    /// into the recipient and boxes the message one.
+    fn from(handle: ActorHandle<A>) -> Self {
+        handle.recipient()
+    }
+}

+ 0 - 15
src/debug.rs

@@ -1,15 +0,0 @@
-//! So we don't polute the main lib with debug impls
-
-use crate::{Actor, Envelope, Handler};
-
-impl<M, A> std::fmt::Debug for Envelope<M, A>
-where
-    A: Actor + Handler<M>,
-    M: Clone + Send,
-{
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("Envelope")
-            .field("message", &"{{..}}")
-            .finish()
-    }
-}

+ 26 - 133
src/lib.rs

@@ -1,146 +1,35 @@
-use crate::runtime::ActorRuntime;
-use async_trait::async_trait;
-use flume::{SendError, Sender};
-use message::MailboxSender;
-use message::{Envelope, Enveloper, MessageRequest};
 use std::fmt::Debug;
 use tokio::sync::oneshot;
 
-pub mod debug;
+pub mod actor;
 pub mod message;
 pub mod relay;
 pub mod runtime;
 
-const DEFAULT_CHANNEL_CAPACITY: usize = 128;
-
-pub trait Actor: Sized + Send + Sync + 'static {
-    fn start(self) -> ActorHandle<Self> {
-        println!("Starting actor");
-        let (message_tx, message_rx) = flume::bounded(DEFAULT_CHANNEL_CAPACITY);
-        let (command_tx, command_rx) = flume::bounded(DEFAULT_CHANNEL_CAPACITY);
-        tokio::spawn(ActorRuntime::new(self, command_rx, message_rx).runt());
-        ActorHandle::new(message_tx, command_tx)
-    }
-}
-
-/// The main trait to implement on an [Actor] to enable it to handle messages.
-#[async_trait]
-pub trait Handler<M>: Actor {
-    type Response;
-    async fn handle(&mut self, message: M) -> Result<Self::Response, Error>;
-}
-
-/// A handle to a spawned actor. Obtained when calling `start` on an [Actor] and is used to send messages
-/// to it.
 #[derive(Debug)]
-pub struct ActorHandle<A>
-where
-    A: Actor,
-{
-    message_tx: MailboxSender<A>,
-    command_tx: Sender<ActorCommand>,
-}
-
-impl<A> Clone for ActorHandle<A>
-where
-    A: Actor,
-{
-    fn clone(&self) -> Self {
-        Self {
-            message_tx: self.message_tx.clone(),
-            command_tx: self.command_tx.clone(),
-        }
-    }
-}
-
-impl<A> ActorHandle<A>
-where
-    A: Actor,
-{
-    pub fn new(message_tx: MailboxSender<A>, command_tx: Sender<ActorCommand>) -> Self {
-        Self {
-            message_tx,
-            command_tx,
-        }
-    }
-
-    /// Sends a message to the actor and returns a [MessageRequest] that can
-    /// be awaited. This method should be used when one needs a response from the
-    /// actor.
-    pub fn send_wait<M>(&self, message: M) -> Result<MessageRequest<A::Response>, SendError<M>>
-    where
-        M: Send + Clone,
-        A: Handler<M> + Enveloper<A, M>,
-    {
-        if self.message_tx.is_full() || self.message_tx.is_disconnected() {
-            return Err(SendError(message));
-        }
-        let (tx, rx) = oneshot::channel();
-        let _ = self.message_tx.send(A::pack(message, Some(tx)));
-        Ok(MessageRequest { response_rx: rx })
-    }
-
-    /// Send a message to the actor without returning a response, but still returning an
-    /// error if the channel is full or disconnected.
-    pub fn send<M>(&self, message: M) -> Result<(), SendError<M>>
-    where
-        M: Clone + Send,
-        A: Handler<M> + Enveloper<A, M> + 'static,
-    {
-        if self.message_tx.is_full() || self.message_tx.is_disconnected() {
-            return Err(SendError(message));
-        }
-        let _ = self.message_tx.send(A::pack(message, None));
-        Ok(())
-    }
-
-    /// Send a message ignoring any errors in the process. The true YOLO way to send messages.
-    pub fn send_forget<M>(&self, message: M)
-    where
-        M: Clone + Send + 'static,
-        A: Handler<M> + Enveloper<A, M> + 'static,
-    {
-        let _ = self.message_tx.send(A::pack(message, None));
-    }
-
-    pub fn send_cmd(&self, cmd: ActorCommand) -> Result<(), SendError<ActorCommand>> {
-        self.command_tx.send(cmd)
-    }
-}
-
-#[derive(Debug, PartialEq, Eq)]
-pub enum ActorStatus {
-    Stopped = 0,
-    Starting = 1,
-    Running = 2,
-    Stopping = 3,
-    Idle = 4,
+pub enum ActorCommand {
+    Stop,
 }
 
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
     #[error("Actor channel closed")]
-    ActorChannelClosed,
+    ActorDisconnected,
+    #[error("Relay stream closed")]
+    RelayDisconnected,
     #[error("Channel closed: {0}")]
     ChannelClosed(#[from] oneshot::error::TryRecvError),
-    #[error("Warp error: {0}")]
-    Warp(#[from] warp::Error),
-}
-
-pub enum SendErr<M> {
-    Full(M),
-    Closed(M),
-}
-
-#[derive(Debug)]
-pub enum ActorCommand {
-    Stop,
 }
 
 #[cfg(test)]
 mod tests {
 
     use super::*;
+    use crate::{
+        actor::{Actor, Recipient},
+        message::Handler,
+    };
+    use async_trait::async_trait;
     use std::{sync::atomic::AtomicUsize, time::Duration};
 
     #[tokio::test]
@@ -159,20 +48,20 @@ mod tests {
         #[async_trait]
         impl Handler<Foo> for Testor {
             type Response = usize;
-            async fn handle(&mut self, _: Foo) -> Result<usize, Error> {
+            async fn handle(&mut self, _: &Foo) -> usize {
                 println!("Handling Foo");
-                Ok(10)
+                10
             }
         }
 
         #[async_trait]
         impl Handler<Bar> for Testor {
             type Response = isize;
-            async fn handle(&mut self, _: Bar) -> Result<isize, Error> {
+            async fn handle(&mut self, _: &Bar) -> isize {
                 for _ in 0..10_000 {
                     println!("Handling Bar");
                 }
-                Ok(10)
+                10
             }
         }
 
@@ -182,14 +71,18 @@ mod tests {
         let handle = Testor {}.start();
         println!("HELLO WORLDS");
         for _ in 0..100 {
-            res += handle.send_wait(Foo {}).unwrap().await.unwrap();
-            res2 += handle.send_wait(Bar {}).unwrap().await.unwrap();
+            res += handle.send_wait(Foo {}).await.unwrap();
+            res2 += handle.send_wait(Bar {}).await.unwrap();
         }
 
         handle.send(Foo {}).unwrap();
         handle.send_forget(Bar {});
         handle.send_cmd(ActorCommand::Stop).unwrap();
 
+        let rec: Recipient<Foo> = handle.recipient();
+        rec.send(Foo {}).unwrap();
+        handle.send_cmd(ActorCommand::Stop).unwrap();
+
         assert_eq!(res, 1000);
         assert_eq!(res2, 1000);
     }
@@ -211,8 +104,8 @@ mod tests {
 
         #[async_trait]
         impl Handler<Foo> for Testor {
-            type Response = usize;
-            async fn handle(&mut self, _: Foo) -> Result<usize, Error> {
+            type Response = Result<usize, Error>;
+            async fn handle(&mut self, _: &Foo) -> Result<usize, Error> {
                 println!("INCREMENTING COUNT FOO");
                 COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                 Ok(10)
@@ -221,8 +114,8 @@ mod tests {
 
         #[async_trait]
         impl Handler<Bar> for Testor {
-            type Response = isize;
-            async fn handle(&mut self, _: Bar) -> Result<isize, Error> {
+            type Response = Result<isize, Error>;
+            async fn handle(&mut self, _: &Bar) -> Result<isize, Error> {
                 println!("INCREMENTING COUNT BAR");
                 COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                 Ok(10)
@@ -231,7 +124,7 @@ mod tests {
 
         let handle = Testor {}.start();
 
-        handle.send_wait(Bar {}).unwrap().await.unwrap();
+        handle.send_wait(Bar {}).await.unwrap().unwrap();
         handle.send(Foo {}).unwrap();
         handle.send_forget(Bar {});
 

+ 62 - 65
src/message.rs

@@ -1,49 +1,46 @@
-use crate::{Actor, Error, Handler};
+use crate::{actor::Actor, Error};
 use async_trait::async_trait;
-use std::marker::PhantomData;
 use tokio::sync::oneshot;
 
+/// The main trait to implement on an [Actor] to enable it to handle messages.
 #[async_trait]
-pub trait MessageHandler<A: Actor>: Send + Sync {
-    async fn handle(&mut self, actor: &mut A);
+pub trait Handler<M>: Actor {
+    type Response: Send;
+    async fn handle(&mut self, message: &M) -> Self::Response;
 }
 
-pub type BoxedMessageHandler<A> = Box<dyn MessageHandler<A>>;
-
-pub type MailboxReceiver<A> = flume::Receiver<BoxedMessageHandler<A>>;
-pub type MailboxSender<A> = flume::Sender<BoxedMessageHandler<A>>;
-
-pub struct ActorMailbox<M, A: Handler<M>> {
-    _phantom_actor: PhantomData<A>,
-    _phantom_msg: PhantomData<M>,
+/// Represents a message that can be handled by an [Actor]. This should never be manually
+/// implemented and is implemented solely by [Envelope] to enable sending
+/// different types of messages to an actor at runtime.
+///
+/// The main purpose of this trait is enabling us to send any message type to a running
+/// actor. Notice how this trait contains no information about the message, only the
+/// actor it is being sent to. This allows us to dynamically call the handle method
+/// during runtime for any message.
+///
+/// The handle implementation in this trait just delegates to the actor's [Handler] implementation
+/// for the message using self as the `message` parameter.
+#[async_trait]
+pub trait ActorMessage<A: Actor>: Send + Sync {
+    async fn handle(&mut self, actor: &mut A);
 }
 
-/// Represents a type erased message that ultimately gets stored in an [Envelope]. We need this indirection so we can abstract away the concrete message
-/// type when creating an actor handle, otherwise we would only be able to send a single message type to the actor.
-/* #[async_trait]
-pub trait ActorMessage<A: Actor> {
-    async fn handle(self, actor: &mut A);
-} */
+/// A type erased message representation that is sent to any
+/// actor that implements a handler for the underlying message.
+pub type BoxedActorMessage<A> = Box<dyn ActorMessage<A>>;
 
-/// Used by [ActorHandle][super::ActorHandle]s to pack messages into [Envelope]s so we have a type erased message to send to the actor.
-pub trait Enveloper<A, M>
-where
-    A: Handler<M>,
-    M: Clone + Send,
-{
-    /// Wrap a message in an envelope with an optional response channel.
-    fn pack(
-        message: M,
-        tx: Option<oneshot::Sender<<A as Handler<M>>::Response>>,
-    ) -> Box<dyn MessageHandler<A>>;
-}
+/// Type erased message receiver. The part of the channel in the actor's runtime.
+pub type MailboxReceiver<A> = flume::Receiver<BoxedActorMessage<A>>;
+
+/// Type erased message sender. The part of the channel given in an [ActorHandle][super::ActorHandle].
+pub type MailboxSender<A> = flume::Sender<BoxedActorMessage<A>>;
 
-/// A type erased wrapper for messages. This wrapper essentially enables us to send any message to the actor
-/// so long as it implements the necessary handler.
-pub struct Envelope<M, A>
+/// Contains the message in transit and an optional response channel for the message which will be invoked
+/// if the message was sent with `send_wait`
+pub(crate) struct Envelope<M, A>
 where
     A: Handler<M>,
-    M: Clone + Send + 'static,
+    M: Send + 'static,
 {
     message: M,
     response_tx: Option<oneshot::Sender<A::Response>>,
@@ -51,9 +48,8 @@ where
 
 impl<M, A> Envelope<M, A>
 where
-    A: Handler<M> + Send + 'static,
-    A::Response: Send,
-    M: Clone + Send + Sync + 'static,
+    A: Handler<M>,
+    M: Send + Sync + 'static,
 {
     pub fn new(message: M, tx: Option<oneshot::Sender<A::Response>>) -> Self {
         Self {
@@ -64,57 +60,46 @@ where
 }
 
 #[async_trait]
-impl<M, A> MessageHandler<A> for Envelope<M, A>
+impl<M, A> ActorMessage<A> for Envelope<M, A>
 where
-    A: Actor + Handler<M> + Send,
-    M: Clone + Send + Sync,
-    A::Response: Send,
+    A: Handler<M>,
+    M: Send + Sync,
 {
     async fn handle(&mut self, actor: &mut A) {
-        let result = A::handle(actor, self.message.clone()).await;
+        let result = A::handle(actor, &self.message).await;
         if let Some(res_tx) = self.response_tx.take() {
-            let _ = res_tx.send(result.unwrap());
+            let _ = res_tx.send(result);
         }
     }
 }
 
-/// The inner parts of the [Envelope] containing the actual message as well as an optional
-/// response channel.
-/* struct EnvelopeInner<M, R> {
-    message: M,
-    tx: Option<oneshot::Sender<R>>,
-}
-
-#[async_trait]
-impl<A, M> ActorMessage<A> for EnvelopeInner<M, <A as Handler<M>>::Response>
+/// Used by [ActorHandle][super::ActorHandle]s to pack messages into [Envelope]s.
+pub trait Enveloper<A, M>
 where
     A: Handler<M>,
-    A::Response: Send,
-    M: Clone + Send + Sync + 'static,
+    M: Send,
 {
-    async fn handle(self, actor: &mut A) {
-        let result = A::handle(actor, self.message.clone()).await;
-        if let Some(res_tx) = self.tx {
-            let _ = res_tx.send(result.unwrap());
-        }
-    }
+    /// Wrap a message in an envelope with an optional response channel.
+    fn pack(
+        message: M,
+        tx: Option<oneshot::Sender<<A as Handler<M>>::Response>>,
+    ) -> Box<dyn ActorMessage<A>>;
 }
- */
 
 impl<A, M> Enveloper<A, M> for A
 where
-    A: Handler<M> + Send + 'static,
-    A::Response: Send,
-    M: Clone + Send + Sync + 'static,
+    A: Handler<M>,
+    M: Send + Sync + 'static,
 {
     fn pack(
         message: M,
         tx: Option<oneshot::Sender<<A as Handler<M>>::Response>>,
-    ) -> Box<dyn MessageHandler<A>> {
+    ) -> Box<dyn ActorMessage<A>> {
         Box::new(Envelope::new(message, tx))
     }
 }
 
+/// A future that when awaited returns an actor's response to a message.
 pub struct MessageRequest<R> {
     pub response_rx: oneshot::Receiver<R>,
 }
@@ -138,3 +123,15 @@ impl<R> std::future::Future for MessageRequest<R> {
         }
     }
 }
+
+impl<M, A> std::fmt::Debug for Envelope<M, A>
+where
+    A: Actor + Handler<M>,
+    M: Clone + Send,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Envelope")
+            .field("message", &"{{..}}")
+            .finish()
+    }
+}

+ 62 - 45
src/relay.rs

@@ -1,19 +1,38 @@
 use crate::{
-    message::MailboxReceiver, Actor, ActorCommand, ActorHandle, Error, DEFAULT_CHANNEL_CAPACITY,
+    actor::{Actor, ActorHandle},
+    message::MailboxReceiver,
+    ActorCommand, Error,
 };
 use async_trait::async_trait;
-use flume::{r#async::RecvStream, Receiver, Sender};
+use flume::{Receiver, Sender};
 use futures::{Stream, StreamExt};
-use std::{fmt::Display, sync::atomic::AtomicUsize};
+use std::fmt::Display;
 
 /// Represents an actor that has access to a stream and a sender channel
-/// which it can respond to.
+/// which it can use to respond.
 ///
-/// A websocket actor receives messages via the stream and processes them with
-/// its [Handler] implementation. The handler implementation should always return an
-/// `Option<M>` where M is the type used when implementing this trait. A handler that returns
-/// `None` will not forward any response to the sink. If the handler returns `Some(M)` it will
-/// be forwarded to the sink.
+/// The intended usecase for this is to have a receiver channel for responses
+/// that forwards anything it receives to the sink. This can be achieved
+/// by creating a stream from the receiver (many channel implementations provide this)
+/// which forwards to the sink. The receiver stream is then spawned to a runtime.
+///
+/// ### Example
+///
+/// ```ignore
+/// // Imagine if you will, a websocket
+/// ws.on_upgrade(|socket| async move {
+///   let (sink, stream) = socket.split();
+///   let (tx, rx) = flume::unbounded();
+///   
+///   let actor = WebsocketActor::new();
+///   let handle = actor.start_relay(tx);
+///   tokio::spawn(rx.into_stream().map(Ok).forward(si));
+/// })
+/// ```
+///
+/// A relay actor receives messages via its stream and processes them with
+/// its [Relay] implementation. A relay that returns `None` will not forward any
+/// response to the sink. If the relay returns `Some(M)` it will be forwarded to the sink.
 pub trait RelayActor<M, Str>: Actor
 where
     Self: Relay<M>,
@@ -25,47 +44,46 @@ where
     type Error: Display;
 
     fn start_relay(self, stream: Str, sender: Sender<M>) -> ActorHandle<Self> {
-        println!("Starting actor");
-        let (message_tx, message_rx) = flume::bounded(DEFAULT_CHANNEL_CAPACITY);
-        let (command_tx, command_rx) = flume::bounded(DEFAULT_CHANNEL_CAPACITY);
-        tokio::spawn(RelayRuntime::new(self, command_rx, message_rx, stream, sender).runt());
+        tracing::trace!("Starting relay actor");
+        let (message_tx, message_rx) = flume::unbounded();
+        let (command_tx, command_rx) = flume::unbounded();
+        tokio::spawn(RelayRuntime::new(self, command_rx, message_rx, stream, sender).run());
         ActorHandle::new(message_tx, command_tx)
     }
 }
 
 #[async_trait]
 pub trait Relay<M>: Actor {
-    async fn handle(&mut self, message: M) -> Result<Option<M>, Error>;
+    async fn process(&mut self, message: M) -> Option<M>;
 }
 
 pub struct RelayRuntime<A, M, Str>
 where
-    A: RelayActor<M, Str> + Relay<M> + 'static,
-    Str: Stream<Item = Result<M, A::Error>> + Send + Unpin + 'static,
+    A: RelayActor<M, Str> + Relay<M>,
+    Str: Stream<Item = Result<M, A::Error>> + Unpin + Send + 'static,
     M: Send + 'static,
     A::Error: Send,
 {
     actor: A,
 
     /// The receiving end of the websocket
-    ws_stream: Str,
+    stream: Str,
 
-    /// The sending end of the websocket. Hooked to a receiver that forwards any
+    /// The sending end of the stream. Hooked to a receiver that forwards any
     /// response sent from here.
-    ws_sender: Sender<M>,
+    sender: Sender<M>,
 
     /// Actor command receiver
-    command_stream: RecvStream<'static, ActorCommand>,
+    commands: Receiver<ActorCommand>,
 
+    /// Actor message receiver
     mailbox: MailboxReceiver<A>,
 }
 
-static PROCESS: AtomicUsize = AtomicUsize::new(0);
-
 impl<A, M, Str> RelayRuntime<A, M, Str>
 where
     Str: Stream<Item = Result<M, A::Error>> + Send + Unpin,
-    A: RelayActor<M, Str> + Send + 'static + Relay<M>,
+    A: RelayActor<M, Str> + Relay<M>,
     M: Send,
     A::Error: Send,
 {
@@ -78,46 +96,45 @@ where
     ) -> Self {
         Self {
             actor,
-            ws_sender: sender,
-            ws_stream: stream,
+            sender,
+            stream,
             mailbox,
-            command_stream: command_rx.into_stream(),
+            commands: command_rx,
         }
     }
 
-    pub async fn runt(mut self) {
+    pub async fn run(mut self) -> Result<(), Error> {
         loop {
+            // Only time we error here is when we disconnect, stream errors are just logged
             tokio::select! {
-            Some(command) = self.command_stream.next() => {
-               match command {
+                command = self.commands.recv_async() => {
+                    let Ok(command) = command else { return Err(Error::ActorDisconnected); };
+                    match command {
                         ActorCommand::Stop => {
-                            println!("actor stopping");
-                            return
+                            tracing::trace!("Relay actor stopping");
+                            return Ok(())
                         },
                     }
-            }
-            message = self.mailbox.recv_async() => {
-                if let Ok(mut message) = message {
+                }
+                message = self.mailbox.recv_async() => {
+                    let Ok(mut message) = message else { return Err(Error::ActorDisconnected); };
                     message.handle(&mut self.actor).await;
-                } else {
-                    break;
                 }
-            }
-            Some(ws_msg) = self.ws_stream.next() => {
+                ws_msg = self.stream.next() => {
+                    let Some(ws_msg) = ws_msg else { return Err(Error::RelayDisconnected) };
                     match ws_msg {
                         Ok(msg) => {
-                            let res = self.actor.handle(msg).await.unwrap();
-                            PROCESS.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
-                            println!("PROCESSED {}", PROCESS.load(std::sync::atomic::Ordering::Relaxed));
+                            let res = self.actor.process(msg).await;
                             if let Some(res) = res {
-                                self.ws_sender.send_async(res).await.unwrap();
+                                let Ok(_) = self.sender.send_async(res).await else { return Err(Error::RelayDisconnected) };
                             }
                         },
-                        Err(_) => todo!(),
+                        Err(e) => {
+                            tracing::error!("Stream error occurred: {e}")
+                        },
                     }
-            }
+                }
             }
         }
-        println!("actor stopping");
     }
 }

+ 20 - 23
src/runtime.rs

@@ -1,21 +1,20 @@
-use crate::{message::MailboxReceiver, Actor, ActorCommand};
-use flume::{r#async::RecvStream, Receiver};
-use futures::StreamExt;
-
-pub const QUEUE_CAPACITY: usize = 128;
+use crate::{actor::Actor, message::MailboxReceiver, ActorCommand, Error};
+use flume::Receiver;
 
+/// The default actor runtime. Spins and listens for incoming commands
+/// and messages.
 pub struct ActorRuntime<A>
 where
-    A: Actor + Send + 'static,
+    A: Actor,
 {
     actor: A,
-    command_stream: RecvStream<'static, ActorCommand>,
+    commands: Receiver<ActorCommand>,
     mailbox: MailboxReceiver<A>,
 }
 
 impl<A> ActorRuntime<A>
 where
-    A: Actor + 'static + Send,
+    A: Actor,
 {
     pub fn new(
         actor: A,
@@ -25,31 +24,29 @@ where
         println!("Building default runtime");
         Self {
             actor,
-            command_stream: command_rx.into_stream(),
+            commands: command_rx,
             mailbox: message_rx,
         }
     }
 
-    pub async fn runt(mut self) {
+    pub async fn run(mut self) -> Result<(), Error> {
         loop {
+            // Only time we error here is when we disconnect
             tokio::select! {
-                Some(command) = self.command_stream.next() => {
-                   match command {
-                            ActorCommand::Stop => {
-                                println!("actor stopping");
-                                return
-                            },
-                        }
+                command = self.commands.recv_async() => {
+                    let Ok(command) = command else { return Err(Error::ActorDisconnected); };
+                    match command {
+                        ActorCommand::Stop => {
+                            tracing::trace!("Relay actor stopping");
+                            return Ok(())
+                        },
+                    }
                 }
                 message = self.mailbox.recv_async() => {
-                    if let Ok(mut message) = message {
-                        message.handle(&mut self.actor).await
-                    } else {
-                         break;
-                     }
+                    let Ok(mut message) = message else { return Err(Error::ActorDisconnected); };
+                    message.handle(&mut self.actor).await;
                 }
             }
         }
-        println!("actor stopping");
     }
 }

+ 12 - 9
tests/websocket.rs

@@ -1,6 +1,8 @@
 use async_trait::async_trait;
+use drama::actor::{Actor, ActorHandle};
+use drama::message::Handler;
 use drama::relay::{Relay, RelayActor};
-use drama::{Actor, ActorHandle, Error, Handler};
+use flume::Sender;
 use futures::stream::SplitStream;
 use futures::StreamExt;
 use parking_lot::RwLock;
@@ -12,11 +14,12 @@ use warp::Filter;
 
 struct WebsocketActor {
     hello: ActorHandle<Hello>,
+    tx: Sender<Message>,
 }
 
 impl WebsocketActor {
-    fn new(handle: ActorHandle<Hello>) -> Self {
-        Self { hello: handle }
+    fn new(handle: ActorHandle<Hello>, tx: Sender<Message>) -> Self {
+        Self { hello: handle, tx }
     }
 }
 
@@ -28,14 +31,14 @@ impl RelayActor<Message, SplitStream<WebSocket>> for WebsocketActor {
 
 #[async_trait]
 impl Relay<Message> for WebsocketActor {
-    async fn handle(&mut self, message: Message) -> Result<Option<Message>, Error> {
+    async fn process(&mut self, message: Message) -> Option<Message> {
         self.hello
             .send(crate::Msg {
                 _content: message.to_str().unwrap().to_owned(),
             })
             .unwrap_or_else(|e| println!("FUKEN HELL M8 {e}"));
-
-        Ok(Some(message))
+        self.tx.send(message.clone()).unwrap();
+        Some(message)
     }
 }
 
@@ -51,8 +54,8 @@ struct Msg {
 #[async_trait]
 impl Handler<Msg> for Hello {
     type Response = usize;
-    async fn handle(&mut self, _: Msg) -> Result<usize, Error> {
-        Ok(10)
+    async fn handle(&mut self, _: &Msg) -> usize {
+        10
     }
 }
 
@@ -80,7 +83,7 @@ async fn main() {
                     let (si, st) = socket.split();
                     let (tx, rx) = flume::unbounded();
 
-                    let actor = WebsocketActor::new(hello);
+                    let actor = WebsocketActor::new(hello, tx.clone());
                     let handle = actor.start_relay(st, tx);
                     tokio::spawn(rx.into_stream().map(Ok).forward(si));
                     let id = ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);