rsiot/components/cmp_websocket_client/
fn_process.rs

1use futures::StreamExt;
2use tokio::{
3    sync::mpsc,
4    task::JoinSet,
5    time::{sleep, Duration},
6};
7use tokio_tungstenite::connect_async;
8use tracing::{error, info, warn};
9use url::Url;
10
11use crate::{
12    components_config::websocket_general::WebsocketMessage,
13    executor::{join_set_spawn, CmpInOut, ComponentError},
14    message::MsgDataBound,
15    serde_utils::SerdeAlg,
16};
17
18use super::{
19    cmp_websocket_client_general::{ConnectionState, WebsocketClientGeneralTasks},
20    config::Config,
21    tasks, Error,
22};
23
24pub async fn fn_process<TMessage, TServerToClient, TClientToServer>(
25    input: CmpInOut<TMessage>,
26    config: Config<TMessage, TServerToClient, TClientToServer>,
27) -> Result<(), ComponentError>
28where
29    TMessage: MsgDataBound + 'static,
30    TServerToClient: 'static + WebsocketMessage,
31    TClientToServer: 'static + WebsocketMessage,
32{
33    info!("cmp_websocket_client starting");
34
35    let serde_alg = SerdeAlg::new(config.serde_alg);
36
37    let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
38    let (ch_tx_connection_state, ch_rx_connection_state) = mpsc::channel(1000);
39    let task = ConnectionState {
40        input: ch_rx_connection_state,
41        output: input.clone(),
42        fn_connection_state: config.fn_connection_state,
43    };
44    join_set_spawn(&mut task_set, task.spawn());
45
46    loop {
47        let res = task_connect(
48            input.clone(),
49            config.clone(),
50            ch_tx_connection_state.clone(),
51            serde_alg,
52        )
53        .await;
54        match res {
55            Ok(_) => (),
56            Err(err) => error!("{:?}", err),
57        }
58        warn!("Restaring...");
59        ch_tx_connection_state.send(false).await.unwrap();
60        sleep(Duration::from_millis(2000)).await;
61    }
62}
63
64/// Подключаемся к серверу и запускаем потоки получения и отправки
65async fn task_connect<TMessage, TServerToClient, TClientToServer>(
66    in_out: CmpInOut<TMessage>,
67    config: Config<TMessage, TServerToClient, TClientToServer>,
68    ch_tx_connection_state: mpsc::Sender<bool>,
69    serde_alg: SerdeAlg,
70) -> super::Result<()>
71where
72    TMessage: MsgDataBound + 'static,
73    TServerToClient: 'static + WebsocketMessage,
74    TClientToServer: 'static + WebsocketMessage,
75{
76    let url = Url::parse(&config.url)?;
77
78    let (ws_stream, _) = connect_async(url)
79        .await
80        .map_err(|e| Error::SetupConnection(e.to_string()))?;
81
82    let (websocket_write, websocket_read) = ws_stream.split();
83
84    let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
85
86    // Запуск общих задач
87    let ws_general = WebsocketClientGeneralTasks {
88        msg_bus: in_out.clone(),
89        buffer_size: 1000,
90        task_set: &mut task_set,
91        fn_client_to_server: config.fn_client_to_server,
92        fn_server_to_client: config.fn_server_to_client,
93        ch_tx_connection_state: ch_tx_connection_state.clone(),
94        serde_alg,
95    };
96    let (ch_rx_input_to_send, ch_tx_receive_to_output) = ws_general.spawn();
97
98    // Задача отправки текста на сервер
99    let task = tasks::Send {
100        input: ch_rx_input_to_send,
101        websocket_write,
102    };
103    join_set_spawn(&mut task_set, task.spawn());
104
105    // Задача получения текста из сервера
106    let task = tasks::Receive {
107        websocket_read,
108        output: ch_tx_receive_to_output,
109    };
110    join_set_spawn(&mut task_set, task.spawn());
111
112    while let Some(task_result) = task_set.join_next().await {
113        warn!("Task completed with result: {:?}", task_result);
114        task_set.shutdown().await;
115    }
116    Ok(())
117}