rsiot/executor/
msgbus_output.rs1use tokio::sync::mpsc;
2use tracing::trace;
3use uuid::Uuid;
4
5use crate::message::{Message, MsgDataBound};
6
7use super::ComponentError;
8
9pub struct MsgBusOutput<TMsg>
11where
12 TMsg: MsgDataBound,
13{
14 output: mpsc::Sender<Message<TMsg>>,
15 id: Uuid,
16}
17
18impl<TMsg> MsgBusOutput<TMsg>
19where
20 TMsg: MsgDataBound,
21{
22 pub(crate) fn new(output: mpsc::Sender<Message<TMsg>>, id: Uuid) -> Self {
23 Self { output, id }
24 }
25
26 pub async fn send(&self, mut msg: Message<TMsg>) -> Result<(), ComponentError> {
28 trace!("Start send to output: {msg:?}");
29
30 msg.set_cmp_source(&self.id);
31 self.output
32 .send(msg)
33 .await
34 .map_err(|e| ComponentError::CmpOutput(e.to_string()))
35 }
36
37 pub fn try_send(&self, mut msg: Message<TMsg>) -> Result<(), ComponentError> {
39 trace!("Start send to output: {msg:?}");
40
41 msg.set_cmp_source(&self.id);
42 self.output
43 .try_send(msg)
44 .map_err(|e| ComponentError::CmpOutput(e.to_string()))
45 }
46
47 pub fn send_blocking(&self, mut msg: Message<TMsg>) -> Result<(), ComponentError> {
49 trace!("Start send to output: {msg:?}");
50
51 msg.set_cmp_source(&self.id);
52
53 self.output
54 .blocking_send(msg)
55 .map_err(|e| ComponentError::CmpOutput(e.to_string()))
56 }
57
58 pub fn max_capacity(&self) -> usize {
60 self.output.max_capacity()
61 }
62}
63
64impl<TMsg> Clone for MsgBusOutput<TMsg>
65where
66 TMsg: MsgDataBound,
67{
68 fn clone(&self) -> Self {
69 Self {
70 output: self.output.clone(),
71 id: self.id,
72 }
73 }
74}