rsiot/components_config/master_device/device/
device.rs1#![allow(clippy::module_inception)]
2
3use std::{sync::Arc, time::Duration};
4
5use futures::TryFutureExt;
6use tokio::{
7 sync::{Mutex, mpsc},
8 task::JoinSet,
9};
10
11use crate::{components::shared_tasks, executor::MsgBusInput, message::Message};
12use crate::{executor::join_set_spawn, message::MsgDataBound};
13
14use super::{BufferBound, DeviceState, RequestResponseBound, ResponseResult, config::*, tasks};
15
16pub struct DeviceBase<TMsg, TFieldbusRequest, TFieldbusResponse, TBuffer>
18where
19 TFieldbusRequest: RequestResponseBound,
20{
21 pub fn_init_requests: fn(&TBuffer) -> Vec<TFieldbusRequest>,
23
24 pub periodic_requests: Vec<ConfigPeriodicRequest<TFieldbusRequest, TBuffer>>,
26
27 pub fn_msgs_to_buffer: fn(&TMsg, &mut TBuffer),
39
40 pub buffer_to_request_period: Duration,
42
43 pub fn_buffer_to_request: fn(&TBuffer) -> anyhow::Result<Vec<TFieldbusRequest>>,
53
54 pub fn_response_to_buffer:
56 fn(TFieldbusResponse, &mut TBuffer) -> anyhow::Result<ResponseResult>,
57
58 pub fn_buffer_to_msgs: fn(&mut TBuffer) -> Vec<TMsg>,
68
69 pub fn_device_state: fn(DeviceState) -> Option<TMsg>,
71
72 pub buffer_default: TBuffer,
74}
75
76impl<TMsg, TRequest, TResponse, TBuffer> DeviceBase<TMsg, TRequest, TResponse, TBuffer>
77where
78 TRequest: 'static + RequestResponseBound,
79 TResponse: 'static + RequestResponseBound,
80 TMsg: MsgDataBound + 'static,
81 TBuffer: 'static + BufferBound,
82{
83 pub async fn spawn(
85 self,
86 id: impl AsRef<str>,
87 ch_rx_msgbus_to_device: MsgBusInput<TMsg>,
88 ch_tx_device_to_fieldbus: mpsc::Sender<TRequest>,
89 ch_rx_fieldbus_to_device: mpsc::Receiver<TResponse>,
90 ch_tx_device_to_msgbus: mpsc::Sender<Message<TMsg>>,
91 ) -> super::Result<()> {
92 let buffer = self.buffer_default;
93 let buffer = Arc::new(Mutex::new(buffer));
94
95 let device_state = DeviceState {
96 init_completed: false,
97 response_ok_count: 0,
98 response_err_count: 0,
99 };
100 let device_state = Arc::new(Mutex::new(device_state));
101
102 let (ch_tx_need_request, ch_rx_need_request) = mpsc::channel::<()>(100);
103 let (ch_tx_request, ch_rx_request) = mpsc::channel::<TRequest>(100);
104 let (ch_tx_output_to_filter, ch_rx_output_to_filter) = mpsc::channel::<Message<TMsg>>(500);
105
106 let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
107
108 let task = tasks::InitRequest {
112 buffer: buffer.clone(),
113 device_state: device_state.clone(),
114 fn_init_requests: self.fn_init_requests,
115 ch_tx_request: ch_tx_request.clone(),
116 };
117 join_set_spawn(
118 &mut task_set,
119 format!("master_device | init_request | {}", id.as_ref()),
120 task.spawn(),
121 );
122
123 let task = tasks::InputRequest {
125 buffer: buffer.clone(),
126 ch_rx_msgbus_to_device,
127 ch_tx_need_request: ch_tx_need_request.clone(),
128 fn_msgs_to_buffer: self.fn_msgs_to_buffer,
129 };
130 join_set_spawn(
131 &mut task_set,
132 format!("master_device | input_request | {}", id.as_ref()),
133 task.spawn(),
134 );
135
136 let task = tasks::BufferPeriodic {
138 device_state: device_state.clone(),
139 ch_tx_need_request: ch_tx_need_request.clone(),
140 period: self.buffer_to_request_period,
141 };
142 join_set_spawn(
143 &mut task_set,
144 format!("master_device | buffer_periodic | {}", id.as_ref()),
145 task.spawn(),
146 );
147
148 let task = tasks::Request {
150 ch_rx_request,
151 ch_tx_device_to_fieldbus,
152 };
153 join_set_spawn(
154 &mut task_set,
155 format!("master_device | request | {}", id.as_ref()),
156 task.spawn(),
157 );
158
159 let task = tasks::Response {
161 buffer: buffer.clone(),
162 device_state: device_state.clone(),
163 ch_rx_fieldbus_to_device,
164 ch_tx_output_to_filter,
165 ch_tx_need_request: ch_tx_need_request.clone(),
166 fn_response_to_buffer: self.fn_response_to_buffer,
167 fn_buffer_to_msgs: self.fn_buffer_to_msgs,
168 };
169 join_set_spawn(
170 &mut task_set,
171 format!("master_device | response | {}", id.as_ref()),
172 task.spawn(),
173 );
174
175 let task = shared_tasks::filter_identical_data::FilterIdenticalData {
177 input: ch_rx_output_to_filter,
178 output: ch_tx_device_to_msgbus,
179 };
180 join_set_spawn(
181 &mut task_set,
182 format!("master_device | filter_identical_data | {}", id.as_ref()),
183 task.spawn().map_err(super::Error::TaskFilterIdenticalData),
184 );
185
186 for periodic_request in self.periodic_requests {
188 let task = tasks::PeriodicRequest {
189 device_state: device_state.clone(),
190 buffer: buffer.clone(),
191 period: periodic_request.period,
192 fn_request: periodic_request.fn_requests,
193 ch_tx_request: ch_tx_request.clone(),
194 };
195 join_set_spawn(
196 &mut task_set,
197 format!("master_device | periodic_request | {}", id.as_ref()),
198 task.spawn(),
199 );
200 }
201
202 let task = tasks::BufferToRequests {
204 buffer: buffer.clone(),
205 ch_rx_buffer_changed: ch_rx_need_request,
206 ch_tx_request: ch_tx_request.clone(),
207 fn_buffer_to_request: self.fn_buffer_to_request,
208 };
209 join_set_spawn(
210 &mut task_set,
211 format!("master_device | buffer_to_requests | {}", id.as_ref()),
212 task.spawn(),
213 );
214
215 while let Some(res) = task_set.join_next().await {
216 res??;
217 }
218 Ok(())
219 }
220}
221
222impl<TMsg, TFieldbusRequest, TFieldbusResponse, TBuffer> Default
223 for DeviceBase<TMsg, TFieldbusRequest, TFieldbusResponse, TBuffer>
224where
225 TFieldbusRequest: RequestResponseBound,
226 TBuffer: Default,
227 TMsg: MsgDataBound,
228{
229 fn default() -> Self {
230 DeviceBase {
231 fn_init_requests: |_| vec![],
232 periodic_requests: vec![],
233 fn_msgs_to_buffer: |_, _| (),
234 buffer_to_request_period: Duration::from_millis(1000),
235 fn_buffer_to_request: |_| Ok(vec![]),
236 fn_response_to_buffer: |_, _| ResponseResult::ok(),
237 fn_buffer_to_msgs: |_| vec![],
238 fn_device_state: |_| None,
239 buffer_default: Default::default(),
240 }
241 }
242}