rsiot/components/cmp_websocket_server/
fn_process.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
//! Компонент для подключения через websocket server.
//!
//! Перенаправляет поток входящих сообщений подключенным вебсокет-клиентам
//!

use tokio::{
    net::TcpListener,
    spawn,
    time::{sleep, Duration},
};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};

use crate::{
    executor::{CmpInOut, ComponentError},
    message::{AuthPermissions, MsgDataBound, ServiceBound},
};

use super::{
    async_task_utils::cancellable_task, config::Config, errors::Error,
    handle_ws_connection::handle_ws_connection,
};

pub async fn fn_process<TMessage, TService>(
    input: CmpInOut<TMessage, TService>,
    config: Config<TMessage>,
) -> Result<(), ComponentError>
where
    TMessage: MsgDataBound + 'static,
    TService: ServiceBound + 'static,
{
    info!(
        "Component cmp_websocket_server started. Config: {:?}",
        config
    );

    let cancel = CancellationToken::new();

    loop {
        let result = task_main(input.clone(), config.clone(), cancel.clone()).await;
        match result {
            Ok(_) => (),
            Err(err) => error!("{:?}", err),
        }
        info!("Restarting...");
        sleep(Duration::from_secs(2)).await;
    }
}

async fn task_main<TMessage, TService>(
    in_out: CmpInOut<TMessage, TService>,
    config: Config<TMessage>,
    cancel: CancellationToken,
) -> super::Result<()>
where
    TMessage: MsgDataBound + 'static,
    TService: ServiceBound + 'static,
{
    let addr = format!("0.0.0.0:{}", config.port);

    let listener = create_tcp_listener(addr).await?;

    // слушаем порт, при получении запроса создаем новое подключение WS
    while let Ok(stream_and_addr) = listener.accept().await {
        let session_name = format!("session_{}", stream_and_addr.1);
        let future = handle_ws_connection(
            in_out.clone_with_new_id(&session_name, AuthPermissions::FullAccess),
            config.clone(),
            stream_and_addr,
        );
        spawn(cancellable_task(future, cancel.clone()));
    }

    Ok(())
}

async fn create_tcp_listener(addr: String) -> super::Result<TcpListener> {
    let listener = TcpListener::bind(&addr).await;
    let listener = match listener {
        Ok(value) => value,
        Err(error) => {
            return Err(Error::BindToPort(error));
        }
    };
    info!("Listening on: {}", addr);
    Ok(listener)
}