rsiot/components/shared_tasks/
fn_process_master.rs1use std::ops::Index;
4
5use futures::TryFutureExt;
6use tokio::{
7 sync::{broadcast, mpsc},
8 task::JoinSet,
9};
10
11use crate::{
12 components_config::master_device::{
13 self, DeviceTrait, FieldbusRequestWithIndex, FieldbusResponseWithIndex,
14 RequestResponseBound,
15 },
16 executor::{join_set_spawn, CmpInOut},
17 message::{Message, MsgDataBound},
18};
19
20use super::{filter_identical_data, mpsc_to_msgbus, msgbus_to_broadcast};
21
22pub struct FnProcessMaster<'a, TMsg, TError, TFieldbusRequest, TFieldbusResponse>
24where
25 TMsg: MsgDataBound + 'static,
26 TError: Send + Sync + 'static,
27{
28 pub msg_bus: CmpInOut<TMsg>,
30
31 pub buffer_size: usize,
33
34 pub task_set: &'a mut JoinSet<Result<(), TError>>,
36
37 pub error_msgbus_to_broadcast: fn(msgbus_to_broadcast::Error) -> TError,
39
40 pub error_filter: fn(filter_identical_data::Error) -> TError,
42
43 pub error_mpsc_to_msgbus: fn(mpsc_to_msgbus::Error) -> TError,
45
46 pub error_master_device: fn(master_device::Error) -> TError,
48
49 pub error_tokiompscsend: fn() -> TError,
51
52 pub devices: Vec<Box<dyn DeviceTrait<TMsg, TFieldbusRequest, TFieldbusResponse>>>,
54}
55
56impl<TMsg, TError, TFieldbusRequest, TFieldbusResponse>
57 FnProcessMaster<'_, TMsg, TError, TFieldbusRequest, TFieldbusResponse>
58where
59 TMsg: MsgDataBound + 'static,
60 TError: Send + Sync + 'static,
61 TFieldbusRequest: RequestResponseBound + 'static,
62 TFieldbusResponse: RequestResponseBound + 'static,
63{
64 pub fn spawn(
68 self,
69 ) -> (
70 mpsc::Receiver<FieldbusRequestWithIndex<TFieldbusRequest>>,
71 mpsc::Sender<FieldbusResponseWithIndex<TFieldbusResponse>>,
72 ) {
73 let devices_count = self.devices.len();
74
75 let (ch_tx_msgbus_to_devices, ch_rx_msgbus_to_devices) =
79 broadcast::channel::<Message<TMsg>>(self.buffer_size);
80
81 let mut ch_tx_device_to_addindex = vec![];
83 let mut ch_rx_device_to_addindex = vec![];
84
85 for _ in 0..devices_count {
86 let (ch_tx, ch_rx) = mpsc::channel::<TFieldbusRequest>(self.buffer_size);
87 ch_tx_device_to_addindex.push(ch_tx);
88 ch_rx_device_to_addindex.push(ch_rx);
89 }
90
91 let (ch_tx_addindex_to_fieldbus, ch_rx_addindex_to_fieldbus) =
93 mpsc::channel::<FieldbusRequestWithIndex<TFieldbusRequest>>(self.buffer_size);
94
95 let (ch_tx_fieldbus_to_split, ch_rx_fieldbus_to_split) =
97 mpsc::channel::<FieldbusResponseWithIndex<TFieldbusResponse>>(self.buffer_size);
98
99 let mut ch_tx_split_to_devices = vec![];
101 let mut ch_rx_split_to_devices = vec![];
102 for _ in 0..devices_count {
103 let (ch_tx, ch_rx) = mpsc::channel::<TFieldbusResponse>(self.buffer_size);
104 ch_tx_split_to_devices.push(ch_tx);
105 ch_rx_split_to_devices.push(Some(ch_rx));
106 }
107
108 let (ch_tx_devices_to_filter, ch_rx_devices_to_filter) =
110 mpsc::channel::<Message<TMsg>>(self.buffer_size);
111
112 let (ch_tx_filter_to_msgbus, ch_rx_filter_to_msgbus) =
114 mpsc::channel::<Message<TMsg>>(self.buffer_size);
115
116 let task = msgbus_to_broadcast::MsgBusToBroadcast {
118 msg_bus: self.msg_bus.clone(),
119 output: ch_tx_msgbus_to_devices,
120 };
121 join_set_spawn(
122 self.task_set,
123 task.spawn().map_err(self.error_msgbus_to_broadcast),
124 );
125
126 for (index, device) in self.devices.into_iter().enumerate() {
128 let ch_rx_msgbus_to_devices = ch_rx_msgbus_to_devices.resubscribe();
129 let ch_tx_device_to_addindex = ch_tx_device_to_addindex[index].clone();
130 let ch_rx_fieldbus_to_device = ch_rx_split_to_devices[index].take().unwrap();
131 let ch_tx_devices_to_filter = ch_tx_devices_to_filter.clone();
132 let task = device.spawn(
133 ch_rx_msgbus_to_devices,
134 ch_tx_device_to_addindex,
135 ch_rx_fieldbus_to_device,
136 ch_tx_devices_to_filter,
137 );
138 join_set_spawn(self.task_set, task.map_err(self.error_master_device));
139 }
140
141 for (device_index, ch_rx) in ch_rx_device_to_addindex.into_iter().enumerate() {
143 let task = AddIndex {
144 input: ch_rx,
145 output: ch_tx_addindex_to_fieldbus.clone(),
146 device_index,
147 error_tokiompscsend: self.error_tokiompscsend,
148 };
149 join_set_spawn(self.task_set, task.spawn());
150 }
151
152 let task = SplitResponses {
154 input: ch_rx_fieldbus_to_split,
155 output: ch_tx_split_to_devices,
156 error_tokiompscsend: self.error_tokiompscsend,
157 };
158 join_set_spawn(self.task_set, task.spawn());
159
160 let task = filter_identical_data::FilterIdenticalData {
162 input: ch_rx_devices_to_filter,
163 output: ch_tx_filter_to_msgbus,
164 };
165 join_set_spawn(self.task_set, task.spawn().map_err(self.error_filter));
166
167 let task = mpsc_to_msgbus::MpscToMsgBus {
169 input: ch_rx_filter_to_msgbus,
170 msg_bus: self.msg_bus.clone(),
171 };
172 join_set_spawn(
173 self.task_set,
174 task.spawn().map_err(self.error_mpsc_to_msgbus),
175 );
176
177 (ch_rx_addindex_to_fieldbus, ch_tx_fieldbus_to_split)
178 }
179}
180
181struct AddIndex<TFieldbusRequest, TError>
182where
183 TFieldbusRequest: RequestResponseBound,
184{
185 pub input: mpsc::Receiver<TFieldbusRequest>,
186 pub output: mpsc::Sender<FieldbusRequestWithIndex<TFieldbusRequest>>,
187 pub device_index: usize,
188 pub error_tokiompscsend: fn() -> TError,
189}
190impl<TFieldbusRequest, TError> AddIndex<TFieldbusRequest, TError>
191where
192 TFieldbusRequest: RequestResponseBound,
193{
194 pub async fn spawn(mut self) -> Result<(), TError> {
195 while let Some(request) = self.input.recv().await {
196 let request_with_index = FieldbusRequestWithIndex {
197 device_index: self.device_index,
198 request,
199 };
200 self.output
201 .send(request_with_index)
202 .await
203 .map_err(|_| (self.error_tokiompscsend)())?;
204 }
205 Ok(())
206 }
207}
208
209struct SplitResponses<TFieldbusResponse, TError>
210where
211 TFieldbusResponse: RequestResponseBound,
212{
213 pub input: mpsc::Receiver<FieldbusResponseWithIndex<TFieldbusResponse>>,
214 pub output: Vec<mpsc::Sender<TFieldbusResponse>>,
215 pub error_tokiompscsend: fn() -> TError,
216}
217impl<TFieldbusResponse, TError> SplitResponses<TFieldbusResponse, TError>
218where
219 TFieldbusResponse: RequestResponseBound,
220{
221 pub async fn spawn(mut self) -> Result<(), TError> {
222 while let Some(response_with_index) = self.input.recv().await {
223 let device_index = response_with_index.device_index;
224 let response = response_with_index.response;
225 self.output[device_index]
226 .send(response)
227 .await
228 .map_err(|_| (self.error_tokiompscsend)())?;
229 }
230 Ok(())
231 }
232}