rsiot/components/cmp_external_fn_process/
mod.rs1use async_trait::async_trait;
12
13#[cfg(feature = "single-thread")]
14pub use futures::future::LocalBoxFuture;
15
16#[cfg(not(feature = "single-thread"))]
17pub use futures::future::BoxFuture;
18
19use crate::{
20 executor::{
21 CmpResult, Component, ComponentError, IComponentProcess, MsgBusInput, MsgBusLinker,
22 MsgBusOutput,
23 },
24 message::*,
25};
26
27pub const COMPONENT_NAME: &str = "cmp_external_fn_process";
29
30#[cfg(feature = "single-thread")]
31type FnProcess<TMsg> =
32 Box<dyn Fn(MsgBusInput<TMsg>, MsgBusOutput<TMsg>) -> LocalBoxFuture<'static, CmpResult>>;
33
34#[cfg(not(feature = "single-thread"))]
35type FnProcess<TMsg> = Box<
36 dyn Fn(MsgBusInput<TMsg>, MsgBusOutput<TMsg>) -> BoxFuture<'static, CmpResult> + Send + Sync,
37>;
38
39pub struct Config<TMsg>
41where
42 TMsg: MsgDataBound,
43{
44 #[cfg(feature = "single-thread")]
48 pub fn_process: FnProcess<TMsg>,
49
50 #[cfg(not(feature = "single-thread"))]
54 pub fn_process: FnProcess<TMsg>,
55}
56
57#[cfg_attr(not(feature = "single-thread"), async_trait)]
58#[cfg_attr(feature = "single-thread", async_trait(?Send))]
59#[async_trait(?Send)]
60impl<TMsg> IComponentProcess<Config<TMsg>, TMsg> for Component<Config<TMsg>, TMsg>
61where
62 TMsg: MsgDataBound,
63{
64 async fn process(
65 &self,
66 config: Config<TMsg>,
67 msgbus_linker: MsgBusLinker<TMsg>,
68 ) -> Result<(), ComponentError> {
69 let (msgbus_input, msgbus_output) = msgbus_linker.init(COMPONENT_NAME).input_output();
70 (config.fn_process)(msgbus_input, msgbus_output).await
71 }
72}
73
74pub type Cmp<TMsg> = Component<Config<TMsg>, TMsg>;
76
77#[cfg(test)]
78mod tests {
79 use std::time::Duration;
80
81 use tracing::info;
82
83 use crate::{
84 components::cmp_external_fn_process,
85 executor::{CmpResult, sleep},
86 message::{example_message::*, *},
87 };
88
89 use super::*;
90
91 #[cfg(feature = "single-thread")]
92 #[test]
93 fn single_thread() {
94 use futures::future::LocalBoxFuture;
95
96 fn fn_process_wrapper<TMsg>(
97 input: MsgBusInput<TMsg>,
98 output: MsgBusOutput<TMsg>,
99 ) -> LocalBoxFuture<'static, CmpResult>
100 where
101 TMsg: MsgDataBound + 'static,
102 {
103 Box::pin(async { fn_process(input, output).await })
104 }
105 async fn fn_process<TMsg>(
106 _input: MsgBusInput<TMsg>,
107 _output: MsgBusOutput<TMsg>,
108 ) -> CmpResult
109 where
110 TMsg: MsgDataBound,
111 {
112 loop {
113 info!("External fn process");
114 sleep(Duration::from_secs(2)).await;
115 }
116 }
117
118 let _config = cmp_external_fn_process::Config {
119 fn_process: Box::new(fn_process_wrapper::<Custom>),
120 };
121 }
122
123 #[cfg(not(feature = "single-thread"))]
124 #[test]
125 fn multi_thread() {
126 use futures::future::BoxFuture;
127
128 fn fn_process_wrapper<TMsg>(
129 input: MsgBusInput<TMsg>,
130 output: MsgBusOutput<TMsg>,
131 ) -> BoxFuture<'static, CmpResult>
132 where
133 TMsg: MsgDataBound + 'static,
134 {
135 Box::pin(async { fn_process(input, output).await })
136 }
137
138 async fn fn_process<TMsg>(
139 _input: MsgBusInput<TMsg>,
140 _output: MsgBusOutput<TMsg>,
141 ) -> CmpResult
142 where
143 TMsg: MsgDataBound + 'static,
144 {
145 loop {
146 info!("External fn process");
147 sleep(Duration::from_secs(2)).await;
148 }
149 }
150
151 let _config = cmp_external_fn_process::Config {
152 fn_process: Box::new(fn_process_wrapper::<Custom>),
153 };
154 }
155}