From 09624ee69bd6015fcab172ec1207e1e9deabe9c6 Mon Sep 17 00:00:00 2001 From: Romulus21 Date: Mon, 19 Aug 2024 23:33:46 +0200 Subject: [PATCH] test with env file --- .env.example | 7 +++++ Cargo.lock | 73 ++++++------------------------------------------- Cargo.toml | 9 +----- README.md | 14 ++++++++++ src/main.rs | 66 ++++++++++++++++++++++++++------------------ src/mqtt_sub.rs | 27 ------------------ src/reccord.rs | 36 ++++++++++-------------- 7 files changed, 84 insertions(+), 148 deletions(-) create mode 100644 .env.example create mode 100644 README.md delete mode 100644 src/mqtt_sub.rs diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..1657509 --- /dev/null +++ b/.env.example @@ -0,0 +1,7 @@ +DB_USER=user +DB_PASSWORD=password +DB_HOST=localhost +DB_PORT=3306 +DB_NAME=test_rust_mosquitto +MQTT_HOST=localhost +MQTT_PORT=1883 \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 77d70bb..3e49920 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -151,17 +151,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a" -[[package]] -name = "atty" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" -dependencies = [ - "hermit-abi", - "libc", - "winapi", -] - [[package]] name = "autocfg" version = "1.1.0" @@ -309,9 +298,9 @@ dependencies = [ [[package]] name = "cmake" -version = "0.1.48" +version = "0.1.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8ad8cef104ac57b68b89df3208164d228503abbdce70f6880ffa3d970e7443a" +checksum = "a31c789563b815f77f4250caee12365734369f942439b7defd71e18a48197130" dependencies = [ "cc", ] @@ -474,19 +463,6 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" -[[package]] -name = "env_logger" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" -dependencies = [ - "atty", - "humantime", - "log", - "regex", - "termcolor", -] - [[package]] name = "event-listener" version = "2.5.2" @@ -777,15 +753,6 @@ dependencies = [ "libc", ] -[[package]] -name = "humantime" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f" -dependencies = [ - "quick-error", -] - [[package]] name = "idna" version = "0.2.3" @@ -1215,10 +1182,12 @@ dependencies = [ [[package]] name = "paho-mqtt" -version = "0.9.1" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d82fea0990fe54e75d575bbd9bc2ee5919fd10cc0b4a95f1967528083129fc4b" +checksum = "19e405de34b835fb6457d8b0169eda21949f855472b3e346556af9e29fac6eb2" dependencies = [ + "async-channel", + "crossbeam-channel", "futures", "futures-timer", "libc", @@ -1229,9 +1198,9 @@ dependencies = [ [[package]] name = "paho-mqtt-sys" -version = "0.5.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ad9ac6a77a7e7c70cd51262b94ab666c9e4c38fb0f4201dba8d7f8589aa8ce4" +checksum = "5e482419d847af4ec43c07eed70f5f94f87dc712d267aecc91ab940944ab6bf4" dependencies = [ "cmake", "openssl-sys", @@ -1316,12 +1285,6 @@ dependencies = [ "unicode-xid", ] -[[package]] -name = "quick-error" -version = "1.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" - [[package]] name = "quote" version = "1.0.15" @@ -1419,9 +1382,7 @@ version = "0.1.0" dependencies = [ "async-std", "dotenv", - "env_logger", "futures", - "log", "mysql", "paho-mqtt", ] @@ -1597,15 +1558,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "termcolor" -version = "1.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" -dependencies = [ - "winapi-util", -] - [[package]] name = "thiserror" version = "1.0.30" @@ -1842,15 +1794,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" -[[package]] -name = "winapi-util" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" -dependencies = [ - "winapi", -] - [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index effb352..dbdf7ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,14 +8,7 @@ edition = "2018" [dependencies] dotenv = "0.15" -paho-mqtt = { version = "0.12.1", features = ["vendored-ssl"] } +paho-mqtt = "0.12" futures = "0.3" async-std = "1" -log = "0.4" mysql = "*" - -[dev-dependencies] -env_logger = "0.7" - -[target.armv7-unknown-linux-gnueabihf] -linker = "arm-linux-gnueabihf-gcc" diff --git a/README.md b/README.md new file mode 100644 index 0000000..dbf0826 --- /dev/null +++ b/README.md @@ -0,0 +1,14 @@ +example of systemd config +``` +[Unit] +Description=Running rust script +After=multi-user.target + +[Service] +WorkingDirectory=/home/pi/Scripts/rust_agregator/target/release +ExecStart=/home/pi/Scripts/rust_mosquitto/target/release/rust_mosquitto +Type=simple + +[Install] +WantedBy=multi-user.target +``` \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index d37024b..45c24b1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,53 +1,64 @@ -use async_std::task; -use dotenv; +use dotenv::dotenv; use futures::{executor::block_on, stream::StreamExt}; use paho_mqtt as mqtt; -use std::time::Duration; -// use log::Level; +use std::{env, process, time::Duration}; extern crate mysql; // The topics to which we subscribe. -const TOPICS: &str = "#"; -const QOS: i32 = 1; - -mod mqtt_sub; +//const TOPICS: &str = "#"; +const TOPICS: &[&str] = &["#"]; +const QOS: &[i32] = &[1, 1]; mod reccord; //mod sql; fn main() { - dotenv::dotenv().ok(); - // Initialize the logger from the environment - // env_logger::init(); + dotenv().ok(); - // println!("\n"); - // for (key, value) in env::vars_os() { - // println!("{:?}: {:?}", key, value); - // } - // println!("\n"); + let mqtt_host = env::var("MQTT_HOST").expect("MQTT_HOST must be set"); + let mqtt_port = env::var("MQTT_PORT").expect("MQTT_PORT must be set"); - let mut cli = mqtt_sub::init_cli(); + let url = format!("tcp://{}:{}", mqtt_host, mqtt_port); + let create_opts = mqtt::CreateOptionsBuilder::new_v3() + .server_uri(url.as_str()) + .client_id("rust_async_subscribe") + .finalize(); + + // Create the client connection + let mut cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| { + println!("Error creating the client: {:?}", e); + process::exit(1); + }); if let Err(err) = block_on(async { // Get message stream before connecting. let mut strm = cli.get_stream(25); // Define the set of options for the connection - let lwt = mqtt::Message::new("test", "Async subscriber lost connection", mqtt::QOS_1); + let lwt = mqtt::Message::new( + "test/lwt", + "[LWT] Async subscriber lost connection", + mqtt::QOS_1, + ); - let conn_opts = mqtt::ConnectOptionsBuilder::new() + // Create the connect options, explicitly requesting MQTT v3.x + let conn_opts = mqtt::ConnectOptionsBuilder::new_v3() .keep_alive_interval(Duration::from_secs(30)) - .mqtt_version(mqtt::MQTT_VERSION_3_1_1) .clean_session(false) .will_message(lwt) .finalize(); // Make the connection to the broker - println!("Connecting to the MQTT server..."); cli.connect(conn_opts).await?; - cli.subscribe(TOPICS, QOS).await?; + + println!("Subscribing to topics: {:?}", TOPICS); + cli.subscribe_many(TOPICS, QOS).await?; + + // Just loop on incoming messages. println!("Waiting for messages..."); + let mut rconn_attempt: usize = 0; + // Note that we're not providing a way to cleanly shut down and // disconnect. Therefore, when you kill this app (with a ^C or // whatever) the server will get an unexpected drop and then @@ -56,14 +67,15 @@ fn main() { while let Some(msg_opt) = strm.next().await { if let Some(msg) = msg_opt { reccord::message(msg); - //sql::send(); - } else { + } + else { // A "None" means we were disconnected. Try to reconnect... - println!("Lost connection. Attempting reconnect."); + println!("Lost connection. Attempting reconnect..."); while let Err(err) = cli.reconnect().await { - println!("Error reconnecting: {}", err); + rconn_attempt += 1; + println!("Error reconnecting #{}: {}", rconn_attempt, err); // For tokio use: tokio::time::delay_for() - task::sleep(Duration::from_millis(1000)).await; + async_std::task::sleep(Duration::from_secs(1)).await; } } } diff --git a/src/mqtt_sub.rs b/src/mqtt_sub.rs deleted file mode 100644 index 3858f6b..0000000 --- a/src/mqtt_sub.rs +++ /dev/null @@ -1,27 +0,0 @@ -use paho_mqtt as mqtt; -use std::{ - env, - process, -}; - - -pub fn init_cli() -> mqtt::AsyncClient { - let host = env::args() - .nth(1) - .unwrap_or_else(|| env::var("MQTTSERVER").unwrap().to_string()); - - // Create the client. Use an ID for a persistent session. - // A real system should try harder to use a unique ID. - let create_opts = mqtt::CreateOptionsBuilder::new() - .server_uri(host) - .client_id("rust_async_subscribe") - .finalize(); - - // Create the client connection - let cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| { - println!("Error creating the client: {:?}", e); - process::exit(1); - }); - - return cli; -} diff --git a/src/reccord.rs b/src/reccord.rs index 68e9b33..f2386ed 100644 --- a/src/reccord.rs +++ b/src/reccord.rs @@ -1,13 +1,9 @@ use mysql::prelude::*; use mysql::*; use std::env; +use dotenv::dotenv; #[derive(Debug, PartialEq, Eq)] -struct Payment { - customer_id: i32, - amount: i32, - account_name: Option, -} struct MosquittoMessage { service: String, capteur: String, @@ -16,32 +12,32 @@ struct MosquittoMessage { } pub fn message(message: paho_mqtt::Message) { - let split: Vec<&str> = message.topic().split("/").collect(); + dotenv().ok(); + let db_user = env::var("DB_USER").expect("DB_USER must be set"); + let db_password = env::var("DB_PASSWORD").expect("DB_PASSWORD must be set"); + let db_host = env::var("DB_HOST").expect("DB_HOST must be set"); + let db_port: u16 = env::var("DB_PORT").expect("DB_PORT must be set").parse().unwrap(); + let db_name = env::var("DB_NAME").expect("DB_NAME must be set"); + let url_builder = format!("mysql://{}:{}@{}:{}/{}", db_user, db_password, db_host, db_port, db_name); + let url = Opts::from_url(url_builder.as_str()).unwrap(); + let pool = Pool::new(url).unwrap(); + let mut conn = pool.get_conn().unwrap(); + + let split: Vec<&str> = message.topic().split("/").collect(); let data = message.payload_str(); - let message_receved = vec![MosquittoMessage { + let message_received = vec![MosquittoMessage { service: split[0].to_string(), capteur: split[1].to_string(), r#type: split[2].to_string(), donnee: data.to_string(), }]; - let url_builder = format!( - "mysql://{}:{}@localhost:3306/{}", - env::var("DBUSER").unwrap().to_string(), - env::var("DBPASSWORD").unwrap().to_string(), - env::var("DBNAME").unwrap().to_string() - ); - let url = Opts::from_url(url_builder.as_str()).unwrap(); - let pool = Pool::new(url).unwrap(); - - let mut conn = pool.get_conn().unwrap(); - let _res = conn.exec_batch( r"INSERT INTO donnees (service, capteur, type, donnee, date_donnee) VALUES (:service, :capteur, :type, :donnee, NOW())", - message_receved.iter().map(|p| { + message_received.iter().map(|p| { params! { "service" => &p.service, "capteur" => &p.capteur, @@ -52,6 +48,4 @@ pub fn message(message: paho_mqtt::Message) { ); println!("topic : {:?} data : {}", split, data.to_string()); - - return (); }