Browse Source

make actor message trait async, finish basic runtime

Josip Benko-Đaković 1 year ago
parent
commit
51e15557bf
5 changed files with 164 additions and 41 deletions
  1. 83 0
      .vscode/launch.json
  2. 64 3
      src/lib.rs
  3. 13 16
      src/message.rs
  4. 2 11
      src/runtime.rs
  5. 2 11
      src/ws.rs

+ 83 - 0
.vscode/launch.json

@@ -0,0 +1,83 @@
+{
+  // Use IntelliSense to learn about possible attributes.
+  // Hover to view descriptions of existing attributes.
+  // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
+  "version": "0.2.0",
+  "configurations": [
+    {
+      "type": "lldb",
+      "request": "launch",
+      "name": "Debug unit tests in library 'actors'",
+      "cargo": {
+        "args": [
+          "test",
+          "--no-run",
+          "--lib",
+          "--package=actors"
+        ],
+        "filter": {
+          "name": "actors",
+          "kind": "lib"
+        }
+      },
+      "args": [],
+      "cwd": "${workspaceFolder}"
+    },
+    {
+      "type": "lldb",
+      "request": "launch",
+      "name": "Debug executable 'test-ws'",
+      "cargo": {
+        "args": [
+          "build",
+          "--bin=test-ws",
+          "--package=actors"
+        ],
+        "filter": {
+          "name": "test-ws",
+          "kind": "bin"
+        }
+      },
+      "args": [],
+      "cwd": "${workspaceFolder}"
+    },
+    {
+      "type": "lldb",
+      "request": "launch",
+      "name": "Debug unit tests in executable 'test-ws'",
+      "cargo": {
+        "args": [
+          "test",
+          "--no-run",
+          "--bin=test-ws",
+          "--package=actors"
+        ],
+        "filter": {
+          "name": "test-ws",
+          "kind": "bin"
+        }
+      },
+      "args": [],
+      "cwd": "${workspaceFolder}"
+    },
+    {
+      "type": "lldb",
+      "request": "launch",
+      "name": "Debug integration test 'websocket'",
+      "cargo": {
+        "args": [
+          "test",
+          "--no-run",
+          "--test=websocket",
+          "--package=actors"
+        ],
+        "filter": {
+          "name": "websocket",
+          "kind": "test"
+        }
+      },
+      "args": [],
+      "cwd": "${workspaceFolder}"
+    }
+  ]
+}

+ 64 - 3
src/lib.rs

@@ -60,7 +60,7 @@ where
         M: Message + Send,
         A: Handler<M> + Enveloper<A, M>,
     {
-        if self.message_tx.is_full() {
+        if self.message_tx.is_full() || self.message_tx.is_disconnected() {
             return Err(SendError(message));
         }
         let (tx, rx) = oneshot::channel();
@@ -75,7 +75,7 @@ where
         M: Message + Send + 'static,
         A: Handler<M> + Enveloper<A, M> + 'static,
     {
-        if self.message_tx.is_full() {
+        if self.message_tx.is_full() || self.message_tx.is_disconnected() {
             return Err(SendError(message));
         }
         let _ = self.message_tx.send(A::pack(message, None));
@@ -219,10 +219,12 @@ pub enum ActorCommand {
 #[cfg(test)]
 mod tests {
 
+    use std::{sync::atomic::AtomicUsize, time::Duration};
+
     use super::*;
 
     #[tokio::test]
-    async fn it_works() {
+    async fn it_works_sync() {
         #[derive(Debug)]
         struct Testor {}
 
@@ -267,6 +269,9 @@ mod tests {
             res2 += handle.send_wait(Bar {}).unwrap().await.unwrap();
         }
 
+        handle.send(Foo {}).unwrap();
+        handle.send_forget(Bar {});
+
         let rec: Recipient<Foo> = handle.recipient();
         res += rec.send_wait(Foo {}).unwrap().await.unwrap();
         handle.send_cmd(ActorCommand::Stop).unwrap();
@@ -274,4 +279,60 @@ mod tests {
         assert_eq!(res, 1010);
         assert_eq!(res2, 1000);
     }
+
+    #[tokio::test]
+    async fn it_works_yolo() {
+        #[derive(Debug)]
+        struct Testor {}
+
+        #[derive(Debug)]
+        struct Foo {}
+
+        #[derive(Debug)]
+        struct Bar {}
+
+        impl Message for Foo {
+            type Response = usize;
+        }
+
+        impl Message for Bar {
+            type Response = isize;
+        }
+
+        impl Actor for Testor {}
+
+        static COUNT: AtomicUsize = AtomicUsize::new(0);
+
+        #[async_trait]
+        impl Handler<Foo> for Testor {
+            async fn handle(&mut self, _: Foo) -> Result<usize, Error> {
+                println!("INCREMENTING COUNT FOO");
+                COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+                Ok(10)
+            }
+        }
+
+        #[async_trait]
+        impl Handler<Bar> for Testor {
+            async fn handle(&mut self, _: Bar) -> Result<isize, Error> {
+                println!("INCREMENTING COUNT BAR");
+                COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+                Ok(10)
+            }
+        }
+
+        let handle = Testor {}.start();
+
+        handle.send_wait(Bar {}).unwrap().await.unwrap();
+        handle.send(Foo {}).unwrap();
+        handle.send_forget(Bar {});
+
+        for _ in 0..100 {
+            let _ = handle.send(Foo {});
+            handle.send_forget(Bar {});
+            tokio::time::sleep(Duration::from_micros(100)).await
+        }
+
+        assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 203);
+    }
 }

+ 13 - 16
src/message.rs

@@ -1,6 +1,5 @@
-use std::task::Poll;
-
 use crate::{Actor, Error, Handler};
+use async_trait::async_trait;
 use tokio::sync::oneshot;
 
 /// Represents a message that can be sent to an actor. The response type is what the actor must return in its handler implementation.
@@ -10,8 +9,9 @@ pub trait Message {
 
 /// Represents a type erased message that ultimately gets stored in an [Envelope]. We need this indirection so we can abstract away the concrete message
 /// type when creating an actor handle, otherwise we would only be able to send a single message type to the actor.
+#[async_trait(?Send)]
 pub trait ActorMessage<A: Actor> {
-    fn handle(&mut self, actor: &mut A, cx: &mut std::task::Context<'_>) -> Poll<()>;
+    async fn handle(&mut self, actor: &mut A);
 }
 
 /// Used by [ActorHandle][super::ActorHandle]s to pack [Message]s into [Envelope]s so we have a type erased message to send to the actor.
@@ -52,30 +52,27 @@ struct EnvelopeInner<M: Message> {
     tx: Option<oneshot::Sender<M::Response>>,
 }
 
+#[async_trait(?Send)]
 impl<A> ActorMessage<A> for Envelope<A>
 where
     A: Actor,
 {
-    fn handle(&mut self, actor: &mut A, cx: &mut std::task::Context<'_>) -> Poll<()> {
-        self.message.handle(actor, cx)
+    async fn handle(&mut self, actor: &mut A) {
+        self.message.handle(actor).await
     }
 }
 
+#[async_trait(?Send)]
 impl<A, M> ActorMessage<A> for EnvelopeInner<M>
 where
     M: Message,
     A: Actor + Handler<M>,
 {
-    fn handle(&mut self, actor: &mut A, cx: &mut std::task::Context<'_>) -> Poll<()> {
+    async fn handle(&mut self, actor: &mut A) {
         let message = self.message.take().expect("Message already processed");
-        match actor.handle(message).as_mut().poll(cx) {
-            Poll::Ready(result) => {
-                if let Some(res_tx) = self.tx.take() {
-                    let _ = res_tx.send(result.unwrap());
-                }
-                Poll::Ready(())
-            }
-            Poll::Pending => Poll::Pending,
+        let result = actor.handle(message).await;
+        if let Some(res_tx) = self.tx.take() {
+            let _ = res_tx.send(result.unwrap());
         }
     }
 }
@@ -104,9 +101,9 @@ impl<R> std::future::Future for MessageRequest<R> {
     ) -> std::task::Poll<Self::Output> {
         println!("Awaiting response");
         match self.as_mut().response_rx.try_recv() {
-            Ok(msg) => {
+            Ok(response) => {
                 println!("Future ready");
-                std::task::Poll::Ready(Ok(msg))
+                std::task::Poll::Ready(Ok(response))
             }
             Err(e) => {
                 println!("Future pending {e}");

+ 2 - 11
src/runtime.rs

@@ -61,17 +61,8 @@ where
             };
 
             // Process all pending messages
-            let mut idx = 0;
-            while idx < this.message_queue.len() {
-                let pending = &mut this.message_queue[idx];
-                match pending.handle(this.actor, cx) {
-                    Poll::Ready(_) => {
-                        this.message_queue.swap_remove_front(idx);
-                        continue;
-                    }
-                    Poll::Pending => idx += 1,
-                }
-            }
+            this.message_queue
+                .retain_mut(|message| message.handle(this.actor).as_mut().poll(cx).is_pending());
 
             // Poll message receiver and continue to process if anything comes up
             while let Poll::Ready(Ok(message)) =

+ 2 - 11
src/ws.rs

@@ -164,17 +164,8 @@ impl Future for WebsocketRuntime {
             let _ = Pin::new(&mut this.ws_sink.flush()).poll(cx);
 
             // Process all messages
-            let mut idx = 0;
-            while idx < this.message_queue.len() {
-                let pending = &mut this.message_queue[idx];
-                match pending.handle(this.actor, cx) {
-                    Poll::Ready(_) => {
-                        this.message_queue.swap_remove_front(idx);
-                        continue;
-                    }
-                    Poll::Pending => idx += 1,
-                }
-            }
+            this.message_queue
+                .retain_mut(|message| message.handle(this.actor).as_mut().poll(cx).is_pending());
 
             // Poll message receiver and continue to process if anything comes up
             while let Poll::Ready(Ok(message)) =