rsiot/components/cmp_http_client_esp/
fn_process.rsuse std::time::Duration;
use embedded_svc::http::{client::Client as HttpClient, Method};
use esp_idf_svc::http::client::EspHttpConnection;
use tokio::{
task::JoinSet,
time::{sleep, Instant},
};
use tracing::{error, info, warn};
use url::Url;
use crate::{
executor::CmpInOut,
message::{Message, MsgDataBound, ServiceBound},
};
use super::{config, Error};
pub async fn fn_process<TMsg, TService>(
config: config::Config<TMsg>,
in_out: CmpInOut<TMsg, TService>,
) -> super::Result<()>
where
TMsg: MsgDataBound + 'static,
TService: ServiceBound + 'static,
{
sleep(Duration::from_secs(2)).await;
info!("Starting http-client, configuration: {:?}", config);
loop {
let res = task_main(in_out.clone(), config.clone()).await;
match res {
Ok(_) => (),
Err(err) => {
error!("Error in http-client: {:?}", err);
}
}
info!("Restarting...");
sleep(Duration::from_secs(2)).await;
}
}
async fn task_main<TMsg, TService>(
in_out: CmpInOut<TMsg, TService>,
config: config::Config<TMsg>,
) -> super::Result<()>
where
TMsg: MsgDataBound + 'static,
TService: ServiceBound + 'static,
{
let mut set = JoinSet::<super::Result<()>>::new();
let url = Url::parse(&config.connection_config.base_url);
let url = match url {
Ok(val) => val,
Err(err) => {
let err = err.to_string();
let err = format!("Cannot parse url: {}", err);
return Err(Error::Configuration(err));
}
};
for req in config.requests_periodic {
let future = task_periodic_request(in_out.clone(), req, url.clone());
set.spawn_local(future);
}
while let Some(res) = set.join_next().await {
res??
}
Ok(())
}
async fn task_periodic_request<TMsg, TService>(
in_out: CmpInOut<TMsg, TService>,
config: config::RequestPeriodic<TMsg>,
url: Url,
) -> super::Result<()>
where
TMsg: MsgDataBound,
TService: ServiceBound,
{
loop {
let begin = Instant::now();
let msgs = process_request_and_response(
&url,
&config.http_param,
config.on_success,
config.on_failure,
)
.await?;
for msg in msgs {
in_out.send_output(msg).await?;
}
let elapsed = begin.elapsed();
let sleep_time = if config.period <= elapsed {
Duration::from_millis(10)
} else {
config.period - elapsed
};
sleep(sleep_time).await;
}
}
async fn process_request_and_response<TMsg>(
url: &Url,
request_param: &config::HttpParam,
on_success: config::CbkOnSuccess<TMsg>,
on_failure: config::CbkOnFailure<TMsg>,
) -> super::Result<Vec<Message<TMsg>>> {
info!("Call http client");
let response = send_request(url.clone(), request_param).await;
let msgs = vec![];
Ok(msgs)
}
async fn send_request(url: Url, req: &config::HttpParam) -> super::Result<()> {
let endpoint = match req {
config::HttpParam::Get { endpoint } => endpoint,
config::HttpParam::Put { endpoint, body: _ } => endpoint,
config::HttpParam::Post { endpoint, body: _ } => endpoint,
};
let url = url.join(endpoint).map_err(|err| {
let err = err.to_string();
Error::Configuration(err)
})?;
let url = url.to_string();
info!("Url: {}", url);
let mut client = HttpClient::wrap(EspHttpConnection::new(&Default::default()).unwrap());
let headers = [("accept", "text/plain")];
let response = match req {
config::HttpParam::Get { endpoint: _ } => {
let request = client.request(Method::Get, url.as_ref(), &headers);
let request = match request {
Ok(val) => val,
Err(err) => {
let err = err.to_string();
warn!("{}", err);
return Ok(());
}
};
request.submit().unwrap()
}
_ => todo!(),
};
let status = response.status();
info!("<- {}", status);
Ok(())
}