rsiot/components/cmp_websocket_client/
fn_process.rs1use futures::StreamExt;
2use tokio::{
3 sync::mpsc,
4 task::JoinSet,
5 time::{sleep, Duration},
6};
7use tokio_tungstenite::connect_async;
8use tracing::{error, info, warn};
9use url::Url;
10
11use crate::{
12 components_config::websocket_general::WebsocketMessage,
13 executor::{join_set_spawn, CmpInOut, ComponentError},
14 message::MsgDataBound,
15 serde_utils::SerdeAlg,
16};
17
18use super::{
19 cmp_websocket_client_general::{ConnectionState, WebsocketClientGeneralTasks},
20 config::Config,
21 tasks, Error,
22};
23
24pub async fn fn_process<TMessage, TServerToClient, TClientToServer>(
25 input: CmpInOut<TMessage>,
26 config: Config<TMessage, TServerToClient, TClientToServer>,
27) -> Result<(), ComponentError>
28where
29 TMessage: MsgDataBound + 'static,
30 TServerToClient: 'static + WebsocketMessage,
31 TClientToServer: 'static + WebsocketMessage,
32{
33 info!("cmp_websocket_client starting");
34
35 let serde_alg = SerdeAlg::new(config.serde_alg);
36
37 let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
38 let (ch_tx_connection_state, ch_rx_connection_state) = mpsc::channel(1000);
39 let task = ConnectionState {
40 input: ch_rx_connection_state,
41 output: input.clone(),
42 fn_connection_state: config.fn_connection_state,
43 };
44 join_set_spawn(&mut task_set, task.spawn());
45
46 loop {
47 let res = task_connect(
48 input.clone(),
49 config.clone(),
50 ch_tx_connection_state.clone(),
51 serde_alg,
52 )
53 .await;
54 match res {
55 Ok(_) => (),
56 Err(err) => error!("{:?}", err),
57 }
58 warn!("Restaring...");
59 ch_tx_connection_state.send(false).await.unwrap();
60 sleep(Duration::from_millis(2000)).await;
61 }
62}
63
64async fn task_connect<TMessage, TServerToClient, TClientToServer>(
66 in_out: CmpInOut<TMessage>,
67 config: Config<TMessage, TServerToClient, TClientToServer>,
68 ch_tx_connection_state: mpsc::Sender<bool>,
69 serde_alg: SerdeAlg,
70) -> super::Result<()>
71where
72 TMessage: MsgDataBound + 'static,
73 TServerToClient: 'static + WebsocketMessage,
74 TClientToServer: 'static + WebsocketMessage,
75{
76 let url = Url::parse(&config.url)?;
77
78 let (ws_stream, _) = connect_async(url)
79 .await
80 .map_err(|e| Error::SetupConnection(e.to_string()))?;
81
82 let (websocket_write, websocket_read) = ws_stream.split();
83
84 let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
85
86 let ws_general = WebsocketClientGeneralTasks {
88 msg_bus: in_out.clone(),
89 buffer_size: 1000,
90 task_set: &mut task_set,
91 fn_client_to_server: config.fn_client_to_server,
92 fn_server_to_client: config.fn_server_to_client,
93 ch_tx_connection_state: ch_tx_connection_state.clone(),
94 serde_alg,
95 };
96 let (ch_rx_input_to_send, ch_tx_receive_to_output) = ws_general.spawn();
97
98 let task = tasks::Send {
100 input: ch_rx_input_to_send,
101 websocket_write,
102 };
103 join_set_spawn(&mut task_set, task.spawn());
104
105 let task = tasks::Receive {
107 websocket_read,
108 output: ch_tx_receive_to_output,
109 };
110 join_set_spawn(&mut task_set, task.spawn());
111
112 while let Some(task_result) = task_set.join_next().await {
113 warn!("Task completed with result: {:?}", task_result);
114 task_set.shutdown().await;
115 }
116 Ok(())
117}