Skip to main content

rsiot/components/shared_tasks/fieldbus_execution/
task_fieldbus_execution.rs

1use futures::TryFutureExt;
2use tokio::{sync::mpsc, task::JoinSet};
3
4use crate::{
5    components::shared_tasks::mpsc_to_msgbus,
6    components_config::master_device::{
7        self, DeviceTrait, FieldbusRequestWithIndex, FieldbusResponseWithIndex,
8        RequestResponseBound,
9    },
10    executor::{MsgBusLinker, join_set_spawn},
11    message::{Message, MsgDataBound},
12};
13
14use super::filter_identical_data;
15use super::{task_add_index::AddIndex, task_split_responses::SplitResponses};
16
17/// Запуск задач, общих для всех компонентов, выполняющих опрос устройств по шине
18pub struct FieldbusExecution<'a, TMsg, TError, TFieldbusRequest, TFieldbusResponse>
19where
20    TMsg: MsgDataBound + 'static,
21    TError: Send + Sync + 'static,
22{
23    /// Подключение к шине MsgBus
24    pub msgbus_linker: MsgBusLinker<TMsg>,
25
26    /// Ссылка на коллекцию задач tokio
27    pub task_set: &'a mut JoinSet<Result<(), TError>>,
28
29    /// Ошибка filter_identical_data
30    pub error_filter: fn(filter_identical_data::Error) -> TError,
31
32    /// Ошибка mpsc_to_msgbus
33    pub error_mpsc_to_msgbus: fn(mpsc_to_msgbus::Error) -> TError,
34
35    /// Ошибка master_device
36    pub error_master_device: fn(master_device::Error) -> TError,
37
38    /// Ошибка tokio::mpsc::send
39    pub error_tokiompscsend: fn() -> TError,
40
41    /// Массив устройств
42    pub devices: Vec<Box<dyn DeviceTrait<TMsg, TFieldbusRequest, TFieldbusResponse>>>,
43}
44
45impl<TMsg, TError, TFieldbusRequest, TFieldbusResponse>
46    FieldbusExecution<'_, TMsg, TError, TFieldbusRequest, TFieldbusResponse>
47where
48    TMsg: MsgDataBound + 'static,
49    TError: Send + Sync + 'static,
50    TFieldbusRequest: RequestResponseBound + 'static,
51    TFieldbusResponse: RequestResponseBound + 'static,
52{
53    /// Запуск задач.
54    ///
55    /// Возвращает кортеж с каналами передачи запросов / ответов
56    pub fn spawn(
57        self,
58    ) -> (
59        mpsc::Receiver<FieldbusRequestWithIndex<TFieldbusRequest>>,
60        mpsc::Sender<FieldbusResponseWithIndex<TFieldbusResponse>>,
61    ) {
62        let devices_count = self.devices.len();
63        let buffer_size = self.msgbus_linker.max_capacity();
64
65        // Создание каналов передачи данных --------------------------------------------------------
66
67        // Каналы передачи запросов из устройств в задачи добавления индекса
68        let mut ch_tx_device_to_addindex = vec![];
69        let mut ch_rx_device_to_addindex = vec![];
70
71        for _ in 0..devices_count {
72            let (ch_tx, ch_rx) = mpsc::channel::<TFieldbusRequest>(buffer_size);
73            ch_tx_device_to_addindex.push(ch_tx);
74            ch_rx_device_to_addindex.push(ch_rx);
75        }
76
77        // Канал передачи запросов из задач добавления индекса в шину
78        let (ch_tx_addindex_to_fieldbus, ch_rx_addindex_to_fieldbus) =
79            mpsc::channel::<FieldbusRequestWithIndex<TFieldbusRequest>>(buffer_size);
80
81        // Канал передачи ответов из шины в задачу разделения ответов для устройств
82        let (ch_tx_fieldbus_to_split, ch_rx_fieldbus_to_split) =
83            mpsc::channel::<FieldbusResponseWithIndex<TFieldbusResponse>>(buffer_size);
84
85        // Каналы передачи ответов из задачи разделения на устройства
86        let mut ch_tx_split_to_devices = vec![];
87        let mut ch_rx_split_to_devices = vec![];
88        for _ in 0..devices_count {
89            let (ch_tx, ch_rx) = mpsc::channel::<TFieldbusResponse>(buffer_size);
90            ch_tx_split_to_devices.push(ch_tx);
91            ch_rx_split_to_devices.push(Some(ch_rx));
92        }
93
94        // Канал передачи сообщений из устройств на фильтр
95        let (ch_tx_devices_to_filter, ch_rx_devices_to_filter) =
96            mpsc::channel::<Message<TMsg>>(buffer_size);
97
98        // Канал передачи сообщений из фильтра на выход компонента
99        let (ch_tx_filter_to_msgbus, ch_rx_filter_to_msgbus) =
100            mpsc::channel::<Message<TMsg>>(buffer_size);
101
102        // Задачи выполнения устройств -------------------------------------------------------------
103        let mut input_vec = vec![];
104        for _ in 0..self.devices.len() {
105            input_vec.push(self.msgbus_linker.input());
106        }
107
108        for (index, device) in self.devices.into_iter().enumerate() {
109            let ch_rx_msgbus_to_devices = input_vec[index].clone();
110            let ch_tx_device_to_addindex = ch_tx_device_to_addindex[index].clone();
111            let Some(ch_rx_fieldbus_to_device) = ch_rx_split_to_devices[index].take() else {
112                panic!("Error configuration in fn_process_master");
113            };
114            let ch_tx_devices_to_filter = ch_tx_devices_to_filter.clone();
115            let task = device.spawn(
116                ch_rx_msgbus_to_devices,
117                ch_tx_device_to_addindex,
118                ch_rx_fieldbus_to_device,
119                ch_tx_devices_to_filter,
120            );
121            join_set_spawn(
122                self.task_set,
123                "fn_process_master | device",
124                task.map_err(self.error_master_device),
125            );
126        }
127
128        // Задачи добавления индекса
129        for (device_index, ch_rx) in ch_rx_device_to_addindex.into_iter().enumerate() {
130            let task = AddIndex {
131                input: ch_rx,
132                output: ch_tx_addindex_to_fieldbus.clone(),
133                device_index,
134                error_tokiompscsend: self.error_tokiompscsend,
135            };
136            join_set_spawn(self.task_set, "fn_process_master | add_index", task.spawn());
137        }
138
139        // Задача разделения ответов
140        let task = SplitResponses {
141            input: ch_rx_fieldbus_to_split,
142            output: ch_tx_split_to_devices,
143            error_tokiompscsend: self.error_tokiompscsend,
144        };
145        join_set_spawn(
146            self.task_set,
147            "fn_process_master | split_responses",
148            task.spawn(),
149        );
150
151        // Фильтрация одинаковых сообщений ---------------------------------------------------------
152        let task = filter_identical_data::FilterIdenticalData {
153            input: ch_rx_devices_to_filter,
154            output: ch_tx_filter_to_msgbus,
155        };
156        join_set_spawn(
157            self.task_set,
158            "fn_process_master | filter_identical_data",
159            task.spawn().map_err(self.error_filter),
160        );
161
162        // Создаем исходящие сообщения -------------------------------------------------------------
163        let task = mpsc_to_msgbus::MpscToMsgBus {
164            input: ch_rx_filter_to_msgbus,
165            output: self.msgbus_linker.output(),
166        };
167        join_set_spawn(
168            self.task_set,
169            "fn_process_master | mpsc_to_msgbus",
170            task.spawn().map_err(self.error_mpsc_to_msgbus),
171        );
172
173        drop(self.msgbus_linker);
174
175        (ch_rx_addindex_to_fieldbus, ch_tx_fieldbus_to_split)
176    }
177}