|
@@ -6,9 +6,11 @@ use crate::{
|
|
|
use async_trait::async_trait;
|
|
|
use flume::Receiver;
|
|
|
use futures::{
|
|
|
+ sink::Feed,
|
|
|
stream::{SplitSink, SplitStream},
|
|
|
Future, SinkExt, Stream, StreamExt,
|
|
|
};
|
|
|
+use parking_lot::Mutex;
|
|
|
use pin_project::pin_project;
|
|
|
use std::{
|
|
|
collections::VecDeque,
|
|
@@ -16,46 +18,134 @@ use std::{
|
|
|
sync::atomic::AtomicUsize,
|
|
|
task::{Context, Poll},
|
|
|
};
|
|
|
-use warp::ws::WebSocket;
|
|
|
+use std::{sync::Arc, time::Duration};
|
|
|
+use warp::ws::{Message, WebSocket};
|
|
|
|
|
|
pub struct WebsocketActor {
|
|
|
websocket: Option<WebSocket>,
|
|
|
}
|
|
|
|
|
|
+impl WebsocketActor {
|
|
|
+ pub fn new(ws: WebSocket) -> Self {
|
|
|
+ Self {
|
|
|
+ websocket: Some(ws),
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
impl Actor for WebsocketActor {
|
|
|
- fn start(self) -> ActorHandle<Self>
|
|
|
- where
|
|
|
- Self: Sized + Send + 'static,
|
|
|
- {
|
|
|
- println!("Starting websocket actor");
|
|
|
- WebsocketRuntime::run(self)
|
|
|
+ fn start(self) -> ActorHandle<Self> {
|
|
|
+ WebsocketRuntime::run(Arc::new(Mutex::new(self)))
|
|
|
}
|
|
|
}
|
|
|
+type WsFuture = Pin<Box<dyn Future<Output = Result<Option<Message>, Error>> + Send>>;
|
|
|
|
|
|
-impl WebsocketActor {
|
|
|
- pub fn new(ws: WebSocket) -> Self {
|
|
|
+struct ActorItem {
|
|
|
+ message: Option<Message>,
|
|
|
+ future: Option<WsFuture>,
|
|
|
+}
|
|
|
+
|
|
|
+impl ActorItem {
|
|
|
+ fn poll(
|
|
|
+ mut self: Pin<&mut Self>,
|
|
|
+ actor: Arc<Mutex<WebsocketActor>>,
|
|
|
+ cx: &mut std::task::Context<'_>,
|
|
|
+ ) -> Poll<Result<Option<Message>, warp::Error>> {
|
|
|
+ // let mut this = self.project();
|
|
|
+ let message = self.as_mut().message.take();
|
|
|
+
|
|
|
+ match message {
|
|
|
+ Some(message) => {
|
|
|
+ let fut = WebsocketActor::handle(actor, message);
|
|
|
+ self.future = Some(fut);
|
|
|
+ // let Some(ref mut fut) = self.future else {panic!("Rust no longer works")};
|
|
|
+ match self.future.as_mut().unwrap().as_mut().poll(cx) {
|
|
|
+ Poll::Ready(result) => match result {
|
|
|
+ Ok(response) => Poll::Ready(Ok(response)),
|
|
|
+ Err(e) => {
|
|
|
+ println!("Shit's fucked son {e}");
|
|
|
+ Poll::Ready(Ok(None))
|
|
|
+ }
|
|
|
+ },
|
|
|
+ Poll::Pending => Poll::Pending,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ None => match self.future {
|
|
|
+ Some(ref mut fut) => match fut.as_mut().poll(cx) {
|
|
|
+ Poll::Ready(result) => match result {
|
|
|
+ Ok(response) => Poll::Ready(Ok(response)),
|
|
|
+ Err(e) => {
|
|
|
+ println!("Shit's fucked son {e}");
|
|
|
+ Poll::Ready(Ok(None))
|
|
|
+ }
|
|
|
+ },
|
|
|
+ Poll::Pending => Poll::Pending,
|
|
|
+ },
|
|
|
+ None => Poll::Pending,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl ActorItem {
|
|
|
+ pub fn new(message: Message) -> Self {
|
|
|
Self {
|
|
|
- websocket: Some(ws),
|
|
|
+ message: Some(message),
|
|
|
+ future: None,
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+/*
|
|
|
+trait WsActorFuture {
|
|
|
+ fn poll(
|
|
|
+ self: Pin<&mut Self>,
|
|
|
+ actor: &mut WebsocketActor,
|
|
|
+ sink: Pin<&mut SplitSink<WebSocket, Message>>,
|
|
|
+ cx: &mut std::task::Context<'_>,
|
|
|
+ ) -> Poll<Result<(), warp::Error>>;
|
|
|
+}
|
|
|
|
|
|
+impl WsActorFuture for Message {
|
|
|
+ fn poll(
|
|
|
+ mut self: Pin<&mut Self>,
|
|
|
+ actor: &mut WebsocketActor,
|
|
|
+ mut sink: Pin<&mut SplitSink<WebSocket, Message>>,
|
|
|
+ cx: &mut std::task::Context<'_>,
|
|
|
+ ) -> Poll<Result<(), warp::Error>> {
|
|
|
+ match actor.handle(self.as_mut().clone()).as_mut().poll(cx) {
|
|
|
+ Poll::Ready(result) => match result {
|
|
|
+ Ok(response) => {
|
|
|
+ if let Some(response) = response {
|
|
|
+ Pin::new(&mut sink.feed(response)).poll(cx)
|
|
|
+ } else {
|
|
|
+ Poll::Pending
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Err(e) => {
|
|
|
+ println!("Shit's fucked son {e}");
|
|
|
+ Poll::Ready(Ok(()))
|
|
|
+ }
|
|
|
+ },
|
|
|
+ Poll::Pending => Poll::Pending,
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+ */
|
|
|
static PROCESSED: AtomicUsize = AtomicUsize::new(0);
|
|
|
|
|
|
#[pin_project]
|
|
|
pub struct WebsocketRuntime {
|
|
|
- actor: WebsocketActor,
|
|
|
+ actor: Arc<Mutex<WebsocketActor>>,
|
|
|
|
|
|
status: ActorStatus,
|
|
|
|
|
|
- // Pin these 2 as we are polling them directly so we know they never move
|
|
|
/// The receiving end of the websocket
|
|
|
#[pin]
|
|
|
ws_stream: SplitStream<WebSocket>,
|
|
|
|
|
|
/// The sending end of the websocket
|
|
|
#[pin]
|
|
|
- ws_sink: SplitSink<WebSocket, warp::ws::Message>,
|
|
|
+ ws_sink: SplitSink<WebSocket, Message>,
|
|
|
|
|
|
/// Actor message receiver
|
|
|
message_rx: Receiver<Envelope<WebsocketActor>>,
|
|
@@ -67,19 +157,20 @@ pub struct WebsocketRuntime {
|
|
|
message_queue: VecDeque<Envelope<WebsocketActor>>,
|
|
|
|
|
|
/// Received, but not yet processed websocket messages
|
|
|
- request_queue: VecDeque<warp::ws::Message>,
|
|
|
+ processing_queue: VecDeque<ActorItem>,
|
|
|
|
|
|
- /// Processed websocket messages ready to be flushed in the sink
|
|
|
- response_queue: VecDeque<warp::ws::Message>,
|
|
|
+ /// Processed websocket messages being flushed to the sink
|
|
|
+ response_queue: VecDeque<Message>,
|
|
|
}
|
|
|
|
|
|
impl WebsocketRuntime {
|
|
|
pub fn new(
|
|
|
- mut actor: WebsocketActor,
|
|
|
+ actor: Arc<Mutex<WebsocketActor>>,
|
|
|
command_rx: Receiver<ActorCommand>,
|
|
|
message_rx: Receiver<Envelope<WebsocketActor>>,
|
|
|
) -> Self {
|
|
|
let (ws_sink, ws_stream) = actor
|
|
|
+ .lock()
|
|
|
.websocket
|
|
|
.take()
|
|
|
.expect("Websocket runtime already started")
|
|
@@ -92,7 +183,7 @@ impl WebsocketRuntime {
|
|
|
message_rx,
|
|
|
command_rx,
|
|
|
message_queue: VecDeque::new(),
|
|
|
- request_queue: VecDeque::new(),
|
|
|
+ processing_queue: VecDeque::new(),
|
|
|
response_queue: VecDeque::new(),
|
|
|
status: ActorStatus::Starting,
|
|
|
}
|
|
@@ -106,7 +197,7 @@ impl Future for WebsocketRuntime {
|
|
|
let mut this = self.project();
|
|
|
|
|
|
loop {
|
|
|
- // Poll command receiver
|
|
|
+ // Poll command receiver and immediatelly process it
|
|
|
match Pin::new(&mut this.command_rx.recv_async()).poll(cx) {
|
|
|
Poll::Ready(Ok(message)) => match message {
|
|
|
ActorCommand::Stop => {
|
|
@@ -124,48 +215,54 @@ impl Future for WebsocketRuntime {
|
|
|
// Poll the websocket stream for any messages and store them to the queue
|
|
|
while let Poll::Ready(Some(ws_message)) = this.ws_stream.as_mut().poll_next(cx) {
|
|
|
match ws_message {
|
|
|
- Ok(message) => this.request_queue.push_back(message),
|
|
|
+ Ok(message) => {
|
|
|
+ this.processing_queue.push_back(ActorItem::new(message));
|
|
|
+ if this.processing_queue.len() > 500 {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
Err(e) => {
|
|
|
eprintln!("WS error occurred {e}")
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Respond to any queued and processed websocket messages
|
|
|
let mut idx = 0;
|
|
|
- while idx < this.request_queue.len() {
|
|
|
- let ws_message = &this.request_queue[idx];
|
|
|
- match this.actor.handle(ws_message.to_owned()).as_mut().poll(cx) {
|
|
|
+ // let actor = &mut this.actor.lock();
|
|
|
+ while idx < this.processing_queue.len() {
|
|
|
+ let job = &mut this.processing_queue[idx];
|
|
|
+ match ActorItem::poll(Pin::new(job), this.actor.clone(), cx) {
|
|
|
Poll::Ready(result) => match result {
|
|
|
Ok(response) => {
|
|
|
if let Some(response) = response {
|
|
|
- match Pin::new(&mut this.ws_sink.feed(response)).poll(cx) {
|
|
|
- Poll::Ready(result) => {
|
|
|
- result?;
|
|
|
- this.request_queue.swap_remove_front(idx);
|
|
|
- PROCESSED
|
|
|
- .fetch_add(1, std::sync::atomic::Ordering::Acquire);
|
|
|
- }
|
|
|
- Poll::Pending => idx += 1,
|
|
|
+ let feed = &mut this.ws_sink.feed(response);
|
|
|
+ let mut feed = Pin::new(feed);
|
|
|
+ while feed.as_mut().poll(cx).is_pending() {
|
|
|
+ let _ = feed.as_mut().poll(cx);
|
|
|
}
|
|
|
}
|
|
|
+ PROCESSED.fetch_add(1, std::sync::atomic::Ordering::Acquire);
|
|
|
+ this.processing_queue.swap_remove_front(idx);
|
|
|
+ }
|
|
|
+ Err(e) => {
|
|
|
+ println!("Shit's fucked my dude {e}")
|
|
|
}
|
|
|
- Err(e) => return Poll::Ready(Err(e)),
|
|
|
},
|
|
|
Poll::Pending => idx += 1,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
println!(
|
|
|
- "PROCESSED {}",
|
|
|
- PROCESSED.load(std::sync::atomic::Ordering::Acquire)
|
|
|
+ "PROCESSED {} CURRENT IN QUEUE {}",
|
|
|
+ PROCESSED.load(std::sync::atomic::Ordering::Acquire),
|
|
|
+ this.processing_queue.len(),
|
|
|
);
|
|
|
|
|
|
let _ = Pin::new(&mut this.ws_sink.flush()).poll(cx);
|
|
|
|
|
|
// Process all messages
|
|
|
- this.message_queue
|
|
|
- .retain_mut(|message| message.handle(this.actor).as_mut().poll(cx).is_pending());
|
|
|
+ /* this.message_queue
|
|
|
+ .retain_mut(|message| message.handle(actor).as_mut().poll(cx).is_pending()); */
|
|
|
|
|
|
// Poll message receiver and continue to process if anything comes up
|
|
|
while let Poll::Ready(Ok(message)) =
|
|
@@ -198,7 +295,7 @@ impl Future for WebsocketRuntime {
|
|
|
}
|
|
|
|
|
|
impl Runtime<WebsocketActor> for WebsocketRuntime {
|
|
|
- fn run(actor: WebsocketActor) -> ActorHandle<WebsocketActor> {
|
|
|
+ fn run(actor: Arc<Mutex<WebsocketActor>>) -> ActorHandle<WebsocketActor> {
|
|
|
let (message_tx, message_rx) = flume::unbounded();
|
|
|
let (command_tx, command_rx) = flume::unbounded();
|
|
|
tokio::spawn(WebsocketRuntime::new(actor, command_rx, message_rx));
|
|
@@ -206,17 +303,18 @@ impl Runtime<WebsocketActor> for WebsocketRuntime {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl crate::Message for warp::ws::Message {
|
|
|
- type Response = Option<warp::ws::Message>;
|
|
|
+impl crate::Message for Message {
|
|
|
+ type Response = Option<Message>;
|
|
|
}
|
|
|
|
|
|
#[async_trait]
|
|
|
-impl Handler<warp::ws::Message> for WebsocketActor {
|
|
|
+impl Handler<Message> for WebsocketActor {
|
|
|
async fn handle(
|
|
|
- &mut self,
|
|
|
- message: warp::ws::Message,
|
|
|
- ) -> Result<<warp::ws::Message as crate::message::Message>::Response, crate::Error> {
|
|
|
- // println!("Actor received message {message:?}");
|
|
|
+ this: Arc<Mutex<Self>>,
|
|
|
+ message: Message,
|
|
|
+ ) -> Result<<Message as crate::message::Message>::Response, crate::Error> {
|
|
|
+ println!("Actor received message {message:?}");
|
|
|
+ // tokio::time::sleep(Duration::from_micros(500)).await;
|
|
|
if message.is_text() {
|
|
|
Ok(Some(message))
|
|
|
} else {
|