c++ mqtt库(paho)

mqtt c++ 库

从github上下载mqtt库:https://github.com/eclipse/paho.mqtt.cpp

linux

安装依赖

  • cmake 和 gcc,如果已经安装就不需要安装

    sudo apt-get install build-essential gcc make cmake cmake-gui cmake-curses-gui
    
  • apt-get install libssl-dev 
    apt-get install doxygen graphviz
    apt-get install libcppunit-dev
    

编译

编译安装mqtt c库

因为c++mqtt的库必须需要c语言的mqtt库支持,所以先编译mqtt c的库。

$ git clone https://github.com/eclipse/paho.mqtt.c.git
$ cd paho.mqtt.c
$ git checkout v1.3.1

$ cmake -Bbuild -H. -DPAHO_WITH_SSL=ON -DPAHO_ENABLE_TESTING=OFF
$ sudo cmake --build build/ --target install
$ sudo ldconfig

编译安装mqtt cpp库

$ git clone https://github.com/eclipse/paho.mqtt.cpp
$ cd paho.mqtt.cpp
$ git checkout v1.1
$ cmake -Bbuild -H. -DPAHO_BUILD_DOCUMENTATION=TRUE -DPAHO_BUILD_SAMPLES=TRUE
$ sudo cmake --build build/ --target install
$ sudo ldconfig

新建mqtt 类

现金mqtt.h

#ifndef __mqtt_h__
#define __mqtt_h__

#include <iostream>
#include <cstdlib>
#include <string>
#include <thread>	// For sleep
#include <atomic>
#include <chrono>
#include <cstring>
#include "mqtt/async_client.h"

using namespace std;

class Mqtt{

public:
	Mqtt(string addr, string clientid);
	~Mqtt();
	void set_callback(mqtt::callback& cb);
	bool connect();
	bool disConnect();
	bool publish(const string& topic,const void* payload, size_t n,int qos, bool retained);
	bool subscribe(const string& topic,
						int qos, mqtt::iaction_listener& cb);
	bool unSubscribe(const string& topic);
private:
	mqtt::async_client* client;
	

};
#endif

新建mqtt.cpp

#include "mqtt.h"
const auto TIMEOUT = std::chrono::seconds(10);
const int MAX_BUFFERED_MSGS = 120;	// 120 * 5sec => 10min off-line buffering
const int PERIOD = 5;

Mqtt::Mqtt(string addr, string clientid){
	//参数1: 服务器地址 参数2:客户端id
	client = new mqtt::async_client(addr, clientid);		
}
Mqtt::~Mqtt(){
	std::cout << "Delete client" << std::endl;
	delete client;
}
void Mqtt::set_callback(mqtt::callback& cb){
	client->set_callback(cb);
}
bool Mqtt::connect(){
	try {	
		mqtt::connect_options connOpts;
		connOpts.set_keep_alive_interval(MAX_BUFFERED_MSGS * PERIOD);
		connOpts.set_clean_session(true);
		connOpts.set_automatic_reconnect(true);
		std::cout << "Connecting to the MQTT server..." << std::flush;

        client->connect(connOpts)->wait();
		std::cout << "OK" << std::endl;
		return true;
	}catch (const mqtt::exception& exc) {
        std::cerr << exc.what() << std::endl;
        return false;
    } 
}
bool Mqtt::disConnect(){
	try {
		std::cout << "\nDisconnecting from the MQTT server..." << std::flush;
		client->disconnect()->wait();
		std::cout << "OK" << std::endl;
		return true;
	}catch (const mqtt::exception& exc) {
        std::cerr << exc.what() << std::endl;
        return false;
    } 
}
bool Mqtt::publish(const string& topic,const void* payload, size_t n,int qos, bool retained){

	client->publish(topic, payload,n, qos, retained);

	return true;
}
bool Mqtt::subscribe(const string& topic,
						int qos,
						mqtt::iaction_listener& cb){
	client->subscribe(topic,qos,nullptr,cb);
	return true;
}

bool Mqtt::unSubscribe(const string& topic){
	client->unsubscribe(topic)->wait(); 
	return true;
}

新建测试demo

新建mqtt_client.cpp

#include "mqtt.h"

const char * PAYLOAD = { "Hello World!" };


int main(int argc, char* argv[]) {

	Mqtt mqtt_client("tcp://192.168.100.100:1883", "async_publish");

	mqtt_client.connect();

	std::cout << "Publish to MQTT server..." << std::endl;
	mqtt_client.publish("xzw", PAYLOAD, strlen(PAYLOAD), 0, false);
	std::cout << "OK" << std::endl;
	mqtt_client.disConnect();
}

编写CMakeLists.txt

# CMake 最低版本号要求
cmake_minimum_required (VERSION 2.8)

# 项目信息
project (mqttDemo)

include_directories(./mqtt)

aux_source_directory(./ CURRENT_SRCS)
aux_source_directory(./mqtt MQTT_SRCS)

set(SRCS ${MQTT_SRCS} ${CURRENT_SRCS})
LINK_DIRECTORIES("/usr/local/lib")

# 指定生成目标 
add_executable(${PROJECT_NAME} ${SRCS})

target_link_libraries(${PROJECT_NAME} paho-mqtt3a paho-mqtt3as paho-mqtt3c paho-mqtt3cs paho-mqttpp3)

编译运行

root@default:/share/paho_mqtt_cpp# ls
CMakeLists.txt  mqtt  mqtt_client.cpp
root@default:/share/paho_mqtt_cpp# ls mqtt
mqtt.cpp  mqtt.h
root@default:/share/paho_mqtt_cpp# mkdir build
root@default:/share/paho_mqtt_cpp# cd build
root@default:/share/paho_mqtt_cpp/build# cmake ..
-- The C compiler identification is GNU 8.4.0
。。。
-- Configuring done
-- Generating done
-- Build files have been written to: /share/paho_mqtt_cpp/build
root@default:/share/paho_mqtt_cpp/build# make
Scanning dependencies of target mqttDemo
[ 33%] Building CXX object CMakeFiles/mqttDemo.dir/mqtt/mqtt.cpp.o
。。。
[100%] Built target mqttDemo

35

添加订阅

修改mqtt_client.cpp

#include "mqtt.h"
#include <unistd.h>


const char * PAYLOAD = { "Hello World!" };


class action_listener : public virtual mqtt::iaction_listener
{
	std::string name_;

	void on_failure(const mqtt::token& tok) override {
		std::cout << name_ << " failure";
		if (tok.get_message_id() != 0)
			std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
		std::cout << std::endl;
	}

	void on_success(const mqtt::token& tok) override {
		std::cout << name_ << " success";
		if (tok.get_message_id() != 0)
			std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
		auto top = tok.get_topics();
		if (top && !top->empty())
			std::cout << "\ttoken topic: '" << (*top)[0] << "', ..." << std::endl;
		std::cout << std::endl;
	}

public:
	action_listener(const std::string& name) : name_(name) {}
};


int main(int argc, char* argv[]) {

	Mqtt mqtt_client("tcp://192.168.100.100:1883", "async_publish");

	mqtt_client.connect();

	action_listener subListener_("xzw_listerner");
	mqtt_client.subscribe("xzw", 0,  subListener_);
	
	std::cout << "Publish to MQTT server..." << std::endl;
	mqtt_client.publish("xzw", PAYLOAD, strlen(PAYLOAD), 0, false);
	std::cout << "OK" << std::endl;
	
	sleep(1);
	mqtt_client.unSubscribe("xzw");
	mqtt_client.disConnect();

}

编译运行

root@default:/share/paho_mqtt_cpp/build# ./mqttDemo 
Connecting to the MQTT server...OK
Publish to MQTT server...
OK
xzw_listerner success for token: [1]
        token topic: 'xzw', ...


Disconnecting from the MQTT server...OK
Delete client

添加回调函数

修改mqtt_client.cpp

#include "mqtt.h"
#include <unistd.h>


const char * PAYLOAD = { "Hello World!" };


class action_listener : public virtual mqtt::iaction_listener
{
	std::string name_;

	void on_failure(const mqtt::token& tok) override {
		std::cout << name_ << " failure";
		if (tok.get_message_id() != 0)
			std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
		std::cout << std::endl;
	}

	void on_success(const mqtt::token& tok) override {
		std::cout << name_ << " success";
		if (tok.get_message_id() != 0)
			std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
		auto top = tok.get_topics();
		if (top && !top->empty())
			std::cout << "\ttoken topic: '" << (*top)[0] << "', ..." << std::endl;
		std::cout << std::endl;
	}

public:
	action_listener(const std::string& name) : name_(name) {}
};

class callback : public virtual mqtt::callback,
					public virtual mqtt::iaction_listener

{
	void reconnect() {
	}

	// Re-connection failure
	void on_failure(const mqtt::token& tok) override {
	}

	// (Re)connection success
	// Either this or connected() can be used for callbacks.
	void on_success(const mqtt::token& tok) override {}

	// (Re)connection success
	void connected(const std::string& cause) override {
	}

	// Callback for when the connection is lost.
	// This will initiate the attempt to manually reconnect.
	void connection_lost(const std::string& cause) override {
	}

	// Callback for when a message arrives.
	void message_arrived(mqtt::const_message_ptr msg) override {
		std::cout << "Message arrived" << std::endl;
		std::cout << "\ttopic: '" << msg->get_topic() << "'" << std::endl;
		std::cout << "\tpayload: '" << msg->to_string() << "'\n" << std::endl;
	}

	void delivery_complete(mqtt::delivery_token_ptr token) override {}

public:
	callback() {}
};

int main(int argc, char* argv[]) {

	Mqtt mqtt_client("tcp://192.168.100.100:1883", "async_publish");
	callback cb;
	mqtt_client.set_callback(cb);
	
	mqtt_client.connect();

	action_listener subListener_("xzw_listerner");
	mqtt_client.subscribe("xzw", 0,  subListener_);
	
	std::cout << "Publish to MQTT server..." << std::endl;
	mqtt_client.publish("xzw", PAYLOAD, strlen(PAYLOAD), 0, false);
	std::cout << "OK" << std::endl;
	
	sleep(1);
	mqtt_client.unSubscribe("xzw");
	mqtt_client.disConnect();

}

编译运行

root@default:/share/paho_mqtt_cpp/build# ./mqttDemo 
Connecting to the MQTT server...OK
Publish to MQTT server...
OK
xzw_listerner success for token: [1]
        token topic: 'xzw', ...

Message arrived
        topic: 'xzw'
        payload: 'Hello World!'


Disconnecting from the MQTT server...OK
Delete client

添加断开重连

修改mqtt_client.cpp

#include "mqtt.h"
#include <unistd.h>

bool mqtt_state;
const char * PAYLOAD = { "Hello World!" };
Mqtt* mqtt_client;


class action_listener : public virtual mqtt::iaction_listener
{
	std::string name_;

	void on_failure(const mqtt::token& tok) override {
		std::cout << name_ << " failure";
		if (tok.get_message_id() != 0)
			std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
		std::cout << std::endl;
	}

	void on_success(const mqtt::token& tok) override {
		std::cout << name_ << " success";
		if (tok.get_message_id() != 0)
			std::cout << " for token: [" << tok.get_message_id() << "]" << std::endl;
		auto top = tok.get_topics();
		if (top && !top->empty())
			std::cout << "\ttoken topic: '" << (*top)[0] << "', ..." << std::endl;
		std::cout << std::endl;
	}

public:
	action_listener(const std::string& name) : name_(name) {}
};
action_listener* subListener_;


class callback : public virtual mqtt::callback,
					public virtual mqtt::iaction_listener

{
	void reconnect() {
		std::cout << "reconnect !" << std::endl;
	}

	// Re-connection failure
	void on_failure(const mqtt::token& tok) override {
		std::cout << "on_failure" << std::endl;
	}

	// (Re)connection success
	// Either this or connected() can be used for callbacks.
	void on_success(const mqtt::token& tok) override {
		std::cout << "on_success" << std::endl;
	}

	// (Re)connection success
	void connected(const std::string& cause) override {
		std::cout << "connected" << std::endl;
		mqtt_state = true;
		mqtt_client->subscribe("xzw", 0,  *subListener_);
	}

	// Callback for when the connection is lost.
	// This will initiate the attempt to manually reconnect.
	void connection_lost(const std::string& cause) override {
		std::cout << "connection_lost" << std::endl;
		mqtt_state = false;
	}

	// Callback for when a message arrives.
	void message_arrived(mqtt::const_message_ptr msg) override {
		std::cout << "Message arrived" << std::endl;
		std::cout << "\ttopic: '" << msg->get_topic() << "'" << std::endl;
		std::cout << "\tpayload: '" << msg->to_string() << "'\n" << std::endl;
	}

	void delivery_complete(mqtt::delivery_token_ptr token) override {}

public:
	callback() {}
};

int main(int argc, char* argv[]) {
	mqtt_client = new Mqtt("tcp://192.168.16.166:1883", "async_publish");
	subListener_ = new action_listener("xzw_listerner");
	callback cb;
	mqtt_client->set_callback(cb);
	
	mqtt_client->connect();
	
	

	
	std::cout << "Publish to MQTT server..." << std::endl;
	mqtt_client->publish("xzw", PAYLOAD, strlen(PAYLOAD), 0, false);
	std::cout << "OK" << std::endl;

	while (true){
		if(!mqtt_state){
			mqtt_client->connect();
		}

	}
	

	mqtt_client->unSubscribe("xzw");
	mqtt_client->disConnect();

}

编译运行

中途关闭mqtt或断开网络,模拟重新连接

root@default:/share/paho_mqtt_cpp/build# ./mqttDemo
Connecting to the MQTT server...OK
Publish to MQTT server...
OK
Connecting to the MQTT server...connected
xzw_listerner success for token: [1]
        token topic: 'xzw', ...

Message arrived
        topic: 'xzw'
        payload: 'Hello xiazhongwei'

connection_lost
MQTT error [-1]: TCP/TLS connect failure
Connecting to the MQTT server...MQTT error [-1]: TCP/TLS connect failure
Connecting to the MQTT server...MQTT error [-1]: TCP/TLS connect failure
Connecting to the MQTT server...OK
Connecting to the MQTT server...connected
xzw_listerner success for token: [1]
        token topic: 'xzw', ...

Message arrived
        topic: 'xzw'
        payload: 'Hello xiazhongwei'

windows

安装依赖

安装openssl,这个库必须安装否则有问腿

地址:http://slproweb.com/products/Win32OpenSSL.html,我下载的是

Win64OpenSSL-1_1_1g.exe

编译

编译安装mqtt c库

因为c++mqtt的库必须需要c语言的mqtt库支持,所以先编译mqtt c的库。

git clone https://github.com/eclipse/paho.mqtt.c.git
cd paho.mqtt.c
git checkout v1.3.1

配置cmake

47

  • 要选择x64否则调用64为ssl会有问题

点击configure,然后勾选上PAHO_WITH_SSL,然后重新configure,将openssl路径填进去,再次configure并generate

48

用vs打开Eclipse Paho C.sln,点击右键all build,

编译安装mqtt cpp库

git clone https://github.com/eclipse/paho.mqtt.cpp
cd paho.mqtt.cpp
git checkout v1.1

同样使用cmake配置mqtt cpp,平台也选择x64,修改配置项

50

创建vs项目

新建vs文件夹,并在该文件夹下创建vs项目

51

在vs目录下新建paho-c /paho-cpp文件夹,并在这两个文件夹下分别新建include 和 lib目录

52

配置好预处理器 “WIN64”,不然会报一堆错误

55

53

54

windows版没有unistd库,需要注释掉这部分代码,编译成功后把dll文件放在可执行文件相同目录下即可使用

问题

error C2589: “(”: “::”右边的非法标记 error C2059: 语法错误 : “::”

函数模板max与Visual C++中的全局的宏max冲突。 解决项目属性   ——> C/C++ ——> 预处理器 ——> 预处理器定义 (此处添加预定义编译开关   NOMINMAX)
error C4996: 'sprintf': This function or variable may be unsafe.

这种微软的警告,主要因为那些C库的函数,很多函数内部是不进行参数检测的(包括越界类的),微软担心使用这些会造成内存异常,所以就改写了同样功能的函数,改写了的函数进行了参数的检测,使用这些新的函数会更安全和便捷。
解决打开项目----项目属性---配置属性----C/C++ ----预处理器----预处理定义,
添加_CRT_SECURE_NO_DEPRECATE和_SCL_SECURE_NO_DEPRECATE这两个宏。

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×