網關與云平臺之間的通信方式一般都是客戶指定的,就那么幾種(阿里云、華為云、騰訊云、亞馬遜AWS平臺)。一般都要求網關與云平臺之間處于長連接的狀態,這樣云端的各種指令就可以隨時發送到網關。

與云平臺之間的 MQTT 連接
目前的幾大物聯網云平臺,都提供了不同的接入方式。對于網關來說,應用最多的就是 MQTT 接入。
我們知道,MQTT 只是一個協議而已,不同的編程語言中都有實現,在 C 語言中也有好幾個實現。
在網關內部,運行著一個后臺 deamon: MQTT Broker,其實就是 mosquitto 這個可執行程序,它充當著消息總線的功能。這里請大家注意:因為這個消息總線是運行在嵌入式系統的內部,接入總線的客戶端就是需要相互通信的那些進程。這些進程的數量是有限的,即使是一個比較復雜的系統,最多十幾個進程也就差不多了。因此,mosquitto 這個實現是完全可以支撐系統負載的。
那么,如果在云端部署一個 MQTT Broker,理論上是可以直接使用 mosquitto 這個實現來作為消息總線的,但是你要評估接入的客戶端(也就是網關)在一個什么樣的數量級,考慮到并發的問題,一定要做壓力測試。
對于后臺開發,我的經驗不多,不敢(也不能)多言,誤導大家就罪過了。不過,對于一般的學習和測試來說,在云端直接部署 mosquitto 作為消息總線,是沒有問題的。
Proc_Bridge 進程:外部和內部消息總線之間的橋接器
下面這張圖,說明了 Proc_Bridge 進程在這個模型中的作用:

從云平臺消息總線接收到的消息,需要轉發到內部的消息總線;從內部消息總線接收到的消息,需要轉發到云平臺的消息總線;
如果用 mosquitto 來實現,應該如何來實現呢?
1. mosquitto 的 API 接口
mosquitto 這個實現是基于回調函數的機制來運行的,例如:
// 連接成功時的回調函數void my_connect_callback(struct mosquitto *mosq, void *obj, int rc){ // ...}
// 連接失敗時的回調函數void my_disconnect_callback(struct mosquitto *mosq, void *obj, int result){ // ...}
// 接收到消息時的回調函數void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message){ // ..}
int main(){ // 其他代碼 // ... // 創建一個 mosquitto 對象 struct mosquitto g_mosq = mosquitto_new("client_name", true, NULL); // 注冊回調函數 mosquitto_connect_callback_set(g_mosq, my_connect_callback); mosquitto_disconnect_callback_set(g_mosq, my_disconnect_callback); mosquitto_message_callback_set(g_mosq, my_message_callback); // 這里還有其他的回調函數設置 // 開始連接到消息總線 mosquitto_connect(g_mosq, "127.0.0.1", 1883, 60); while(1) { int rc = mosquitto_loop(g_mosq, -1, 1); if (rc) { printf("mqtt_portal: mosquitto_loop rc = %d ", rc); sleep(1); mosquitto_reconnect(g_mosq); } } mosquitto_destroy(g_mosq); mosquitto_lib_cleanup(); return 0;}
以上代碼就是一個 mosquitto 客戶端的最簡代碼了,使用回調函數的機制,讓程序的開發非常簡單。
mosquitto 把底層的細節問題都幫助我們處理了,只要我們注冊的函數被調用了,就說明發生了我們感興趣的事件。
這樣的回調機制在各種開源軟件中使用的比較多,比如:glib 里的定時器、libevent通訊處理、libmodbus 里的數據處理、linux 內核中的驅動開發和定時器,都是這個套路,一通百通!
在網關中的每個進程,只需要添加上面這部分代碼,就可以掛載到消息總線上,從而可以與其它進程進行收發數據了。
2. 利用 UserData 指針,實現多個 MQTT 連接
上面的實例僅僅是連接到一個消息總線上,對于一個普通的進程來說,達到了通信的目的。
但是對于 Proc_Bridge 進程來說,還沒有達到目的,因為這個進程處于橋接的位置,需要同時連接到遠程和本地這兩個消息總線上。那么應該如何實現呢?
看一下 mosquitto_new 這個函數的簽名:
* obj - A user pointer that will be passed as an argument to any * callbacks that are specified.最后一個參數的作用是:可以設置一個用戶自己的數據(作為指針傳入),那么mosquitto 在回調我們的注冊的任何一個函數時,都會把這個指針傳入。
因此,我們可以利用這個參數來區分這個連接是遠程連接?還是本地連接。libmosq_EXPORT struct mosquitto *mosquitto_new(const char *id, bool clean_session, void *obj);
所以,我們可以定義一個結構體變量,把一個 MQTT 連接的所有信息都記錄在這里,然后注冊給 mosquitto。當 mosquitto 回調函數時,把這個結構體變量的指針回傳給我們,這樣就拿到了這個連接的所有數據,在某種程度上來說,這也是一種面向對象的思想。
// 從來表示一個 MQTT 連接的結構體typedef struct{ char *id; char *name; char *pw; char *host; int port; pthread_t tHandle; struct mosquitto *mosq; int mqtt_num;}MQData;
完整的代碼已經放到網盤里了,為了讓你先從原理上看明白,我把關鍵幾個地方的代碼貼在這里:
// 分配結構體變量MQData userData = (MQData *)malloc(sizeof(MQData));
// 設置屬于這里連接的參數: id, name 等等
// 創建 mosquitto 對象時,傳入 userData。struct mosquitto *mosq = mosquitto_new(userData->id, true, userData);
// 在回調函數中,把 obj 指針前轉成 MQData 指針static void messageCB(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message){ MQData *userData = (MQData *)obj; // 此時就可以根據 userData 指針中的內容分辨出這是哪一個鏈接了}
另外一個問題:不知道你是否注意到示例中的 mosquitto_loop() 這個函數?這個函數需要放在 while 死循環中不停的調用,才能出發 mosuiqtto 內部的事件。(其實在 mosuiqtto 中,還提供了另一個簡化的函數 mosquitto_loop_forever)。
也就是說:在每個連接中,需要持續的觸發 mosquitto 底層的事件,才能讓消息系統順利的收發。因此,在示例代碼中,使用兩個線程分別連接到云平臺的總線和內部的總線。