rsiot/executor/
component_executor.rs1use std::time::Duration;
2use tokio::{
3 sync::{broadcast, mpsc},
4 task::JoinSet,
5};
6use tracing::{debug, error, info, trace, warn};
7use uuid::Uuid;
8
9use crate::message::{system_messages::*, *};
10
11use super::{
12 component::IComponent, error::ComponentError, join_set_spawn, sleep, types::FnAuth, Cache,
13 CmpInOut,
14};
15
16const UPDATE_TTL_PERIOD: Duration = Duration::from_millis(200);
17
18pub struct ComponentExecutor<TMsg>
20where
21 TMsg: MsgDataBound,
22{
23 task_set: JoinSet<Result<(), ComponentError>>,
24 cmp_in_out: CmpInOut<TMsg>,
25}
26
27pub struct ComponentExecutorConfig<TMsg> {
29 pub buffer_size: usize,
31
32 pub fn_auth: FnAuth<TMsg>,
48
49 pub delay_publish: Duration,
55}
56
57impl<TMsg> ComponentExecutor<TMsg>
58where
59 TMsg: MsgDataBound + 'static,
60{
61 pub fn new(config: ComponentExecutorConfig<TMsg>) -> Self {
63 info!("ComponentExecutor start creation");
64 let id = MsgTrace::generate_uuid();
65 let (component_input_send, component_input) =
66 broadcast::channel::<Message<TMsg>>(config.buffer_size);
67 let (component_output, component_output_recv) =
68 mpsc::channel::<Message<TMsg>>(config.buffer_size);
69 let cache: Cache<TMsg> = Cache::new();
70 let mut task_set: JoinSet<Result<(), ComponentError>> = JoinSet::new();
71
72 let task_internal_handle = task_internal(
74 component_output_recv,
75 component_input_send.clone(),
76 cache.clone(),
77 id,
78 config.delay_publish,
79 );
80 join_set_spawn(&mut task_set, task_internal_handle);
81
82 let task_update_ttl_in_cache_handle = task_update_ttl_in_cache(cache.clone());
84 join_set_spawn(&mut task_set, task_update_ttl_in_cache_handle);
85
86 let cmp_in_out = CmpInOut::new(
87 component_input,
88 component_output,
89 cache.clone(),
90 "Trace name (maybe delete?)",
91 id,
92 AuthPermissions::default(),
93 config.fn_auth,
94 );
95
96 Self {
97 task_set,
98 cmp_in_out,
99 }
100 }
101
102 #[cfg(not(feature = "single-thread"))]
104 pub fn add_cmp(mut self, mut component: impl IComponent<TMsg> + Send + 'static) -> Self {
105 component.set_interface(self.cmp_in_out.clone());
106
107 self.task_set.spawn(async move { component.spawn().await });
108
109 self
110 }
111 #[cfg(feature = "single-thread")]
113 pub fn add_cmp(mut self, mut component: impl IComponent<TMsg> + 'static) -> Self {
114 component.set_interface(self.cmp_in_out.clone());
115
116 self.task_set
117 .spawn_local(async move { component.spawn().await });
118 self
119 }
120
121 pub async fn wait_result(&mut self) -> Result<(), ComponentError> {
126 let msg;
127 if let Some(result) = self.task_set.join_next().await {
128 match result {
129 Ok(result) => match result {
130 Ok(_) => msg = "Component has finished executing".to_string(),
131 Err(err) => {
132 msg = format!("Component has finished executing with error: {:?}", err);
133 }
134 },
135 Err(err) => {
136 msg = format!("Component has finished executing with error: {:?}", err);
137 }
138 };
139 error!(msg);
140 return Err(ComponentError::Execution(msg));
141 }
142 Ok(())
143 }
144}
145
146async fn task_internal<TMsg>(
147 mut input: mpsc::Receiver<Message<TMsg>>,
148 output: broadcast::Sender<Message<TMsg>>,
149 cache: Cache<TMsg>,
150 executor_id: Uuid,
151 delay_publish: Duration,
152) -> Result<(), ComponentError>
153where
154 TMsg: MsgDataBound,
155{
156 debug!("Internal task of ComponentExecutor: starting");
157
158 sleep(delay_publish).await;
160
161 while let Some(mut msg) = input.recv().await {
162 trace!("ComponentExecutor: new message: {:?}", msg);
164 msg.add_trace_item(&executor_id);
166 let msg = save_msg_in_cache(msg, &cache).await;
167 let Some(msg) = msg else { continue };
168 output.send(msg).map_err(|err| {
169 let err = format!(
170 "Internal task of ComponentExecutor: send to channel error, {:?}",
171 err
172 );
173 ComponentError::Initialization(err)
174 })?;
175 }
176 warn!("Internal task: stop");
177 Ok(())
178}
179
180async fn task_update_ttl_in_cache<TMsg>(cache: Cache<TMsg>) -> Result<(), ComponentError>
182where
183 TMsg: MsgDataBound,
184{
185 loop {
186 sleep(UPDATE_TTL_PERIOD).await;
187 let mut cache = cache.write().await;
188 let mut keys_for_delete = vec![];
189 for (key, msg) in cache.iter_mut() {
190 msg.update_time_to_live(UPDATE_TTL_PERIOD);
191 if !msg.is_alive() {
192 keys_for_delete.push(key.clone());
193 }
194 }
195 for key in keys_for_delete {
196 let remove_result = cache.remove(&key);
197 if remove_result.is_none() {
198 let err = format!("Message with key {} not found in cache", key);
199 return Err(ComponentError::Execution(err));
200 }
201 }
202 }
203}
204
205async fn save_msg_in_cache<TMsg>(msg: Message<TMsg>, cache: &Cache<TMsg>) -> Option<Message<TMsg>>
211where
212 TMsg: MsgDataBound,
213{
214 if let MsgData::System(data) = &msg.data {
216 match data {
217 System::AuthRequestByLogin(_) => return Some(msg),
218 System::AuthRequestByToken(_) => return Some(msg),
219 System::AuthResponseErr(_) => return Some(msg),
220 System::AuthResponseOk(_) => return Some(msg),
221 System::EspWifiConnected => return Some(msg),
222 System::Ping(_) => return None,
223 System::Pong(_) => return None,
224 }
225 }
226 if !msg.is_alive() {
228 return Some(msg);
229 }
230 let key = msg.key.clone();
231 let value = msg.clone();
232 {
233 let mut lock = cache.write().await;
234 let value_from_cache = lock.get(&key);
235 if let Some(_value_from_cache) = value_from_cache {
236 }
241 lock.insert(key, value);
242 }
243 Some(msg)
244}