rsiot/components/cmp_websocket_client_wasm/fn_process.rs
1use std::time::Duration;
2
3use futures::StreamExt;
4use gloo::{net::websocket::futures::WebSocket, timers::future::sleep};
5use tokio::{sync::mpsc, task::JoinSet};
6use tracing::{info, warn};
7use url::Url;
8
9use crate::{
10 components_config::websocket_general::WebsocketMessage,
11 executor::{join_set_spawn, CmpInOut},
12 message::MsgDataBound,
13 serde_utils::SerdeAlg,
14};
15
16use super::{
17 cmp_websocket_client_general::{ConnectionState, WebsocketClientGeneralTasks},
18 tasks, Config, Error,
19};
20
21pub async fn fn_process<TMessage, TServerToClient, TClientToServer>(
22 config: Config<TMessage, TServerToClient, TClientToServer>,
23 input: CmpInOut<TMessage>,
24) -> super::Result<()>
25where
26 TMessage: MsgDataBound + 'static,
27 TServerToClient: WebsocketMessage + 'static,
28 TClientToServer: WebsocketMessage + 'static,
29{
30 info!("Starting cmp_websocket_client_wasm. Config: {config:?}");
31
32 let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
33 let (ch_tx_connection_state, ch_rx_connection_state) = mpsc::channel(1000);
34 let task = ConnectionState {
35 input: ch_rx_connection_state,
36 output: input.clone(),
37 fn_connection_state: config.fn_connection_state,
38 };
39 join_set_spawn(&mut task_set, task.spawn());
40
41 loop {
42 let result = task_main(
43 config.clone(),
44 input.clone(),
45 ch_tx_connection_state.clone(),
46 )
47 .await;
48 warn!("End with resilt: {:?}", result);
49 info!("Restarting...");
50 ch_tx_connection_state.send(false).await.unwrap();
51 sleep(Duration::from_secs(2)).await;
52 }
53}
54
55async fn task_main<TMessage, TServerToClient, TClientToServer>(
56 config: Config<TMessage, TServerToClient, TClientToServer>,
57 msg_bus: CmpInOut<TMessage>,
58 ch_tx_connection_state: mpsc::Sender<bool>,
59) -> super::Result<()>
60where
61 TMessage: MsgDataBound + 'static,
62 TServerToClient: WebsocketMessage + 'static,
63 TClientToServer: WebsocketMessage + 'static,
64{
65 let serde_alg = SerdeAlg::new(config.serde_alg);
66
67 let url = Url::parse(&config.url).map_err(Error::BadUrl)?;
68 let url = url.to_string();
69 let ws = WebSocket::open(&url).map_err(|e| Error::SetupConnection(e.to_string()))?;
70 info!("Connection to websocket server established");
71 let (websocket_write, websocket_read) = ws.split();
72
73 let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
74
75 // Запуск общих задач
76 let ws_general = WebsocketClientGeneralTasks {
77 msg_bus: msg_bus.clone(),
78 buffer_size: 1000,
79 task_set: &mut task_set,
80 fn_client_to_server: config.fn_client_to_server,
81 fn_server_to_client: config.fn_server_to_client,
82 ch_tx_connection_state,
83 serde_alg,
84 };
85 let (ch_rx_input_to_send, ch_tx_receive_to_output) = ws_general.spawn();
86
87 // Задача отправки текста на сервер
88 let task = tasks::Send {
89 input: ch_rx_input_to_send,
90 websocket_write,
91 };
92 join_set_spawn(&mut task_set, task.spawn());
93
94 // Задача получения текста из сервера
95 let task = tasks::Receive {
96 websocket_read,
97 output: ch_tx_receive_to_output,
98 };
99 join_set_spawn(&mut task_set, task.spawn());
100
101 while let Some(task_result) = task_set.join_next().await {
102 warn!("Task completed with result: {:?}", task_result);
103 task_result??
104 }
105 Ok(())
106}
107
108// /// Задача отправки входящего потока сообщений на Websocker сервер
109// async fn task_input<TMsg, TService>(
110// config: Config<TMsg>,
111// mut input: CmpInOut<TMsg, TService>,
112// mut write_stream: SplitSink<WebSocket, Message>,
113// ) -> super::Result<()>
114// where
115// TMsg: MsgDataBound,
116// TService: ServiceBound,
117// {
118// while let Ok(msg) = input.recv_input().await {
119// let ws_msg = (config.fn_input)(&msg).map_err(Error::FnInput)?;
120// let ws_msg = match ws_msg {
121// Some(val) => val,
122// None => continue,
123// };
124// let ws_msg = Message::Text(ws_msg);
125// trace!("New message send to Websocker server: {:?}", ws_msg);
126// write_stream.send(ws_msg).await?;
127// }
128// Err(Error::TaskInput)
129// }
130
131// /// Задача получения текста из Websoket сервера и преобразование в исходящий поток сообщений
132// async fn task_output<TMessage, TService>(
133// config: Config<TMessage>,
134// output: CmpInOut<TMessage, TService>,
135// mut read_stream: SplitStream<WebSocket>,
136// ) -> super::Result<()>
137// where
138// TMessage: MsgDataBound,
139// TService: ServiceBound,
140// {
141// let mut first_execution = true;
142// while let Some(text) = read_stream.next().await {
143// trace!("New message from Websocket server: {:?}", text);
144// let text = match text {
145// Ok(text) => text,
146// Err(_) => continue,
147// };
148// let msg = match text {
149// Message::Text(value) => value,
150// Message::Bytes(_) => todo!(),
151// };
152
153// let msgs = (config.fn_output)(&msg).map_err(Error::FnOutput);
154// let msgs = match msgs {
155// Ok(val) => val,
156// Err(err) => {
157// warn!("{err}");
158// continue;
159// }
160// };
161
162// // Соединение установлено
163// if first_execution {
164// if let Some(msg) = (config.fn_connection_state)(true) {
165// output.send_output(msg).await.map_err(Error::CmpOutput)?;
166// }
167// first_execution = false;
168// }
169
170// let Some(msgs) = msgs else { continue };
171// for msg in msgs {
172// output.send_output(msg).await.map_err(Error::CmpOutput)?;
173// }
174// }
175// // Соединение закрыто
176// if let Some(msg) = (config.fn_connection_state)(false) {
177// output.send_output(msg).await.map_err(Error::CmpOutput)?;
178// }
179// Err(Error::TaskOutput)
180// }