MQTTを使いたい。2

今回も MQTT です。
前回はとりあえず動かしたいサンプルだったので、今回は少し実利用を想定してみます。

IOT での実利用とは定期的な通知だと思っているので、
今回は実行時にメッセージを発行ではなく、
起動中に定期的にメッセージを送ってみます。

目次

Broker の実装

今回の参考は、npm で mosca をインストールした際パッケージの中です。node_modules\mosca\examples\kafka\server.jsを参考にしていきます。

mosca.serverで作成されたオブジェクトに、onというメソッドがあるので、第一引数に文字列で指定して、さまざま実行の状況でログ表示などができます。
第一引数に指定できる文字列は、node_modules\mosca\examples\kafka\server.jsを参考にして確認できた範囲では以下の通り、
コメントは読み取れた?各イベントの状況

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//サーバーの準備ができた。
server.on("ready", () => {});
//サーバーの処理でエラーが発生した
server.on("error", (err) => {});
//クライアントが接続された
server.on("clientConnected", (client) => {});
//クライアントが切断しています?
server.on("clientDisconnecting", (client) => {});
//クライアントが切断した
server.on("clientDisconnected", (client) => {});
//メッセージが発行された
server.on("published", (packet, client) => {});
//メッセージの購読を宣言した
server.on("subscribed", (topic, client) => {});
//メッセージの購読の解除を宣言した
server.on("unsubscribed", (topic, client) => {});

ただし、node_modules\mosca\examples\kafka\server.jsで見ると、
第二引数にclientが入っているけど、その中のプロパティがいざ使うとundifinedだったなんてことがあるので注意。

これを使って、以下のようなサンプルを作ってみた。

mqtt_sample_bro.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
const mosca = require("mosca");

const server = new mosca.Server({
port: 1883,
});

server.on("ready", () => {
console.log("Server is ready!");
});

server.on("error", (err) => {
console.log(err);
});

server.on("clientConnected", (client) => {
console.log(`Clinet Conected : ${client.id}`);
});

server.on("clientDisconnecting", (client) => {
console.log(`ClinetDisconecting`);
console.log(` ClinetDisconecting Client Id: ${client.id}`);
});

server.on("clientDisconnected", (client) => {
console.log(`ClinetDisconected`);
console.log(` ClientDisconnected Client Id: ${client.id}`);
});

server.on("published", (packet, client) => {
console.log(`Published`);
console.log(` Published Topic : ${packet.topic}`);
console.log(` Published MessageID: ${packet.messageId}`);
console.log(` Published Payload : ${packet.payload.toString()}`);
});

server.on("subscribed", (topic, client) => {
console.log(`Subscribed`);
console.log(` Subscribed Topic : ${topic}`);
console.log(` Subscribed Clinet Id: ${client.id}`);
});

server.on("unsubscribed", (topic, client) => {
console.log(`Unsubscribed`);
console.log(` Unsubscribed Topic : ${topic}`);
console.log(` Unsubscribed Clinet Id: ${client.id}`);
});

mqtt_sample_bro.jsをサーバーに前回記載した以下のmqtt_simple_pub.jsをクライアントにして実行してみる。

mqtt_simple_pub.js
1
2
3
4
5
6
7
8
const mqtt = require("mqtt");

const client = mqtt.connect("mqtt://localhost");

client.on("connect", () => {
client.publish("msg", "message!");
client.end();
});

実行すると以下のように表示される。

mqtt_sample_bro.js実行結果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Server is ready!
Clinet Conected : mqttjs_e1ad9970
Published
Published Topic : $SYS/vReb6J6/new/clients
Published MessageID: 0TcpZsD
Published Payload : mqttjs_e1ad9970
ClinetDisconected
ClientDisconnected Client Id: mqttjs_e1ad9970
Published
Published Topic : msg
Published MessageID: 0KEgcdD
Published Payload : message!
Published
Published Topic : $SYS/vReb6J6/disconnect/clients
Published MessageID: w7Aurpx
Published Payload : mqttjs_e1ad9970

クライアントが、mqttjs_e1ad9970という ID で接続していることがわかる。
実行ログを見ると、 最初の Published の部分で、送っていないメッセージが送られている。
どうやら各端末に新しいクライアントが参加したと伝える通知らしい。
最後のところでも、端末が切断した通知も送られている。
mosca を使ったサンプルで見かけるコードには、端末の接続と切断時の topic に含まれるnewdisconnectを調べて console.log()に出さないのがよくある。

クライアントで送信直後に、client.end()しているせいなのかmessage!を送信した履歴の記載の順番がおかしい。
クライアントで、client.end()をしないでおくと以下のように、
接続して、メッセージを発行しただけになるので素直。

mqtt_sample_bro.js実行結果2
1
2
3
4
5
6
7
8
Published
Published Topic : $SYS/JZ_OYvp/new/clients
Published MessageID: 7ouwYu1
Published Payload : mqttjs_03845c3d
Published
Published Topic : msg
Published MessageID: p_CKa~O
Published Payload : message!

Client の実装

Publisher

以前作成したものは、実行直後にメッセージ送って終わってたので、
今回は 1 秒ごとにメッセージを送ってみる。
加えて、勝手に生成されてたクライアント ID を自分で割り当てしてみる。
mqtt.connect()の第二引数でオプションを割り当てる。
送信側は以下の通り

mqtt_sample_pub.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const mqtt = require("mqtt");

const client = mqtt.connect("mqtt://localhost", {
clientId: "Pub1234",
});

const topic = "msg";
const messagetag = `message`;

client.on("connect", () => {
setInterval(() => {
let message = `${messagetag} ${new Date()}`;
console.log(`Message Publish`);
console.log(` Publish Topic :${topic}`);
console.log(` Publish Message:${message}`);
client.publish(topic, message);
}, 1000);
});

Subscriber

受信側は、ほとんど前回変更は無いけどクライアント ID をこちらも自分で割り当ててみる。

mqtt_sample_sub.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const mqtt = require("mqtt");

const client = mqtt.connect("mqtt://localhost", {
clientId: "Sub1234",
});

client.on("connect", () => {
client.subscribe("msg");
});

client.on("message", (topic, message) => {
console.log(`Message Receive`);
console.log(` Recive Topic :${topic}`);
console.log(` Recive Message:${message.toString()}`);
});

実行はそれぞれ別のコンソールで以下の順番で実行する。

  1. node mqtt_sample_bro.js
  2. node mqtt_sample_sub.js
  3. node mqtt_sample_pub.js

それぞれの実行結果は次のようになる。

mqtt_sample_bro.js 実行画面

mqtt_sample_bro.js実行画面
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

Server is ready!
Clinet Conected : Sub1234
Published
Published Topic : $SYS/kh8vpXY/new/clients
Published MessageID: lRSsCxP
Published Payload : Sub1234
Subscribed
Subscribed Topic : msg
Subscribed Clinet Id: Sub1234
Published
Published Topic : $SYS/kh8vpXY/new/subscribes
Published MessageID: WCvDJNN
Published Payload : {"clientId":"Sub1234","topic":"msg"}
Clinet Conected : Pub1234
Published
Published Topic : $SYS/kh8vpXY/new/clients
Published MessageID: mZuQr~5
Published Payload : Pub1234
Published
Published Topic : msg
Published MessageID: ouThwDu
Published Payload : message Sun Dec 23 2018 16:58:38 GMT+0900 (東京 (標
準時))
Published
Published Topic : msg
Published MessageID: ojEiyIA
Published Payload : message Sun Dec 23 2018 16:58:39 GMT+0900 (東京 (標
準時))
ClinetDisconected
ClientDisconnected Client Id: Pub1234
Published
Published Topic : $SYS/kh8vpXY/disconnect/clients
Published MessageID: iv_tgUA
Published Payload : Pub1234

mqtt_sample_sub.js 実行画面

mqtt_sample_sub.js実行画面
1
2
3
4
5
6
Message Receive
Recive Topic :msg
Recive Message:message Sun Dec 23 2018 16:58:38 GMT+0900 (東京 (標準時))
Message Receive
Recive Topic :msg
Recive Message:message Sun Dec 23 2018 16:58:39 GMT+0900 (東京 (標準時))

mqtt_sample_pub.js 実行画面

mqtt_sample_pub.js実行画面
1
2
3
4
5
6
Message Publish
Publish Topic :msg
Publish Message:message Sun Dec 23 2018 16:58:38 GMT+0900 (東京 (標準時))
Message Publish
Publish Topic :msg
Publish Message:message Sun Dec 23 2018 16:58:39 GMT+0900 (東京 (標準時))

これでメッセージ定期実行で送らせて、モニタできました。

次は、そろそろ m5stack か obniz の操作から、サーバーにメッセージを送ってみたいと思います。

ではでは。