Преглед на файлове

add readme and tidy up exports

biblius преди 1 година
родител
ревизия
451631acf2
променени са 6 файла, в които са добавени 168 реда и са изтрити 88 реда
  1. 61 0
      README.md
  2. 61 4
      src/actor.rs
  3. 21 17
      src/lib.rs
  4. 21 9
      src/relay.rs
  5. 0 52
      src/runtime.rs
  6. 4 6
      tests/websocket.rs

+ 61 - 0
README.md

@@ -0,0 +1,61 @@
+# Drama
+
+A dramatically minimal actor library. Check out the test(s) for websocket usage examples.
+
+```rust
+#[derive(Debug)]
+struct Testor {
+  foos: usize,
+  bars: isize,
+}
+
+#[derive(Debug, Clone)]
+struct Foo {}
+
+#[derive(Debug, Clone)]
+struct Bar {}
+
+impl Actor for Testor {}
+
+#[async_trait]
+impl Handler<Foo> for Testor {
+  type Response = usize;
+  async fn handle(&mut self, _: &Foo) -> usize {
+      self.foos += 1;
+      10
+  }
+}
+    
+
+#[async_trait]
+impl Handler<Bar> for Testor {
+  type Response = isize;
+  async fn handle(&mut self, _: &Bar) -> isize {
+      self.bars += 1;
+      if self.foos == 100 {
+        assert_eq!(self.bars, 100);
+      }
+      10
+  }
+}
+
+let mut res = 0;
+let mut res2 = 0;
+
+let handle = Testor { foos: 0, bars: 0 }.start();
+
+for _ in 0..100 {
+  res += handle.send_wait(Foo {}).await.unwrap();
+  res2 += handle.send_wait(Bar {}).await.unwrap();
+}
+
+handle.send(Foo {}).unwrap();
+handle.send_forget(Bar {});
+
+let rec: Recipient<Foo> = handle.recipient();
+rec.send(Foo {}).unwrap();
+handle.send_cmd(ActorCommand::Stop).unwrap();
+
+assert_eq!(res, 1000);
+assert_eq!(res2, 1000);
+```

+ 61 - 4
src/actor.rs

@@ -1,16 +1,17 @@
 use crate::{
-    message::{BoxedActorMessage, Enveloper, Handler, MailboxSender, MessageRequest},
-    runtime::ActorRuntime,
+    message::{
+        BoxedActorMessage, Enveloper, Handler, MailboxReceiver, MailboxSender, MessageRequest,
+    },
     ActorCommand,
 };
-
+use flume::Receiver;
 use flume::{SendError, Sender};
 use std::fmt::Debug;
 use tokio::sync::oneshot;
 
 pub trait Actor: Sized + Send + Sync + 'static {
     fn start(self) -> ActorHandle<Self> {
-        println!("Starting actor");
+        tracing::trace!("Starting actor");
         let (message_tx, message_rx) = flume::unbounded();
         let (command_tx, command_rx) = flume::unbounded();
         tokio::spawn(ActorRuntime::new(self, command_rx, message_rx).run());
@@ -168,3 +169,59 @@ where
         handle.recipient()
     }
 }
+
+/// The default actor runtime. Spins and listens for incoming commands
+/// and messages.
+struct ActorRuntime<A>
+where
+    A: Actor,
+{
+    actor: A,
+    commands: Receiver<ActorCommand>,
+    mailbox: MailboxReceiver<A>,
+}
+
+impl<A> ActorRuntime<A>
+where
+    A: Actor,
+{
+    pub fn new(
+        actor: A,
+        command_rx: Receiver<ActorCommand>,
+        message_rx: MailboxReceiver<A>,
+    ) -> Self {
+        tracing::trace!("Building default runtime");
+        Self {
+            actor,
+            commands: command_rx,
+            mailbox: message_rx,
+        }
+    }
+
+    pub async fn run(mut self) {
+        loop {
+            // Only time we error here is when we disconnect
+            tokio::select! {
+                command = self.commands.recv_async() => {
+                    let Ok(command) = command else {
+                        tracing::trace!("Actor channel disconnected, stopping");
+                        return;
+                    };
+                    match command {
+                        ActorCommand::Stop => {
+                            tracing::trace!("Actor command received, stopping");
+                            return
+                        },
+                    }
+                }
+                message = self.mailbox.recv_async() => {
+                    let Ok(mut message) = message else {
+                        tracing::trace!("Actor channel disconnected, stopping");
+                        return;
+                    };
+                    message.handle(&mut self.actor).await;
+                }
+            }
+        }
+    }
+}

+ 21 - 17
src/lib.rs

@@ -1,10 +1,13 @@
 use std::fmt::Debug;
 use tokio::sync::oneshot;
 
-pub mod actor;
-pub mod message;
-pub mod relay;
-pub mod runtime;
+mod actor;
+mod message;
+mod relay;
+
+pub use actor::{Actor, ActorHandle, Recipient};
+pub use message::Handler;
+pub use relay::{Relay, RelayActor};
 
 #[derive(Debug)]
 pub enum ActorCommand {
@@ -13,10 +16,6 @@ pub enum ActorCommand {
 
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
-    #[error("Actor channel closed")]
-    ActorDisconnected,
-    #[error("Relay stream closed")]
-    RelayDisconnected,
     #[error("Channel closed: {0}")]
     ChannelClosed(#[from] oneshot::error::TryRecvError),
 }
@@ -35,7 +34,10 @@ mod tests {
     #[tokio::test]
     async fn it_works_sync() {
         #[derive(Debug)]
-        struct Testor {}
+        struct Testor {
+            foos: usize,
+            bars: isize,
+        }
 
         #[derive(Debug, Clone)]
         struct Foo {}
@@ -49,7 +51,8 @@ mod tests {
         impl Handler<Foo> for Testor {
             type Response = usize;
             async fn handle(&mut self, _: &Foo) -> usize {
-                println!("Handling Foo");
+                tracing::trace!("Handling Foo");
+                self.foos += 1;
                 10
             }
         }
@@ -58,8 +61,10 @@ mod tests {
         impl Handler<Bar> for Testor {
             type Response = isize;
             async fn handle(&mut self, _: &Bar) -> isize {
-                for _ in 0..10_000 {
-                    println!("Handling Bar");
+                tracing::trace!("Handling Bar");
+                self.bars += 1;
+                if self.foos == 100 {
+                    assert_eq!(self.bars, 100);
                 }
                 10
             }
@@ -68,8 +73,8 @@ mod tests {
         let mut res = 0;
         let mut res2 = 0;
 
-        let handle = Testor {}.start();
-        println!("HELLO WORLDS");
+        let handle = Testor { foos: 0, bars: 0 }.start();
+        tracing::trace!("HELLO WORLDS");
         for _ in 0..100 {
             res += handle.send_wait(Foo {}).await.unwrap();
             res2 += handle.send_wait(Bar {}).await.unwrap();
@@ -77,7 +82,6 @@ mod tests {
 
         handle.send(Foo {}).unwrap();
         handle.send_forget(Bar {});
-        handle.send_cmd(ActorCommand::Stop).unwrap();
 
         let rec: Recipient<Foo> = handle.recipient();
         rec.send(Foo {}).unwrap();
@@ -106,7 +110,7 @@ mod tests {
         impl Handler<Foo> for Testor {
             type Response = Result<usize, Error>;
             async fn handle(&mut self, _: &Foo) -> Result<usize, Error> {
-                println!("INCREMENTING COUNT FOO");
+                tracing::trace!("INCREMENTING COUNT FOO");
                 COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                 Ok(10)
             }
@@ -116,7 +120,7 @@ mod tests {
         impl Handler<Bar> for Testor {
             type Response = Result<isize, Error>;
             async fn handle(&mut self, _: &Bar) -> Result<isize, Error> {
-                println!("INCREMENTING COUNT BAR");
+                tracing::trace!("INCREMENTING COUNT BAR");
                 COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                 Ok(10)
             }

+ 21 - 9
src/relay.rs

@@ -1,7 +1,7 @@
 use crate::{
     actor::{Actor, ActorHandle},
     message::MailboxReceiver,
-    ActorCommand, Error,
+    ActorCommand,
 };
 use async_trait::async_trait;
 use flume::{Receiver, Sender};
@@ -57,7 +57,7 @@ pub trait Relay<M>: Actor {
     async fn process(&mut self, message: M) -> Option<M>;
 }
 
-pub struct RelayRuntime<A, M, Str>
+struct RelayRuntime<A, M, Str>
 where
     A: RelayActor<M, Str> + Relay<M>,
     Str: Stream<Item = Result<M, A::Error>> + Unpin + Send + 'static,
@@ -103,30 +103,42 @@ where
         }
     }
 
-    pub async fn run(mut self) -> Result<(), Error> {
+    pub async fn run(mut self) {
         loop {
             // Only time we error here is when we disconnect, stream errors are just logged
             tokio::select! {
                 command = self.commands.recv_async() => {
-                    let Ok(command) = command else { return Err(Error::ActorDisconnected); };
+                    let Ok(command) = command else {
+                        tracing::trace!("Actor channel disconnected, stopping");
+                        return;
+                    };
                     match command {
                         ActorCommand::Stop => {
-                            tracing::trace!("Relay actor stopping");
-                            return Ok(())
+                            tracing::trace!("Actor command received, stopping");
+                            return;
                         },
                     }
                 }
                 message = self.mailbox.recv_async() => {
-                    let Ok(mut message) = message else { return Err(Error::ActorDisconnected); };
+                    let Ok(mut message) = message else {
+                        tracing::trace!("Actor channel disconnected, stopping");
+                        return;
+                     };
                     message.handle(&mut self.actor).await;
                 }
                 ws_msg = self.stream.next() => {
-                    let Some(ws_msg) = ws_msg else { return Err(Error::RelayDisconnected) };
+                    let Some(ws_msg) = ws_msg else {
+                        tracing::trace!("Actor stream finished, stopping");
+                        return;
+                     };
                     match ws_msg {
                         Ok(msg) => {
                             let res = self.actor.process(msg).await;
                             if let Some(res) = res {
-                                let Ok(_) = self.sender.send_async(res).await else { return Err(Error::RelayDisconnected) };
+                                let Ok(_) = self.sender.send_async(res).await else {
+                                    tracing::trace!("Actor stream finished, stopping");
+                                    return;
+                                };
                             }
                         },
                         Err(e) => {

+ 0 - 52
src/runtime.rs

@@ -1,52 +0,0 @@
-use crate::{actor::Actor, message::MailboxReceiver, ActorCommand, Error};
-use flume::Receiver;
-
-/// The default actor runtime. Spins and listens for incoming commands
-/// and messages.
-pub struct ActorRuntime<A>
-where
-    A: Actor,
-{
-    actor: A,
-    commands: Receiver<ActorCommand>,
-    mailbox: MailboxReceiver<A>,
-}
-
-impl<A> ActorRuntime<A>
-where
-    A: Actor,
-{
-    pub fn new(
-        actor: A,
-        command_rx: Receiver<ActorCommand>,
-        message_rx: MailboxReceiver<A>,
-    ) -> Self {
-        println!("Building default runtime");
-        Self {
-            actor,
-            commands: command_rx,
-            mailbox: message_rx,
-        }
-    }
-
-    pub async fn run(mut self) -> Result<(), Error> {
-        loop {
-            // Only time we error here is when we disconnect
-            tokio::select! {
-                command = self.commands.recv_async() => {
-                    let Ok(command) = command else { return Err(Error::ActorDisconnected); };
-                    match command {
-                        ActorCommand::Stop => {
-                            tracing::trace!("Relay actor stopping");
-                            return Ok(())
-                        },
-                    }
-                }
-                message = self.mailbox.recv_async() => {
-                    let Ok(mut message) = message else { return Err(Error::ActorDisconnected); };
-                    message.handle(&mut self.actor).await;
-                }
-            }
-        }
-    }
-}

+ 4 - 6
tests/websocket.rs

@@ -1,7 +1,5 @@
 use async_trait::async_trait;
-use drama::actor::{Actor, ActorHandle};
-use drama::message::Handler;
-use drama::relay::{Relay, RelayActor};
+use drama::{Actor, ActorHandle, Handler, Relay, RelayActor};
 use flume::Sender;
 use futures::stream::SplitStream;
 use futures::StreamExt;
@@ -36,7 +34,7 @@ impl Relay<Message> for WebsocketActor {
             .send(crate::Msg {
                 _content: message.to_str().unwrap().to_owned(),
             })
-            .unwrap_or_else(|e| println!("FUKEN HELL M8 {e}"));
+            .unwrap_or_else(|e| tracing::trace!("FUKEN HELL M8 {e}"));
         self.tx.send(message.clone()).unwrap();
         Some(message)
     }
@@ -87,7 +85,7 @@ async fn main() {
                     let handle = actor.start_relay(st, tx);
                     tokio::spawn(rx.into_stream().map(Ok).forward(si));
                     let id = ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
-                    println!("Adding actor {id}");
+                    tracing::trace!("Adding actor {id}");
                     pool.write().insert(id, handle);
                 })
             },
@@ -143,7 +141,7 @@ static INDEX_HTML: &str = r#"<!DOCTYPE html>
         send.onclick = function() {
             const msg = text.value;
             let i = 0;
-             while (i < 100000) {
+             while (i < 10000) {
                 ws.send(msg);
                  i += 1;
              }