rsiot/components/cmp_add_input_stream/
mod.rs

1//! Компонент для добавления сообщений из побочного потока
2
3use async_trait::async_trait;
4use tokio::sync::broadcast;
5
6use crate::{
7    executor::{CmpInOut, Component, ComponentError, IComponentProcess},
8    message::*,
9};
10
11/// Настройки компонента cmp_add_input_stream
12#[derive(Debug)]
13pub struct Config<TMessage> {
14    /// Внешний канал broadcast, на который происходит подписка
15    pub channel: broadcast::Receiver<Message<TMessage>>,
16}
17
18/// Компонент для добавления сообщений из побочного потока
19#[cfg_attr(not(feature = "single-thread"), async_trait)]
20#[cfg_attr(feature = "single-thread", async_trait(?Send))]
21impl<TMsg> IComponentProcess<Config<TMsg>, TMsg> for Component<Config<TMsg>, TMsg>
22where
23    TMsg: MsgDataBound + 'static,
24{
25    async fn process(
26        &self,
27        mut config: Config<TMsg>,
28        in_out: CmpInOut<TMsg>,
29    ) -> Result<(), ComponentError> {
30        while let Ok(msg) = config.channel.recv().await {
31            in_out
32                .send_output(msg)
33                .await
34                .map_err(|err| ComponentError::Execution(err.to_string()))?;
35        }
36        Ok(())
37    }
38}
39
40/// Компонент cmp_add_input_stream
41pub type Cmp<TMsg> = Component<Config<TMsg>, TMsg>;