Skip to main content

rsiot/components/cmp_external_fn_process/
mod.rs

1//! Тестирование документации:
2//!
3//! ```bash
4//! cargo test components::cmp_external_fn_process --features="executor" --target="x86_64-unknown-linux-gnu";
5//! cargo test --doc components::cmp_external_fn_process --features="executor" --target="x86_64-unknown-linux-gnu";
6//!
7//! cargo test components::cmp_external_fn_process --features="executor, single-thread" --target="x86_64-unknown-linux-gnu";
8//! cargo test --doc components::cmp_external_fn_process --features="executor, single-thread" --target="x86_64-unknown-linux-gnu";
9//! ```
10
11use async_trait::async_trait;
12
13#[cfg(feature = "single-thread")]
14pub use futures::future::LocalBoxFuture;
15
16#[cfg(not(feature = "single-thread"))]
17pub use futures::future::BoxFuture;
18
19use crate::{
20    executor::{
21        CmpResult, Component, ComponentError, IComponentProcess, MsgBusInput, MsgBusLinker,
22        MsgBusOutput,
23    },
24    message::*,
25};
26
27/// Название компонента
28pub const COMPONENT_NAME: &str = "cmp_external_fn_process";
29
30#[cfg(feature = "single-thread")]
31type FnProcess<TMsg> =
32    Box<dyn Fn(MsgBusInput<TMsg>, MsgBusOutput<TMsg>) -> LocalBoxFuture<'static, CmpResult>>;
33
34#[cfg(not(feature = "single-thread"))]
35type FnProcess<TMsg> = Box<
36    dyn Fn(MsgBusInput<TMsg>, MsgBusOutput<TMsg>) -> BoxFuture<'static, CmpResult> + Send + Sync,
37>;
38
39/// Настройки cmp_external_fn_process
40pub struct Config<TMsg>
41where
42    TMsg: MsgDataBound,
43{
44    /// Внешняя функция для выполнения
45    ///
46    /// Выполняемую асинхронную функцию `fn_external` необходимо обернуть в функцию.
47    #[cfg(feature = "single-thread")]
48    pub fn_process: FnProcess<TMsg>,
49
50    /// Внешняя функция для выполнения
51    ///
52    /// Выполняемую асинхронную функцию `fn_external` необходимо обернуть в функцию.
53    #[cfg(not(feature = "single-thread"))]
54    pub fn_process: FnProcess<TMsg>,
55}
56
57#[cfg_attr(not(feature = "single-thread"), async_trait)]
58#[cfg_attr(feature = "single-thread", async_trait(?Send))]
59#[async_trait(?Send)]
60impl<TMsg> IComponentProcess<Config<TMsg>, TMsg> for Component<Config<TMsg>, TMsg>
61where
62    TMsg: MsgDataBound,
63{
64    async fn process(
65        &self,
66        config: Config<TMsg>,
67        msgbus_linker: MsgBusLinker<TMsg>,
68    ) -> Result<(), ComponentError> {
69        let (msgbus_input, msgbus_output) = msgbus_linker.init(COMPONENT_NAME).input_output();
70        (config.fn_process)(msgbus_input, msgbus_output).await
71    }
72}
73
74/// Компонент cmp_external_fn_process
75pub type Cmp<TMsg> = Component<Config<TMsg>, TMsg>;
76
77#[cfg(test)]
78mod tests {
79    use std::time::Duration;
80
81    use tracing::info;
82
83    use crate::{
84        components::cmp_external_fn_process,
85        executor::{CmpResult, sleep},
86        message::{example_message::*, *},
87    };
88
89    use super::*;
90
91    #[cfg(feature = "single-thread")]
92    #[test]
93    fn single_thread() {
94        use futures::future::LocalBoxFuture;
95
96        fn fn_process_wrapper<TMsg>(
97            input: MsgBusInput<TMsg>,
98            output: MsgBusOutput<TMsg>,
99        ) -> LocalBoxFuture<'static, CmpResult>
100        where
101            TMsg: MsgDataBound + 'static,
102        {
103            Box::pin(async { fn_process(input, output).await })
104        }
105        async fn fn_process<TMsg>(
106            _input: MsgBusInput<TMsg>,
107            _output: MsgBusOutput<TMsg>,
108        ) -> CmpResult
109        where
110            TMsg: MsgDataBound,
111        {
112            loop {
113                info!("External fn process");
114                sleep(Duration::from_secs(2)).await;
115            }
116        }
117
118        let _config = cmp_external_fn_process::Config {
119            fn_process: Box::new(fn_process_wrapper::<Custom>),
120        };
121    }
122
123    #[cfg(not(feature = "single-thread"))]
124    #[test]
125    fn multi_thread() {
126        use futures::future::BoxFuture;
127
128        fn fn_process_wrapper<TMsg>(
129            input: MsgBusInput<TMsg>,
130            output: MsgBusOutput<TMsg>,
131        ) -> BoxFuture<'static, CmpResult>
132        where
133            TMsg: MsgDataBound + 'static,
134        {
135            Box::pin(async { fn_process(input, output).await })
136        }
137
138        async fn fn_process<TMsg>(
139            _input: MsgBusInput<TMsg>,
140            _output: MsgBusOutput<TMsg>,
141        ) -> CmpResult
142        where
143            TMsg: MsgDataBound + 'static,
144        {
145            loop {
146                info!("External fn process");
147                sleep(Duration::from_secs(2)).await;
148            }
149        }
150
151        let _config = cmp_external_fn_process::Config {
152            fn_process: Box::new(fn_process_wrapper::<Custom>),
153        };
154    }
155}