state.rs 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. use crate::{
  2. request::{self, HttpRequestParameters, HttpResponse},
  3. AppResult,
  4. };
  5. use futures::FutureExt;
  6. use serde::Serialize;
  7. use std::{collections::HashMap, time::Instant};
  8. use tauri::ipc::Channel;
  9. use tauri_plugin_log::log;
  10. use tokio::select;
  11. use tokio_stream::{StreamExt, StreamMap};
  12. pub struct AppState {
  13. /// Sqlite database. Just an Arc so cheap to clone.
  14. pub db: sqlx::sqlite::SqlitePool,
  15. pub client: reqwest::Client,
  16. pub req_tx: tokio::sync::mpsc::Sender<OutboundRequest>,
  17. pub cancel_tx: tokio::sync::mpsc::Sender<i64>,
  18. }
  19. impl AppState {
  20. pub async fn new() -> Self {
  21. log::info!("Connecting to DB");
  22. let db = crate::db::init("sqlite:/home/biblius/codium/rusty/rquest/rquest.db").await;
  23. let client = reqwest::Client::new();
  24. let (req_tx, mut req_rx) = tokio::sync::mpsc::channel::<OutboundRequest>(128);
  25. let (cancel_tx, mut cancel_rx) = tokio::sync::mpsc::channel(128);
  26. let c = client.clone();
  27. tokio::spawn(async move {
  28. log::info!("Spawning request queue runtime");
  29. let mut timers = HashMap::<i64, Instant>::new();
  30. let mut return_channels = HashMap::<i64, Channel<ResponseResult>>::new();
  31. let mut outbound = StreamMap::new();
  32. loop {
  33. select! {
  34. Some(req) = req_rx.recv() => {
  35. let send = Box::pin(request::send(c.clone(), req.req));
  36. outbound.insert(req.req_id, send.into_stream());
  37. timers.insert(req.req_id, Instant::now());
  38. return_channels.insert(req.req_id, req.return_channel);
  39. },
  40. Some(cancel) = cancel_rx.recv() => {
  41. if outbound.remove(&cancel).is_some(){
  42. let Some(start) = timers.remove(&cancel) else {
  43. continue;
  44. };
  45. log::debug!("cancelled request {cancel}; took {}ms", Instant::now().duration_since(start).as_millis());
  46. } else {
  47. log::warn!("cannot cancel request, does not exist: {cancel}");
  48. continue;
  49. };
  50. },
  51. Some((req_id, res)) = outbound.next() => {
  52. if let Some(start) = timers.remove(&req_id) {
  53. log::debug!("request complete {req_id}; took {}ms", Instant::now().duration_since(start).as_millis());
  54. }
  55. let Some(channel) = return_channels.remove(&req_id) else {
  56. log::warn!("missing return channel for {req_id}");
  57. continue;
  58. };
  59. channel.send(res.into()).unwrap();
  60. },
  61. }
  62. }
  63. });
  64. Self {
  65. db,
  66. client,
  67. req_tx,
  68. cancel_tx,
  69. }
  70. }
  71. pub async fn queue_request(
  72. &self,
  73. req_id: i64,
  74. req: HttpRequestParameters,
  75. channel: Channel<ResponseResult>,
  76. ) -> AppResult<()> {
  77. log::info!("Queueing request {req_id}: {req:?}");
  78. let _ = self
  79. .req_tx
  80. .send(OutboundRequest {
  81. req_id,
  82. req,
  83. return_channel: channel,
  84. })
  85. .await;
  86. Ok(())
  87. }
  88. pub async fn cancel_request(&self, req_id: i64) -> AppResult<()> {
  89. log::info!("Cancelling request {req_id}");
  90. let _ = self.cancel_tx.send(req_id).await;
  91. Ok(())
  92. }
  93. }
  94. pub struct OutboundRequest {
  95. req_id: i64,
  96. req: HttpRequestParameters,
  97. return_channel: Channel<ResponseResult>,
  98. }
  99. #[derive(Debug, Serialize)]
  100. #[serde(tag = "type", content = "data")]
  101. pub enum ResponseResult {
  102. Ok(HttpResponse),
  103. Err(String),
  104. }
  105. impl From<AppResult<HttpResponse>> for ResponseResult {
  106. fn from(value: AppResult<HttpResponse>) -> Self {
  107. match value {
  108. Ok(res) => Self::Ok(res),
  109. Err(e) => Self::Err(e.to_string()),
  110. }
  111. }
  112. }