rsiot/components/cmp_esp_uart_slave/
fn_process.rs

1use std::sync::Arc;
2
3use esp_idf_svc::hal::{
4    gpio,
5    peripheral::Peripheral,
6    uart::{self, AsyncUartDriver, Uart},
7};
8use futures::TryFutureExt;
9use tokio::{
10    sync::{mpsc, Mutex},
11    task::JoinSet,
12};
13
14use crate::components_config::uart_general::Parity;
15use crate::{
16    components::shared_tasks::{filter_identical_data, mpsc_to_msgbus},
17    executor::{join_set_spawn, CmpInOut},
18    message::MsgDataBound,
19};
20
21use super::{tasks, Config};
22
23pub async fn fn_process<TMsg, TUart, TPeripheral, TBufferData>(
24    config: Config<TMsg, TUart, TPeripheral, TBufferData>,
25    msg_bus: CmpInOut<TMsg>,
26) -> super::Result<()>
27where
28    TMsg: 'static + MsgDataBound,
29    TUart: Peripheral<P = TPeripheral> + 'static,
30    TPeripheral: Uart,
31    TBufferData: 'static,
32{
33    let uart_config = uart::config::Config::new()
34        .baudrate(config.baudrate.into())
35        .data_bits(config.data_bits.into())
36        .stop_bits(config.stop_bits.into())
37        .mode(uart::config::Mode::RS485HalfDuplex);
38    let uart_config = match config.parity {
39        Parity::None => uart_config.parity_none(),
40        Parity::Even => uart_config.parity_even(),
41        Parity::Odd => uart_config.parity_odd(),
42    };
43
44    let uart = AsyncUartDriver::new(
45        config.uart,
46        config.pin_tx,
47        config.pin_rx,
48        Option::<gpio::Gpio0>::None,
49        Some(config.pin_rts),
50        &uart_config,
51    )
52    .unwrap();
53
54    let buffer_data = config.buffer_data_default;
55    let buffer_data = Arc::new(Mutex::new(buffer_data));
56
57    let (ch_tx_output_to_filter, ch_rx_output_to_filter) = mpsc::channel(100);
58    let (ch_tx_filter_to_msgbus, ch_rx_filter_to_msgbus) = mpsc::channel(100);
59
60    let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
61
62    // Задача обработки входящих сообщений
63    let task = tasks::Input {
64        msg_bus: msg_bus.clone(),
65        fn_input: config.fn_input,
66        buffer_data: buffer_data.clone(),
67    };
68    join_set_spawn(&mut task_set, task.spawn());
69
70    // Задача коммуникации по протоклу UART
71    let task = tasks::UartComm {
72        uart,
73        fn_uart_comm: config.fn_uart_comm,
74        buffer_data: buffer_data.clone(),
75    };
76    join_set_spawn(&mut task_set, task.spawn());
77
78    // Задача генерирования исходящих сообщений
79    let task = tasks::Output {
80        output: ch_tx_output_to_filter,
81        buffer_data: buffer_data.clone(),
82        fn_output: config.fn_output,
83        fn_output_period: config.fn_output_period,
84    };
85    join_set_spawn(&mut task_set, task.spawn());
86
87    // Задача фильтрации исходящих сообщений
88    let task = filter_identical_data::FilterIdenticalData {
89        input: ch_rx_output_to_filter,
90        output: ch_tx_filter_to_msgbus,
91    };
92    join_set_spawn(
93        &mut task_set,
94        task.spawn().map_err(super::Error::TaskFilterIdenticalData),
95    );
96
97    // Задача передачи сообщений в шину
98    let task = mpsc_to_msgbus::MpscToMsgBus {
99        input: ch_rx_filter_to_msgbus,
100        msg_bus: msg_bus.clone(),
101    };
102    join_set_spawn(
103        &mut task_set,
104        task.spawn().map_err(super::Error::TaskMpscToMsgbus),
105    );
106
107    // Ждем выполнения задач
108    while let Some(res) = task_set.join_next().await {
109        res??;
110    }
111
112    Ok(())
113}