rsiot/executor/
component_executor.rs1use std::time::Duration;
2use tokio::{
3 sync::{broadcast, mpsc},
4 task::JoinSet,
5};
6use tracing::{error, info};
7
8use crate::{
9 executor::Instant,
10 message::{AuthPermissions, Message, MsgDataBound},
11};
12
13#[cfg(feature = "log_tokio")]
14use super::task_runtime_metrics::TaskRuntimeMetrics;
15use super::{
16 MsgBusLinker, TokioRuntimeMetrics, component::IComponent, error::ComponentError,
17 join_set_spawn, task_internal::TaskInternal, types::FnAuth,
18};
19
20#[cfg(feature = "log_tokio")]
21const RUNTIME_METRICS_PERIOD: Duration = Duration::from_millis(100);
22
23pub type FnTokioMetrics<TMsg> = fn(TokioRuntimeMetrics) -> Option<TMsg>;
24
25pub struct ComponentExecutor<TMsg>
27where
28 TMsg: MsgDataBound,
29{
30 task_set: JoinSet<Result<(), ComponentError>>,
31 cmp_in_out: MsgBusLinker<TMsg>,
32 start_time: Instant,
33}
34
35pub struct ComponentExecutorConfig<TMsg> {
37 pub buffer_size: usize,
39
40 pub fn_auth: FnAuth<TMsg>,
56
57 pub delay_publish: Duration,
63
64 pub fn_tokio_metrics: FnTokioMetrics<TMsg>,
68}
69
70impl<TMsg> ComponentExecutor<TMsg>
71where
72 TMsg: MsgDataBound + 'static,
73{
74 pub fn new(config: ComponentExecutorConfig<TMsg>) -> Self {
76 info!("ComponentExecutor start creation");
77 let (component_input_send, component_input) =
78 broadcast::channel::<Message<TMsg>>(config.buffer_size);
79 let (component_output, component_output_recv) =
80 mpsc::channel::<Message<TMsg>>(config.buffer_size);
81 let mut task_set: JoinSet<Result<(), ComponentError>> = JoinSet::new();
82
83 let task = TaskInternal {
86 output: component_output_recv,
87 input: component_input_send.clone(),
88 delay_publish: config.delay_publish,
89 max_capacity: config.buffer_size,
90 };
91 join_set_spawn(&mut task_set, "internal_task", task.spawn());
92
93 #[cfg(feature = "log_tokio")]
95 {
96 let task = TaskRuntimeMetrics::<TMsg> {
97 output: component_input_send,
98 period: RUNTIME_METRICS_PERIOD,
99 fn_tokio_metrics: config.fn_tokio_metrics,
100 };
101 join_set_spawn(&mut task_set, "tokio_metrics", task.spawn());
102 }
103
104 let cmp_in_out = MsgBusLinker::new(
105 component_input,
106 component_output,
107 AuthPermissions::default(),
108 config.fn_auth,
109 );
110
111 Self {
112 task_set,
113 cmp_in_out,
114 start_time: Instant::now(),
115 }
116 }
117
118 #[cfg(not(feature = "single-thread"))]
120 pub fn add_cmp(mut self, mut component: impl IComponent<TMsg> + Send + 'static) -> Self {
121 component.set_interface(self.cmp_in_out.clone());
122
123 self.task_set.spawn(async move { component.spawn().await });
124
125 self
126 }
127 #[cfg(feature = "single-thread")]
129 pub fn add_cmp(mut self, mut component: impl IComponent<TMsg> + 'static) -> Self {
130 component.set_interface(self.cmp_in_out.clone());
131
132 self.task_set
133 .spawn_local(async move { component.spawn().await });
134 self
135 }
136
137 pub async fn wait_result(mut self) -> Result<(), ComponentError> {
140 drop(self.cmp_in_out);
142
143 let msg;
144 if let Some(result) = self.task_set.join_next().await {
145 info!("Program runtime: {:?}", self.start_time.elapsed());
146 match result {
147 Ok(Ok(_)) => {
148 msg = "Component has finished executing with Ok result".to_string();
149 info!(msg);
150 return Ok(());
151 }
152
153 Ok(Err(err)) => {
154 msg = format!("Component has finished executing with error: {err:?}");
155 }
156
157 Err(err) => {
158 msg = format!("Component has finished executing with error: {err:?}");
159 }
160 };
161 error!(msg);
162 return Err(ComponentError::Execution(msg));
163 }
164 Ok(())
165 }
166}