rsiot/components/cmp_esp_uart_slave/
fn_process.rs1use std::sync::Arc;
2
3use esp_idf_svc::hal::{
4 gpio,
5 peripheral::Peripheral,
6 uart::{self, AsyncUartDriver, Uart},
7};
8use futures::TryFutureExt;
9use tokio::{
10 sync::{mpsc, Mutex},
11 task::JoinSet,
12};
13
14use crate::components_config::uart_general::Parity;
15use crate::{
16 components::shared_tasks::{filter_identical_data, mpsc_to_msgbus},
17 executor::{join_set_spawn, CmpInOut},
18 message::MsgDataBound,
19};
20
21use super::{tasks, Config};
22
23pub async fn fn_process<TMsg, TUart, TPeripheral, TBufferData>(
24 config: Config<TMsg, TUart, TPeripheral, TBufferData>,
25 msg_bus: CmpInOut<TMsg>,
26) -> super::Result<()>
27where
28 TMsg: 'static + MsgDataBound,
29 TUart: Peripheral<P = TPeripheral> + 'static,
30 TPeripheral: Uart,
31 TBufferData: 'static,
32{
33 let uart_config = uart::config::Config::new()
34 .baudrate(config.baudrate.into())
35 .data_bits(config.data_bits.into())
36 .stop_bits(config.stop_bits.into())
37 .mode(uart::config::Mode::RS485HalfDuplex);
38 let uart_config = match config.parity {
39 Parity::None => uart_config.parity_none(),
40 Parity::Even => uart_config.parity_even(),
41 Parity::Odd => uart_config.parity_odd(),
42 };
43
44 let uart = AsyncUartDriver::new(
45 config.uart,
46 config.pin_tx,
47 config.pin_rx,
48 Option::<gpio::Gpio0>::None,
49 Some(config.pin_rts),
50 &uart_config,
51 )
52 .unwrap();
53
54 let buffer_data = config.buffer_data_default;
55 let buffer_data = Arc::new(Mutex::new(buffer_data));
56
57 let (ch_tx_output_to_filter, ch_rx_output_to_filter) = mpsc::channel(100);
58 let (ch_tx_filter_to_msgbus, ch_rx_filter_to_msgbus) = mpsc::channel(100);
59
60 let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
61
62 let task = tasks::Input {
64 msg_bus: msg_bus.clone(),
65 fn_input: config.fn_input,
66 buffer_data: buffer_data.clone(),
67 };
68 join_set_spawn(&mut task_set, task.spawn());
69
70 let task = tasks::UartComm {
72 uart,
73 fn_uart_comm: config.fn_uart_comm,
74 buffer_data: buffer_data.clone(),
75 };
76 join_set_spawn(&mut task_set, task.spawn());
77
78 let task = tasks::Output {
80 output: ch_tx_output_to_filter,
81 buffer_data: buffer_data.clone(),
82 fn_output: config.fn_output,
83 fn_output_period: config.fn_output_period,
84 };
85 join_set_spawn(&mut task_set, task.spawn());
86
87 let task = filter_identical_data::FilterIdenticalData {
89 input: ch_rx_output_to_filter,
90 output: ch_tx_filter_to_msgbus,
91 };
92 join_set_spawn(
93 &mut task_set,
94 task.spawn().map_err(super::Error::TaskFilterIdenticalData),
95 );
96
97 let task = mpsc_to_msgbus::MpscToMsgBus {
99 input: ch_rx_filter_to_msgbus,
100 msg_bus: msg_bus.clone(),
101 };
102 join_set_spawn(
103 &mut task_set,
104 task.spawn().map_err(super::Error::TaskMpscToMsgbus),
105 );
106
107 while let Some(res) = task_set.join_next().await {
109 res??;
110 }
111
112 Ok(())
113}