rsiot/components/shared_tasks/
fn_process_master.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
//! Запуск задач, общих для всех компонентов, выполняющих опрос устройств по шине

use futures::TryFutureExt;
use tokio::{
    sync::{broadcast, mpsc},
    task::JoinSet,
};

use crate::{
    components_config::master_device::{self, AddressBound, DeviceTrait, RequestResponseBound},
    executor::{join_set_spawn, CmpInOut},
    message::{Message, MsgDataBound, ServiceBound},
};

use super::{filter_identical_data, mpsc_to_msgbus, msgbus_to_broadcast};

/// Запуск задач, общих для всех компонентов, выполняющих опрос устройств по шине
pub struct FnProcessMaster<
    'a,
    TMsg,
    TService,
    TError,
    TFieldbusRequest,
    TFieldbusResponse,
    TAddress,
> where
    TMsg: MsgDataBound + 'static,
    TService: ServiceBound + 'static,
    TError: Send + Sync + 'static,
    TAddress: AddressBound,
{
    /// Шина сообщений
    pub msg_bus: CmpInOut<TMsg, TService>,

    /// Ёмкость очередей сообщений между задачами
    pub buffer_size: usize,

    /// Ссылка на коллекцию задач tokio
    pub task_set: &'a mut JoinSet<Result<(), TError>>,

    /// Ошибка msgbus_to_broadcast
    pub error_msgbus_to_broadcast: fn(msgbus_to_broadcast::Error) -> TError,

    /// Ошибка filter_identical_data
    pub error_filter: fn(filter_identical_data::Error) -> TError,

    /// Ошибка mpsc_to_msgbus
    pub error_mpsc_to_msgbus: fn(mpsc_to_msgbus::Error) -> TError,

    /// Ошибка master_device
    pub error_master_device: fn(master_device::Error) -> TError,

    /// Массив устройств
    pub devices: Vec<Box<dyn DeviceTrait<TMsg, TFieldbusRequest, TFieldbusResponse, TAddress>>>,
}

impl<TMsg, TService, TError, TFieldbusRequest, TFieldbusResponse, TAddress>
    FnProcessMaster<'_, TMsg, TService, TError, TFieldbusRequest, TFieldbusResponse, TAddress>
where
    TMsg: MsgDataBound + 'static,
    TService: ServiceBound + 'static,
    TError: Send + Sync + 'static,
    TFieldbusRequest: RequestResponseBound<TAddress> + 'static,
    TFieldbusResponse: RequestResponseBound<TAddress> + 'static,
    TAddress: 'static + AddressBound,
{
    /// Запуск задач.
    ///
    /// Возвращает кортеж с каналами передачи запросов / ответов
    pub fn spawn(
        self,
    ) -> (
        mpsc::Receiver<TFieldbusRequest>,
        broadcast::Sender<TFieldbusResponse>,
    ) {
        // Создание каналов передачи данных ------------------------------------------------------------

        // Канал передачи со входа компонента на устройства
        let (ch_tx_msgbus_to_devices, ch_rx_msgbus_to_devices) =
            broadcast::channel::<Message<TMsg>>(self.buffer_size);

        // Канал передачи данных из драйверов в канал SPI
        let (ch_tx_devices_to_fieldbus, ch_rx_devices_to_fieldbus) =
            mpsc::channel::<TFieldbusRequest>(self.buffer_size);

        // Канал передачи из канала SPI всем драйверам
        let (ch_tx_fieldbus_to_devices, ch_rx_fieldbus_to_devices) =
            broadcast::channel::<TFieldbusResponse>(self.buffer_size);

        // Канал передачи из устройств на фильтр
        let (ch_tx_devices_to_filter, ch_rx_devices_to_filter) =
            mpsc::channel::<Message<TMsg>>(self.buffer_size);

        // Канал передачи из фильтра на выход компонента
        let (ch_tx_filter_to_msgbus, ch_rx_filter_to_msgbus) =
            mpsc::channel::<Message<TMsg>>(self.buffer_size);

        // Передача входящих сообщений на драйвера устройств -------------------------------------------
        let task = msgbus_to_broadcast::MsgBusToBroadcast {
            msg_bus: self.msg_bus.clone(),
            output: ch_tx_msgbus_to_devices,
        };
        join_set_spawn(
            self.task_set,
            task.spawn().map_err(self.error_msgbus_to_broadcast),
        );

        // Задачи выполнения драйверов устройств -------------------------------------------------------
        for device in self.devices {
            let ch_rx_msgbus_to_devices = ch_rx_msgbus_to_devices.resubscribe();
            let ch_tx_device_to_fieldbus = ch_tx_devices_to_fieldbus.clone();
            let ch_rx_fieldbus_to_device = ch_rx_fieldbus_to_devices.resubscribe();
            let ch_tx_devices_to_filter = ch_tx_devices_to_filter.clone();
            join_set_spawn(
                self.task_set,
                device
                    .spawn(
                        ch_rx_msgbus_to_devices,
                        ch_tx_device_to_fieldbus,
                        ch_rx_fieldbus_to_device,
                        ch_tx_devices_to_filter,
                    )
                    .map_err(self.error_master_device),
            );
        }

        // Фильтрация одинаковых сообщений -------------------------------------------------------------
        let task = filter_identical_data::FilterIdenticalData {
            input: ch_rx_devices_to_filter,
            output: ch_tx_filter_to_msgbus,
        };
        join_set_spawn(self.task_set, task.spawn().map_err(self.error_filter));

        // Создаем исходящие сообщения -----------------------------------------------------------------
        let task = mpsc_to_msgbus::MpscToMsgBus {
            input: ch_rx_filter_to_msgbus,
            msg_bus: self.msg_bus.clone(),
        };
        join_set_spawn(
            self.task_set,
            task.spawn().map_err(self.error_mpsc_to_msgbus),
        );

        (ch_rx_devices_to_fieldbus, ch_tx_fieldbus_to_devices)
    }
}