rsiot/components/cmp_websocket_client/
fn_process.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use futures::{
    stream::{SplitSink, SplitStream},
    SinkExt, StreamExt,
};
use tokio::{
    net::TcpStream,
    task::JoinSet,
    time::{sleep, Duration},
};
use tokio_tungstenite::{
    connect_async, tungstenite::Message as TungsteniteMessage, MaybeTlsStream, WebSocketStream,
};
use tracing::{error, info, warn};
use url::Url;

use crate::{
    executor::{CmpInOut, ComponentError},
    message::{Message, MsgDataBound, ServiceBound},
};

use super::{
    config::{Config, FnOutput},
    error::Error,
};

pub async fn fn_process<TMessage, TService>(
    input: CmpInOut<TMessage, TService>,
    config: Config<TMessage>,
) -> Result<(), ComponentError>
where
    TMessage: MsgDataBound + 'static,
    TService: ServiceBound + 'static,
{
    info!("cmp_websocket_client starting");

    loop {
        let res = task_connect(input.clone(), config.clone()).await;
        match res {
            Ok(_) => (),
            Err(err) => error!("{:?}", err),
        }
        warn!("Restaring...");
        sleep(Duration::from_secs(2)).await;
    }
}

/// Подключаемся к серверу и запускаем потоки получения и отправки
async fn task_connect<TMessage, TService>(
    in_out: CmpInOut<TMessage, TService>,
    config: Config<TMessage>,
) -> Result<(), Error<TMessage>>
where
    TMessage: MsgDataBound + 'static,
    TService: ServiceBound + 'static,
{
    let url = Url::parse(&config.url).map_err(Error::BadUrl)?;

    let (ws_stream, _) = connect_async(url).await?;
    let (write, read) = ws_stream.split();

    let mut task_set: JoinSet<Result<(), Error<TMessage>>> = JoinSet::new();
    task_set.spawn(task_send(in_out.clone(), write, config.fn_input));
    task_set.spawn(task_recv(in_out, read, config.fn_output));

    while let Some(task_result) = task_set.join_next().await {
        task_result??
    }
    Ok(())
}

/// Задача отправки данных на сервер Websocket
async fn task_send<TMessage, TService>(
    mut input: CmpInOut<TMessage, TService>,
    mut write: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>,
    fn_send: fn(&Message<TMessage>) -> anyhow::Result<Option<String>>,
) -> Result<(), Error<TMessage>>
where
    TMessage: MsgDataBound,
    TService: ServiceBound,
{
    while let Ok(msg) = input.recv_input().await {
        let text = (fn_send)(&msg).map_err(Error::FnInput)?;
        if let Some(text) = text {
            let text = TungsteniteMessage::Text(text);
            write.send(text).await?;
        }
    }
    Ok(())
}

/// Задача приема данных с сервера Websocket
async fn task_recv<TMessage, TService>(
    output: CmpInOut<TMessage, TService>,
    mut read: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
    fn_recv: FnOutput<TMessage>,
) -> Result<(), Error<TMessage>>
where
    TMessage: MsgDataBound,
    TService: ServiceBound,
{
    while let Some(msg) = read.next().await {
        let data = msg?.into_text()?;
        let msgs = (fn_recv)(&data).map_err(|err| Error::FnOutput(err))?;
        let msgs = match msgs {
            Some(msgs) => msgs,
            None => continue,
        };
        for msg in msgs {
            output.send_output(msg).await.map_err(Error::CmpOutput)?;
        }
    }
    Ok(())
}