rsiot/components/cmp_websocket_client_wasm/
fn_process.rsuse std::time::Duration;
use futures::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use gloo::{
net::websocket::{futures::WebSocket, Message},
timers::future::sleep,
};
use tokio::task::JoinSet;
use tracing::{info, trace, warn};
use url::Url;
use crate::{
executor::{join_set_spawn, CmpInOut},
message::{MsgDataBound, ServiceBound},
};
use super::{Config, Error};
pub async fn fn_process<TMessage, TService>(
config: Config<TMessage>,
input: CmpInOut<TMessage, TService>,
) -> super::Result
where
TMessage: MsgDataBound + 'static,
TService: ServiceBound + 'static,
{
info!("Starting cmp_websocket_client_wasm. Config: {config:?}");
loop {
let result = task_main(config.clone(), input.clone()).await;
warn!("End with resilt: {:?}", result);
info!("Restarting...");
sleep(Duration::from_secs(2)).await;
}
}
async fn task_main<TMessage, TService>(
config: Config<TMessage>,
msg_bus: CmpInOut<TMessage, TService>,
) -> super::Result
where
TMessage: MsgDataBound + 'static,
TService: ServiceBound + 'static,
{
let url = Url::parse(&config.url).map_err(Error::BadUrl)?;
let url = url.to_string();
let ws = WebSocket::open(&url).map_err(Error::Connect)?;
info!("Connection to websocket server established");
let (write_stream, read_stream) = ws.split();
let mut task_set: JoinSet<super::Result> = JoinSet::new();
let task = task_input(config.clone(), msg_bus.clone(), write_stream);
join_set_spawn(&mut task_set, task);
let task = task_output(config, msg_bus, read_stream);
join_set_spawn(&mut task_set, task);
while let Some(task_result) = task_set.join_next().await {
task_result??
}
Ok(())
}
async fn task_input<TMsg, TService>(
config: Config<TMsg>,
mut input: CmpInOut<TMsg, TService>,
mut write_stream: SplitSink<WebSocket, Message>,
) -> super::Result
where
TMsg: MsgDataBound,
TService: ServiceBound,
{
while let Ok(msg) = input.recv_input().await {
let ws_msg = (config.fn_input)(&msg).map_err(Error::FnInput)?;
let ws_msg = match ws_msg {
Some(val) => val,
None => continue,
};
let ws_msg = Message::Text(ws_msg);
trace!("New message send to Websocker server: {:?}", ws_msg);
write_stream.send(ws_msg).await?;
}
Err(Error::TaskInput)
}
async fn task_output<TMessage, TService>(
config: Config<TMessage>,
output: CmpInOut<TMessage, TService>,
mut read_stream: SplitStream<WebSocket>,
) -> super::Result
where
TMessage: MsgDataBound,
TService: ServiceBound,
{
let mut first_execution = true;
while let Some(text) = read_stream.next().await {
trace!("New message from Websocket server: {:?}", text);
let text = match text {
Ok(text) => text,
Err(_) => continue,
};
let msg = match text {
Message::Text(value) => value,
Message::Bytes(_) => todo!(),
};
let msgs = (config.fn_output)(&msg).map_err(Error::FnOutput);
let msgs = match msgs {
Ok(val) => val,
Err(err) => {
warn!("{err}");
continue;
}
};
if first_execution {
if let Some(msg) = (config.fn_connection_state)(true) {
output.send_output(msg).await.map_err(Error::CmpOutput)?;
}
first_execution = false;
}
let Some(msgs) = msgs else { continue };
for msg in msgs {
output.send_output(msg).await.map_err(Error::CmpOutput)?;
}
}
if let Some(msg) = (config.fn_connection_state)(false) {
output.send_output(msg).await.map_err(Error::CmpOutput)?;
}
Err(Error::TaskOutput)
}