websocket.rs 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. use async_trait::async_trait;
  2. use drama::actor::{Actor, ActorHandle};
  3. use drama::message::Handler;
  4. use drama::relay::{Relay, RelayActor};
  5. use flume::Sender;
  6. use futures::stream::SplitStream;
  7. use futures::StreamExt;
  8. use parking_lot::RwLock;
  9. use std::collections::HashMap;
  10. use std::sync::atomic::AtomicUsize;
  11. use std::sync::Arc;
  12. use warp::ws::{Message, WebSocket};
  13. use warp::Filter;
  14. struct WebsocketActor {
  15. hello: ActorHandle<Hello>,
  16. tx: Sender<Message>,
  17. }
  18. impl WebsocketActor {
  19. fn new(handle: ActorHandle<Hello>, tx: Sender<Message>) -> Self {
  20. Self { hello: handle, tx }
  21. }
  22. }
  23. impl Actor for WebsocketActor {}
  24. impl RelayActor<Message, SplitStream<WebSocket>> for WebsocketActor {
  25. type Error = warp::Error;
  26. }
  27. #[async_trait]
  28. impl Relay<Message> for WebsocketActor {
  29. async fn process(&mut self, message: Message) -> Option<Message> {
  30. self.hello
  31. .send(crate::Msg {
  32. _content: message.to_str().unwrap().to_owned(),
  33. })
  34. .unwrap_or_else(|e| println!("FUKEN HELL M8 {e}"));
  35. self.tx.send(message.clone()).unwrap();
  36. Some(message)
  37. }
  38. }
  39. struct Hello {}
  40. impl Actor for Hello {}
  41. #[derive(Clone)]
  42. struct Msg {
  43. pub _content: String,
  44. }
  45. #[async_trait]
  46. impl Handler<Msg> for Hello {
  47. type Response = usize;
  48. async fn handle(&mut self, _: &Msg) -> usize {
  49. 10
  50. }
  51. }
  52. type Arbiter = Arc<RwLock<HashMap<usize, ActorHandle<WebsocketActor>>>>;
  53. static ID: AtomicUsize = AtomicUsize::new(0);
  54. #[tokio::main]
  55. async fn main() {
  56. let pool = Arc::new(RwLock::new(HashMap::new()));
  57. let pool = warp::any().map(move || pool.clone());
  58. let hello = Hello {}.start();
  59. let hello = warp::any().map(move || hello.clone());
  60. // GET /chat -> websocket upgrade
  61. let chat = warp::path("chat")
  62. // The `ws()` filter will prepare Websocket handshake...
  63. .and(warp::ws())
  64. .and(pool)
  65. .and(hello)
  66. .map(
  67. |ws: warp::ws::Ws, pool: Arbiter, hello: ActorHandle<Hello>| {
  68. // This will call our function if the handshake succeeds.
  69. ws.on_upgrade(|socket| async move {
  70. let (si, st) = socket.split();
  71. let (tx, rx) = flume::unbounded();
  72. let actor = WebsocketActor::new(hello, tx.clone());
  73. let handle = actor.start_relay(st, tx);
  74. tokio::spawn(rx.into_stream().map(Ok).forward(si));
  75. let id = ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
  76. println!("Adding actor {id}");
  77. pool.write().insert(id, handle);
  78. })
  79. },
  80. );
  81. // GET / -> index html
  82. let index = warp::path::end().map(|| warp::reply::html(INDEX_HTML));
  83. let routes = index.or(chat);
  84. warp::serve(routes).run(([127, 0, 0, 1], 3030)).await
  85. }
  86. static INDEX_HTML: &str = r#"<!DOCTYPE html>
  87. <html lang="en">
  88. <head>
  89. <title>Warp Chat</title>
  90. </head>
  91. <body>
  92. <h1>Warp chat</h1>
  93. <div id="chat">
  94. <p><em>Connecting...</em></p>
  95. </div>
  96. <input type="text" id="text" />
  97. <button type="button" id="send">Send</button>
  98. <script type="text/javascript">
  99. const chat = document.getElementById('chat');
  100. const text = document.getElementById('text');
  101. const uri = 'ws://' + location.host + '/chat';
  102. const ws = new WebSocket(uri);
  103. let num = 0;
  104. function message(data) {
  105. if (num % 10000 === 0) chat.innerHTML = `${num}`
  106. }
  107. ws.onopen = function() {
  108. chat.innerHTML = '<p><em>Connected!</em></p>';
  109. };
  110. ws.onmessage = function(msg) {
  111. console.log(msg)
  112. num += 1;
  113. message(msg.data);
  114. };
  115. ws.onclose = function() {
  116. chat.getElementsByTagName('em')[0].innerText = 'Disconnected!';
  117. };
  118. send.onclick = function() {
  119. const msg = text.value;
  120. let i = 0;
  121. while (i < 100000) {
  122. ws.send(msg);
  123. i += 1;
  124. }
  125. // text.value = '';
  126. message('<You>: ' + msg);
  127. };
  128. </script>
  129. </body>
  130. </html>
  131. "#;