Skip to main content

rsiot/components_config/master_device/device/
device.rs

1#![allow(clippy::module_inception)]
2
3use std::{sync::Arc, time::Duration};
4
5use futures::TryFutureExt;
6use tokio::{
7    sync::{Mutex, mpsc},
8    task::JoinSet,
9};
10
11use crate::{components::shared_tasks, executor::MsgBusInput, message::Message};
12use crate::{executor::join_set_spawn, message::MsgDataBound};
13
14use super::{BufferBound, DeviceState, RequestResponseBound, ResponseResult, config::*, tasks};
15
16/// Базовое устройство для опроса по шине.
17pub struct DeviceBase<TMsg, TFieldbusRequest, TFieldbusResponse, TBuffer>
18where
19    TFieldbusRequest: RequestResponseBound,
20{
21    /// Запросы при инициализации устройства
22    pub fn_init_requests: fn(&TBuffer) -> Vec<TFieldbusRequest>,
23
24    /// Периодические запросы
25    pub periodic_requests: Vec<ConfigPeriodicRequest<TFieldbusRequest, TBuffer>>,
26
27    /// Обновление буфера на основе входящих сообщений
28    ///
29    /// Обычно соответствует параметру fn_input конфигурации устройства
30    ///
31    /// # Пример
32    ///
33    /// ```rust
34    /// let msgs = [Custom::AllInputs(buffer.all_inputs)];
35    /// let msgs = msgs.iter().map(|m| Message::new_custom(*m)).collect();
36    /// msgs
37    /// ```
38    pub fn_msgs_to_buffer: fn(&TMsg, &mut TBuffer),
39
40    /// Периодическое формирование запросов на основе fn_buffer_to_request
41    pub buffer_to_request_period: Duration,
42
43    /// Преобразование данных из буфера в массив запросов на шине
44    ///
45    /// Вызывается несколькими способами:
46    ///
47    /// - при обновлении буфера на основе входящих сообщений функцией fn_msgs_to_buffer
48    ///
49    /// - при расшифровке ответа от устройства, при возвращении true из функции fn_response_to_buffer
50    ///
51    /// - периодически с периодом buffer_to_request_period
52    pub fn_buffer_to_request: fn(&TBuffer) -> anyhow::Result<Vec<TFieldbusRequest>>,
53
54    /// Обновление буфера на основе данных, полученных с устройства
55    pub fn_response_to_buffer:
56        fn(TFieldbusResponse, &mut TBuffer) -> anyhow::Result<ResponseResult>,
57
58    /// Функция создания сообщений на основе буфера
59    ///
60    /// Обычно соответствует параметру fn_output кофигурации устройства
61    ///
62    /// Пример:
63    ///
64    /// ```rust
65    /// |buffer| {vec![]}
66    /// ```
67    pub fn_buffer_to_msgs: fn(&mut TBuffer) -> Vec<TMsg>,
68
69    /// Состояние устройства
70    pub fn_device_state: fn(DeviceState) -> Option<TMsg>,
71
72    /// Значения в буфере при инициализации
73    pub buffer_default: TBuffer,
74}
75
76impl<TMsg, TRequest, TResponse, TBuffer> DeviceBase<TMsg, TRequest, TResponse, TBuffer>
77where
78    TRequest: 'static + RequestResponseBound,
79    TResponse: 'static + RequestResponseBound,
80    TMsg: MsgDataBound + 'static,
81    TBuffer: 'static + BufferBound,
82{
83    /// Запустить работу
84    pub async fn spawn(
85        self,
86        id: impl AsRef<str>,
87        ch_rx_msgbus_to_device: MsgBusInput<TMsg>,
88        ch_tx_device_to_fieldbus: mpsc::Sender<TRequest>,
89        ch_rx_fieldbus_to_device: mpsc::Receiver<TResponse>,
90        ch_tx_device_to_msgbus: mpsc::Sender<Message<TMsg>>,
91    ) -> super::Result<()> {
92        let buffer = self.buffer_default;
93        let buffer = Arc::new(Mutex::new(buffer));
94
95        let device_state = DeviceState {
96            init_completed: false,
97            response_ok_count: 0,
98            response_err_count: 0,
99        };
100        let device_state = Arc::new(Mutex::new(device_state));
101
102        let (ch_tx_need_request, ch_rx_need_request) = mpsc::channel::<()>(100);
103        let (ch_tx_request, ch_rx_request) = mpsc::channel::<TRequest>(100);
104        let (ch_tx_output_to_filter, ch_rx_output_to_filter) = mpsc::channel::<Message<TMsg>>(500);
105
106        let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
107
108        // Задача выполнения первоначальных запросов
109        //
110        // Приостанавливаем выполнение, пока не будет выполнена задача
111        let task = tasks::InitRequest {
112            buffer: buffer.clone(),
113            device_state: device_state.clone(),
114            fn_init_requests: self.fn_init_requests,
115            ch_tx_request: ch_tx_request.clone(),
116        };
117        join_set_spawn(
118            &mut task_set,
119            format!("master_device | init_request | {}", id.as_ref()),
120            task.spawn(),
121        );
122
123        // Задача создания запросов на основе входящих сообщений
124        let task = tasks::InputRequest {
125            buffer: buffer.clone(),
126            ch_rx_msgbus_to_device,
127            ch_tx_need_request: ch_tx_need_request.clone(),
128            fn_msgs_to_buffer: self.fn_msgs_to_buffer,
129        };
130        join_set_spawn(
131            &mut task_set,
132            format!("master_device | input_request | {}", id.as_ref()),
133            task.spawn(),
134        );
135
136        // Задача периодического формирования запросов на основе буфера
137        let task = tasks::BufferPeriodic {
138            device_state: device_state.clone(),
139            ch_tx_need_request: ch_tx_need_request.clone(),
140            period: self.buffer_to_request_period,
141        };
142        join_set_spawn(
143            &mut task_set,
144            format!("master_device | buffer_periodic | {}", id.as_ref()),
145            task.spawn(),
146        );
147
148        // Задача отправки запросов
149        let task = tasks::Request {
150            ch_rx_request,
151            ch_tx_device_to_fieldbus,
152        };
153        join_set_spawn(
154            &mut task_set,
155            format!("master_device | request | {}", id.as_ref()),
156            task.spawn(),
157        );
158
159        // Задача обработки ответа
160        let task = tasks::Response {
161            buffer: buffer.clone(),
162            device_state: device_state.clone(),
163            ch_rx_fieldbus_to_device,
164            ch_tx_output_to_filter,
165            ch_tx_need_request: ch_tx_need_request.clone(),
166            fn_response_to_buffer: self.fn_response_to_buffer,
167            fn_buffer_to_msgs: self.fn_buffer_to_msgs,
168        };
169        join_set_spawn(
170            &mut task_set,
171            format!("master_device | response | {}", id.as_ref()),
172            task.spawn(),
173        );
174
175        // Задачи фильтрации одинаковых сообщений
176        let task = shared_tasks::filter_identical_data::FilterIdenticalData {
177            input: ch_rx_output_to_filter,
178            output: ch_tx_device_to_msgbus,
179        };
180        join_set_spawn(
181            &mut task_set,
182            format!("master_device | filter_identical_data | {}", id.as_ref()),
183            task.spawn().map_err(super::Error::TaskFilterIdenticalData),
184        );
185
186        // Задача создания периодических запросов
187        for periodic_request in self.periodic_requests {
188            let task = tasks::PeriodicRequest {
189                device_state: device_state.clone(),
190                buffer: buffer.clone(),
191                period: periodic_request.period,
192                fn_request: periodic_request.fn_requests,
193                ch_tx_request: ch_tx_request.clone(),
194            };
195            join_set_spawn(
196                &mut task_set,
197                format!("master_device | periodic_request | {}", id.as_ref()),
198                task.spawn(),
199            );
200        }
201
202        // Задача формирования запросов на основе буфера
203        let task = tasks::BufferToRequests {
204            buffer: buffer.clone(),
205            ch_rx_buffer_changed: ch_rx_need_request,
206            ch_tx_request: ch_tx_request.clone(),
207            fn_buffer_to_request: self.fn_buffer_to_request,
208        };
209        join_set_spawn(
210            &mut task_set,
211            format!("master_device | buffer_to_requests | {}", id.as_ref()),
212            task.spawn(),
213        );
214
215        while let Some(res) = task_set.join_next().await {
216            res??;
217        }
218        Ok(())
219    }
220}
221
222impl<TMsg, TFieldbusRequest, TFieldbusResponse, TBuffer> Default
223    for DeviceBase<TMsg, TFieldbusRequest, TFieldbusResponse, TBuffer>
224where
225    TFieldbusRequest: RequestResponseBound,
226    TBuffer: Default,
227    TMsg: MsgDataBound,
228{
229    fn default() -> Self {
230        DeviceBase {
231            fn_init_requests: |_| vec![],
232            periodic_requests: vec![],
233            fn_msgs_to_buffer: |_, _| (),
234            buffer_to_request_period: Duration::from_millis(1000),
235            fn_buffer_to_request: |_| Ok(vec![]),
236            fn_response_to_buffer: |_, _| ResponseResult::ok(),
237            fn_buffer_to_msgs: |_| vec![],
238            fn_device_state: |_| None,
239            buffer_default: Default::default(),
240        }
241    }
242}