Skip to main content

rsiot/components_config/master_device/device/
device.rs

1#![allow(clippy::module_inception)]
2
3use std::{sync::Arc, time::Duration};
4
5use futures::TryFutureExt;
6use tokio::{
7    sync::{Mutex, mpsc},
8    task::JoinSet,
9};
10
11use crate::{components::shared_tasks, executor::MsgBusInput, message::Message};
12use crate::{executor::join_set_spawn, message::MsgDataBound};
13
14use super::{BufferBound, RequestResponseBound, config::*, tasks};
15
16/// Базовое устройство для опроса по шине.
17pub struct DeviceBase<TMsg, TFieldbusRequest, TFieldbusResponse, TBuffer>
18where
19    TFieldbusRequest: RequestResponseBound,
20{
21    /// Запросы при инициализации устройства
22    pub fn_init_requests: fn(&TBuffer) -> Vec<TFieldbusRequest>,
23
24    /// Периодические запросы
25    pub periodic_requests: Vec<ConfigPeriodicRequest<TFieldbusRequest, TBuffer>>,
26
27    /// Обновление буфера на основе входящих сообщений
28    ///
29    /// Обычно соответствует параметру fn_input конфигурации устройства
30    ///
31    /// # Пример
32    ///
33    /// ```rust
34    /// let msgs = [Custom::AllInputs(buffer.all_inputs)];
35    /// let msgs = msgs.iter().map(|m| Message::new_custom(*m)).collect();
36    /// msgs
37    /// ```
38    pub fn_msgs_to_buffer: fn(&TMsg, &mut TBuffer),
39
40    /// Периодическое формирование запросов на основе fn_buffer_to_request
41    pub buffer_to_request_period: Duration,
42
43    /// Преобразование данных из буфера в массив запросов на шине
44    ///
45    /// Вызывается несколькими способами:
46    ///
47    /// - при обновлении буфера на основе входящих сообщений функцией fn_msgs_to_buffer
48    ///
49    /// - при расшифровке ответа от устройства, при возвращении true из функции fn_response_to_buffer
50    ///
51    /// - периодически с периодом buffer_to_request_period
52    pub fn_buffer_to_request: fn(&TBuffer) -> anyhow::Result<Vec<TFieldbusRequest>>,
53
54    /// Обновление буфера на основе данных, полученных с устройства
55    pub fn_response_to_buffer: fn(TFieldbusResponse, &mut TBuffer) -> anyhow::Result<bool>,
56
57    /// Функция создания сообщений на основе буфера
58    ///
59    /// Обычно соответствует параметру fn_output кофигурации устройства
60    ///
61    /// Пример:
62    ///
63    /// ```rust
64    /// |buffer| {vec![]}
65    /// ```
66    pub fn_buffer_to_msgs: fn(&mut TBuffer) -> Vec<TMsg>,
67
68    /// Значения в буфере при инициализации
69    pub buffer_default: TBuffer,
70}
71
72impl<TMsg, TRequest, TResponse, TBuffer> DeviceBase<TMsg, TRequest, TResponse, TBuffer>
73where
74    TRequest: 'static + RequestResponseBound,
75    TResponse: 'static + RequestResponseBound,
76    TMsg: MsgDataBound + 'static,
77    TBuffer: 'static + BufferBound,
78{
79    /// Запустить работу
80    pub async fn spawn(
81        self,
82        id: impl AsRef<str>,
83        ch_rx_msgbus_to_device: MsgBusInput<TMsg>,
84        ch_tx_device_to_fieldbus: mpsc::Sender<TRequest>,
85        ch_rx_fieldbus_to_device: mpsc::Receiver<TResponse>,
86        ch_tx_device_to_msgbus: mpsc::Sender<Message<TMsg>>,
87    ) -> super::Result<()> {
88        let buffer = self.buffer_default;
89        let buffer = Arc::new(Mutex::new(buffer));
90
91        let (ch_tx_buffer, ch_rx_buffer) = mpsc::channel::<()>(100);
92        let (ch_tx_request, ch_rx_request) = mpsc::channel::<TRequest>(100);
93        let (ch_tx_output_to_filter, ch_rx_output_to_filter) = mpsc::channel::<Message<TMsg>>(500);
94
95        let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
96
97        // Задача выполнения первоначальных запросов
98        //
99        // Приостанавливаем выполнение, пока не будет выполнена задача
100        let task = tasks::InitRequest {
101            buffer: buffer.clone(),
102            fn_init_requests: self.fn_init_requests,
103            ch_tx_request: ch_tx_request.clone(),
104        };
105        task.spawn().await?;
106
107        // Задача создания периодических запросов
108        for periodic_request in self.periodic_requests {
109            let task = tasks::PeriodicRequest {
110                buffer: buffer.clone(),
111                period: periodic_request.period,
112                fn_request: periodic_request.fn_requests,
113                ch_tx_request: ch_tx_request.clone(),
114            };
115            join_set_spawn(
116                &mut task_set,
117                format!("master_device | periodic_request | {}", id.as_ref()),
118                task.spawn(),
119            );
120        }
121
122        // Задача создания запросов на основе входящих сообщений
123        let task = tasks::InputRequest {
124            buffer: buffer.clone(),
125            ch_rx_msgbus_to_device,
126            ch_tx_buffer: ch_tx_buffer.clone(),
127            fn_msgs_to_buffer: self.fn_msgs_to_buffer,
128        };
129        join_set_spawn(
130            &mut task_set,
131            format!("master_device | input_request | {}", id.as_ref()),
132            task.spawn(),
133        );
134
135        // Задача периодического формирования запросов на основе буфера
136        let task = tasks::BufferPeriodic {
137            ch_tx_buffer: ch_tx_buffer.clone(),
138            period: self.buffer_to_request_period,
139        };
140        join_set_spawn(
141            &mut task_set,
142            format!("master_device | buffer_periodic | {}", id.as_ref()),
143            task.spawn(),
144        );
145
146        // Задача формирования запросов на основе буфера
147        let task = tasks::BufferToRequests {
148            buffer: buffer.clone(),
149            ch_rx_buffer,
150            ch_tx_request: ch_tx_request.clone(),
151            fn_buffer_to_request: self.fn_buffer_to_request,
152        };
153        join_set_spawn(
154            &mut task_set,
155            format!("master_device | buffer_to_requests | {}", id.as_ref()),
156            task.spawn(),
157        );
158
159        // Задача отправки запросов
160        let task = tasks::Request {
161            ch_rx_request,
162            ch_tx_device_to_fieldbus,
163        };
164        join_set_spawn(
165            &mut task_set,
166            format!("master_device | request | {}", id.as_ref()),
167            task.spawn(),
168        );
169
170        // Задача обработки ответа
171        let task = tasks::Response {
172            buffer: buffer.clone(),
173            ch_rx_fieldbus_to_device,
174            ch_tx_output_to_filter,
175            ch_tx_buffer: ch_tx_buffer.clone(),
176            fn_response_to_buffer: self.fn_response_to_buffer,
177            fn_buffer_to_msgs: self.fn_buffer_to_msgs,
178        };
179        join_set_spawn(
180            &mut task_set,
181            format!("master_device | response | {}", id.as_ref()),
182            task.spawn(),
183        );
184
185        // Задачи фильтрации одинаковых сообщений
186        let task = shared_tasks::filter_identical_data::FilterIdenticalData {
187            input: ch_rx_output_to_filter,
188            output: ch_tx_device_to_msgbus,
189        };
190        join_set_spawn(
191            &mut task_set,
192            format!("master_device | filter_identical_data | {}", id.as_ref()),
193            task.spawn().map_err(super::Error::TaskFilterIdenticalData),
194        );
195
196        while let Some(res) = task_set.join_next().await {
197            res??;
198        }
199        Ok(())
200    }
201}
202
203impl<TMsg, TFieldbusRequest, TFieldbusResponse, TBuffer> Default
204    for DeviceBase<TMsg, TFieldbusRequest, TFieldbusResponse, TBuffer>
205where
206    TFieldbusRequest: RequestResponseBound,
207    TBuffer: Default,
208{
209    fn default() -> Self {
210        DeviceBase {
211            fn_init_requests: |_| vec![],
212            periodic_requests: vec![],
213            fn_msgs_to_buffer: |_, _| (),
214            buffer_to_request_period: Duration::from_millis(1000),
215            fn_buffer_to_request: |_| Ok(vec![]),
216            fn_response_to_buffer: |_, _| Ok(false),
217            fn_buffer_to_msgs: |_| vec![],
218            buffer_default: Default::default(),
219        }
220    }
221}