test with env file

This commit is contained in:
Romulus21
2024-08-19 23:33:46 +02:00
parent f7e9a99679
commit 09624ee69b
7 changed files with 84 additions and 148 deletions

View File

@@ -1,53 +1,64 @@
use async_std::task;
use dotenv;
use dotenv::dotenv;
use futures::{executor::block_on, stream::StreamExt};
use paho_mqtt as mqtt;
use std::time::Duration;
// use log::Level;
use std::{env, process, time::Duration};
extern crate mysql;
// The topics to which we subscribe.
const TOPICS: &str = "#";
const QOS: i32 = 1;
mod mqtt_sub;
//const TOPICS: &str = "#";
const TOPICS: &[&str] = &["#"];
const QOS: &[i32] = &[1, 1];
mod reccord;
//mod sql;
fn main() {
dotenv::dotenv().ok();
// Initialize the logger from the environment
// env_logger::init();
dotenv().ok();
// println!("\n");
// for (key, value) in env::vars_os() {
// println!("{:?}: {:?}", key, value);
// }
// println!("\n");
let mqtt_host = env::var("MQTT_HOST").expect("MQTT_HOST must be set");
let mqtt_port = env::var("MQTT_PORT").expect("MQTT_PORT must be set");
let mut cli = mqtt_sub::init_cli();
let url = format!("tcp://{}:{}", mqtt_host, mqtt_port);
let create_opts = mqtt::CreateOptionsBuilder::new_v3()
.server_uri(url.as_str())
.client_id("rust_async_subscribe")
.finalize();
// Create the client connection
let mut cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| {
println!("Error creating the client: {:?}", e);
process::exit(1);
});
if let Err(err) = block_on(async {
// Get message stream before connecting.
let mut strm = cli.get_stream(25);
// Define the set of options for the connection
let lwt = mqtt::Message::new("test", "Async subscriber lost connection", mqtt::QOS_1);
let lwt = mqtt::Message::new(
"test/lwt",
"[LWT] Async subscriber lost connection",
mqtt::QOS_1,
);
let conn_opts = mqtt::ConnectOptionsBuilder::new()
// Create the connect options, explicitly requesting MQTT v3.x
let conn_opts = mqtt::ConnectOptionsBuilder::new_v3()
.keep_alive_interval(Duration::from_secs(30))
.mqtt_version(mqtt::MQTT_VERSION_3_1_1)
.clean_session(false)
.will_message(lwt)
.finalize();
// Make the connection to the broker
println!("Connecting to the MQTT server...");
cli.connect(conn_opts).await?;
cli.subscribe(TOPICS, QOS).await?;
println!("Subscribing to topics: {:?}", TOPICS);
cli.subscribe_many(TOPICS, QOS).await?;
// Just loop on incoming messages.
println!("Waiting for messages...");
let mut rconn_attempt: usize = 0;
// Note that we're not providing a way to cleanly shut down and
// disconnect. Therefore, when you kill this app (with a ^C or
// whatever) the server will get an unexpected drop and then
@@ -56,14 +67,15 @@ fn main() {
while let Some(msg_opt) = strm.next().await {
if let Some(msg) = msg_opt {
reccord::message(msg);
//sql::send();
} else {
}
else {
// A "None" means we were disconnected. Try to reconnect...
println!("Lost connection. Attempting reconnect.");
println!("Lost connection. Attempting reconnect...");
while let Err(err) = cli.reconnect().await {
println!("Error reconnecting: {}", err);
rconn_attempt += 1;
println!("Error reconnecting #{}: {}", rconn_attempt, err);
// For tokio use: tokio::time::delay_for()
task::sleep(Duration::from_millis(1000)).await;
async_std::task::sleep(Duration::from_secs(1)).await;
}
}
}

View File

@@ -1,27 +0,0 @@
use paho_mqtt as mqtt;
use std::{
env,
process,
};
pub fn init_cli() -> mqtt::AsyncClient {
let host = env::args()
.nth(1)
.unwrap_or_else(|| env::var("MQTTSERVER").unwrap().to_string());
// Create the client. Use an ID for a persistent session.
// A real system should try harder to use a unique ID.
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id("rust_async_subscribe")
.finalize();
// Create the client connection
let cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| {
println!("Error creating the client: {:?}", e);
process::exit(1);
});
return cli;
}

View File

@@ -1,13 +1,9 @@
use mysql::prelude::*;
use mysql::*;
use std::env;
use dotenv::dotenv;
#[derive(Debug, PartialEq, Eq)]
struct Payment {
customer_id: i32,
amount: i32,
account_name: Option<String>,
}
struct MosquittoMessage {
service: String,
capteur: String,
@@ -16,32 +12,32 @@ struct MosquittoMessage {
}
pub fn message(message: paho_mqtt::Message) {
let split: Vec<&str> = message.topic().split("/").collect();
dotenv().ok();
let db_user = env::var("DB_USER").expect("DB_USER must be set");
let db_password = env::var("DB_PASSWORD").expect("DB_PASSWORD must be set");
let db_host = env::var("DB_HOST").expect("DB_HOST must be set");
let db_port: u16 = env::var("DB_PORT").expect("DB_PORT must be set").parse().unwrap();
let db_name = env::var("DB_NAME").expect("DB_NAME must be set");
let url_builder = format!("mysql://{}:{}@{}:{}/{}", db_user, db_password, db_host, db_port, db_name);
let url = Opts::from_url(url_builder.as_str()).unwrap();
let pool = Pool::new(url).unwrap();
let mut conn = pool.get_conn().unwrap();
let split: Vec<&str> = message.topic().split("/").collect();
let data = message.payload_str();
let message_receved = vec![MosquittoMessage {
let message_received = vec![MosquittoMessage {
service: split[0].to_string(),
capteur: split[1].to_string(),
r#type: split[2].to_string(),
donnee: data.to_string(),
}];
let url_builder = format!(
"mysql://{}:{}@localhost:3306/{}",
env::var("DBUSER").unwrap().to_string(),
env::var("DBPASSWORD").unwrap().to_string(),
env::var("DBNAME").unwrap().to_string()
);
let url = Opts::from_url(url_builder.as_str()).unwrap();
let pool = Pool::new(url).unwrap();
let mut conn = pool.get_conn().unwrap();
let _res = conn.exec_batch(
r"INSERT INTO donnees (service, capteur, type, donnee, date_donnee)
VALUES (:service, :capteur, :type, :donnee, NOW())",
message_receved.iter().map(|p| {
message_received.iter().map(|p| {
params! {
"service" => &p.service,
"capteur" => &p.capteur,
@@ -52,6 +48,4 @@ pub fn message(message: paho_mqtt::Message) {
);
println!("topic : {:?} data : {}", split, data.to_string());
return ();
}