Browse Source

fix queue, finalise prototype

biblius 1 year ago
parent
commit
5ce0586528
6 changed files with 306 additions and 313 deletions
  1. 1 1
      Cargo.toml
  2. 54 21
      src/lib.rs
  3. 28 34
      src/message.rs
  4. 89 99
      src/runtime.rs
  5. 116 144
      src/ws.rs
  6. 18 14
      tests/websocket.rs

+ 1 - 1
Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 edition = "2021"
-name = "actors"
+name = "drama"
 version = "0.1.0"
 
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

+ 54 - 21
src/lib.rs

@@ -2,23 +2,44 @@ use crate::runtime::{ActorRuntime, Runtime};
 use async_trait::async_trait;
 use flume::{SendError, Sender};
 use message::{Envelope, Enveloper, Message, MessageRequest};
-use parking_lot::Mutex;
 use std::{fmt::Debug, sync::Arc};
 use tokio::sync::oneshot;
+use tokio::sync::Mutex;
+
 pub mod debug;
 pub mod message;
 pub mod runtime;
 pub mod ws;
 
-const DEFAULT_CHANNEL_CAPACITY: usize = 16;
+pub struct Hello {}
+
+impl Actor for Hello {}
+
+pub struct Msg {
+    pub content: String,
+}
+impl Message for Msg {
+    type Response = usize;
+}
+
+#[async_trait]
+impl Handler<Msg> for Hello {
+    async fn handle(_: Arc<Mutex<Self>>, _: Box<Msg>) -> Result<usize, Error> {
+        println!("Handling message Hello");
+        Ok(10)
+    }
+}
 
+const DEFAULT_CHANNEL_CAPACITY: usize = 128;
+
+#[async_trait]
 pub trait Actor {
-    fn start(self) -> ActorHandle<Self>
+    async fn start(self) -> ActorHandle<Self>
     where
         Self: Sized + Send + 'static,
     {
         println!("Starting actor");
-        ActorRuntime::run(Arc::new(Mutex::new(self)))
+        ActorRuntime::run(Arc::new(Mutex::new(self))).await
     }
 }
 
@@ -28,12 +49,12 @@ pub trait Handler<M>: Actor
 where
     M: Message,
 {
-    async fn handle(this: Arc<Mutex<Self>>, message: M) -> Result<M::Response, Error>;
+    async fn handle(this: Arc<Mutex<Self>>, message: Box<M>) -> Result<M::Response, Error>;
 }
 
 /// A handle to a spawned actor. Obtained when calling `start` on an [Actor] and is used to send messages
 /// to it.
-#[derive(Debug, Clone)]
+#[derive(Debug)]
 pub struct ActorHandle<A>
 where
     A: Actor,
@@ -42,6 +63,18 @@ where
     command_tx: Sender<ActorCommand>,
 }
 
+impl<A> Clone for ActorHandle<A>
+where
+    A: Actor,
+{
+    fn clone(&self) -> Self {
+        Self {
+            message_tx: self.message_tx.clone(),
+            command_tx: self.command_tx.clone(),
+        }
+    }
+}
+
 impl<A> ActorHandle<A>
 where
     A: Actor,
@@ -69,8 +102,8 @@ where
         Ok(MessageRequest { response_rx: rx })
     }
 
-    /// Send a message to the actor without waiting for any response, but still returning an
-    /// error if the channel is full.
+    /// Send a message to the actor without returning a response, but still returning an
+    /// error if the channel is full or disconnected.
     pub fn send<M>(&self, message: M) -> Result<(), SendError<M>>
     where
         M: Message + Send + 'static,
@@ -98,9 +131,9 @@ where
 
     pub fn recipient<M>(&self) -> Recipient<M>
     where
-        M: Message + Send + 'static,
+        M: Message + Send + 'static + Sync,
         M::Response: Send,
-        A: Handler<M> + 'static,
+        A: Handler<M> + Send + 'static,
     {
         Recipient {
             message_tx: Box::new(self.message_tx.clone()),
@@ -117,7 +150,7 @@ pub struct Recipient<M>
 where
     M: Message,
 {
-    message_tx: Box<dyn DynamicSender<M>>,
+    message_tx: Box<dyn MessageSender<M>>,
     command_tx: Sender<ActorCommand>,
 }
 
@@ -144,7 +177,7 @@ where
 
 /// A helper trait used solely by [Recipient]'s message channel to erase the actor type.
 /// This is achieved by implementing it on [Sender<Envelope<A>].
-trait DynamicSender<M>
+trait MessageSender<M>
 where
     M: Message + Send,
 {
@@ -153,7 +186,7 @@ where
     fn send(&self, message: M) -> Result<(), SendError<M>>;
 }
 
-impl<A, M> DynamicSender<M> for Sender<Envelope<A>>
+impl<A, M> MessageSender<M> for Sender<Envelope<A>>
 where
     M: Message + Send + 'static,
     M::Response: Send,
@@ -182,9 +215,9 @@ where
 
 impl<A, M> From<ActorHandle<A>> for Recipient<M>
 where
-    M: Message + Send + 'static,
+    M: Message + Send + 'static + Sync,
     M::Response: Send,
-    A: Actor + Handler<M> + Enveloper<A, M> + 'static,
+    A: Actor + Handler<M> + Enveloper<A, M> + Send + 'static,
 {
     /// Just calls `ActorHandler::recipient`, i.e. clones the underlying channels
     /// into the recipient and boxes the message one.
@@ -249,7 +282,7 @@ mod tests {
 
         #[async_trait]
         impl Handler<Foo> for Testor {
-            async fn handle(this: Arc<Mutex<Testor>>, _: Foo) -> Result<usize, Error> {
+            async fn handle(_: Arc<Mutex<Self>>, _: Box<Foo>) -> Result<usize, Error> {
                 println!("Handling Foo");
                 Ok(10)
             }
@@ -257,7 +290,7 @@ mod tests {
 
         #[async_trait]
         impl Handler<Bar> for Testor {
-            async fn handle(this: Arc<Mutex<Testor>>, _: Bar) -> Result<isize, Error> {
+            async fn handle(_: Arc<Mutex<Self>>, _: Box<Bar>) -> Result<isize, Error> {
                 for _ in 0..10_000 {
                     println!("Handling Bar");
                 }
@@ -268,7 +301,7 @@ mod tests {
         let mut res = 0;
         let mut res2 = 0;
 
-        let handle = Testor {}.start();
+        let handle = Testor {}.start().await;
         println!("HELLO WORLDS");
         for _ in 0..100 {
             res += handle.send_wait(Foo {}).unwrap().await.unwrap();
@@ -311,7 +344,7 @@ mod tests {
 
         #[async_trait]
         impl Handler<Foo> for Testor {
-            async fn handle(this: Arc<Mutex<Testor>>, _: Foo) -> Result<usize, Error> {
+            async fn handle(_: Arc<Mutex<Testor>>, _: Box<Foo>) -> Result<usize, Error> {
                 println!("INCREMENTING COUNT FOO");
                 COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                 Ok(10)
@@ -320,7 +353,7 @@ mod tests {
 
         #[async_trait]
         impl Handler<Bar> for Testor {
-            async fn handle(this: Arc<Mutex<Testor>>, _: Bar) -> Result<isize, Error> {
+            async fn handle(_: Arc<Mutex<Testor>>, _: Box<Bar>) -> Result<isize, Error> {
                 println!("INCREMENTING COUNT BAR");
                 COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                 Ok(10)
@@ -334,7 +367,7 @@ mod tests {
         let local_set = LocalSet::new();
 
         let task = async {
-            let handle = Testor {}.start();
+            let handle = Testor {}.start().await;
 
             handle.send_wait(Bar {}).unwrap().await.unwrap();
             handle.send(Foo {}).unwrap();

+ 28 - 34
src/message.rs

@@ -2,8 +2,8 @@ use std::sync::Arc;
 
 use crate::{Actor, Error, Handler};
 use async_trait::async_trait;
-use parking_lot::Mutex;
 use tokio::sync::oneshot;
+use tokio::sync::Mutex;
 
 /// Represents a message that can be sent to an actor. The response type is what the actor must return in its handler implementation.
 pub trait Message {
@@ -12,9 +12,9 @@ pub trait Message {
 
 /// Represents a type erased message that ultimately gets stored in an [Envelope]. We need this indirection so we can abstract away the concrete message
 /// type when creating an actor handle, otherwise we would only be able to send a single message type to the actor.
-#[async_trait(?Send)]
+#[async_trait]
 pub trait ActorMessage<A: Actor> {
-    async fn handle(&mut self, actor: Arc<Mutex<A>>);
+    async fn handle(self: Box<Self>, actor: Arc<Mutex<A>>);
 }
 
 /// Used by [ActorHandle][super::ActorHandle]s to pack [Message]s into [Envelope]s so we have a type erased message to send to the actor.
@@ -26,7 +26,7 @@ pub trait Enveloper<A: Actor, M: Message> {
 /// A type erased wrapper for messages. This wrapper essentially enables us to send any message to the actor
 /// so long as it implements the necessary handler.
 pub struct Envelope<A: Actor> {
-    message: Box<dyn ActorMessage<A> + Send>,
+    message: Box<dyn ActorMessage<A> + Send + Sync>,
 }
 
 impl<A> Envelope<A>
@@ -35,13 +35,13 @@ where
 {
     pub fn new<M>(message: M, tx: Option<oneshot::Sender<M::Response>>) -> Self
     where
-        A: Handler<M> + 'static,
-        M: Message + Send + 'static,
+        A: Handler<M> + Send + 'static,
+        M: Message + Send + 'static + Sync,
         M::Response: Send,
     {
         Self {
             message: Box::new(EnvelopeInner {
-                message: Some(message),
+                message: Some(Box::new(message)),
                 tx,
             }),
         }
@@ -51,30 +51,31 @@ where
 /// The inner parts of the [Envelope] containing the actual message as well as an optional
 /// response channel.
 struct EnvelopeInner<M: Message> {
-    message: Option<M>,
+    message: Option<Box<M>>,
     tx: Option<oneshot::Sender<M::Response>>,
 }
 
-#[async_trait(?Send)]
+#[async_trait]
 impl<A> ActorMessage<A> for Envelope<A>
 where
-    A: Actor,
+    A: Actor + Send,
 {
-    async fn handle(&mut self, actor: Arc<Mutex<A>>) {
-        self.message.handle(actor).await
+    async fn handle(self: Box<Self>, actor: Arc<Mutex<A>>) {
+        ActorMessage::handle(self.message, actor).await
     }
 }
 
-#[async_trait(?Send)]
+#[async_trait]
 impl<A, M> ActorMessage<A> for EnvelopeInner<M>
 where
-    M: Message,
-    A: Actor + Handler<M> + 'static,
+    M: Message + Send + Sync,
+    M::Response: Send,
+    A: Actor + Handler<M> + Send + 'static,
 {
-    async fn handle(&mut self, actor: Arc<Mutex<A>>) {
-        let message = self.message.take().expect("Message already processed");
+    async fn handle(self: Box<Self>, actor: Arc<Mutex<A>>) {
+        let Some(message) = self.message else { panic!("Handle already called") };
         let result = A::handle(actor, message).await;
-        if let Some(res_tx) = self.tx.take() {
+        if let Some(res_tx) = self.tx {
             let _ = res_tx.send(result.unwrap());
         }
     }
@@ -82,8 +83,8 @@ where
 
 impl<A, M> Enveloper<A, M> for A
 where
-    A: Actor + Handler<M> + 'static,
-    M: Message + Send + 'static,
+    A: Actor + Handler<M> + Send + 'static,
+    M: Message + Send + 'static + Sync,
     M::Response: Send,
 {
     fn pack(message: M, tx: Option<oneshot::Sender<M::Response>>) -> Envelope<A> {
@@ -102,22 +103,15 @@ impl<R> std::future::Future for MessageRequest<R> {
         mut self: std::pin::Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Self::Output> {
-        println!("Awaiting response");
         match self.as_mut().response_rx.try_recv() {
-            Ok(response) => {
-                println!("Future ready");
-                std::task::Poll::Ready(Ok(response))
-            }
-            Err(e) => {
-                println!("Future pending {e}");
-                match e {
-                    oneshot::error::TryRecvError::Empty => {
-                        cx.waker().wake_by_ref();
-                        std::task::Poll::Pending
-                    }
-                    oneshot::error::TryRecvError::Closed => std::task::Poll::Ready(Err(e.into())),
+            Ok(response) => std::task::Poll::Ready(Ok(response)),
+            Err(e) => match e {
+                oneshot::error::TryRecvError::Empty => {
+                    cx.waker().wake_by_ref();
+                    std::task::Poll::Pending
                 }
-            }
+                oneshot::error::TryRecvError::Closed => std::task::Poll::Ready(Err(e.into())),
+            },
         }
     }
 }

+ 89 - 99
src/runtime.rs

@@ -2,21 +2,23 @@ use crate::{
     message::ActorMessage, Actor, ActorCommand, ActorHandle, Envelope, Error,
     DEFAULT_CHANNEL_CAPACITY,
 };
+use async_trait::async_trait;
 use flume::Receiver;
 use futures::Future;
-use parking_lot::Mutex;
 use pin_project::pin_project;
 use std::{
     collections::VecDeque,
     pin::Pin,
     sync::Arc,
-    task::{Context, Poll},
+    task::{ready, Context, Poll},
 };
+use tokio::sync::Mutex;
 
-const DEFAULT_QUEUE_CAPACITY: usize = 16;
+const QUEUE_CAPACITY: usize = 128;
 
+#[async_trait]
 pub trait Runtime<A> {
-    fn run(actor: Arc<Mutex<A>>) -> ActorHandle<A>
+    async fn run(actor: Arc<Mutex<A>>) -> ActorHandle<A>
     where
         A: Actor + Send + 'static,
     {
@@ -40,6 +42,47 @@ where
 
 impl<A> Runtime<A> for ActorRuntime<A> where A: Actor + Send + 'static {}
 
+type ActorFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
+
+struct ActorJob<A>
+where
+    A: Actor,
+{
+    message: Option<Envelope<A>>,
+    future: Option<ActorFuture>,
+}
+
+impl<A> ActorJob<A>
+where
+    A: Actor + Send + 'static,
+{
+    fn new(message: Envelope<A>) -> Self {
+        Self {
+            message: Some(message),
+            future: None,
+        }
+    }
+
+    fn poll(
+        mut self: Pin<&mut Self>,
+        actor: Arc<Mutex<A>>,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<()> {
+        match self.as_mut().message.take() {
+            Some(message) => {
+                let fut = Box::new(message).handle(actor);
+                self.future = Some(fut);
+                ready!(self.future.as_mut().unwrap().as_mut().poll(cx));
+                Poll::Ready(())
+            }
+            None => {
+                ready!(self.future.as_mut().unwrap().as_mut().poll(cx));
+                Poll::Ready(())
+            }
+        }
+    }
+}
+
 impl<A> Future for ActorRuntime<A>
 where
     A: Actor + Send + 'static,
@@ -48,60 +91,52 @@ where
 
     fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
         let this = self.project();
-        loop {
-            // Poll command receiver
-            match Pin::new(&mut this.command_rx.recv_async()).poll(cx) {
-                Poll::Ready(Ok(message)) => match message {
-                    ActorCommand::Stop => {
-                        println!("Actor stopping");
-                        break Poll::Ready(Ok(())); // TODO drain the queue and all that graceful stuff
-                    }
-                },
-                Poll::Ready(Err(_)) => {
-                    println!("Command channel closed, ungracefully stopping actor");
-                    break Poll::Ready(Err(Error::ActorChannelClosed));
-                }
-                Poll::Pending => {}
-            };
-
-            // Process the pending futures
-            this.process_queue
-                .retain_mut(|job| job.poll(&mut this.actor.lock(), cx).is_pending());
-
-            // Poll message receiver
-            while let Poll::Ready(Ok(message)) =
-                Pin::new(&mut this.message_rx.recv_async()).poll(cx)
-            {
-                // this.process_queue.push_back(ActorJob::new(message));
-                if this.process_queue.len() >= DEFAULT_QUEUE_CAPACITY {
-                    break;
+
+        // Poll command receiver
+        match Pin::new(&mut this.command_rx.recv_async()).poll(cx) {
+            Poll::Ready(Ok(message)) => match message {
+                ActorCommand::Stop => {
+                    println!("Actor stopping");
+                    return Poll::Ready(Ok(())); // TODO drain the queue and all that graceful stuff
                 }
+            },
+            Poll::Ready(Err(_)) => {
+                println!("Command channel closed, ungracefully stopping actor");
+                return Poll::Ready(Err(Error::ActorChannelClosed));
             }
+            Poll::Pending => {}
+        };
+
+        // Process the pending futures
+        this.process_queue
+            .retain_mut(|job| Pin::new(job).poll(this.actor.clone(), cx).is_pending());
+
+        // Poll message receiver
+        while let Poll::Ready(Ok(message)) = Pin::new(&mut this.message_rx.recv_async()).poll(cx) {
+            this.process_queue.push_back(ActorJob::new(message));
+            if this.process_queue.len() >= QUEUE_CAPACITY {
+                break;
+            }
+        }
 
-            // Process pending futures again after we've potentially received some
-            this.process_queue
-                .retain_mut(|job| job.poll(&mut this.actor.lock(), cx).is_pending());
+        // Process pending futures again after we've potentially received some
+        this.process_queue
+            .retain_mut(|job| Pin::new(job).poll(this.actor.clone(), cx).is_pending());
 
-            // Poll again and process new messages if any
-            match Pin::new(&mut this.message_rx.recv_async()).poll(cx) {
-                Poll::Ready(Ok(message)) => {
-                    // this.process_queue.push_back(ActorJob::new(message));
-                    continue;
-                }
-                Poll::Ready(Err(_)) => {
-                    println!("Message channel closed, ungracefully stopping actor");
-                    break Poll::Ready(Err(Error::ActorChannelClosed));
-                }
-                Poll::Pending => {
-                    if !this.process_queue.is_empty() {
-                        continue;
-                    }
-                }
-            };
+        // Poll again and process new messages if any
+        match Pin::new(&mut this.message_rx.recv_async()).poll(cx) {
+            Poll::Ready(Ok(message)) => {
+                this.process_queue.push_back(ActorJob::new(message));
+            }
+            Poll::Ready(Err(_)) => {
+                println!("Message channel closed, ungracefully stopping actor");
+                return Poll::Ready(Err(Error::ActorChannelClosed));
+            }
+            Poll::Pending => {}
+        };
 
-            cx.waker().wake_by_ref();
-            return Poll::Pending;
-        }
+        cx.waker().wake_by_ref();
+        Poll::Pending
     }
 }
 
@@ -119,52 +154,7 @@ where
             actor,
             command_rx,
             message_rx,
-            process_queue: VecDeque::with_capacity(DEFAULT_QUEUE_CAPACITY),
+            process_queue: VecDeque::with_capacity(QUEUE_CAPACITY),
         }
     }
 }
-
-pub trait ActorFuture<A>
-where
-    A: Actor,
-{
-    type Output;
-    fn poll(
-        self: Pin<&mut Self>,
-        actor: &mut A,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<Self::Output>;
-}
-
-struct ActorJob<A>(pub Pin<Box<dyn ActorFuture<A, Output = ()> + Send + 'static>>);
-
-impl<A> ActorJob<A>
-where
-    A: Actor,
-{
-    fn new<F>(future: F) -> Self
-    where
-        F: ActorFuture<A, Output = ()> + Send + 'static,
-    {
-        Self(Box::pin(future))
-    }
-
-    fn poll(&mut self, actor: &mut A, cx: &mut std::task::Context<'_>) -> Poll<()> {
-        self.0.as_mut().poll(actor, cx)
-    }
-}
-
-/* impl<A> ActorFuture<A> for Envelope<A>
-where
-    A: Actor,
-{
-    type Output = ();
-
-    fn poll(
-        self: Pin<&mut Self>,
-        actor: &mut A,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<Self::Output> {
-        self.get_mut().handle(actor).as_mut().poll(cx)
-    }
-} */

+ 116 - 144
src/ws.rs

@@ -1,73 +1,80 @@
 use crate::{
-    message::{ActorMessage, Envelope},
-    runtime::Runtime,
-    Actor, ActorCommand, ActorHandle, ActorStatus, Error, Handler,
+    message::Envelope, runtime::Runtime, Actor, ActorCommand, ActorHandle, ActorStatus, Error,
+    Handler, Hello,
 };
 use async_trait::async_trait;
 use flume::Receiver;
 use futures::{
-    sink::Feed,
     stream::{SplitSink, SplitStream},
     Future, SinkExt, Stream, StreamExt,
 };
-use parking_lot::Mutex;
 use pin_project::pin_project;
 use std::{
     collections::VecDeque,
     pin::Pin,
     sync::atomic::AtomicUsize,
     task::{Context, Poll},
+    time::Duration,
 };
-use std::{sync::Arc, time::Duration};
+use std::{sync::Arc, task::ready};
+use tokio::sync::Mutex;
 use warp::ws::{Message, WebSocket};
 
+const WS_QUEUE_SIZE: usize = 128;
+
 pub struct WebsocketActor {
     websocket: Option<WebSocket>,
+    hello: ActorHandle<Hello>,
 }
 
 impl WebsocketActor {
-    pub fn new(ws: WebSocket) -> Self {
+    pub fn new(ws: WebSocket, handle: ActorHandle<Hello>) -> Self {
         Self {
             websocket: Some(ws),
+            hello: handle,
         }
     }
 }
 
+#[async_trait]
 impl Actor for WebsocketActor {
-    fn start(self) -> ActorHandle<Self> {
-        WebsocketRuntime::run(Arc::new(Mutex::new(self)))
+    async fn start(self) -> ActorHandle<Self> {
+        WebsocketRuntime::run(Arc::new(Mutex::new(self))).await
     }
 }
 type WsFuture = Pin<Box<dyn Future<Output = Result<Option<Message>, Error>> + Send>>;
 
 struct ActorItem {
-    message: Option<Message>,
+    message: Option<Box<Message>>,
     future: Option<WsFuture>,
 }
 
 impl ActorItem {
+    pub fn new(message: Message) -> Self {
+        Self {
+            message: Some(Box::new(message)),
+            future: None,
+        }
+    }
+
     fn poll(
         mut self: Pin<&mut Self>,
         actor: Arc<Mutex<WebsocketActor>>,
         cx: &mut std::task::Context<'_>,
     ) -> Poll<Result<Option<Message>, warp::Error>> {
-        //         let mut this = self.project();
         let message = self.as_mut().message.take();
 
         match message {
             Some(message) => {
                 let fut = WebsocketActor::handle(actor, message);
                 self.future = Some(fut);
-                // let Some(ref mut fut) = self.future else {panic!("Rust no longer works")};
-                match self.future.as_mut().unwrap().as_mut().poll(cx) {
-                    Poll::Ready(result) => match result {
-                        Ok(response) => Poll::Ready(Ok(response)),
-                        Err(e) => {
-                            println!("Shit's fucked son {e}");
-                            Poll::Ready(Ok(None))
-                        }
-                    },
-                    Poll::Pending => Poll::Pending,
+                let result = ready!(self.future.as_mut().unwrap().as_mut().poll(cx));
+                match result {
+                    Ok(response) => Poll::Ready(Ok(response)),
+                    Err(e) => {
+                        println!("Shit's fucked son {e}");
+                        Poll::Ready(Ok(None))
+                    }
                 }
             }
             None => match self.future {
@@ -79,59 +86,20 @@ impl ActorItem {
                             Poll::Ready(Ok(None))
                         }
                     },
-                    Poll::Pending => Poll::Pending,
-                },
-                None => Poll::Pending,
-            },
-        }
-    }
-}
-
-impl ActorItem {
-    pub fn new(message: Message) -> Self {
-        Self {
-            message: Some(message),
-            future: None,
-        }
-    }
-}
-/*
-trait WsActorFuture {
-    fn poll(
-        self: Pin<&mut Self>,
-        actor: &mut WebsocketActor,
-        sink: Pin<&mut SplitSink<WebSocket, Message>>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<Result<(), warp::Error>>;
-}
-
-impl WsActorFuture for Message {
-    fn poll(
-        mut self: Pin<&mut Self>,
-        actor: &mut WebsocketActor,
-        mut sink: Pin<&mut SplitSink<WebSocket, Message>>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<Result<(), warp::Error>> {
-        match actor.handle(self.as_mut().clone()).as_mut().poll(cx) {
-            Poll::Ready(result) => match result {
-                Ok(response) => {
-                    if let Some(response) = response {
-                        Pin::new(&mut sink.feed(response)).poll(cx)
-                    } else {
+                    Poll::Pending => {
+                        PENDING.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
+                        // println!("Websocket Future pending - COUNT {PENDING:?}");
                         Poll::Pending
                     }
-                }
-                Err(e) => {
-                    println!("Shit's fucked son {e}");
-                    Poll::Ready(Ok(()))
-                }
+                },
+                None => panic!("Impossibru"),
             },
-            Poll::Pending => Poll::Pending,
         }
     }
 }
- */
+
 static PROCESSED: AtomicUsize = AtomicUsize::new(0);
+static PENDING: AtomicUsize = AtomicUsize::new(0);
 
 #[pin_project]
 pub struct WebsocketRuntime {
@@ -164,13 +132,14 @@ pub struct WebsocketRuntime {
 }
 
 impl WebsocketRuntime {
-    pub fn new(
+    pub async fn new(
         actor: Arc<Mutex<WebsocketActor>>,
         command_rx: Receiver<ActorCommand>,
         message_rx: Receiver<Envelope<WebsocketActor>>,
     ) -> Self {
         let (ws_sink, ws_stream) = actor
             .lock()
+            .await
             .websocket
             .take()
             .expect("Websocket runtime already started")
@@ -196,28 +165,31 @@ impl Future for WebsocketRuntime {
     fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
         let mut this = self.project();
 
-        loop {
-            // Poll command receiver and immediatelly process it
-            match Pin::new(&mut this.command_rx.recv_async()).poll(cx) {
-                Poll::Ready(Ok(message)) => match message {
-                    ActorCommand::Stop => {
-                        println!("Actor stopping");
-                        break Poll::Ready(Ok(())); // TODO drain the queue and all that graceful stuff
+        // Poll command receiver and immediatelly process it
+        if let Poll::Ready(result) = Pin::new(&mut this.command_rx.recv_async()).poll(cx) {
+            match result {
+                Ok(command) => {
+                    match command {
+                        ActorCommand::Stop => {
+                            println!("Actor stopping");
+                            return Poll::Ready(Ok(())); // TODO drain the queue and all that graceful stuff
+                        }
                     }
-                },
-                Poll::Ready(Err(_)) => {
-                    println!("Actor stopping"); // TODO drain the queue and all that graceful stuff
-                    break Poll::Ready(Err(Error::ActorChannelClosed));
                 }
-                Poll::Pending => {}
-            };
+                Err(e) => {
+                    println!("Actor stopping - {e}"); // TODO drain the queue and all that graceful stuff
+                    return Poll::Ready(Err(Error::ActorChannelClosed));
+                }
+            }
+        };
 
-            // Poll the websocket stream for any messages and store them to the queue
+        // Poll the websocket stream for any messages and store them to the queue
+        if this.processing_queue.is_empty() {
             while let Poll::Ready(Some(ws_message)) = this.ws_stream.as_mut().poll_next(cx) {
                 match ws_message {
                     Ok(message) => {
                         this.processing_queue.push_back(ActorItem::new(message));
-                        if this.processing_queue.len() > 500 {
+                        if this.processing_queue.len() >= WS_QUEUE_SIZE {
                             break;
                         }
                     }
@@ -226,79 +198,70 @@ impl Future for WebsocketRuntime {
                     }
                 }
             }
+        }
 
-            let mut idx = 0;
-            //             let actor = &mut this.actor.lock();
-            while idx < this.processing_queue.len() {
-                let job = &mut this.processing_queue[idx];
-                match ActorItem::poll(Pin::new(job), this.actor.clone(), cx) {
-                    Poll::Ready(result) => match result {
-                        Ok(response) => {
-                            if let Some(response) = response {
-                                let feed = &mut this.ws_sink.feed(response);
-                                let mut feed = Pin::new(feed);
-                                while feed.as_mut().poll(cx).is_pending() {
-                                    let _ = feed.as_mut().poll(cx);
-                                }
-                            }
-                            PROCESSED.fetch_add(1, std::sync::atomic::Ordering::Acquire);
-                            this.processing_queue.swap_remove_front(idx);
-                        }
-                        Err(e) => {
-                            println!("Shit's fucked my dude {e}")
+        let mut idx = 0;
+        while idx < this.processing_queue.len() {
+            let job = Pin::new(&mut this.processing_queue[idx]);
+            match ActorItem::poll(job, this.actor.clone(), cx) {
+                Poll::Ready(result) => match result {
+                    Ok(response) => {
+                        if let Some(response) = response {
+                            let feed = &mut this.ws_sink.feed(response);
+                            let mut feed = Pin::new(feed);
+                            let _ = feed.as_mut().poll(cx);
                         }
-                    },
-                    Poll::Pending => idx += 1,
-                }
+                        PROCESSED.fetch_add(1, std::sync::atomic::Ordering::Acquire);
+                        this.processing_queue.swap_remove_front(idx);
+                    }
+                    Err(e) => {
+                        println!("Shit's fucked my dude {e}")
+                    }
+                },
+                Poll::Pending => idx += 1,
             }
+        }
 
-            println!(
-                "PROCESSED {} CURRENT IN QUEUE {}",
-                PROCESSED.load(std::sync::atomic::Ordering::Acquire),
-                this.processing_queue.len(),
-            );
+        // println!(
+        //     "PROCESSED {} CURRENT IN QUEUE {}",
+        //     PROCESSED.load(std::sync::atomic::Ordering::Acquire),
+        //     this.processing_queue.len(),
+        // );
 
-            let _ = Pin::new(&mut this.ws_sink.flush()).poll(cx);
+        let _ = Pin::new(&mut this.ws_sink.flush()).poll(cx);
 
-            // Process all messages
-            /*             this.message_queue
-            .retain_mut(|message| message.handle(actor).as_mut().poll(cx).is_pending()); */
+        // Process all messages
+        /*             this.message_queue
+        .retain_mut(|message| message.handle(actor).as_mut().poll(cx).is_pending()); */
+
+        // Poll message receiver and continue to process if anything comes up
+        while let Poll::Ready(Ok(message)) = Pin::new(&mut this.message_rx.recv_async()).poll(cx) {
+            this.message_queue.push_back(message);
+        }
 
-            // Poll message receiver and continue to process if anything comes up
-            while let Poll::Ready(Ok(message)) =
-                Pin::new(&mut this.message_rx.recv_async()).poll(cx)
-            {
+        // Poll again and process new messages if any
+        match Pin::new(&mut this.message_rx.recv_async()).poll(cx) {
+            Poll::Ready(Ok(message)) => {
                 this.message_queue.push_back(message);
             }
+            Poll::Ready(Err(_)) => {
+                println!("Message channel closed, ungracefully stopping actor");
+                return Poll::Ready(Err(Error::ActorChannelClosed));
+            }
+            Poll::Pending => {}
+        };
 
-            // Poll again and process new messages if any
-            match Pin::new(&mut this.message_rx.recv_async()).poll(cx) {
-                Poll::Ready(Ok(message)) => {
-                    this.message_queue.push_back(message);
-                    continue;
-                }
-                Poll::Ready(Err(_)) => {
-                    println!("Message channel closed, ungracefully stopping actor");
-                    break Poll::Ready(Err(Error::ActorChannelClosed));
-                }
-                Poll::Pending => {
-                    if !this.message_queue.is_empty() {
-                        continue;
-                    }
-                }
-            };
-
-            cx.waker().wake_by_ref();
-            return Poll::Pending;
-        }
+        cx.waker().wake_by_ref();
+        Poll::Pending
     }
 }
 
+#[async_trait]
 impl Runtime<WebsocketActor> for WebsocketRuntime {
-    fn run(actor: Arc<Mutex<WebsocketActor>>) -> ActorHandle<WebsocketActor> {
+    async fn run(actor: Arc<Mutex<WebsocketActor>>) -> ActorHandle<WebsocketActor> {
         let (message_tx, message_rx) = flume::unbounded();
         let (command_tx, command_rx) = flume::unbounded();
-        tokio::spawn(WebsocketRuntime::new(actor, command_rx, message_rx));
+        tokio::spawn(WebsocketRuntime::new(actor, command_rx, message_rx).await);
         ActorHandle::new(message_tx, command_tx)
     }
 }
@@ -311,12 +274,21 @@ impl crate::Message for Message {
 impl Handler<Message> for WebsocketActor {
     async fn handle(
         this: Arc<Mutex<Self>>,
-        message: Message,
+        message: Box<Message>,
     ) -> Result<<Message as crate::message::Message>::Response, crate::Error> {
-        println!("Actor received message {message:?}");
-        // tokio::time::sleep(Duration::from_micros(500)).await;
+        //let mut act = this.lock().await;
+        this.lock()
+            .await
+            .hello
+            .send(crate::Msg {
+                content: message.to_str().unwrap().to_owned(),
+            })
+            .unwrap_or_else(|e| println!("{e}"));
+        //        println!("Actor retreived lock and sent message got response {res}");
+        tokio::time::sleep(Duration::from_micros(1)).await;
+        //act.wait().await;
         if message.is_text() {
-            Ok(Some(message))
+            Ok(Some(*message.clone()))
         } else {
             Ok(None)
         }

+ 18 - 14
tests/websocket.rs

@@ -1,5 +1,5 @@
-use actors::ws::WebsocketActor;
-use actors::{Actor, ActorHandle};
+use drama::ws::WebsocketActor;
+use drama::{Actor, ActorHandle, Hello};
 use std::collections::HashMap;
 use std::sync::atomic::AtomicUsize;
 use std::sync::{Arc, RwLock};
@@ -14,22 +14,26 @@ async fn main() {
     let pool = Arc::new(RwLock::new(HashMap::new()));
     let pool = warp::any().map(move || pool.clone());
 
+    let hello = Hello {}.start().await;
+    let hello = warp::any().map(move || hello.clone());
     // GET /chat -> websocket upgrade
     let chat = warp::path("chat")
         // The `ws()` filter will prepare Websocket handshake...
         .and(warp::ws())
         .and(pool)
-        .map(|ws: warp::ws::Ws, pool: Arbiter| {
-            // This will call our function if the handshake succeeds.
-            ws.on_upgrade(move |socket| {
-                let actor = WebsocketActor::new(socket);
-                let handle = actor.start();
-                // let id = ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
-                // println!("Adding actor {id}");
-                pool.write().unwrap().insert(0, handle);
-                futures::future::ready(())
-            })
-        });
+        .and(hello)
+        .map(
+            |ws: warp::ws::Ws, pool: Arbiter, hello: ActorHandle<Hello>| {
+                // This will call our function if the handshake succeeds.
+                ws.on_upgrade(|socket| async move {
+                    let actor = WebsocketActor::new(socket, hello);
+                    let handle = actor.start().await;
+                    let id = ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+                    println!("Adding actor {id}");
+                    pool.write().unwrap().insert(id, handle);
+                })
+            },
+        );
 
     // GET / -> index html
 
@@ -69,8 +73,8 @@ static INDEX_HTML: &str = r#"<!DOCTYPE html>
         };
 
         ws.onmessage = function(msg) {
-            message(msg.data);
             num += 1;
+            message(msg.data);
         };
 
         ws.onclose = function() {