rsiot/executor/
cmp_in_out.rs1use std::{cmp::max, fmt::Debug};
2
3use tracing::{info, trace};
4use uuid::Uuid;
5
6use crate::message::{system_messages::*, *};
7
8use super::{
9 types::{CmpInput, CmpOutput, FnAuth},
10 Cache, ComponentError,
11};
12
13#[derive(Debug)]
15pub struct CmpInOut<TMsg>
16where
17 TMsg: MsgDataBound,
18{
19 input: CmpInput<TMsg>,
20 output: CmpOutput<TMsg>,
21 pub cache: Cache<TMsg>,
24 name: String,
25 id: Uuid,
26 auth_perm: AuthPermissions,
27 fn_auth: FnAuth<TMsg>,
28}
29
30impl<TMsg> CmpInOut<TMsg>
31where
32 TMsg: MsgDataBound,
33{
34 pub fn new(
36 input: CmpInput<TMsg>,
37 output: CmpOutput<TMsg>,
38 cache: Cache<TMsg>,
39 name: &str,
40 id: Uuid,
41 auth_perm: AuthPermissions,
42 fn_auth: FnAuth<TMsg>,
43 ) -> Self {
44 info!("Start: {}, id: {}, auth_perm: {:?}", name, id, auth_perm);
45 Self {
46 input,
47 output,
48 cache,
49 id,
50 name: name.into(),
51 auth_perm,
52 fn_auth,
53 }
54 }
55
56 pub fn clone_with_new_id(&self, name: &str, auth_perm: AuthPermissions) -> Self {
61 let name = format!("{}::{}", self.name, name);
62 let id = MsgTrace::generate_uuid();
63 info!("Start: {}, id: {}, auth_perm: {:?}", name, id, auth_perm);
64 Self {
65 input: self.input.resubscribe(),
66 output: self.output.clone(),
67 cache: self.cache.clone(),
68 name,
69 id,
70 auth_perm,
71 fn_auth: self.fn_auth,
72 }
73 }
74
75 pub async fn recv_input(&mut self) -> Result<Message<TMsg>, ComponentError> {
77 loop {
78 let msg = self
79 .input
80 .recv()
81 .await
82 .map_err(|e| ComponentError::CmpInput(e.to_string()))?;
83
84 if let MsgData::System(System::AuthResponseOk(value)) = &msg.data {
87 if !value.trace_ids.contains(&self.id) {
88 continue;
89 }
90 self.auth_perm = max(self.auth_perm, value.perm);
91 }
92 if let MsgData::System(System::AuthResponseErr(value)) = &msg.data {
93 if !value.trace_ids.contains(&self.id) {
94 continue;
95 }
96 }
97
98 if msg.contains_trace_item(&self.id) {
100 continue;
101 }
102
103 let Some(mut msg) = (self.fn_auth)(msg, &self.auth_perm) else {
105 continue;
106 };
107
108 msg.add_trace_item(&self.id);
109 return Ok(msg);
110 }
111 }
112
113 pub async fn recv_cache_all(&self) -> Vec<Message<TMsg>> {
115 let lock = self.cache.read().await;
116 lock.values()
117 .cloned()
118 .filter_map(|m| (self.fn_auth)(m, &self.auth_perm))
119 .collect()
120 }
121
122 pub async fn recv_cache_msg(&self, key: &str) -> Option<Message<TMsg>> {
124 let cache = self.cache.read().await;
125 cache.get(key).map(|m| m.to_owned())
126 }
127
128 pub async fn send_output(&self, msg: Message<TMsg>) -> Result<(), ComponentError> {
130 trace!("Start send to output: {msg:?}");
131 let Some(mut msg) = (self.fn_auth)(msg, &self.auth_perm) else {
133 trace!("No authorization. Auth: {:?}", self.auth_perm);
134 return Ok(());
135 };
136
137 msg.add_trace_item(&self.id);
138 self.output
139 .send(msg)
140 .await
141 .map_err(|e| ComponentError::CmpOutput(e.to_string()))
142 }
143
144 pub fn send_output_blocking(&self, msg: Message<TMsg>) -> Result<(), ComponentError> {
146 trace!("Start send to output: {msg:?}");
147 let Some(mut msg) = (self.fn_auth)(msg, &self.auth_perm) else {
149 trace!("No authorization. Auth: {:?}", self.auth_perm);
150 return Ok(());
151 };
152
153 msg.add_trace_item(&self.id);
154 self.output
155 .blocking_send(msg)
156 .map_err(|e| ComponentError::CmpOutput(e.to_string()))
157 }
158
159 pub fn max_capacity(&self) -> usize {
161 self.output.max_capacity()
162 }
163}
164
165impl<TMsg> Clone for CmpInOut<TMsg>
166where
167 TMsg: MsgDataBound,
168{
169 fn clone(&self) -> Self {
170 Self {
171 input: self.input.resubscribe(),
172 output: self.output.clone(),
173 cache: self.cache.clone(),
174 id: self.id,
175 name: self.name.clone(),
176 auth_perm: self.auth_perm,
177 fn_auth: self.fn_auth,
178 }
179 }
180}