Browse Source

start adding actor status

biblius 1 year ago
parent
commit
9859e9c218
3 changed files with 20 additions and 8 deletions
  1. 10 1
      src/lib.rs
  2. 4 3
      src/message.rs
  3. 6 4
      src/ws.rs

+ 10 - 1
src/lib.rs

@@ -186,12 +186,21 @@ where
     A: Actor + Handler<M> + Enveloper<A, M> + 'static,
 {
     /// Just calls `ActorHandler::recipient`, i.e. clones the underlying channels
-    /// into the recipient.
+    /// into the recipient and boxes the message one.
     fn from(handle: ActorHandle<A>) -> Self {
         handle.recipient()
     }
 }
 
+#[derive(Debug, PartialEq, Eq)]
+pub enum ActorStatus {
+    Stopped = 0,
+    Starting = 1,
+    Running = 2,
+    Stopping = 3,
+    Idle = 4,
+}
+
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
     #[error("Actor channel closed")]

+ 4 - 3
src/message.rs

@@ -67,11 +67,12 @@ where
     A: Actor + Handler<M>,
 {
     fn handle(&mut self, actor: &mut A, cx: &mut std::task::Context<'_>) -> Poll<()> {
-        let Some(message) = self.message.take() else { panic!("Message already processed") };
+        let message = self.message.take().expect("Message already processed");
         match actor.handle(message).as_mut().poll(cx) {
             Poll::Ready(result) => {
-                let Some(res_tx) = self.tx.take() else { panic!("Message already processed") };
-                let _ = res_tx.send(result.unwrap());
+                if let Some(res_tx) = self.tx.take() {
+                    let _ = res_tx.send(result.unwrap());
+                }
                 Poll::Ready(())
             }
             Poll::Pending => Poll::Pending,

+ 6 - 4
src/ws.rs

@@ -1,13 +1,13 @@
 use crate::{
     message::{ActorMessage, Envelope},
     runtime::Runtime,
-    Actor, ActorCommand, ActorHandle, Error, Handler,
+    Actor, ActorCommand, ActorHandle, ActorStatus, Error, Handler,
 };
 use async_trait::async_trait;
 use flume::Receiver;
 use futures::{
     stream::{SplitSink, SplitStream},
-    Future, SinkExt, StreamExt,
+    Future, SinkExt, Stream, StreamExt,
 };
 use pin_project::pin_project;
 use std::{
@@ -46,6 +46,8 @@ static PROCESSED: AtomicUsize = AtomicUsize::new(0);
 pub struct WebsocketRuntime {
     actor: WebsocketActor,
 
+    status: ActorStatus,
+
     // Pin these 2 as we are polling them directly so we know they never move
     /// The receiving end of the websocket
     #[pin]
@@ -92,6 +94,7 @@ impl WebsocketRuntime {
             message_queue: VecDeque::new(),
             request_queue: VecDeque::new(),
             response_queue: VecDeque::new(),
+            status: ActorStatus::Starting,
         }
     }
 }
@@ -119,8 +122,7 @@ impl Future for WebsocketRuntime {
             };
 
             // Poll the websocket stream for any messages and store them to the queue
-            while let Poll::Ready(Some(ws_message)) = Pin::new(&mut this.ws_stream.next()).poll(cx)
-            {
+            while let Poll::Ready(Some(ws_message)) = this.ws_stream.as_mut().poll_next(cx) {
                 match ws_message {
                     Ok(message) => this.request_queue.push_back(message),
                     Err(e) => {