Browse Source

fix message dropping when sink is overloaded

Josip Benko-Đaković 1 year ago
parent
commit
69d2ed0641
2 changed files with 9 additions and 9 deletions
  1. 7 6
      src/ws.rs
  2. 2 3
      tests/websocket.rs

+ 7 - 6
src/ws.rs

@@ -128,10 +128,12 @@ where
                         if let Some(response) = response {
                             let feed = &mut this.ws_sink.feed(response);
                             let mut feed = Pin::new(feed);
-                            let _ = feed.as_mut().poll(cx);
+                            while feed.as_mut().poll(cx).is_pending() {
+                                // Yikes, but too dumb to figure out a better solution
+                                cx.waker().wake_by_ref();
+                            }
                         }
-
-                        PROCESSED.fetch_add(1, std::sync::atomic::Ordering::Acquire);
+                        PROCESSED.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
                         this.response_queue.swap_remove_front(idx);
                     }
                     Err(e) => {
@@ -141,7 +143,6 @@ where
                 Poll::Pending => idx += 1,
             }
         }
-
         this.process_messages(cx)?;
 
         println!(
@@ -202,7 +203,7 @@ where
     A: Handler<M>,
 {
     message: Option<Box<M>>,
-    future: Option<WsFuture<M, A>>,
+    future: Option<WsFuture<A, M>>,
     __a: PhantomData<A>,
 }
 
@@ -253,7 +254,7 @@ where
     }
 }
 
-type WsFuture<M, A> =
+type WsFuture<A, M> =
     Pin<Box<dyn Future<Output = Result<<A as Handler<M>>::Response, Error>> + Send>>;
 
 static PROCESSED: AtomicUsize = AtomicUsize::new(0);

+ 2 - 3
tests/websocket.rs

@@ -58,7 +58,7 @@ impl Handler<Message> for WebsocketActor {
             .send(crate::Msg {
                 _content: message.to_str().unwrap().to_owned(),
             })
-            .unwrap_or_else(|e| println!("{e}"));
+            .unwrap_or_else(|e| println!("FUKEN HELL M8 {e}"));
 
         Ok(Some(*message.clone()))
     }
@@ -76,7 +76,6 @@ struct Msg {
 impl Handler<Msg> for Hello {
     type Response = usize;
     async fn handle(_: Arc<Mutex<Self>>, _: Box<Msg>) -> Result<usize, Error> {
-        println!("Handling message Hello");
         Ok(10)
     }
 }
@@ -161,7 +160,7 @@ static INDEX_HTML: &str = r#"<!DOCTYPE html>
                 ws.send(msg);
                 i += 1;
             }
-            text.value = '';
+            // text.value = '';
 
             message('<You>: ' + msg);
         };