rsiot/components_config/master_device/device/
device.rs1#![allow(clippy::module_inception)]
2
3use std::sync::Arc;
4
5use futures::TryFutureExt;
6use tokio::{
7 sync::{broadcast, mpsc, Mutex},
8 task::JoinSet,
9};
10
11use crate::{components::shared_tasks, message::Message};
12use crate::{executor::join_set_spawn, message::MsgDataBound};
13
14use super::{config::*, tasks, BufferBound, RequestResponseBound};
15
16pub struct DeviceBase<TMsg, TFieldbusRequest, TFieldbusResponse, TBuffer>
18where
19 TFieldbusRequest: RequestResponseBound,
20{
21 pub fn_init_requests: fn(&TBuffer) -> Vec<TFieldbusRequest>,
23
24 pub periodic_requests: Vec<ConfigPeriodicRequest<TFieldbusRequest, TBuffer>>,
26
27 pub fn_msgs_to_buffer: fn(&Message<TMsg>, &mut TBuffer),
39
40 pub fn_buffer_to_request: fn(&TBuffer) -> anyhow::Result<Vec<TFieldbusRequest>>,
42
43 pub fn_response_to_buffer: fn(TFieldbusResponse, &mut TBuffer) -> anyhow::Result<()>,
45
46 pub fn_buffer_to_msgs: fn(&mut TBuffer) -> Vec<Message<TMsg>>,
56
57 pub buffer_default: TBuffer,
59}
60
61impl<TMsg, TRequest, TResponse, TBuffer> DeviceBase<TMsg, TRequest, TResponse, TBuffer>
62where
63 TRequest: 'static + RequestResponseBound,
64 TResponse: 'static + RequestResponseBound,
65 TMsg: MsgDataBound + 'static,
66 TBuffer: 'static + BufferBound,
67{
68 pub async fn spawn(
70 self,
71 ch_rx_msgbus_to_device: broadcast::Receiver<Message<TMsg>>,
72 ch_tx_device_to_fieldbus: mpsc::Sender<TRequest>,
73 ch_rx_fieldbus_to_device: mpsc::Receiver<TResponse>,
74 ch_tx_device_to_msgbus: mpsc::Sender<Message<TMsg>>,
75 ) -> super::Result<()> {
76 let buffer = self.buffer_default;
77 let buffer = Arc::new(Mutex::new(buffer));
78
79 let (ch_tx_output_to_filter, ch_rx_output_to_filter) = mpsc::channel(100);
80
81 let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
82
83 let task = tasks::InitRequest {
87 buffer: buffer.clone(),
88 fn_init_requests: self.fn_init_requests,
89 ch_tx_device_to_fieldbus: ch_tx_device_to_fieldbus.clone(),
90 };
91 task.spawn().await.unwrap();
92
93 let task = tasks::InputRequest {
95 buffer: buffer.clone(),
96 ch_rx_msgbus_to_device,
97 ch_tx_device_to_fieldbus: ch_tx_device_to_fieldbus.clone(),
98 fn_msgs_to_buffer: self.fn_msgs_to_buffer,
99 fn_buffer_to_request: self.fn_buffer_to_request,
100 };
101 join_set_spawn(&mut task_set, task.spawn());
102
103 for periodic_request in self.periodic_requests {
105 let task = tasks::PeriodicRequest {
106 buffer: buffer.clone(),
107 period: periodic_request.period,
108 fn_request: periodic_request.fn_requests,
109 ch_tx_device_to_fieldbus: ch_tx_device_to_fieldbus.clone(),
110 };
111 join_set_spawn(&mut task_set, task.spawn());
112 }
113
114 let task = tasks::Response {
116 buffer: buffer.clone(),
117 ch_rx_fieldbus_to_device,
118 ch_tx_output_to_filter,
119 fn_response_to_buffer: self.fn_response_to_buffer,
120 fn_buffer_to_msgs: self.fn_buffer_to_msgs,
121 };
122 join_set_spawn(&mut task_set, task.spawn());
123
124 let task = shared_tasks::filter_identical_data::FilterIdenticalData {
126 input: ch_rx_output_to_filter,
127 output: ch_tx_device_to_msgbus,
128 };
129 join_set_spawn(
130 &mut task_set,
131 task.spawn().map_err(super::Error::TaskFilterIdenticalData),
132 );
133
134 while let Some(res) = task_set.join_next().await {
135 res??;
136 }
137 Ok(())
138 }
139}
140
141impl<TMsg, TFieldbusRequest, TFieldbusResponse, TBuffer> Default
142 for DeviceBase<TMsg, TFieldbusRequest, TFieldbusResponse, TBuffer>
143where
144 TFieldbusRequest: RequestResponseBound,
145 TBuffer: Default,
146{
147 fn default() -> Self {
148 DeviceBase {
149 fn_init_requests: |_| vec![],
150 periodic_requests: vec![],
151 fn_msgs_to_buffer: |_, _| (),
152 fn_buffer_to_request: |_| Ok(vec![]),
153 fn_response_to_buffer: |_, _| Ok(()),
154 fn_buffer_to_msgs: |_| vec![],
155 buffer_default: Default::default(),
156 }
157 }
158}