rsiot/message/message.rs
1#![allow(clippy::module_inception)]
2
3use std::{fmt::Debug, time::Duration};
4
5use serde::{Deserialize, Serialize};
6use uuid::Uuid;
7
8use super::{MsgData, MsgDataBound, MsgTrace, TimeToLiveValue, Timestamp};
9
10/// Сообщение
11#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
12pub struct Message<TCustom> {
13 /// Данные
14 pub data: MsgData<TCustom>,
15 /// Ключ
16 pub key: String,
17 /// Метка времени
18 pub ts: Timestamp,
19 /// Путь, по котором передавалось сообщение
20 pub trace: MsgTrace,
21 /// Время жизни сообщения
22 ttl: TimeToLiveValue,
23}
24
25impl<TCustom> Message<TCustom>
26where
27 TCustom: MsgDataBound,
28{
29 /// Создать новое сообщение
30 pub fn new(data: MsgData<TCustom>) -> Self {
31 // let key = define_key(&data);
32 // let key = super::define_key::define_key(&data);
33 let key = data.key();
34 let ttl = data.define_time_to_live();
35 Self {
36 data,
37 key,
38 ts: Default::default(),
39 trace: MsgTrace::default(),
40 ttl,
41 }
42 }
43
44 /// Создать новое сообщение типа `MsgData::Custom`
45 pub fn new_custom(custom_data: TCustom) -> Self {
46 let data = MsgData::Custom(custom_data);
47 // let key = define_key(&data);
48 // let key = super::define_key::define_key(&data);
49 let key = data.key();
50 let ttl = data.define_time_to_live();
51 Self {
52 data,
53 key,
54 ts: Default::default(),
55 trace: MsgTrace::default(),
56 ttl,
57 }
58 }
59
60 /// Возвращает данные сообщения, если тип сообщения `MsgData::Custom`
61 pub fn get_custom_data(&self) -> Option<TCustom> {
62 match &self.data {
63 MsgData::System(_) => None,
64 MsgData::Custom(data) => Some(data.clone()),
65 }
66 }
67
68 /// Добавить запись пути
69 pub fn add_trace_item(&mut self, id: &Uuid) {
70 self.trace.add_trace_item(*id)
71 }
72
73 /// Проверяем, что в трейсе сообщения присутсвует компонент с заданным id.
74 ///
75 /// Полезно для предотварщения зацикливания сообщений, чтобы данный компонент не обрабатывал
76 /// сообщения, которые он же и сгенерировал
77 pub fn contains_trace_item(&self, id: &Uuid) -> bool {
78 self.trace.contains_trace_item(id)
79 }
80
81 /// Обновить время жизни сообщения
82 pub fn update_time_to_live(&mut self, time_step: Duration) {
83 match self.ttl {
84 TimeToLiveValue::Infinite => (),
85 TimeToLiveValue::Duration(duration) => {
86 let ttl_new = duration.checked_sub(time_step);
87 match ttl_new {
88 Some(ttl_new) => self.ttl = TimeToLiveValue::Duration(ttl_new),
89 None => self.ttl = TimeToLiveValue::Duration(Duration::new(0, 0)),
90 }
91 }
92 TimeToLiveValue::DisableCaching => (),
93 }
94 }
95
96 /// Возвращает false, если время жизни сообщения истекло
97 pub fn is_alive(&self) -> bool {
98 match self.ttl {
99 TimeToLiveValue::Infinite => true,
100 TimeToLiveValue::Duration(duration) => !duration.is_zero(),
101 TimeToLiveValue::DisableCaching => false,
102 }
103 }
104
105 // Разрешен ли марштур данного сообщения
106 // pub fn is_route_enabled(&self, src: &TCustom::TService, dst: &TCustom::TService) -> bool {
107 // let route = match &self.data {
108 // MsgData::System(data) => return data.define_enabled_routes(),
109 // MsgData::Custom(data) => data.define_enabled_routes(),
110 // };
111 // match route {
112 // MsgRoute::SrcToAny(route_src) => *src == route_src,
113 // MsgRoute::SrcToDst(route_src, route_dst) => *src == route_src && *dst == route_dst,
114 // MsgRoute::AnyToAny => true,
115 // MsgRoute::None => false,
116 // MsgRoute::SrcToDstSeveral(routes) => {
117 // for (route_src, route_dst) in routes {
118 // if *src == route_src && *dst == route_dst {
119 // return true;
120 // }
121 // }
122 // false
123 // }
124 // }
125 // }
126}