| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- use crate::{
- request::{self, HttpRequestParameters, HttpResponse},
- AppResult,
- };
- use futures::FutureExt;
- use serde::Serialize;
- use std::{collections::HashMap, time::Instant};
- use tauri::ipc::Channel;
- use tauri_plugin_log::log;
- use tokio::select;
- use tokio_stream::{StreamExt, StreamMap};
- pub struct AppState {
- /// Sqlite database. Just an Arc so cheap to clone.
- pub db: sqlx::sqlite::SqlitePool,
- pub client: reqwest::Client,
- pub req_tx: tokio::sync::mpsc::Sender<OutboundRequest>,
- pub cancel_tx: tokio::sync::mpsc::Sender<i64>,
- }
- impl AppState {
- pub async fn new() -> Self {
- log::info!("Connecting to DB");
- let db = crate::db::init("sqlite:/home/biblius/codium/rusty/rquest/rquest.db").await;
- let client = reqwest::Client::new();
- let (req_tx, mut req_rx) = tokio::sync::mpsc::channel::<OutboundRequest>(128);
- let (cancel_tx, mut cancel_rx) = tokio::sync::mpsc::channel(128);
- let c = client.clone();
- tokio::spawn(async move {
- log::info!("Spawning request queue runtime");
- let mut timers = HashMap::<i64, Instant>::new();
- let mut return_channels = HashMap::<i64, Channel<ResponseResult>>::new();
- let mut outbound = StreamMap::new();
- loop {
- select! {
- Some(req) = req_rx.recv() => {
- let send = Box::pin(request::send(c.clone(), req.req));
- outbound.insert(req.req_id, send.into_stream());
- timers.insert(req.req_id, Instant::now());
- return_channels.insert(req.req_id, req.return_channel);
- },
- Some(cancel) = cancel_rx.recv() => {
- if outbound.remove(&cancel).is_some(){
- let Some(start) = timers.remove(&cancel) else {
- continue;
- };
- log::debug!("cancelled request {cancel}; took {}ms", Instant::now().duration_since(start).as_millis());
- } else {
- log::warn!("cannot cancel request, does not exist: {cancel}");
- continue;
- };
- },
- Some((req_id, res)) = outbound.next() => {
- if let Some(start) = timers.remove(&req_id) {
- log::debug!("request complete {req_id}; took {}ms", Instant::now().duration_since(start).as_millis());
- }
- let Some(channel) = return_channels.remove(&req_id) else {
- log::warn!("missing return channel for {req_id}");
- continue;
- };
- channel.send(res.into()).unwrap();
- },
- }
- }
- });
- Self {
- db,
- client,
- req_tx,
- cancel_tx,
- }
- }
- pub async fn queue_request(
- &self,
- req_id: i64,
- req: HttpRequestParameters,
- channel: Channel<ResponseResult>,
- ) -> AppResult<()> {
- log::info!("Queueing request {req_id}: {req:?}");
- let _ = self
- .req_tx
- .send(OutboundRequest {
- req_id,
- req,
- return_channel: channel,
- })
- .await;
- Ok(())
- }
- pub async fn cancel_request(&self, req_id: i64) -> AppResult<()> {
- log::info!("Cancelling request {req_id}");
- let _ = self.cancel_tx.send(req_id).await;
- Ok(())
- }
- }
- pub struct OutboundRequest {
- req_id: i64,
- req: HttpRequestParameters,
- return_channel: Channel<ResponseResult>,
- }
- #[derive(Debug, Serialize)]
- #[serde(tag = "type", content = "data")]
- pub enum ResponseResult {
- Ok(HttpResponse),
- Err(String),
- }
- impl From<AppResult<HttpResponse>> for ResponseResult {
- fn from(value: AppResult<HttpResponse>) -> Self {
- match value {
- Ok(res) => Self::Ok(res),
- Err(e) => Self::Err(e.to_string()),
- }
- }
- }
|