websocket.rs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. use async_trait::async_trait;
  2. use drama::runtime::Runtime;
  3. use drama::ws::{WebsocketRuntime, WsActor};
  4. use drama::{Actor, ActorHandle, Error, Handler};
  5. use futures::stream::{SplitSink, SplitStream};
  6. use futures::StreamExt;
  7. use std::collections::HashMap;
  8. use std::sync::atomic::AtomicUsize;
  9. use std::sync::{Arc, RwLock};
  10. use tokio::sync::Mutex;
  11. use warp::ws::{Message, WebSocket};
  12. use warp::Filter;
  13. type Arbiter = Arc<RwLock<HashMap<usize, ActorHandle<WebsocketActor>>>>;
  14. static ID: AtomicUsize = AtomicUsize::new(0);
  15. struct WebsocketActor {
  16. websocket: Option<WebSocket>,
  17. hello: ActorHandle<Hello>,
  18. }
  19. impl WebsocketActor {
  20. fn new(ws: WebSocket, handle: ActorHandle<Hello>) -> Self {
  21. Self {
  22. websocket: Some(ws),
  23. hello: handle,
  24. }
  25. }
  26. }
  27. impl Actor for WebsocketActor {
  28. fn start(self) -> ActorHandle<Self> {
  29. WebsocketRuntime::run(self)
  30. }
  31. }
  32. impl WsActor<Message, SplitStream<WebSocket>, SplitSink<WebSocket, Message>> for WebsocketActor {
  33. type Error = warp::Error;
  34. fn websocket(&mut self) -> (SplitSink<WebSocket, Message>, SplitStream<WebSocket>) {
  35. self.websocket
  36. .take()
  37. .expect("Websocket already split")
  38. .split()
  39. }
  40. }
  41. #[async_trait]
  42. impl Handler<Message> for WebsocketActor {
  43. type Response = Option<Message>;
  44. async fn handle(
  45. this: Arc<Mutex<Self>>,
  46. message: Box<Message>,
  47. ) -> Result<Self::Response, Error> {
  48. this.lock()
  49. .await
  50. .hello
  51. .send(crate::Msg {
  52. _content: message.to_str().unwrap().to_owned(),
  53. })
  54. .unwrap_or_else(|e| println!("FUKEN HELL M8 {e}"));
  55. Ok(Some(*message.clone()))
  56. }
  57. }
  58. struct Hello {}
  59. impl Actor for Hello {}
  60. struct Msg {
  61. pub _content: String,
  62. }
  63. #[async_trait]
  64. impl Handler<Msg> for Hello {
  65. type Response = usize;
  66. async fn handle(_: Arc<Mutex<Self>>, _: Box<Msg>) -> Result<usize, Error> {
  67. Ok(10)
  68. }
  69. }
  70. #[tokio::main]
  71. async fn main() {
  72. let pool = Arc::new(RwLock::new(HashMap::new()));
  73. let pool = warp::any().map(move || pool.clone());
  74. let hello = Hello {}.start();
  75. let hello = warp::any().map(move || hello.clone());
  76. // GET /chat -> websocket upgrade
  77. let chat = warp::path("chat")
  78. // The `ws()` filter will prepare Websocket handshake...
  79. .and(warp::ws())
  80. .and(pool)
  81. .and(hello)
  82. .map(
  83. |ws: warp::ws::Ws, pool: Arbiter, hello: ActorHandle<Hello>| {
  84. // This will call our function if the handshake succeeds.
  85. ws.on_upgrade(|socket| async move {
  86. let actor = WebsocketActor::new(socket, hello);
  87. let handle = actor.start();
  88. let id = ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
  89. println!("Adding actor {id}");
  90. pool.write().unwrap().insert(id, handle);
  91. })
  92. },
  93. );
  94. // GET / -> index html
  95. let index = warp::path::end().map(|| warp::reply::html(INDEX_HTML));
  96. let routes = index.or(chat);
  97. warp::serve(routes).run(([127, 0, 0, 1], 3030)).await
  98. }
  99. static INDEX_HTML: &str = r#"<!DOCTYPE html>
  100. <html lang="en">
  101. <head>
  102. <title>Warp Chat</title>
  103. </head>
  104. <body>
  105. <h1>Warp chat</h1>
  106. <div id="chat">
  107. <p><em>Connecting...</em></p>
  108. </div>
  109. <input type="text" id="text" />
  110. <button type="button" id="send">Send</button>
  111. <script type="text/javascript">
  112. const chat = document.getElementById('chat');
  113. const text = document.getElementById('text');
  114. const uri = 'ws://' + location.host + '/chat';
  115. const ws = new WebSocket(uri);
  116. let num = 0;
  117. function message(data) {
  118. if (num % 10000 === 0) chat.innerHTML = `${num}`
  119. }
  120. ws.onopen = function() {
  121. chat.innerHTML = '<p><em>Connected!</em></p>';
  122. };
  123. ws.onmessage = function(msg) {
  124. console.log(msg)
  125. num += 1;
  126. message(msg.data);
  127. };
  128. ws.onclose = function() {
  129. chat.getElementsByTagName('em')[0].innerText = 'Disconnected!';
  130. };
  131. send.onclick = function() {
  132. const msg = text.value;
  133. let i = 0;
  134. while (i < 100000) {
  135. ws.send(msg);
  136. i += 1;
  137. }
  138. // text.value = '';
  139. message('<You>: ' + msg);
  140. };
  141. </script>
  142. </body>
  143. </html>
  144. "#;