first commit
This commit is contained in:
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
/target
|
||||||
|
.env
|
||||||
|
/src/example.py
|
||||||
1867
Cargo.lock
generated
Normal file
1867
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
18
Cargo.toml
Normal file
18
Cargo.toml
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
[package]
|
||||||
|
name = "rust_mosquitto"
|
||||||
|
version = "0.1.0"
|
||||||
|
authors = ["Romulus21 <romain@delanoe.pro>"]
|
||||||
|
edition = "2018"
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
dotenv = "0.15"
|
||||||
|
paho-mqtt = "0.9"
|
||||||
|
futures = "0.3"
|
||||||
|
async-std = "1"
|
||||||
|
log = "0.4"
|
||||||
|
mysql = "*"
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
env_logger = "0.7"
|
||||||
76
src/main.rs
Normal file
76
src/main.rs
Normal file
@@ -0,0 +1,76 @@
|
|||||||
|
use async_std::task;
|
||||||
|
use dotenv;
|
||||||
|
use futures::{executor::block_on, stream::StreamExt};
|
||||||
|
use paho_mqtt as mqtt;
|
||||||
|
use std::time::Duration;
|
||||||
|
// use log::Level;
|
||||||
|
|
||||||
|
extern crate mysql;
|
||||||
|
|
||||||
|
// The topics to which we subscribe.
|
||||||
|
const TOPICS: &str = "#";
|
||||||
|
const QOS: i32 = 1;
|
||||||
|
|
||||||
|
mod mqtt_sub;
|
||||||
|
mod reccord;
|
||||||
|
//mod sql;
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
dotenv::dotenv().ok();
|
||||||
|
// Initialize the logger from the environment
|
||||||
|
// env_logger::init();
|
||||||
|
|
||||||
|
// println!("\n");
|
||||||
|
// for (key, value) in env::vars_os() {
|
||||||
|
// println!("{:?}: {:?}", key, value);
|
||||||
|
// }
|
||||||
|
// println!("\n");
|
||||||
|
|
||||||
|
let mut cli = mqtt_sub::init_cli();
|
||||||
|
|
||||||
|
if let Err(err) = block_on(async {
|
||||||
|
// Get message stream before connecting.
|
||||||
|
let mut strm = cli.get_stream(25);
|
||||||
|
|
||||||
|
// Define the set of options for the connection
|
||||||
|
let lwt = mqtt::Message::new("test", "Async subscriber lost connection", mqtt::QOS_1);
|
||||||
|
|
||||||
|
let conn_opts = mqtt::ConnectOptionsBuilder::new()
|
||||||
|
.keep_alive_interval(Duration::from_secs(30))
|
||||||
|
.mqtt_version(mqtt::MQTT_VERSION_3_1_1)
|
||||||
|
.clean_session(false)
|
||||||
|
.will_message(lwt)
|
||||||
|
.finalize();
|
||||||
|
|
||||||
|
// Make the connection to the broker
|
||||||
|
println!("Connecting to the MQTT server...");
|
||||||
|
cli.connect(conn_opts).await?;
|
||||||
|
cli.subscribe(TOPICS, QOS).await?;
|
||||||
|
println!("Waiting for messages...");
|
||||||
|
|
||||||
|
// Note that we're not providing a way to cleanly shut down and
|
||||||
|
// disconnect. Therefore, when you kill this app (with a ^C or
|
||||||
|
// whatever) the server will get an unexpected drop and then
|
||||||
|
// should emit the LWT message.
|
||||||
|
|
||||||
|
while let Some(msg_opt) = strm.next().await {
|
||||||
|
if let Some(msg) = msg_opt {
|
||||||
|
reccord::message(msg);
|
||||||
|
//sql::send();
|
||||||
|
} else {
|
||||||
|
// A "None" means we were disconnected. Try to reconnect...
|
||||||
|
println!("Lost connection. Attempting reconnect.");
|
||||||
|
while let Err(err) = cli.reconnect().await {
|
||||||
|
println!("Error reconnecting: {}", err);
|
||||||
|
// For tokio use: tokio::time::delay_for()
|
||||||
|
task::sleep(Duration::from_millis(1000)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Explicit return type for the async block
|
||||||
|
Ok::<(), mqtt::Error>(())
|
||||||
|
}) {
|
||||||
|
eprintln!("{}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
27
src/mqtt_sub.rs
Normal file
27
src/mqtt_sub.rs
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
use paho_mqtt as mqtt;
|
||||||
|
use std::{
|
||||||
|
env,
|
||||||
|
process,
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
pub fn init_cli() -> mqtt::AsyncClient {
|
||||||
|
let host = env::args()
|
||||||
|
.nth(1)
|
||||||
|
.unwrap_or_else(|| env::var("MQTTSERVER").unwrap().to_string());
|
||||||
|
|
||||||
|
// Create the client. Use an ID for a persistent session.
|
||||||
|
// A real system should try harder to use a unique ID.
|
||||||
|
let create_opts = mqtt::CreateOptionsBuilder::new()
|
||||||
|
.server_uri(host)
|
||||||
|
.client_id("rust_async_subscribe")
|
||||||
|
.finalize();
|
||||||
|
|
||||||
|
// Create the client connection
|
||||||
|
let cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| {
|
||||||
|
println!("Error creating the client: {:?}", e);
|
||||||
|
process::exit(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
return cli;
|
||||||
|
}
|
||||||
57
src/reccord.rs
Normal file
57
src/reccord.rs
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
use mysql::prelude::*;
|
||||||
|
use mysql::*;
|
||||||
|
use std::env;
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
|
struct Payment {
|
||||||
|
customer_id: i32,
|
||||||
|
amount: i32,
|
||||||
|
account_name: Option<String>,
|
||||||
|
}
|
||||||
|
struct MosquittoMessage {
|
||||||
|
service: String,
|
||||||
|
capteur: String,
|
||||||
|
r#type: String,
|
||||||
|
donnee: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn message(message: paho_mqtt::Message) {
|
||||||
|
let split: Vec<&str> = message.topic().split("/").collect();
|
||||||
|
|
||||||
|
let data = message.payload_str();
|
||||||
|
|
||||||
|
let message_receved = vec![MosquittoMessage {
|
||||||
|
service: split[0].to_string(),
|
||||||
|
capteur: split[1].to_string(),
|
||||||
|
r#type: split[2].to_string(),
|
||||||
|
donnee: data.to_string(),
|
||||||
|
}];
|
||||||
|
|
||||||
|
let url_builder = format!(
|
||||||
|
"mysql://{}:{}@localhost:3306/{}",
|
||||||
|
env::var("DBUSER").unwrap().to_string(),
|
||||||
|
env::var("DBPASSWORD").unwrap().to_string(),
|
||||||
|
env::var("DBNAME").unwrap().to_string()
|
||||||
|
);
|
||||||
|
let url = Opts::from_url(url_builder.as_str()).unwrap();
|
||||||
|
let pool = Pool::new(url).unwrap();
|
||||||
|
|
||||||
|
let mut conn = pool.get_conn().unwrap();
|
||||||
|
|
||||||
|
let _res = conn.exec_batch(
|
||||||
|
r"INSERT INTO donnees (service, capteur, type, donnee, date_donnee)
|
||||||
|
VALUES (:service, :capteur, :type, :donnee, NOW())",
|
||||||
|
message_receved.iter().map(|p| {
|
||||||
|
params! {
|
||||||
|
"service" => &p.service,
|
||||||
|
"capteur" => &p.capteur,
|
||||||
|
"type" => &p.r#type,
|
||||||
|
"donnee" => &p.donnee,
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
println!("topic : {:?} data : {}", split, data.to_string());
|
||||||
|
|
||||||
|
return ();
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user