rsiot/components/cmp_websocket_client_wasm/
fn_process.rs

1use std::time::Duration;
2
3use futures::StreamExt;
4use gloo::{net::websocket::futures::WebSocket, timers::future::sleep};
5use tokio::{sync::mpsc, task::JoinSet};
6use tracing::{info, warn};
7use url::Url;
8
9use crate::{
10    components_config::websocket_general::WebsocketMessage,
11    executor::{join_set_spawn, CmpInOut},
12    message::MsgDataBound,
13    serde_utils::SerdeAlg,
14};
15
16use super::{
17    cmp_websocket_client_general::{ConnectionState, WebsocketClientGeneralTasks},
18    tasks, Config, Error,
19};
20
21pub async fn fn_process<TMessage, TServerToClient, TClientToServer>(
22    config: Config<TMessage, TServerToClient, TClientToServer>,
23    input: CmpInOut<TMessage>,
24) -> super::Result<()>
25where
26    TMessage: MsgDataBound + 'static,
27    TServerToClient: WebsocketMessage + 'static,
28    TClientToServer: WebsocketMessage + 'static,
29{
30    info!("Starting cmp_websocket_client_wasm. Config: {config:?}");
31
32    let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
33    let (ch_tx_connection_state, ch_rx_connection_state) = mpsc::channel(1000);
34    let task = ConnectionState {
35        input: ch_rx_connection_state,
36        output: input.clone(),
37        fn_connection_state: config.fn_connection_state,
38    };
39    join_set_spawn(&mut task_set, task.spawn());
40
41    loop {
42        let result = task_main(
43            config.clone(),
44            input.clone(),
45            ch_tx_connection_state.clone(),
46        )
47        .await;
48        warn!("End with resilt: {:?}", result);
49        info!("Restarting...");
50        ch_tx_connection_state.send(false).await.unwrap();
51        sleep(Duration::from_secs(2)).await;
52    }
53}
54
55async fn task_main<TMessage, TServerToClient, TClientToServer>(
56    config: Config<TMessage, TServerToClient, TClientToServer>,
57    msg_bus: CmpInOut<TMessage>,
58    ch_tx_connection_state: mpsc::Sender<bool>,
59) -> super::Result<()>
60where
61    TMessage: MsgDataBound + 'static,
62    TServerToClient: WebsocketMessage + 'static,
63    TClientToServer: WebsocketMessage + 'static,
64{
65    let serde_alg = SerdeAlg::new(config.serde_alg);
66
67    let url = Url::parse(&config.url).map_err(Error::BadUrl)?;
68    let url = url.to_string();
69    let ws = WebSocket::open(&url).map_err(|e| Error::SetupConnection(e.to_string()))?;
70    info!("Connection to websocket server established");
71    let (websocket_write, websocket_read) = ws.split();
72
73    let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
74
75    // Запуск общих задач
76    let ws_general = WebsocketClientGeneralTasks {
77        msg_bus: msg_bus.clone(),
78        buffer_size: 1000,
79        task_set: &mut task_set,
80        fn_client_to_server: config.fn_client_to_server,
81        fn_server_to_client: config.fn_server_to_client,
82        ch_tx_connection_state,
83        serde_alg,
84    };
85    let (ch_rx_input_to_send, ch_tx_receive_to_output) = ws_general.spawn();
86
87    // Задача отправки текста на сервер
88    let task = tasks::Send {
89        input: ch_rx_input_to_send,
90        websocket_write,
91    };
92    join_set_spawn(&mut task_set, task.spawn());
93
94    // Задача получения текста из сервера
95    let task = tasks::Receive {
96        websocket_read,
97        output: ch_tx_receive_to_output,
98    };
99    join_set_spawn(&mut task_set, task.spawn());
100
101    while let Some(task_result) = task_set.join_next().await {
102        warn!("Task completed with result: {:?}", task_result);
103        task_result??
104    }
105    Ok(())
106}
107
108// /// Задача отправки входящего потока сообщений на Websocker сервер
109// async fn task_input<TMsg, TService>(
110//     config: Config<TMsg>,
111//     mut input: CmpInOut<TMsg, TService>,
112//     mut write_stream: SplitSink<WebSocket, Message>,
113// ) -> super::Result<()>
114// where
115//     TMsg: MsgDataBound,
116//     TService: ServiceBound,
117// {
118//     while let Ok(msg) = input.recv_input().await {
119//         let ws_msg = (config.fn_input)(&msg).map_err(Error::FnInput)?;
120//         let ws_msg = match ws_msg {
121//             Some(val) => val,
122//             None => continue,
123//         };
124//         let ws_msg = Message::Text(ws_msg);
125//         trace!("New message send to Websocker server: {:?}", ws_msg);
126//         write_stream.send(ws_msg).await?;
127//     }
128//     Err(Error::TaskInput)
129// }
130
131// /// Задача получения текста из Websoket сервера и преобразование в исходящий поток сообщений
132// async fn task_output<TMessage, TService>(
133//     config: Config<TMessage>,
134//     output: CmpInOut<TMessage, TService>,
135//     mut read_stream: SplitStream<WebSocket>,
136// ) -> super::Result<()>
137// where
138//     TMessage: MsgDataBound,
139//     TService: ServiceBound,
140// {
141//     let mut first_execution = true;
142//     while let Some(text) = read_stream.next().await {
143//         trace!("New message from Websocket server: {:?}", text);
144//         let text = match text {
145//             Ok(text) => text,
146//             Err(_) => continue,
147//         };
148//         let msg = match text {
149//             Message::Text(value) => value,
150//             Message::Bytes(_) => todo!(),
151//         };
152
153//         let msgs = (config.fn_output)(&msg).map_err(Error::FnOutput);
154//         let msgs = match msgs {
155//             Ok(val) => val,
156//             Err(err) => {
157//                 warn!("{err}");
158//                 continue;
159//             }
160//         };
161
162//         // Соединение установлено
163//         if first_execution {
164//             if let Some(msg) = (config.fn_connection_state)(true) {
165//                 output.send_output(msg).await.map_err(Error::CmpOutput)?;
166//             }
167//             first_execution = false;
168//         }
169
170//         let Some(msgs) = msgs else { continue };
171//         for msg in msgs {
172//             output.send_output(msg).await.map_err(Error::CmpOutput)?;
173//         }
174//     }
175//     // Соединение закрыто
176//     if let Some(msg) = (config.fn_connection_state)(false) {
177//         output.send_output(msg).await.map_err(Error::CmpOutput)?;
178//     }
179//     Err(Error::TaskOutput)
180// }