rsiot/components/cmp_http_server_esp/
fn_process.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
use std::time::Duration;

use embedded_svc::{
    http::{Headers, Method},
    io::{Read, Write},
};
use esp_idf_svc::http::server::{Configuration as HttpServerConfiguration, EspHttpServer};
use tokio::time::sleep;
use tracing::{info, trace, warn};

use crate::{
    executor::CmpInOut,
    message::{system_messages, Message, MsgData, MsgDataBound, ServiceBound},
};

use super::Config;

/// Заголовки для разрешения CORS
const HEADERS: [(&str, &str); 4] = [
    ("Access-Control-Allow-Origin", "*"),
    ("Access-Control-Max-Age", "600"),
    ("Access-Control-Allow-Methods", "PUT,POST,GET,OPTIONS"),
    ("Access-Control-Allow-Headers", "*"),
];

pub async fn fn_process<TMsg, TService>(
    mut in_out: CmpInOut<TMsg, TService>,
    config: Config<TMsg>,
) -> super::Result<()>
where
    TMsg: MsgDataBound + 'static,
    TService: ServiceBound + 'static,
{
    // Необходимо подождать, пока поднимется Wi-Fi
    while let Ok(msg) = in_out.recv_input().await {
        match msg.data {
            MsgData::System(system_messages::System::EspWifiConnected) => break,
            _ => continue,
        }
    }
    info!("Starting cmp_esp_http_server");

    let http_config = HttpServerConfiguration {
        http_port: config.port,
        ..Default::default()
    };

    let mut server = loop {
        info!("trying to create EspHttpServer");
        let server = EspHttpServer::new(&http_config);
        match server {
            Ok(server) => break server,
            Err(err) => {
                let err = format! {"Error EspHttpServer creation: {}", err};
                warn!("{}", err);
            }
        }
        sleep(Duration::from_secs(2)).await;
    };

    // Запрос чтения всех сообщений
    let cache_clone = in_out.cache.clone();
    server
        .fn_handler("/messages", Method::Get, move |request| {
            trace!("Get request, all messages");
            let mut msgs_json: Vec<String> = vec![];
            {
                let lock = cache_clone.blocking_read();
                for msg in lock.values() {
                    if !msg.is_route_enabled(&config.this_service, &config.client_service) {
                        continue;
                    }
                    let msg_json = msg.serialize().unwrap();
                    msgs_json.push(msg_json);
                }
            }
            let json = msgs_json.join(",");
            let json = format!("[{}]", json);
            let mut response = request.into_response(200, None, &HEADERS).unwrap();
            response.write_all(json.as_bytes()).unwrap();
            Ok(()) as super::Result<()>
        })
        .unwrap();

    // Запись одного сообщения
    let in_out_clone = in_out.clone();
    server
        .fn_handler("/messages", Method::Put, move |mut request| {
            trace!("Put request");

            let len = request.content_len().unwrap_or(0) as usize;
            let mut buf = vec![0; len];
            request.read_exact(&mut buf).unwrap();
            let buf_str = String::from_utf8_lossy(&buf);
            let mut response = request.into_response(200, None, &HEADERS).unwrap();
            let msg = Message::deserialize(&buf_str);
            match msg {
                Ok(val) => in_out_clone.send_output_blocking(val).unwrap(),
                Err(err) => {
                    let err = format!("{:?}", err);
                    response.write_all(err.as_bytes()).unwrap();
                }
            }

            Ok(()) as super::Result<()>
        })
        .unwrap();

    // Запись одного сообщения (копия PUT)
    let in_out_clone = in_out.clone();
    server
        .fn_handler("/messages", Method::Post, move |mut request| {
            trace!("Post request");

            let len = request.content_len().unwrap_or(0) as usize;
            let mut buf = vec![0; len];
            request.read_exact(&mut buf).unwrap();
            let buf_str = String::from_utf8_lossy(&buf);
            let mut response = request.into_response(200, None, &HEADERS).unwrap();
            let msg = Message::deserialize(&buf_str);
            match msg {
                Ok(val) => in_out_clone.send_output_blocking(val).unwrap(),
                Err(err) => {
                    let err = format!("{:?}", err);
                    response.write_all(err.as_bytes()).unwrap();
                }
            }

            Ok(()) as super::Result<()>
        })
        .unwrap();

    loop {
        sleep(Duration::from_secs(2)).await;
    }
}