rsiot/components/shared_tasks/
fn_process_master.rs

1//! Запуск задач, общих для всех компонентов, выполняющих опрос устройств по шине
2
3use std::ops::Index;
4
5use futures::TryFutureExt;
6use tokio::{
7    sync::{broadcast, mpsc},
8    task::JoinSet,
9};
10
11use crate::{
12    components_config::master_device::{
13        self, DeviceTrait, FieldbusRequestWithIndex, FieldbusResponseWithIndex,
14        RequestResponseBound,
15    },
16    executor::{join_set_spawn, CmpInOut},
17    message::{Message, MsgDataBound},
18};
19
20use super::{filter_identical_data, mpsc_to_msgbus, msgbus_to_broadcast};
21
22/// Запуск задач, общих для всех компонентов, выполняющих опрос устройств по шине
23pub struct FnProcessMaster<'a, TMsg, TError, TFieldbusRequest, TFieldbusResponse>
24where
25    TMsg: MsgDataBound + 'static,
26    TError: Send + Sync + 'static,
27{
28    /// Шина сообщений
29    pub msg_bus: CmpInOut<TMsg>,
30
31    /// Ёмкость очередей сообщений между задачами
32    pub buffer_size: usize,
33
34    /// Ссылка на коллекцию задач tokio
35    pub task_set: &'a mut JoinSet<Result<(), TError>>,
36
37    /// Ошибка msgbus_to_broadcast
38    pub error_msgbus_to_broadcast: fn(msgbus_to_broadcast::Error) -> TError,
39
40    /// Ошибка filter_identical_data
41    pub error_filter: fn(filter_identical_data::Error) -> TError,
42
43    /// Ошибка mpsc_to_msgbus
44    pub error_mpsc_to_msgbus: fn(mpsc_to_msgbus::Error) -> TError,
45
46    /// Ошибка master_device
47    pub error_master_device: fn(master_device::Error) -> TError,
48
49    /// Ошибка tokio::mpsc::send
50    pub error_tokiompscsend: fn() -> TError,
51
52    /// Массив устройств
53    pub devices: Vec<Box<dyn DeviceTrait<TMsg, TFieldbusRequest, TFieldbusResponse>>>,
54}
55
56impl<TMsg, TError, TFieldbusRequest, TFieldbusResponse>
57    FnProcessMaster<'_, TMsg, TError, TFieldbusRequest, TFieldbusResponse>
58where
59    TMsg: MsgDataBound + 'static,
60    TError: Send + Sync + 'static,
61    TFieldbusRequest: RequestResponseBound + 'static,
62    TFieldbusResponse: RequestResponseBound + 'static,
63{
64    /// Запуск задач.
65    ///
66    /// Возвращает кортеж с каналами передачи запросов / ответов
67    pub fn spawn(
68        self,
69    ) -> (
70        mpsc::Receiver<FieldbusRequestWithIndex<TFieldbusRequest>>,
71        mpsc::Sender<FieldbusResponseWithIndex<TFieldbusResponse>>,
72    ) {
73        let devices_count = self.devices.len();
74
75        // Создание каналов передачи данных --------------------------------------------------------
76
77        // Канал передачи со входа компонента на устройства
78        let (ch_tx_msgbus_to_devices, ch_rx_msgbus_to_devices) =
79            broadcast::channel::<Message<TMsg>>(self.buffer_size);
80
81        // Каналы передачи запросов из устройств в задачи добавления индекса
82        let mut ch_tx_device_to_addindex = vec![];
83        let mut ch_rx_device_to_addindex = vec![];
84
85        for _ in 0..devices_count {
86            let (ch_tx, ch_rx) = mpsc::channel::<TFieldbusRequest>(self.buffer_size);
87            ch_tx_device_to_addindex.push(ch_tx);
88            ch_rx_device_to_addindex.push(ch_rx);
89        }
90
91        // Канал передачи запросов из задач добавления индекса в шину
92        let (ch_tx_addindex_to_fieldbus, ch_rx_addindex_to_fieldbus) =
93            mpsc::channel::<FieldbusRequestWithIndex<TFieldbusRequest>>(self.buffer_size);
94
95        // Канал передачи ответов из шины в задачу разделения ответов для устройств
96        let (ch_tx_fieldbus_to_split, ch_rx_fieldbus_to_split) =
97            mpsc::channel::<FieldbusResponseWithIndex<TFieldbusResponse>>(self.buffer_size);
98
99        // Каналы передачи ответов из задачи разделения на устройства
100        let mut ch_tx_split_to_devices = vec![];
101        let mut ch_rx_split_to_devices = vec![];
102        for _ in 0..devices_count {
103            let (ch_tx, ch_rx) = mpsc::channel::<TFieldbusResponse>(self.buffer_size);
104            ch_tx_split_to_devices.push(ch_tx);
105            ch_rx_split_to_devices.push(Some(ch_rx));
106        }
107
108        // Канал передачи сообщений из устройств на фильтр
109        let (ch_tx_devices_to_filter, ch_rx_devices_to_filter) =
110            mpsc::channel::<Message<TMsg>>(self.buffer_size);
111
112        // Канал передачи сообщений из фильтра на выход компонента
113        let (ch_tx_filter_to_msgbus, ch_rx_filter_to_msgbus) =
114            mpsc::channel::<Message<TMsg>>(self.buffer_size);
115
116        // Передача входящих сообщений на устройства -----------------------------------------------
117        let task = msgbus_to_broadcast::MsgBusToBroadcast {
118            msg_bus: self.msg_bus.clone(),
119            output: ch_tx_msgbus_to_devices,
120        };
121        join_set_spawn(
122            self.task_set,
123            task.spawn().map_err(self.error_msgbus_to_broadcast),
124        );
125
126        // Задачи выполнения устройств -------------------------------------------------------------
127        for (index, device) in self.devices.into_iter().enumerate() {
128            let ch_rx_msgbus_to_devices = ch_rx_msgbus_to_devices.resubscribe();
129            let ch_tx_device_to_addindex = ch_tx_device_to_addindex[index].clone();
130            let ch_rx_fieldbus_to_device = ch_rx_split_to_devices[index].take().unwrap();
131            let ch_tx_devices_to_filter = ch_tx_devices_to_filter.clone();
132            let task = device.spawn(
133                ch_rx_msgbus_to_devices,
134                ch_tx_device_to_addindex,
135                ch_rx_fieldbus_to_device,
136                ch_tx_devices_to_filter,
137            );
138            join_set_spawn(self.task_set, task.map_err(self.error_master_device));
139        }
140
141        // Задачи добавления индекса
142        for (device_index, ch_rx) in ch_rx_device_to_addindex.into_iter().enumerate() {
143            let task = AddIndex {
144                input: ch_rx,
145                output: ch_tx_addindex_to_fieldbus.clone(),
146                device_index,
147                error_tokiompscsend: self.error_tokiompscsend,
148            };
149            join_set_spawn(self.task_set, task.spawn());
150        }
151
152        // Задача разделения ответов
153        let task = SplitResponses {
154            input: ch_rx_fieldbus_to_split,
155            output: ch_tx_split_to_devices,
156            error_tokiompscsend: self.error_tokiompscsend,
157        };
158        join_set_spawn(self.task_set, task.spawn());
159
160        // Фильтрация одинаковых сообщений ---------------------------------------------------------
161        let task = filter_identical_data::FilterIdenticalData {
162            input: ch_rx_devices_to_filter,
163            output: ch_tx_filter_to_msgbus,
164        };
165        join_set_spawn(self.task_set, task.spawn().map_err(self.error_filter));
166
167        // Создаем исходящие сообщения -------------------------------------------------------------
168        let task = mpsc_to_msgbus::MpscToMsgBus {
169            input: ch_rx_filter_to_msgbus,
170            msg_bus: self.msg_bus.clone(),
171        };
172        join_set_spawn(
173            self.task_set,
174            task.spawn().map_err(self.error_mpsc_to_msgbus),
175        );
176
177        (ch_rx_addindex_to_fieldbus, ch_tx_fieldbus_to_split)
178    }
179}
180
181struct AddIndex<TFieldbusRequest, TError>
182where
183    TFieldbusRequest: RequestResponseBound,
184{
185    pub input: mpsc::Receiver<TFieldbusRequest>,
186    pub output: mpsc::Sender<FieldbusRequestWithIndex<TFieldbusRequest>>,
187    pub device_index: usize,
188    pub error_tokiompscsend: fn() -> TError,
189}
190impl<TFieldbusRequest, TError> AddIndex<TFieldbusRequest, TError>
191where
192    TFieldbusRequest: RequestResponseBound,
193{
194    pub async fn spawn(mut self) -> Result<(), TError> {
195        while let Some(request) = self.input.recv().await {
196            let request_with_index = FieldbusRequestWithIndex {
197                device_index: self.device_index,
198                request,
199            };
200            self.output
201                .send(request_with_index)
202                .await
203                .map_err(|_| (self.error_tokiompscsend)())?;
204        }
205        Ok(())
206    }
207}
208
209struct SplitResponses<TFieldbusResponse, TError>
210where
211    TFieldbusResponse: RequestResponseBound,
212{
213    pub input: mpsc::Receiver<FieldbusResponseWithIndex<TFieldbusResponse>>,
214    pub output: Vec<mpsc::Sender<TFieldbusResponse>>,
215    pub error_tokiompscsend: fn() -> TError,
216}
217impl<TFieldbusResponse, TError> SplitResponses<TFieldbusResponse, TError>
218where
219    TFieldbusResponse: RequestResponseBound,
220{
221    pub async fn spawn(mut self) -> Result<(), TError> {
222        while let Some(response_with_index) = self.input.recv().await {
223            let device_index = response_with_index.device_index;
224            let response = response_with_index.response;
225            self.output[device_index]
226                .send(response)
227                .await
228                .map_err(|_| (self.error_tokiompscsend)())?;
229        }
230        Ok(())
231    }
232}