rsiot/components/shared_tasks/fieldbus_execution/
task_fieldbus_execution.rs1use futures::TryFutureExt;
2use tokio::{sync::mpsc, task::JoinSet};
3
4use crate::{
5 components::shared_tasks::mpsc_to_msgbus,
6 components_config::master_device::{
7 self, DeviceTrait, FieldbusRequestWithIndex, FieldbusResponseWithIndex,
8 RequestResponseBound,
9 },
10 executor::{MsgBusLinker, join_set_spawn},
11 message::{Message, MsgDataBound},
12};
13
14use super::filter_identical_data;
15use super::{task_add_index::AddIndex, task_split_responses::SplitResponses};
16
17pub struct FieldbusExecution<'a, TMsg, TError, TFieldbusRequest, TFieldbusResponse>
19where
20 TMsg: MsgDataBound + 'static,
21 TError: Send + Sync + 'static,
22{
23 pub msgbus_linker: MsgBusLinker<TMsg>,
25
26 pub task_set: &'a mut JoinSet<Result<(), TError>>,
28
29 pub error_filter: fn(filter_identical_data::Error) -> TError,
31
32 pub error_mpsc_to_msgbus: fn(mpsc_to_msgbus::Error) -> TError,
34
35 pub error_master_device: fn(master_device::Error) -> TError,
37
38 pub error_tokiompscsend: fn() -> TError,
40
41 pub devices: Vec<Box<dyn DeviceTrait<TMsg, TFieldbusRequest, TFieldbusResponse>>>,
43}
44
45impl<TMsg, TError, TFieldbusRequest, TFieldbusResponse>
46 FieldbusExecution<'_, TMsg, TError, TFieldbusRequest, TFieldbusResponse>
47where
48 TMsg: MsgDataBound + 'static,
49 TError: Send + Sync + 'static,
50 TFieldbusRequest: RequestResponseBound + 'static,
51 TFieldbusResponse: RequestResponseBound + 'static,
52{
53 pub fn spawn(
57 self,
58 ) -> (
59 mpsc::Receiver<FieldbusRequestWithIndex<TFieldbusRequest>>,
60 mpsc::Sender<FieldbusResponseWithIndex<TFieldbusResponse>>,
61 ) {
62 let devices_count = self.devices.len();
63 let buffer_size = self.msgbus_linker.max_capacity();
64
65 let mut ch_tx_device_to_addindex = vec![];
69 let mut ch_rx_device_to_addindex = vec![];
70
71 for _ in 0..devices_count {
72 let (ch_tx, ch_rx) = mpsc::channel::<TFieldbusRequest>(buffer_size);
73 ch_tx_device_to_addindex.push(ch_tx);
74 ch_rx_device_to_addindex.push(ch_rx);
75 }
76
77 let (ch_tx_addindex_to_fieldbus, ch_rx_addindex_to_fieldbus) =
79 mpsc::channel::<FieldbusRequestWithIndex<TFieldbusRequest>>(buffer_size);
80
81 let (ch_tx_fieldbus_to_split, ch_rx_fieldbus_to_split) =
83 mpsc::channel::<FieldbusResponseWithIndex<TFieldbusResponse>>(buffer_size);
84
85 let mut ch_tx_split_to_devices = vec![];
87 let mut ch_rx_split_to_devices = vec![];
88 for _ in 0..devices_count {
89 let (ch_tx, ch_rx) = mpsc::channel::<TFieldbusResponse>(buffer_size);
90 ch_tx_split_to_devices.push(ch_tx);
91 ch_rx_split_to_devices.push(Some(ch_rx));
92 }
93
94 let (ch_tx_devices_to_filter, ch_rx_devices_to_filter) =
96 mpsc::channel::<Message<TMsg>>(buffer_size);
97
98 let (ch_tx_filter_to_msgbus, ch_rx_filter_to_msgbus) =
100 mpsc::channel::<Message<TMsg>>(buffer_size);
101
102 let mut input_vec = vec![];
104 for _ in 0..self.devices.len() {
105 input_vec.push(self.msgbus_linker.input());
106 }
107
108 for (index, device) in self.devices.into_iter().enumerate() {
109 let ch_rx_msgbus_to_devices = input_vec[index].clone();
110 let ch_tx_device_to_addindex = ch_tx_device_to_addindex[index].clone();
111 let Some(ch_rx_fieldbus_to_device) = ch_rx_split_to_devices[index].take() else {
112 panic!("Error configuration in fn_process_master");
113 };
114 let ch_tx_devices_to_filter = ch_tx_devices_to_filter.clone();
115 let task = device.spawn(
116 ch_rx_msgbus_to_devices,
117 ch_tx_device_to_addindex,
118 ch_rx_fieldbus_to_device,
119 ch_tx_devices_to_filter,
120 );
121 join_set_spawn(
122 self.task_set,
123 "fn_process_master | device",
124 task.map_err(self.error_master_device),
125 );
126 }
127
128 for (device_index, ch_rx) in ch_rx_device_to_addindex.into_iter().enumerate() {
130 let task = AddIndex {
131 input: ch_rx,
132 output: ch_tx_addindex_to_fieldbus.clone(),
133 device_index,
134 error_tokiompscsend: self.error_tokiompscsend,
135 };
136 join_set_spawn(self.task_set, "fn_process_master | add_index", task.spawn());
137 }
138
139 let task = SplitResponses {
141 input: ch_rx_fieldbus_to_split,
142 output: ch_tx_split_to_devices,
143 error_tokiompscsend: self.error_tokiompscsend,
144 };
145 join_set_spawn(
146 self.task_set,
147 "fn_process_master | split_responses",
148 task.spawn(),
149 );
150
151 let task = filter_identical_data::FilterIdenticalData {
153 input: ch_rx_devices_to_filter,
154 output: ch_tx_filter_to_msgbus,
155 };
156 join_set_spawn(
157 self.task_set,
158 "fn_process_master | filter_identical_data",
159 task.spawn().map_err(self.error_filter),
160 );
161
162 let task = mpsc_to_msgbus::MpscToMsgBus {
164 input: ch_rx_filter_to_msgbus,
165 output: self.msgbus_linker.output(),
166 };
167 join_set_spawn(
168 self.task_set,
169 "fn_process_master | mpsc_to_msgbus",
170 task.spawn().map_err(self.error_mpsc_to_msgbus),
171 );
172
173 drop(self.msgbus_linker);
174
175 (ch_rx_addindex_to_fieldbus, ch_tx_fieldbus_to_split)
176 }
177}