rsiot/components/cmp_timescaledb/
fn_process.rs

1use sqlx::{postgres::PgPoolOptions, query, Pool, Postgres};
2use tokio::time::{sleep, Duration};
3use tokio_util::task::TaskTracker;
4use tracing::{debug, error, info};
5use url::Url;
6
7use crate::{
8    executor::{CmpInOut, ComponentError},
9    message::MsgDataBound,
10};
11
12use super::{config::Config, error::Error, model::Row};
13
14pub async fn fn_process<TMsg>(
15    mut input: CmpInOut<TMsg>,
16    config: Config<TMsg>,
17) -> Result<(), ComponentError>
18where
19    TMsg: MsgDataBound,
20{
21    info!("Start cmp_timescaledb");
22
23    loop {
24        let result = task_main(&mut input, &config).await;
25        match result {
26            Ok(_) => (),
27            Err(err) => error!("{:?}", err),
28        }
29        sleep(Duration::from_secs(2)).await;
30        info!("Restarting...")
31    }
32}
33
34async fn task_main<TMsg>(input: &mut CmpInOut<TMsg>, config: &Config<TMsg>) -> Result<(), Error>
35where
36    TMsg: MsgDataBound,
37{
38    let connection_string = Url::parse(&config.connection_string)?;
39
40    let pool = PgPoolOptions::new()
41        .max_connections(config.max_connections)
42        .connect(connection_string.as_str())
43        .await?;
44
45    let task_tracker = TaskTracker::new();
46
47    while let Ok(msg) = input.recv_input().await {
48        let Some(msg) = msg.get_custom_data() else {
49            continue;
50        };
51
52        let rows = (config.fn_input)(&msg);
53        let Some(rows) = rows else { continue };
54
55        for row in rows {
56            let task = save_row_in_db(row, pool.clone());
57            task_tracker.spawn(task);
58        }
59    }
60
61    Ok(())
62}
63
64async fn save_row_in_db(row: Row, pool: Pool<Postgres>) -> Result<(), Error> {
65    debug!("Save row in database: {:?}", row);
66    query(
67        r#"
68INSERT INTO raw
69VALUES ($1, $2, $3, $4, $5, $6, $7)
70ON CONFLICT (time, entity, attr, agg) DO UPDATE
71    SET value = excluded.value,
72         aggts = excluded.aggts,
73         aggnext = excluded.aggnext;"#,
74    )
75    .bind(row.time)
76    .bind(&row.entity)
77    .bind(&row.attr)
78    .bind(row.value)
79    .bind(&row.agg)
80    .bind(row.aggts)
81    .bind(&row.aggnext)
82    .execute(&pool)
83    .await?;
84    Ok(())
85}