rsiot/components/cmp_timescaledb/
fn_process.rs1use sqlx::{postgres::PgPoolOptions, query, Pool, Postgres};
2use tokio::time::{sleep, Duration};
3use tokio_util::task::TaskTracker;
4use tracing::{debug, error, info};
5use url::Url;
6
7use crate::{
8 executor::{CmpInOut, ComponentError},
9 message::MsgDataBound,
10};
11
12use super::{config::Config, error::Error, model::Row};
13
14pub async fn fn_process<TMsg>(
15 mut input: CmpInOut<TMsg>,
16 config: Config<TMsg>,
17) -> Result<(), ComponentError>
18where
19 TMsg: MsgDataBound,
20{
21 info!("Start cmp_timescaledb");
22
23 loop {
24 let result = task_main(&mut input, &config).await;
25 match result {
26 Ok(_) => (),
27 Err(err) => error!("{:?}", err),
28 }
29 sleep(Duration::from_secs(2)).await;
30 info!("Restarting...")
31 }
32}
33
34async fn task_main<TMsg>(input: &mut CmpInOut<TMsg>, config: &Config<TMsg>) -> Result<(), Error>
35where
36 TMsg: MsgDataBound,
37{
38 let connection_string = Url::parse(&config.connection_string)?;
39
40 let pool = PgPoolOptions::new()
41 .max_connections(config.max_connections)
42 .connect(connection_string.as_str())
43 .await?;
44
45 let task_tracker = TaskTracker::new();
46
47 while let Ok(msg) = input.recv_input().await {
48 let Some(msg) = msg.get_custom_data() else {
49 continue;
50 };
51
52 let rows = (config.fn_input)(&msg);
53 let Some(rows) = rows else { continue };
54
55 for row in rows {
56 let task = save_row_in_db(row, pool.clone());
57 task_tracker.spawn(task);
58 }
59 }
60
61 Ok(())
62}
63
64async fn save_row_in_db(row: Row, pool: Pool<Postgres>) -> Result<(), Error> {
65 debug!("Save row in database: {:?}", row);
66 query(
67 r#"
68INSERT INTO raw
69VALUES ($1, $2, $3, $4, $5, $6, $7)
70ON CONFLICT (time, entity, attr, agg) DO UPDATE
71 SET value = excluded.value,
72 aggts = excluded.aggts,
73 aggnext = excluded.aggnext;"#,
74 )
75 .bind(row.time)
76 .bind(&row.entity)
77 .bind(&row.attr)
78 .bind(row.value)
79 .bind(&row.agg)
80 .bind(row.aggts)
81 .bind(&row.aggnext)
82 .execute(&pool)
83 .await?;
84 Ok(())
85}