rsiot/components_config/master_device/device/
device.rs

1#![allow(clippy::module_inception)]
2
3use std::sync::Arc;
4
5use futures::TryFutureExt;
6use tokio::{
7    sync::{broadcast, mpsc, Mutex},
8    task::JoinSet,
9};
10
11use crate::{components::shared_tasks, message::Message};
12use crate::{executor::join_set_spawn, message::MsgDataBound};
13
14use super::{config::*, tasks, BufferBound, RequestResponseBound};
15
16/// Базовое устройство для опроса по шине
17pub struct DeviceBase<TMsg, TFieldbusRequest, TFieldbusResponse, TBuffer>
18where
19    TFieldbusRequest: RequestResponseBound,
20{
21    /// Запросы при инициализации устройства
22    pub fn_init_requests: fn(&TBuffer) -> Vec<TFieldbusRequest>,
23
24    /// Периодические запросы
25    pub periodic_requests: Vec<ConfigPeriodicRequest<TFieldbusRequest, TBuffer>>,
26
27    /// Обновление буфера на основе входящих сообщений
28    ///
29    /// Обычно соответствует параметру fn_input конфигурации устройства
30    ///
31    /// # Пример
32    ///
33    /// ```rust
34    /// let msgs = [Custom::AllInputs(buffer.all_inputs)];
35    /// let msgs = msgs.iter().map(|m| Message::new_custom(*m)).collect();
36    /// msgs
37    /// ```
38    pub fn_msgs_to_buffer: fn(&Message<TMsg>, &mut TBuffer),
39
40    /// Преобразование данных из буфера в массив запросов на шине
41    pub fn_buffer_to_request: fn(&TBuffer) -> anyhow::Result<Vec<TFieldbusRequest>>,
42
43    /// Обновление буфера на основе данных, полученных с устройства
44    pub fn_response_to_buffer: fn(TFieldbusResponse, &mut TBuffer) -> anyhow::Result<()>,
45
46    /// Функция создания сообщений на основе буфера
47    ///
48    /// Обычно соответствует параметру fn_output кофигурации устройства
49    ///
50    /// Пример:
51    ///
52    /// ```rust
53    /// |buffer| {vec![]}
54    /// ```
55    pub fn_buffer_to_msgs: fn(&mut TBuffer) -> Vec<Message<TMsg>>,
56
57    /// Значения в буфере при инициализации
58    pub buffer_default: TBuffer,
59}
60
61impl<TMsg, TRequest, TResponse, TBuffer> DeviceBase<TMsg, TRequest, TResponse, TBuffer>
62where
63    TRequest: 'static + RequestResponseBound,
64    TResponse: 'static + RequestResponseBound,
65    TMsg: MsgDataBound + 'static,
66    TBuffer: 'static + BufferBound,
67{
68    /// Запустить работу
69    pub async fn spawn(
70        self,
71        ch_rx_msgbus_to_device: broadcast::Receiver<Message<TMsg>>,
72        ch_tx_device_to_fieldbus: mpsc::Sender<TRequest>,
73        ch_rx_fieldbus_to_device: mpsc::Receiver<TResponse>,
74        ch_tx_device_to_msgbus: mpsc::Sender<Message<TMsg>>,
75    ) -> super::Result<()> {
76        let buffer = self.buffer_default;
77        let buffer = Arc::new(Mutex::new(buffer));
78
79        let (ch_tx_output_to_filter, ch_rx_output_to_filter) = mpsc::channel(100);
80
81        let mut task_set: JoinSet<super::Result<()>> = JoinSet::new();
82
83        // Задача выполнения первоначальных запросов
84        //
85        // Приостанавливаем выполнение, пока не будет выполнена задача
86        let task = tasks::InitRequest {
87            buffer: buffer.clone(),
88            fn_init_requests: self.fn_init_requests,
89            ch_tx_device_to_fieldbus: ch_tx_device_to_fieldbus.clone(),
90        };
91        task.spawn().await.unwrap();
92
93        // Задача создания запросов на основе входящих сообщений
94        let task = tasks::InputRequest {
95            buffer: buffer.clone(),
96            ch_rx_msgbus_to_device,
97            ch_tx_device_to_fieldbus: ch_tx_device_to_fieldbus.clone(),
98            fn_msgs_to_buffer: self.fn_msgs_to_buffer,
99            fn_buffer_to_request: self.fn_buffer_to_request,
100        };
101        join_set_spawn(&mut task_set, task.spawn());
102
103        // Задача создания периодических запросов
104        for periodic_request in self.periodic_requests {
105            let task = tasks::PeriodicRequest {
106                buffer: buffer.clone(),
107                period: periodic_request.period,
108                fn_request: periodic_request.fn_requests,
109                ch_tx_device_to_fieldbus: ch_tx_device_to_fieldbus.clone(),
110            };
111            join_set_spawn(&mut task_set, task.spawn());
112        }
113
114        // Задача обработки ответа
115        let task = tasks::Response {
116            buffer: buffer.clone(),
117            ch_rx_fieldbus_to_device,
118            ch_tx_output_to_filter,
119            fn_response_to_buffer: self.fn_response_to_buffer,
120            fn_buffer_to_msgs: self.fn_buffer_to_msgs,
121        };
122        join_set_spawn(&mut task_set, task.spawn());
123
124        // Задачи фильтрации одинаковых сообщений
125        let task = shared_tasks::filter_identical_data::FilterIdenticalData {
126            input: ch_rx_output_to_filter,
127            output: ch_tx_device_to_msgbus,
128        };
129        join_set_spawn(
130            &mut task_set,
131            task.spawn().map_err(super::Error::TaskFilterIdenticalData),
132        );
133
134        while let Some(res) = task_set.join_next().await {
135            res??;
136        }
137        Ok(())
138    }
139}
140
141impl<TMsg, TFieldbusRequest, TFieldbusResponse, TBuffer> Default
142    for DeviceBase<TMsg, TFieldbusRequest, TFieldbusResponse, TBuffer>
143where
144    TFieldbusRequest: RequestResponseBound,
145    TBuffer: Default,
146{
147    fn default() -> Self {
148        DeviceBase {
149            fn_init_requests: |_| vec![],
150            periodic_requests: vec![],
151            fn_msgs_to_buffer: |_, _| (),
152            fn_buffer_to_request: |_| Ok(vec![]),
153            fn_response_to_buffer: |_, _| Ok(()),
154            fn_buffer_to_msgs: |_| vec![],
155            buffer_default: Default::default(),
156        }
157    }
158}