rsiot/components/cmp_esp_uart_slave/
fn_process.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use std::sync::Arc;

use esp_idf_svc::hal::{
    gpio,
    peripheral::Peripheral,
    uart::{self, AsyncUartDriver, Uart},
};
use futures::TryFutureExt;
use tokio::{
    sync::{mpsc, Mutex},
    task::JoinSet,
};

use crate::components_config::uart_general::Parity;
use crate::{
    components::shared_tasks::{filter_identical_data, mpsc_to_msgbus},
    executor::{join_set_spawn, CmpInOut},
    message::{MsgDataBound, ServiceBound},
};

use super::{tasks, Config};

pub async fn fn_process<TMsg, TService, TUart, TPeripheral, TBufferData>(
    config: Config<TMsg, TUart, TPeripheral, TBufferData>,
    msg_bus: CmpInOut<TMsg, TService>,
) -> super::Result<()>
where
    TMsg: 'static + MsgDataBound,
    TService: ServiceBound + 'static,
    TUart: Peripheral<P = TPeripheral> + 'static,
    TPeripheral: Uart,
    TBufferData: 'static,
{
    let uart_config = uart::config::Config::new()
        .baudrate(config.baudrate.into())
        .data_bits(config.data_bits.into())
        .stop_bits(config.stop_bits.into())
        .mode(uart::config::Mode::RS485HalfDuplex);
    let uart_config = match config.parity {
        Parity::None => uart_config.parity_none(),
        Parity::Even => uart_config.parity_even(),
        Parity::Odd => uart_config.parity_odd(),
    };

    let uart = AsyncUartDriver::new(
        config.uart,
        config.pin_tx,
        config.pin_rx,
        Option::<gpio::Gpio0>::None,
        Some(config.pin_rts),
        &uart_config,
    )
    .unwrap();

    let buffer_data = config.buffer_data_default;
    let buffer_data = Arc::new(Mutex::new(buffer_data));

    let (ch_tx_output_to_filter, ch_rx_output_to_filter) = mpsc::channel(100);
    let (ch_tx_filter_to_msgbus, ch_rx_filter_to_msgbus) = mpsc::channel(100);

    let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();

    // Задача обработки входящих сообщений
    let task = tasks::Input {
        msg_bus: msg_bus.clone(),
        fn_input: config.fn_input,
        buffer_data: buffer_data.clone(),
    };
    join_set_spawn(&mut task_set, task.spawn());

    // Задача коммуникации по протоклу UART
    let task = tasks::UartComm {
        address: config.address,
        uart,
        fn_uart_comm: config.fn_uart_comm,
        buffer_data: buffer_data.clone(),
    };
    join_set_spawn(&mut task_set, task.spawn());

    // Задача генерирования исходящих сообщений
    let task = tasks::Output {
        output: ch_tx_output_to_filter,
        buffer_data: buffer_data.clone(),
        fn_output: config.fn_output,
        fn_output_period: config.fn_output_period,
    };
    join_set_spawn(&mut task_set, task.spawn());

    // Задача фильтрации исходящих сообщений
    let task = filter_identical_data::FilterIdenticalData {
        input: ch_rx_output_to_filter,
        output: ch_tx_filter_to_msgbus,
    };
    join_set_spawn(
        &mut task_set,
        task.spawn().map_err(super::Error::TaskFilterIdenticalData),
    );

    // Задача передачи сообщений в шину
    let task = mpsc_to_msgbus::MpscToMsgBus {
        input: ch_rx_filter_to_msgbus,
        msg_bus: msg_bus.clone(),
    };
    join_set_spawn(
        &mut task_set,
        task.spawn().map_err(super::Error::TaskMpscToMsgbus),
    );

    // Ждем выполнения задач
    while let Some(res) = task_set.join_next().await {
        res??;
    }

    Ok(())
}