AsyncMQTTclient - Trouble with subscribing to topic

Discuss garden automation systems and software here, including commercial products or Raspberry Pi and Arduino DIY setups.
Post Reply
Gromyjoe
LED Enthusiast
LED Enthusiast
Reactions:
Posts: 48
Joined: Fri Jul 03, 2020 2:28 pm

Hey everyone, I am back with another challenge over the winter. I found this example sketch that I thought would be fairly straight forward to use but I was wrong. I am trying to subscribe to a topic "CALIB1" and use the text to determine which calibration method I'll use. but no matter what I try it will not receive any messages. Publish works fine though.

If anyone has an easier way of doing this im all ears!

Code: Select all

/*
  Rui Santos
  Complete project details at https://RandomNerdTutorials.com/esp32-mqtt-publish-bme280-arduino/
  
  Permission is hereby granted, free of charge, to any person obtaining a copy
  of this software and associated documentation files.
  
  The above copyright notice and this permission notice shall be included in all
  copies or substantial portions of the Software.
*/

#include <Wire.h>
#include <Adafruit_Sensor.h>
#include <Adafruit_BME280.h>
#include <uFire_EC.h>
#include <uFire_pH.h>
#include <WiFi.h>
extern "C" {
  #include "freertos/FreeRTOS.h"
  #include "freertos/timers.h"
}
#include <AsyncMqttClient.h>

#define WIFI_SSID "..."
#define WIFI_PASSWORD "..."

// Raspberry Pi Mosquitto MQTT Broker
#define MQTT_HOST IPAddress(192, 168, 68, 100)
// For a cloud MQTT broker, type the domain name
//#define MQTT_HOST "example.com"
#define MQTT_PORT 1883

//calibration solutions used
#define PH_HIGH_SOLUTION_PH 7.0
#define PH_LOW_SOLUTION_PH  4.0
#define EC_HIGH_SOLUTION_EC 10.0
#define EC_LOW_SOLUTION_EC  1.0
#define CALIBRATION_TEMP    20.0

// Temperature MQTT Topics
#define CALIB_TOPIC    "CALIB1"
#define MQTT_PUB_TEMP "esp32/bme280/temperature"
#define MQTT_PUB_HUM "esp32/bme280/humidity"
#define MQTT_PUB_PRES "esp32/bme280/pressure"
#define MQTT_PUB_EC "esp32/bme280/ec"
#define MQTT_PUB_PH "esp32/bme280/ph"


String message;

// BME280 I2C
Adafruit_BME280 bme;
uFire_EC ec;
uFire_pH ph;
// Variables to hold sensor readings
float temp;
float hum;
float pres;
float uS;
float pH;

AsyncMqttClient mqttClient;
TimerHandle_t mqttReconnectTimer;
TimerHandle_t wifiReconnectTimer;

unsigned long previousMillis = 0;   // Stores last time temperature was published
const long interval = 10000;        // Interval at which to publish sensor readings

void connectToWifi() {
  Serial.println("Connecting to Wi-Fi...");
  WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
}

void connectToMqtt() {
  Serial.println("Connecting to MQTT...");
  mqttClient.connect();
}

void WiFiEvent(WiFiEvent_t event) {
  Serial.printf("[WiFi-event] event: %d\n", event);
  switch(event) {
    case SYSTEM_EVENT_STA_GOT_IP:
      Serial.println("WiFi connected");
      Serial.println("IP address: ");
      Serial.println(WiFi.localIP());
      connectToMqtt();
      break;
    case SYSTEM_EVENT_STA_DISCONNECTED:
      Serial.println("WiFi lost connection");
      xTimerStop(mqttReconnectTimer, 0); // ensure we don't reconnect to MQTT while reconnecting to Wi-Fi
      xTimerStart(wifiReconnectTimer, 0);
      break;
  }
}

void onMqttConnect(bool sessionPresent) {
  Serial.println("Connected to MQTT.");
  Serial.print("Session present: ");
  Serial.println(sessionPresent);
  uint16_t packetIdSub =  mqttClient.subscribe("CALIB_TOPIC",1);
}

void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) {
  Serial.println("Disconnected from MQTT.");
  if (WiFi.isConnected()) {
    xTimerStart(mqttReconnectTimer, 0);
  }
}

void onMqttSubscribe(uint16_t packetIdSub, uint8_t qos) {
  Serial.println("Subscribe acknowledged.");
  Serial.print("  packetId: ");
  Serial.println(packetIdSub);
  Serial.print("  qos: ");
  Serial.println(qos);
}
/*void onMqttUnsubscribe(uint16_t packetId) {
  Serial.println("Unsubscribe acknowledged.");
  Serial.print("  packetId: ");
  Serial.println(packetId);
}*/

void onMqttPublish(uint16_t packetId) {
  Serial.print("Publish acknowledged.");
  Serial.print("  packetId: ");
  Serial.println(packetId);
}

void setup() {
  Serial.begin(115200);
  Serial.println();
  Wire.begin();
  ec.begin();
  ph.begin();
  // Initialize BME280 sensor 
  if (!bme.begin(0x76)) {
    Serial.println("Could not find a valid BME280 sensor, check wiring!");
    while (1);
  }
  
  mqttReconnectTimer = xTimerCreate("mqttTimer", pdMS_TO_TICKS(2000), pdFALSE, (void*)0, reinterpret_cast<TimerCallbackFunction_t>(connectToMqtt));
  wifiReconnectTimer = xTimerCreate("wifiTimer", pdMS_TO_TICKS(2000), pdFALSE, (void*)0, reinterpret_cast<TimerCallbackFunction_t>(connectToWifi));

  WiFi.onEvent(WiFiEvent);

  mqttClient.onConnect(onMqttConnect);
  mqttClient.onDisconnect(onMqttDisconnect);
  mqttClient.onSubscribe(onMqttSubscribe);
  //mqttClient.onUnsubscribe(onMqttUnsubscribe);
  mqttClient.onPublish(onMqttPublish);
  mqttClient.setServer(MQTT_HOST, MQTT_PORT);
  // If your broker requires authentication (username and password), set them below
  mqttClient.setCredentials("mqtt-user", "pw");
  connectToWifi();
}

void loop() {
  unsigned long currentMillis = millis();
  // Every X number of seconds (interval = 10 seconds) 
  // it publishes a new MQTT message
  if (currentMillis - previousMillis >= interval) {
    // Save the last time a new reading was published
    previousMillis = currentMillis;
    // New BME280 sensor readings
    temp = bme.readTemperature();
    //temp = 1.8*bme.readTemperature() + 32;
    hum = bme.readHumidity();
    pres = bme.readPressure()/100.0F;
    uS = ec.measureEC();
    pH = ph.measurepH();

    /* This is the part I am struggling with
   
     //String CALIB_TOPIC == String message;
    if (message == "EC1_HIGH") ec.calibrateProbeHigh(EC_HIGH_SOLUTION_EC, CALIBRATION_TEMP);
    if (message == "EC1_LOW")  ec.calibrateProbeLow(EC_LOW_SOLUTION_EC, CALIBRATION_TEMP);
    if (message == "PH1_HIGH") ph.calibrateProbeHigh(PH_HIGH_SOLUTION_PH);
    if (message == "PH1_LOW") ph.calibrateProbeLow(PH_LOW_SOLUTION_PH);
    

    uint16_t packetIdSub = mqttClient.subscribe(CALIB_TOPIC, 1);
    Serial.print("Subscribing at QoS 2, packetId: ");
    Serial.println(CALIB_TOPIC);
  */
  
    // Publish an MQTT message on topic esp32/BME2800/temperature
    uint16_t packetIdPub1 = mqttClient.publish(MQTT_PUB_TEMP, 1, true, String(temp).c_str());                            
    Serial.printf("Publishing on topic %s at QoS 1, packetId: %i", MQTT_PUB_TEMP, packetIdPub1);
    Serial.printf("Message: %.2f \n", temp);

    // Publish an MQTT message on topic esp32/BME2800/humidity
    uint16_t packetIdPub2 = mqttClient.publish(MQTT_PUB_HUM, 1, true, String(hum).c_str());                            
    Serial.printf("Publishing on topic %s at QoS 1, packetId %i: ", MQTT_PUB_HUM, packetIdPub2);
    Serial.printf("Message: %.2f \n", hum);

    // Publish an MQTT message on topic esp32/BME2800/pressure
    uint16_t packetIdPub3 = mqttClient.publish(MQTT_PUB_PRES, 1, true, String(pres).c_str());                            
    Serial.printf("Publishing on topic %s at QoS 1, packetId: %i", MQTT_PUB_PRES, packetIdPub3);
    Serial.printf("Message: %.3f \n", pres);

 // Publish an MQTT message on topic esp32/ufire/ec
    uint16_t packetIdPub4 = mqttClient.publish(MQTT_PUB_EC, 1, true, String(uS).c_str());                            
    Serial.printf("Publishing on topic %s at QoS 1, packetId: %i", MQTT_PUB_EC, packetIdPub4);
    Serial.printf("Message: %.3f \n", uS);    

 // Publish an MQTT message on topic esp32/ufire/ec
    uint16_t packetIdPub5 = mqttClient.publish(MQTT_PUB_PH, 1, true, String(pH).c_str());                            
    Serial.printf("Publishing on topic %s at QoS 1, packetId: %i", MQTT_PUB_PH, packetIdPub5);
    Serial.printf("Message: %.2f \n", pH);
  }
}
Last edited by Gromyjoe on Sat Dec 18, 2021 12:41 am, edited 1 time in total.
Shimbob
LED Wizard
LED Wizard
Reactions:
Posts: 642
Joined: Mon Nov 27, 2017 11:29 pm

While I'm not totally familiar with that particular MQTT library, a quick glance tells me there's no function defined to handle when you receive a message. From the example code at https://platformio.org/lib/show/346/AsyncMqttClient:

Code: Select all

void setup() {
...
  mqttClient.onMessage(onMqttMessage);
and:

Code: Select all

void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
  Serial.println("Publish received.");
  Serial.print("  topic: ");
  Serial.println(topic);
  Serial.print("  qos: ");
  Serial.println(properties.qos);
  Serial.print("  dup: ");
  Serial.println(properties.dup);
  Serial.print("  retain: ");
  Serial.println(properties.retain);
  Serial.print("  len: ");
  Serial.println(len);
  Serial.print("  index: ");
  Serial.println(index);
  Serial.print("  total: ");
  Serial.println(total);
}
This tells the MQTT library that when it receives a message, it will call your onMqttMessage function and pass along the message. It's within this function that you would put your tests of if(message==etc) and perform the appropriate action.
Shimbob
LED Wizard
LED Wizard
Reactions:
Posts: 642
Joined: Mon Nov 27, 2017 11:29 pm

Something like this?

Code: Select all

void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
  if(0) {
    Serial.println("Publish received.");
    Serial.print("  topic: "); Serial.println(topic);
    Serial.print("  qos: "); Serial.println(properties.qos);
    Serial.print("  dup: "); Serial.println(properties.dup);
    Serial.print("  retain: "); Serial.println(properties.retain);
    Serial.print("  len: "); Serial.println(len);
    Serial.print("  index: "); Serial.println(index);
    Serial.print("  total: "); Serial.println(total);
  }
  
  if (0 == strcmp(topic, CALIB_TOPIC)) {
    switch (payload) {
      case "EC1_HIGH":
        ec.calibrateProbeHigh(EC_HIGH_SOLUTION_EC, CALIBRATION_TEMP);
        break;
      case "EC1_LOW":
        ec.calibrateProbeLow(EC_LOW_SOLUTION_EC, CALIBRATION_TEMP);
        break;
      case "PH1_HIGH":
        ph.calibrateProbeHigh(PH_HIGH_SOLUTION_PH);
        break;
      case "PH1_LOW":
        ph.calibrateProbeLow(PH_LOW_SOLUTION_PH);
        break;
      default:
        Serial.print("unhandled payload:"); Serial.println(payload);
    }
  }
}
sshscp
LED-Curious
LED-Curious
Reactions:
Posts: 14
Joined: Fri Dec 03, 2021 7:00 pm

After setServer you need to create a callback with the following:

Code: Select all

mqttClient.setCallback(callback);

mqttClient.subscribe("Example/topic/");
You will also need to create a function with the same name to parse the mqtt message. You can have 1 callback and use a single function to parse it but you will have to parse from passed topic variable for each one.

Code: Select all

void callback(char* topic, byte* payload, unsigned int length) {

    if (strcmp(topic, "Example/topic/") == 0) {

    // do work here
    }
}
That should get you going but you should think about building in a test to see if mqtt disconnects and reconnect if it does (it will happen often). Please let me know if you need a more complete example.
Shimbob
LED Wizard
LED Wizard
Reactions:
Posts: 642
Joined: Mon Nov 27, 2017 11:29 pm

He's using a different mqtt library, the setCallback() function is named onMessage() and the parameters passed to the callback function are different.

But yeah, needs a callback function defined.
Gromyjoe
LED Enthusiast
LED Enthusiast
Reactions:
Posts: 48
Joined: Fri Jul 03, 2020 2:28 pm

Here is what I ended up using. I can subscribe to topic esp32/calibrate "PH_CAL_LOW" or any function I added. even reset the saved data prior to calibrating. Im sure theres better ways of doing the job but i think its a start! And hopefully someone else comes across this looking for a starting point.


Code: Select all

#include <WiFi.h>
#include <PubSubClient.h>
#include <Wire.h>
#include <Adafruit_BME280.h>
#include <Adafruit_Sensor.h>
#include <uFire_EC.h>
#include <uFire_pH.h>
#include <uFire_ISE.h>

// Replace the next variables with your SSID/Password combination
const char* ssid = "...";
const char* password = "...";

// Add your MQTT Broker IP address, example:
//const char* mqtt_server = "192.168.1.144";
const char* mqtt_server = "192.168.68.100";
const int mqttPort = 1883;
const char* mqttUser = "mqtt-user";
const char* mqttPassword = "....";

WiFiClient espClient;
PubSubClient client(espClient);
long lastMsg = 0;
char msg[50];
int value = 0;

//uncomment the following lines if you're using SPI
/*#include <SPI.h>
#define BME_SCK 18
#define BME_MISO 19
#define BME_MOSI 23
#define BME_CS 5*/

Adafruit_BME280 bme; // I2C
//Adafruit_BME280 bme(BME_CS); // hardware SPI
//Adafruit_BME280 bme(BME_CS, BME_MOSI, BME_MISO, BME_SCK); // software SPI
float temperature = 0;
float humidity = 0;

//calibration solutions used
#define PH_CAL_SOLUTION_SINGLE  1.0
#define PH_CAL_SOLUTION_HIGH    7.0
#define PH_CAL_SOLUTION_LOW     4.0
#define EC_CAL_SOLUTION_SINGLE  1.0
#define EC_CAL_SOLUTION_HIGH   10.0
#define EC_CAL_SOLUTION_LOW    1.0
#define CALIBRATION_TEMP       20.0


uFire_EC ec;
uFire_pH ph;
uFire_ISE mv;

float uS;
float pH;

// LED Pin
const int ledPinEC = 26;
const int ledPinPH = 25;

void setup() {
  Serial.begin(115200);
  Wire.begin();
  ec.begin();
  ph.begin();
  // default settings
  // (you can also pass in a Wire library object like &Wire2)
  //status = bme.begin();  
  if (!bme.begin(0x76)) {
    Serial.println("Could not find a valid BME280 sensor, check wiring!");
    while (1);
  }
  setup_wifi();
  client.setServer(mqtt_server, 1883);
  client.setCallback(callback);

  pinMode(ledPinEC, OUTPUT);
  pinMode(ledPinPH, OUTPUT);

  }

void setup_wifi() {
  delay(10);
  // We start by connecting to a WiFi network
  Serial.println();
  Serial.print("Connecting to ");
  Serial.println(ssid);

  WiFi.begin(ssid, password);

  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }

  Serial.println("");
  Serial.println("WiFi connected");
  Serial.println("IP address: ");
  Serial.println(WiFi.localIP());
}

void callback(char* topic, byte* payload, unsigned int length) {
  Serial.print("Message arrived on topic: ");
  Serial.print(topic);
  Serial.print(". Message: ");
  String messageTemp;
  
  for (int i = 0; i < length; i++) {
    Serial.print((char)payload[i]);
    messageTemp += (char)payload[i];
  }
  Serial.println();

  // Feel free to add more if statements to control more GPIOs with MQTT

  // If a message is received on the topic esp32/output, you check if the message is either "on" or "off". 
  // Changes the output state according to the message
  
  if (String(topic) == "esp32/calibrate") {
    //
    Serial.print("Probe Calibration ");
    if(messageTemp == "EC_CAL"){
      Serial.println("Calibrate EC Probe");
      ec.calibrateProbe(EC_CAL_SOLUTION_SINGLE, CALIBRATION_TEMP);
      Serial.println(ec.getCalibrateOffset());
      digitalWrite(ledPinEC, LOW);
    }
    else if(messageTemp == "EC_CAL_LOW"){
      Serial.println("Calibrate EC LOW");
      ec.calibrateProbeLow(EC_CAL_SOLUTION_LOW, CALIBRATION_TEMP);
      Serial.println(ec.getCalibrateLowReading());
      digitalWrite(ledPinEC, LOW);
    }
    else if(messageTemp == "EC_RESET"){
      Serial.println("RESET EC PROBE");
      ec.reset();
      digitalWrite(ledPinEC, HIGH);
    }
    else if(messageTemp == "EC_CAL_HIGH"){
      Serial.println("Calibrate EC HIGH");
      ec.calibrateProbeHigh(EC_CAL_SOLUTION_HIGH, CALIBRATION_TEMP);
      Serial.println(ec.getCalibrateHighReading());
      digitalWrite(ledPinEC, LOW);
    }
    else if(messageTemp == "PH_CAL"){
      Serial.println("Calibrate PH");
      ph.calibrateSingle(PH_CAL_SOLUTION_SINGLE);
      Serial.print("Single PT REF ");
      Serial.println(ph.getCalibrateOffset(), 2);
      digitalWrite(ledPinPH, LOW);
    }    
    else if(messageTemp == "PH_CAL_HIGH"){
      Serial.println("Calibrate PH HIGH");
      ph.calibrateProbeHigh(PH_CAL_SOLUTION_HIGH);
      Serial.print("high reference | read: ");
      Serial.print(ph.getCalibrateHighReference(), 2);
      Serial.print(" | ");
      Serial.println(ph.getCalibrateHighReading(), 2);
      digitalWrite(ledPinPH, LOW);
    }
    else if(messageTemp == "PH_CAL_LOW"){
      Serial.println("Calibrate PH LOW");
      ph.calibrateProbeLow(PH_CAL_SOLUTION_LOW);
      Serial.print("low reference | read: ");
      Serial.print(ph.getCalibrateLowReference(), 2);
      Serial.print(" | ");
      Serial.println(ph.getCalibrateLowReading(), 2);
      digitalWrite(ledPinPH, LOW);
    }
    else if(messageTemp == "PH_RESET"){
      Serial.println("RESET PH PROBE");
      ph.reset();
      digitalWrite(ledPinPH, HIGH);
    }    
  }
}

void reconnect() {
  // Loop until we're reconnected
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");
    // Attempt to connect
    if (client.connect("ESP32Client", mqttUser, mqttPassword )) {
      Serial.println("connected");
      // Subscribe
      client.subscribe("esp32/calibrate");
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      // Wait 5 seconds before retrying
      delay(5000);
    }
  }
}
void loop() {
  if (!client.connected()) {
    reconnect();
  }
  client.loop();

  long now = millis();
  if (now - lastMsg > 5000) {
    lastMsg = now;
    
    // Temperature in Celsius
    temperature = bme.readTemperature();   
    // Uncomment the next line to set temperature in Fahrenheit 
    // (and comment the previous temperature line)
    //temperature = 1.8 * bme.readTemperature() + 32; // Temperature in Fahrenheit
    
    // Convert the value to a char array
    char tempString[8];
    dtostrf(temperature, 1, 2, tempString);
    Serial.print("Temperature: ");
    Serial.println(tempString);
    client.publish("esp32/temp", tempString);

    humidity = bme.readHumidity();
    
    // Convert the value to a char array
    char humString[8];
    dtostrf(humidity, 1, 2, humString);
    Serial.print("Humidity: ");
    Serial.println(humString);
    client.publish("esp32/hum", humString);

    uS = ec.measureEC();

     // Convert the value to a char array
    char ecString[8];
    dtostrf(uS, 1, 2, ecString);
    Serial.print("ec: ");
    Serial.println(ecString);
    client.publish("esp32/ec", ecString);
    
    pH = ph.measurepH();
    
    // Convert the value to a char array
    char phString[8];
    dtostrf(pH, 1, 2, phString);
    Serial.print("PH: ");
    Serial.println(phString);
    client.publish("esp32/ph", phString);
  }
}
Post Reply