Newer
Older
IoTP / src / main / java / info / istlab / IoTP / MQTTClient.java
package info.istlab.IoTP;

import java.sql.Timestamp;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * https://qiita.com/koichi_baseball/items/c5b982d2afdd4f6f0887
 */
public class MQTTClient {
    MqttThread mthread;

    public MQTTClient() {
        mthread = new MqttThread("mqtt.istlab.info", "office/locker/+");
        // // 1回だけpublish
        // MqttPublisher publisher = new MqttPublisher("10.104.91.19",
        // "office/locker/log");
        // publisher.publish("test from java");
        // if (mthread.isNew()) {
        // System.out.println(MqttThread.recieveData);
        // }
    }

    public static void main(String[] args) {
        new MQTTClient();
    }

}

class MqttThread implements Runnable {
    String broker = "";
    String topic = "";
    static MqttSubscriber subscriber;
    static String recieveData = "";
    Thread thread = null;

    // コンストラクタ
    public MqttThread(String brokerHostName, String subscribeTopic) {
        broker = brokerHostName;
        topic = subscribeTopic;
        subscriber = new MqttSubscriber(broker, "1883", topic, null);
        thread = new Thread(this);
        thread.start();
    }

    public void run() {
        while (thread != null) {
            try {
                subscriber.subscribe();
            } catch (MqttException me) {
                System.out.println("reason: " + me.getReasonCode());
                System.out.println("message: " + me.getMessage());
                System.out.println("localize: " + me.getLocalizedMessage());
                System.out.println("cause: " + me.getCause());
                System.out.println("exception: " + me);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public boolean isNew() {
        boolean flag = false;
        flag = subscriber.isNew();
        return flag;
    }
}

class MqttPublisher {
    String broker = "";
    String topic = "";

    /**
     * コンストラクタ
     * 
     * @param brokerHostName
     * @param publishTopic
     */
    public MqttPublisher(String brokerHostName, String publishTopic) {
        broker = "tcp://" + brokerHostName + ":1883";
        topic = publishTopic;
    }

    /**
     * 引数をpublishする.
     * 
     * @param publishMessage
     */
    public void publish(String publishMessage) {
        final int qos = 2;
        final String clientId = "Publisher";
        try {
            MqttClient mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(false);

            mqttClient.connect(connOpts);

            MqttMessage message = new MqttMessage(publishMessage.getBytes());
            message.setQos(qos);
            // System.out.println("publish message");
            // System.out.println("Topic : "+topic+", Message : "+message);
            mqttClient.publish(topic, message);

            mqttClient.disconnect();
            mqttClient.close();
        } catch (MqttException me) {
            System.out.println("reason: " + me.getReasonCode());
            System.out.println("message: " + me.getMessage());
            System.out.println("localize: " + me.getLocalizedMessage());
            System.out.println("cause: " + me.getCause());
            System.out.println("exception: " + me);
        }
    }
}

class MqttSubscriber implements MqttCallback {
    Timestamp recieveTime;
    Timestamp lastTime;
    String broker = "";
    String topic = "";
    MqttCallback callback;

    /**
     * コンストラクタ
     * 
     * @param brokerHostName
     * @param subscribeTopic
     */
    public MqttSubscriber(String brokerHostName, String port, String subscribeTopic, MqttCallback _callback) {
        broker = "tcp://" + brokerHostName + ":" + port;
        topic = subscribeTopic;
        callback = _callback;
    }

    /**
     * MQTTブローカーとの接続を失った時に呼び出される.
     */
    @Override
    public void connectionLost(Throwable cause) {
        System.out.println("Connection lost");
        System.exit(1);
    }

    /**
     * メッセージを受信したときに呼び出される.
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws MqttException {
        // System.out.println("Message arrived");
        System.out.print("Topic : "+ topic+"  ");
        System.out.println("Message: " + new String(message.getPayload()));

        // recieveTime = new Timestamp(System.currentTimeMillis());
        // MqttThread.recieveData = new String(message.getPayload());
    }

    /**
     * Subscribeしたか否かを判断する.
     * 
     * @return isNewフラグ
     */
    public boolean isNew() {
        boolean flag = false;
        if (recieveTime == lastTime)
            flag = false;
        else
            flag = true;
        lastTime = recieveTime;
        return flag;
    }

    public static void main(String[] args) throws InterruptedException {
        try {
            MqttSubscriber subscriber = new MqttSubscriber("10.104.91.19", "1883", "+", null);
            subscriber.subscribe();
        } catch (MqttException me) {
            System.out.println("reason: " + me.getReasonCode());
            System.out.println("message: " + me.getMessage());
            System.out.println("localize: " + me.getLocalizedMessage());
            System.out.println("cause: " + me.getCause());
            System.out.println("exception: " + me);
        }
    }

    /**
     * メッセージを受信する.
     * 標準入力があるまで接続し続ける.
     * 
     * @throws MqttException
     * @throws InterruptedException
     */
    MqttClient client;
    public void subscribe() throws MqttException, InterruptedException {
        // Subscribe設定
        final int qos = 2;
        final String clientId = "Subscribe";

        if (client != null) {
            client.disconnect();
            client.close();
        }
        client = new MqttClient(broker, clientId, new MemoryPersistence());
        if (callback == null) client.setCallback(this);
        else client.setCallback(callback);

        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(false);
        client.setTimeToWait(10000);

        System.out.println("Connecting to broker:" + broker);
        client.connect(connOpts);

        System.out.println("Subscribe topic :" + topic);
        client.subscribe(topic, qos);

        // BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
        // try {
        //     // 標準入力を受け取るまで待ち続ける
        //     br.readLine();
        // } catch (IOException e) {
        //     System.exit(1);
        // }
        // client.disconnect();
        // client.close();
    }
    public void unsubscribe() throws MqttException, InterruptedException {
        if (client != null) {
            client.disconnect();
            client.close();
        }
        System.out.println("Disconnected");
    }

    /**
     * MqttCallbackに必要,subscribeからは呼び出されなさそう.
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken arg0) {

    }
}