use crate::{ request::{self, HttpRequestParameters, HttpResponse}, AppResult, }; use futures::FutureExt; use serde::Serialize; use std::{collections::HashMap, time::Instant}; use tauri::ipc::Channel; use tauri_plugin_log::log; use tokio::select; use tokio_stream::{StreamExt, StreamMap}; pub struct AppState { /// Sqlite database. Just an Arc so cheap to clone. pub db: sqlx::sqlite::SqlitePool, pub client: reqwest::Client, pub req_tx: tokio::sync::mpsc::Sender, pub cancel_tx: tokio::sync::mpsc::Sender, } impl AppState { pub async fn new() -> Self { log::info!("Connecting to DB"); let db = crate::db::init("sqlite:/home/biblius/codium/rusty/rquest/rquest.db").await; let client = reqwest::Client::new(); let (req_tx, mut req_rx) = tokio::sync::mpsc::channel::(128); let (cancel_tx, mut cancel_rx) = tokio::sync::mpsc::channel(128); let c = client.clone(); tokio::spawn(async move { log::info!("Spawning request queue runtime"); let mut timers = HashMap::::new(); let mut return_channels = HashMap::>::new(); let mut outbound = StreamMap::new(); loop { select! { Some(req) = req_rx.recv() => { let send = Box::pin(request::send(c.clone(), req.req)); outbound.insert(req.req_id, send.into_stream()); timers.insert(req.req_id, Instant::now()); return_channels.insert(req.req_id, req.return_channel); }, Some(cancel) = cancel_rx.recv() => { if outbound.remove(&cancel).is_some(){ let Some(start) = timers.remove(&cancel) else { continue; }; log::debug!("cancelled request {cancel}; took {}ms", Instant::now().duration_since(start).as_millis()); } else { log::warn!("cannot cancel request, does not exist: {cancel}"); continue; }; }, Some((req_id, res)) = outbound.next() => { if let Some(start) = timers.remove(&req_id) { log::debug!("request complete {req_id}; took {}ms", Instant::now().duration_since(start).as_millis()); } let Some(channel) = return_channels.remove(&req_id) else { log::warn!("missing return channel for {req_id}"); continue; }; channel.send(res.into()).unwrap(); }, } } }); Self { db, client, req_tx, cancel_tx, } } pub async fn queue_request( &self, req_id: i64, req: HttpRequestParameters, channel: Channel, ) -> AppResult<()> { log::info!("Queueing request {req_id}: {req:?}"); let _ = self .req_tx .send(OutboundRequest { req_id, req, return_channel: channel, }) .await; Ok(()) } pub async fn cancel_request(&self, req_id: i64) -> AppResult<()> { log::info!("Cancelling request {req_id}"); let _ = self.cancel_tx.send(req_id).await; Ok(()) } } pub struct OutboundRequest { req_id: i64, req: HttpRequestParameters, return_channel: Channel, } #[derive(Debug, Serialize)] #[serde(tag = "type", content = "data")] pub enum ResponseResult { Ok(HttpResponse), Err(String), } impl From> for ResponseResult { fn from(value: AppResult) -> Self { match value { Ok(res) => Self::Ok(res), Err(e) => Self::Err(e.to_string()), } } }