AsyncMQTTclient - Trouble with subscribing to topic
Posted: Fri Dec 17, 2021 12:08 am
Hey everyone, I am back with another challenge over the winter. I found this example sketch that I thought would be fairly straight forward to use but I was wrong. I am trying to subscribe to a topic "CALIB1" and use the text to determine which calibration method I'll use. but no matter what I try it will not receive any messages. Publish works fine though.
If anyone has an easier way of doing this im all ears!
If anyone has an easier way of doing this im all ears!
Code: Select all
/*
Rui Santos
Complete project details at https://RandomNerdTutorials.com/esp32-mqtt-publish-bme280-arduino/
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files.
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
*/
#include <Wire.h>
#include <Adafruit_Sensor.h>
#include <Adafruit_BME280.h>
#include <uFire_EC.h>
#include <uFire_pH.h>
#include <WiFi.h>
extern "C" {
#include "freertos/FreeRTOS.h"
#include "freertos/timers.h"
}
#include <AsyncMqttClient.h>
#define WIFI_SSID "..."
#define WIFI_PASSWORD "..."
// Raspberry Pi Mosquitto MQTT Broker
#define MQTT_HOST IPAddress(192, 168, 68, 100)
// For a cloud MQTT broker, type the domain name
//#define MQTT_HOST "example.com"
#define MQTT_PORT 1883
//calibration solutions used
#define PH_HIGH_SOLUTION_PH 7.0
#define PH_LOW_SOLUTION_PH 4.0
#define EC_HIGH_SOLUTION_EC 10.0
#define EC_LOW_SOLUTION_EC 1.0
#define CALIBRATION_TEMP 20.0
// Temperature MQTT Topics
#define CALIB_TOPIC "CALIB1"
#define MQTT_PUB_TEMP "esp32/bme280/temperature"
#define MQTT_PUB_HUM "esp32/bme280/humidity"
#define MQTT_PUB_PRES "esp32/bme280/pressure"
#define MQTT_PUB_EC "esp32/bme280/ec"
#define MQTT_PUB_PH "esp32/bme280/ph"
String message;
// BME280 I2C
Adafruit_BME280 bme;
uFire_EC ec;
uFire_pH ph;
// Variables to hold sensor readings
float temp;
float hum;
float pres;
float uS;
float pH;
AsyncMqttClient mqttClient;
TimerHandle_t mqttReconnectTimer;
TimerHandle_t wifiReconnectTimer;
unsigned long previousMillis = 0; // Stores last time temperature was published
const long interval = 10000; // Interval at which to publish sensor readings
void connectToWifi() {
Serial.println("Connecting to Wi-Fi...");
WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
}
void connectToMqtt() {
Serial.println("Connecting to MQTT...");
mqttClient.connect();
}
void WiFiEvent(WiFiEvent_t event) {
Serial.printf("[WiFi-event] event: %d\n", event);
switch(event) {
case SYSTEM_EVENT_STA_GOT_IP:
Serial.println("WiFi connected");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());
connectToMqtt();
break;
case SYSTEM_EVENT_STA_DISCONNECTED:
Serial.println("WiFi lost connection");
xTimerStop(mqttReconnectTimer, 0); // ensure we don't reconnect to MQTT while reconnecting to Wi-Fi
xTimerStart(wifiReconnectTimer, 0);
break;
}
}
void onMqttConnect(bool sessionPresent) {
Serial.println("Connected to MQTT.");
Serial.print("Session present: ");
Serial.println(sessionPresent);
uint16_t packetIdSub = mqttClient.subscribe("CALIB_TOPIC",1);
}
void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) {
Serial.println("Disconnected from MQTT.");
if (WiFi.isConnected()) {
xTimerStart(mqttReconnectTimer, 0);
}
}
void onMqttSubscribe(uint16_t packetIdSub, uint8_t qos) {
Serial.println("Subscribe acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetIdSub);
Serial.print(" qos: ");
Serial.println(qos);
}
/*void onMqttUnsubscribe(uint16_t packetId) {
Serial.println("Unsubscribe acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
}*/
void onMqttPublish(uint16_t packetId) {
Serial.print("Publish acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
}
void setup() {
Serial.begin(115200);
Serial.println();
Wire.begin();
ec.begin();
ph.begin();
// Initialize BME280 sensor
if (!bme.begin(0x76)) {
Serial.println("Could not find a valid BME280 sensor, check wiring!");
while (1);
}
mqttReconnectTimer = xTimerCreate("mqttTimer", pdMS_TO_TICKS(2000), pdFALSE, (void*)0, reinterpret_cast<TimerCallbackFunction_t>(connectToMqtt));
wifiReconnectTimer = xTimerCreate("wifiTimer", pdMS_TO_TICKS(2000), pdFALSE, (void*)0, reinterpret_cast<TimerCallbackFunction_t>(connectToWifi));
WiFi.onEvent(WiFiEvent);
mqttClient.onConnect(onMqttConnect);
mqttClient.onDisconnect(onMqttDisconnect);
mqttClient.onSubscribe(onMqttSubscribe);
//mqttClient.onUnsubscribe(onMqttUnsubscribe);
mqttClient.onPublish(onMqttPublish);
mqttClient.setServer(MQTT_HOST, MQTT_PORT);
// If your broker requires authentication (username and password), set them below
mqttClient.setCredentials("mqtt-user", "pw");
connectToWifi();
}
void loop() {
unsigned long currentMillis = millis();
// Every X number of seconds (interval = 10 seconds)
// it publishes a new MQTT message
if (currentMillis - previousMillis >= interval) {
// Save the last time a new reading was published
previousMillis = currentMillis;
// New BME280 sensor readings
temp = bme.readTemperature();
//temp = 1.8*bme.readTemperature() + 32;
hum = bme.readHumidity();
pres = bme.readPressure()/100.0F;
uS = ec.measureEC();
pH = ph.measurepH();
/* This is the part I am struggling with
//String CALIB_TOPIC == String message;
if (message == "EC1_HIGH") ec.calibrateProbeHigh(EC_HIGH_SOLUTION_EC, CALIBRATION_TEMP);
if (message == "EC1_LOW") ec.calibrateProbeLow(EC_LOW_SOLUTION_EC, CALIBRATION_TEMP);
if (message == "PH1_HIGH") ph.calibrateProbeHigh(PH_HIGH_SOLUTION_PH);
if (message == "PH1_LOW") ph.calibrateProbeLow(PH_LOW_SOLUTION_PH);
uint16_t packetIdSub = mqttClient.subscribe(CALIB_TOPIC, 1);
Serial.print("Subscribing at QoS 2, packetId: ");
Serial.println(CALIB_TOPIC);
*/
// Publish an MQTT message on topic esp32/BME2800/temperature
uint16_t packetIdPub1 = mqttClient.publish(MQTT_PUB_TEMP, 1, true, String(temp).c_str());
Serial.printf("Publishing on topic %s at QoS 1, packetId: %i", MQTT_PUB_TEMP, packetIdPub1);
Serial.printf("Message: %.2f \n", temp);
// Publish an MQTT message on topic esp32/BME2800/humidity
uint16_t packetIdPub2 = mqttClient.publish(MQTT_PUB_HUM, 1, true, String(hum).c_str());
Serial.printf("Publishing on topic %s at QoS 1, packetId %i: ", MQTT_PUB_HUM, packetIdPub2);
Serial.printf("Message: %.2f \n", hum);
// Publish an MQTT message on topic esp32/BME2800/pressure
uint16_t packetIdPub3 = mqttClient.publish(MQTT_PUB_PRES, 1, true, String(pres).c_str());
Serial.printf("Publishing on topic %s at QoS 1, packetId: %i", MQTT_PUB_PRES, packetIdPub3);
Serial.printf("Message: %.3f \n", pres);
// Publish an MQTT message on topic esp32/ufire/ec
uint16_t packetIdPub4 = mqttClient.publish(MQTT_PUB_EC, 1, true, String(uS).c_str());
Serial.printf("Publishing on topic %s at QoS 1, packetId: %i", MQTT_PUB_EC, packetIdPub4);
Serial.printf("Message: %.3f \n", uS);
// Publish an MQTT message on topic esp32/ufire/ec
uint16_t packetIdPub5 = mqttClient.publish(MQTT_PUB_PH, 1, true, String(pH).c_str());
Serial.printf("Publishing on topic %s at QoS 1, packetId: %i", MQTT_PUB_PH, packetIdPub5);
Serial.printf("Message: %.2f \n", pH);
}
}