rsiot/components/cmp_websocket_client_wasm/
fn_process.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
use std::time::Duration;

use futures::{
    stream::{SplitSink, SplitStream},
    SinkExt, StreamExt,
};
use gloo::{
    net::websocket::{futures::WebSocket, Message},
    timers::future::sleep,
};
use tokio::task::JoinSet;
use tracing::{info, trace, warn};
use url::Url;

use crate::{
    executor::{join_set_spawn, CmpInOut},
    message::{MsgDataBound, ServiceBound},
};

use super::{Config, Error};

pub async fn fn_process<TMessage, TService>(
    config: Config<TMessage>,
    input: CmpInOut<TMessage, TService>,
) -> super::Result
where
    TMessage: MsgDataBound + 'static,
    TService: ServiceBound + 'static,
{
    info!("Starting cmp_websocket_client_wasm. Config: {config:?}");
    loop {
        let result = task_main(config.clone(), input.clone()).await;
        warn!("End with resilt: {:?}", result);
        info!("Restarting...");
        sleep(Duration::from_secs(2)).await;
    }
}

async fn task_main<TMessage, TService>(
    config: Config<TMessage>,
    msg_bus: CmpInOut<TMessage, TService>,
) -> super::Result
where
    TMessage: MsgDataBound + 'static,
    TService: ServiceBound + 'static,
{
    let url = Url::parse(&config.url).map_err(Error::BadUrl)?;
    let url = url.to_string();
    let ws = WebSocket::open(&url).map_err(Error::Connect)?;
    info!("Connection to websocket server established");
    let (write_stream, read_stream) = ws.split();

    let mut task_set: JoinSet<super::Result> = JoinSet::new();

    // Отправка входящих сообщений на Websocket сервер
    let task = task_input(config.clone(), msg_bus.clone(), write_stream);
    join_set_spawn(&mut task_set, task);

    // Данные от сервера в исходящий поток сообщений
    let task = task_output(config, msg_bus, read_stream);
    join_set_spawn(&mut task_set, task);

    while let Some(task_result) = task_set.join_next().await {
        task_result??
    }
    Ok(())
}

/// Задача отправки входящего потока сообщений на Websocker сервер
async fn task_input<TMsg, TService>(
    config: Config<TMsg>,
    mut input: CmpInOut<TMsg, TService>,
    mut write_stream: SplitSink<WebSocket, Message>,
) -> super::Result
where
    TMsg: MsgDataBound,
    TService: ServiceBound,
{
    while let Ok(msg) = input.recv_input().await {
        let ws_msg = (config.fn_input)(&msg).map_err(Error::FnInput)?;
        let ws_msg = match ws_msg {
            Some(val) => val,
            None => continue,
        };
        let ws_msg = Message::Text(ws_msg);
        trace!("New message send to Websocker server: {:?}", ws_msg);
        write_stream.send(ws_msg).await?;
    }
    Err(Error::TaskInput)
}

/// Задача получения текста из Websoket сервера и преобразование в исходящий поток сообщений
async fn task_output<TMessage, TService>(
    config: Config<TMessage>,
    output: CmpInOut<TMessage, TService>,
    mut read_stream: SplitStream<WebSocket>,
) -> super::Result
where
    TMessage: MsgDataBound,
    TService: ServiceBound,
{
    let mut first_execution = true;
    while let Some(text) = read_stream.next().await {
        trace!("New message from Websocket server: {:?}", text);
        let text = match text {
            Ok(text) => text,
            Err(_) => continue,
        };
        let msg = match text {
            Message::Text(value) => value,
            Message::Bytes(_) => todo!(),
        };

        let msgs = (config.fn_output)(&msg).map_err(Error::FnOutput);
        let msgs = match msgs {
            Ok(val) => val,
            Err(err) => {
                warn!("{err}");
                continue;
            }
        };

        // Соединение установлено
        if first_execution {
            if let Some(msg) = (config.fn_connection_state)(true) {
                output.send_output(msg).await.map_err(Error::CmpOutput)?;
            }
            first_execution = false;
        }

        let Some(msgs) = msgs else { continue };
        for msg in msgs {
            output.send_output(msg).await.map_err(Error::CmpOutput)?;
        }
    }
    // Соединение закрыто
    if let Some(msg) = (config.fn_connection_state)(false) {
        output.send_output(msg).await.map_err(Error::CmpOutput)?;
    }
    Err(Error::TaskOutput)
}