rsiot/components_config/master_device/device/
device.rs1#![allow(clippy::module_inception)]
2
3use std::{sync::Arc, time::Duration};
4
5use futures::TryFutureExt;
6use tokio::{
7 sync::{Mutex, mpsc},
8 task::JoinSet,
9};
10
11use crate::{components::shared_tasks, executor::MsgBusInput, message::Message};
12use crate::{executor::join_set_spawn, message::MsgDataBound};
13
14use super::{BufferBound, RequestResponseBound, config::*, tasks};
15
16pub struct DeviceBase<TMsg, TFieldbusRequest, TFieldbusResponse, TBuffer>
18where
19 TFieldbusRequest: RequestResponseBound,
20{
21 pub fn_init_requests: fn(&TBuffer) -> Vec<TFieldbusRequest>,
23
24 pub periodic_requests: Vec<ConfigPeriodicRequest<TFieldbusRequest, TBuffer>>,
26
27 pub fn_msgs_to_buffer: fn(&TMsg, &mut TBuffer),
39
40 pub buffer_to_request_period: Duration,
42
43 pub fn_buffer_to_request: fn(&TBuffer) -> anyhow::Result<Vec<TFieldbusRequest>>,
53
54 pub fn_response_to_buffer: fn(TFieldbusResponse, &mut TBuffer) -> anyhow::Result<bool>,
56
57 pub fn_buffer_to_msgs: fn(&mut TBuffer) -> Vec<TMsg>,
67
68 pub buffer_default: TBuffer,
70}
71
72impl<TMsg, TRequest, TResponse, TBuffer> DeviceBase<TMsg, TRequest, TResponse, TBuffer>
73where
74 TRequest: 'static + RequestResponseBound,
75 TResponse: 'static + RequestResponseBound,
76 TMsg: MsgDataBound + 'static,
77 TBuffer: 'static + BufferBound,
78{
79 pub async fn spawn(
81 self,
82 id: impl AsRef<str>,
83 ch_rx_msgbus_to_device: MsgBusInput<TMsg>,
84 ch_tx_device_to_fieldbus: mpsc::Sender<TRequest>,
85 ch_rx_fieldbus_to_device: mpsc::Receiver<TResponse>,
86 ch_tx_device_to_msgbus: mpsc::Sender<Message<TMsg>>,
87 ) -> super::Result<()> {
88 let buffer = self.buffer_default;
89 let buffer = Arc::new(Mutex::new(buffer));
90
91 let (ch_tx_buffer, ch_rx_buffer) = mpsc::channel::<()>(100);
92 let (ch_tx_request, ch_rx_request) = mpsc::channel::<TRequest>(100);
93 let (ch_tx_output_to_filter, ch_rx_output_to_filter) = mpsc::channel::<Message<TMsg>>(500);
94
95 let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
96
97 let task = tasks::InitRequest {
101 buffer: buffer.clone(),
102 fn_init_requests: self.fn_init_requests,
103 ch_tx_request: ch_tx_request.clone(),
104 };
105 task.spawn().await?;
106
107 for periodic_request in self.periodic_requests {
109 let task = tasks::PeriodicRequest {
110 buffer: buffer.clone(),
111 period: periodic_request.period,
112 fn_request: periodic_request.fn_requests,
113 ch_tx_request: ch_tx_request.clone(),
114 };
115 join_set_spawn(
116 &mut task_set,
117 format!("master_device | periodic_request | {}", id.as_ref()),
118 task.spawn(),
119 );
120 }
121
122 let task = tasks::InputRequest {
124 buffer: buffer.clone(),
125 ch_rx_msgbus_to_device,
126 ch_tx_buffer: ch_tx_buffer.clone(),
127 fn_msgs_to_buffer: self.fn_msgs_to_buffer,
128 };
129 join_set_spawn(
130 &mut task_set,
131 format!("master_device | input_request | {}", id.as_ref()),
132 task.spawn(),
133 );
134
135 let task = tasks::BufferPeriodic {
137 ch_tx_buffer: ch_tx_buffer.clone(),
138 period: self.buffer_to_request_period,
139 };
140 join_set_spawn(
141 &mut task_set,
142 format!("master_device | buffer_periodic | {}", id.as_ref()),
143 task.spawn(),
144 );
145
146 let task = tasks::BufferToRequests {
148 buffer: buffer.clone(),
149 ch_rx_buffer,
150 ch_tx_request: ch_tx_request.clone(),
151 fn_buffer_to_request: self.fn_buffer_to_request,
152 };
153 join_set_spawn(
154 &mut task_set,
155 format!("master_device | buffer_to_requests | {}", id.as_ref()),
156 task.spawn(),
157 );
158
159 let task = tasks::Request {
161 ch_rx_request,
162 ch_tx_device_to_fieldbus,
163 };
164 join_set_spawn(
165 &mut task_set,
166 format!("master_device | request | {}", id.as_ref()),
167 task.spawn(),
168 );
169
170 let task = tasks::Response {
172 buffer: buffer.clone(),
173 ch_rx_fieldbus_to_device,
174 ch_tx_output_to_filter,
175 ch_tx_buffer: ch_tx_buffer.clone(),
176 fn_response_to_buffer: self.fn_response_to_buffer,
177 fn_buffer_to_msgs: self.fn_buffer_to_msgs,
178 };
179 join_set_spawn(
180 &mut task_set,
181 format!("master_device | response | {}", id.as_ref()),
182 task.spawn(),
183 );
184
185 let task = shared_tasks::filter_identical_data::FilterIdenticalData {
187 input: ch_rx_output_to_filter,
188 output: ch_tx_device_to_msgbus,
189 };
190 join_set_spawn(
191 &mut task_set,
192 format!("master_device | filter_identical_data | {}", id.as_ref()),
193 task.spawn().map_err(super::Error::TaskFilterIdenticalData),
194 );
195
196 while let Some(res) = task_set.join_next().await {
197 res??;
198 }
199 Ok(())
200 }
201}
202
203impl<TMsg, TFieldbusRequest, TFieldbusResponse, TBuffer> Default
204 for DeviceBase<TMsg, TFieldbusRequest, TFieldbusResponse, TBuffer>
205where
206 TFieldbusRequest: RequestResponseBound,
207 TBuffer: Default,
208{
209 fn default() -> Self {
210 DeviceBase {
211 fn_init_requests: |_| vec![],
212 periodic_requests: vec![],
213 fn_msgs_to_buffer: |_, _| (),
214 buffer_to_request_period: Duration::from_millis(1000),
215 fn_buffer_to_request: |_| Ok(vec![]),
216 fn_response_to_buffer: |_, _| Ok(false),
217 fn_buffer_to_msgs: |_| vec![],
218 buffer_default: Default::default(),
219 }
220 }
221}