package info.istlab.IoTP; import java.sql.Timestamp; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * https://qiita.com/koichi_baseball/items/c5b982d2afdd4f6f0887 */ public class MQTTClient { MqttThread mthread; public MQTTClient() { mthread = new MqttThread("mqtt.istlab.info", "office/locker/+"); // // 1回だけpublish // MqttPublisher publisher = new MqttPublisher("10.104.91.19", // "office/locker/log"); // publisher.publish("test from java"); // if (mthread.isNew()) { // System.out.println(MqttThread.recieveData); // } } public static void main(String[] args) { new MQTTClient(); } } class MqttThread implements Runnable { String broker = ""; String topic = ""; static MqttSubscriber subscriber; static String recieveData = ""; Thread thread = null; // コンストラクタ public MqttThread(String brokerHostName, String subscribeTopic) { broker = brokerHostName; topic = subscribeTopic; subscriber = new MqttSubscriber(broker, "1883", topic, null); thread = new Thread(this); thread.start(); } public void run() { while (thread != null) { try { subscriber.subscribe(); } catch (MqttException me) { System.out.println("reason: " + me.getReasonCode()); System.out.println("message: " + me.getMessage()); System.out.println("localize: " + me.getLocalizedMessage()); System.out.println("cause: " + me.getCause()); System.out.println("exception: " + me); } catch (InterruptedException e) { e.printStackTrace(); } try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } public boolean isNew() { boolean flag = false; flag = subscriber.isNew(); return flag; } } class MqttPublisher { String broker = ""; String topic = ""; /** * コンストラクタ * * @param brokerHostName * @param publishTopic */ public MqttPublisher(String brokerHostName, String publishTopic) { broker = "tcp://" + brokerHostName + ":1883"; topic = publishTopic; } /** * 引数をpublishする. * * @param publishMessage */ public void publish(String publishMessage) { final int qos = 2; final String clientId = "Publisher"; try { MqttClient mqttClient = new MqttClient(broker, clientId, new MemoryPersistence()); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(false); mqttClient.connect(connOpts); MqttMessage message = new MqttMessage(publishMessage.getBytes()); message.setQos(qos); // System.out.println("publish message"); // System.out.println("Topic : "+topic+", Message : "+message); mqttClient.publish(topic, message); mqttClient.disconnect(); mqttClient.close(); } catch (MqttException me) { System.out.println("reason: " + me.getReasonCode()); System.out.println("message: " + me.getMessage()); System.out.println("localize: " + me.getLocalizedMessage()); System.out.println("cause: " + me.getCause()); System.out.println("exception: " + me); } } } class MqttSubscriber implements MqttCallback { Timestamp recieveTime; Timestamp lastTime; String broker = ""; String topic = ""; MqttCallback callback; /** * コンストラクタ * * @param brokerHostName * @param subscribeTopic */ public MqttSubscriber(String brokerHostName, String port, String subscribeTopic, MqttCallback _callback) { broker = "tcp://" + brokerHostName + ":" + port; topic = subscribeTopic; callback = _callback; } /** * MQTTブローカーとの接続を失った時に呼び出される. */ @Override public void connectionLost(Throwable cause) { System.out.println("Connection lost"); System.exit(1); } /** * メッセージを受信したときに呼び出される. */ @Override public void messageArrived(String topic, MqttMessage message) throws MqttException { // System.out.println("Message arrived"); System.out.print("Topic : "+ topic+" "); System.out.println("Message: " + new String(message.getPayload())); // recieveTime = new Timestamp(System.currentTimeMillis()); // MqttThread.recieveData = new String(message.getPayload()); } /** * Subscribeしたか否かを判断する. * * @return isNewフラグ */ public boolean isNew() { boolean flag = false; if (recieveTime == lastTime) flag = false; else flag = true; lastTime = recieveTime; return flag; } public static void main(String[] args) throws InterruptedException { try { MqttSubscriber subscriber = new MqttSubscriber("10.104.91.19", "1883", "+", null); subscriber.subscribe(); } catch (MqttException me) { System.out.println("reason: " + me.getReasonCode()); System.out.println("message: " + me.getMessage()); System.out.println("localize: " + me.getLocalizedMessage()); System.out.println("cause: " + me.getCause()); System.out.println("exception: " + me); } } /** * メッセージを受信する. * 標準入力があるまで接続し続ける. * * @throws MqttException * @throws InterruptedException */ MqttClient client; public void subscribe() throws MqttException, InterruptedException { // Subscribe設定 final int qos = 2; final String clientId = "Subscribe"; if (client != null) { client.disconnect(); client.close(); } client = new MqttClient(broker, clientId, new MemoryPersistence()); if (callback == null) client.setCallback(this); else client.setCallback(callback); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(false); client.setTimeToWait(10000); System.out.println("Connecting to broker:" + broker); client.connect(connOpts); System.out.println("Subscribe topic :" + topic); client.subscribe(topic, qos); // BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); // try { // // 標準入力を受け取るまで待ち続ける // br.readLine(); // } catch (IOException e) { // System.exit(1); // } // client.disconnect(); // client.close(); } public void unsubscribe() throws MqttException, InterruptedException { if (client != null) { client.disconnect(); client.close(); } System.out.println("Disconnected"); } /** * MqttCallbackに必要,subscribeからは呼び出されなさそう. */ @Override public void deliveryComplete(IMqttDeliveryToken arg0) { } }