diff --git a/pom.xml b/pom.xml index b620d46..2039be4 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ info.istlab.IoTP IoTP jar - 0.30 + 0.31 IoTP http://maven.apache.org @@ -36,6 +36,13 @@ autocomplete 3.2.0 + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + + diff --git a/src/main/java/info/istlab/IoTP/Launcher.java b/src/main/java/info/istlab/IoTP/Launcher.java index 4e2bdf2..06644d6 100644 --- a/src/main/java/info/istlab/IoTP/Launcher.java +++ b/src/main/java/info/istlab/IoTP/Launcher.java @@ -26,7 +26,7 @@ public class Launcher extends JFrame implements MouseInputListener, KeyListener, Runnable { public static Launcher theapp; - public static String version = "0.30"; + public static String version = "0.31"; static int reboot_msec = 2000; // JPanel mainP; File root; @@ -143,6 +143,10 @@ mi.addActionListener(ae -> openURL("https://cit.istlab.info/m5stickcplus/index.html")); menu.add(mi); + mi = new JMenuItem(isEnglish ? "MQTT Client" : "MQTTクライアント"); + mi.addActionListener(ae -> MqttWindow.create() ); + menu.add(mi); + setJMenuBar(menuBar); // getContentPane().add(mainP, BorderLayout.WEST); diff --git a/src/main/java/info/istlab/IoTP/MQTTClient.java b/src/main/java/info/istlab/IoTP/MQTTClient.java new file mode 100644 index 0000000..95c7603 --- /dev/null +++ b/src/main/java/info/istlab/IoTP/MQTTClient.java @@ -0,0 +1,252 @@ +package info.istlab.IoTP; + +import java.sql.Timestamp; + +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +/** + * https://qiita.com/koichi_baseball/items/c5b982d2afdd4f6f0887 + */ +public class MQTTClient { + MqttThread mthread; + + public MQTTClient() { + mthread = new MqttThread("mqtt.istlab.info", "office/locker/+"); + // // 1回だけpublish + // MqttPublisher publisher = new MqttPublisher("10.104.91.19", + // "office/locker/log"); + // publisher.publish("test from java"); + // if (mthread.isNew()) { + // System.out.println(MqttThread.recieveData); + // } + } + + public static void main(String[] args) { + new MQTTClient(); + } + +} + +class MqttThread implements Runnable { + String broker = ""; + String topic = ""; + static MqttSubscriber subscriber; + static String recieveData = ""; + Thread thread = null; + + // コンストラクタ + public MqttThread(String brokerHostName, String subscribeTopic) { + broker = brokerHostName; + topic = subscribeTopic; + subscriber = new MqttSubscriber(broker, "1883", topic, null); + thread = new Thread(this); + thread.start(); + } + + public void run() { + while (thread != null) { + try { + subscriber.subscribe(); + } catch (MqttException me) { + System.out.println("reason: " + me.getReasonCode()); + System.out.println("message: " + me.getMessage()); + System.out.println("localize: " + me.getLocalizedMessage()); + System.out.println("cause: " + me.getCause()); + System.out.println("exception: " + me); + } catch (InterruptedException e) { + e.printStackTrace(); + } + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public boolean isNew() { + boolean flag = false; + flag = subscriber.isNew(); + return flag; + } +} + +class MqttPublisher { + String broker = ""; + String topic = ""; + + /** + * コンストラクタ + * + * @param brokerHostName + * @param publishTopic + */ + public MqttPublisher(String brokerHostName, String publishTopic) { + broker = "tcp://" + brokerHostName + ":1883"; + topic = publishTopic; + } + + /** + * 引数をpublishする. + * + * @param publishMessage + */ + public void publish(String publishMessage) { + final int qos = 2; + final String clientId = "Publisher"; + try { + MqttClient mqttClient = new MqttClient(broker, clientId, new MemoryPersistence()); + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(false); + + mqttClient.connect(connOpts); + + MqttMessage message = new MqttMessage(publishMessage.getBytes()); + message.setQos(qos); + // System.out.println("publish message"); + // System.out.println("Topic : "+topic+", Message : "+message); + mqttClient.publish(topic, message); + + mqttClient.disconnect(); + mqttClient.close(); + } catch (MqttException me) { + System.out.println("reason: " + me.getReasonCode()); + System.out.println("message: " + me.getMessage()); + System.out.println("localize: " + me.getLocalizedMessage()); + System.out.println("cause: " + me.getCause()); + System.out.println("exception: " + me); + } + } +} + +class MqttSubscriber implements MqttCallback { + Timestamp recieveTime; + Timestamp lastTime; + String broker = ""; + String topic = ""; + MqttCallback callback; + + /** + * コンストラクタ + * + * @param brokerHostName + * @param subscribeTopic + */ + public MqttSubscriber(String brokerHostName, String port, String subscribeTopic, MqttCallback _callback) { + broker = "tcp://" + brokerHostName + ":" + port; + topic = subscribeTopic; + callback = _callback; + } + + /** + * MQTTブローカーとの接続を失った時に呼び出される. + */ + @Override + public void connectionLost(Throwable cause) { + System.out.println("Connection lost"); + System.exit(1); + } + + /** + * メッセージを受信したときに呼び出される. + */ + @Override + public void messageArrived(String topic, MqttMessage message) throws MqttException { + // System.out.println("Message arrived"); + System.out.print("Topic : "+ topic+" "); + System.out.println("Message: " + new String(message.getPayload())); + + // recieveTime = new Timestamp(System.currentTimeMillis()); + // MqttThread.recieveData = new String(message.getPayload()); + } + + /** + * Subscribeしたか否かを判断する. + * + * @return isNewフラグ + */ + public boolean isNew() { + boolean flag = false; + if (recieveTime == lastTime) + flag = false; + else + flag = true; + lastTime = recieveTime; + return flag; + } + + public static void main(String[] args) throws InterruptedException { + try { + MqttSubscriber subscriber = new MqttSubscriber("10.104.91.19", "1883", "+", null); + subscriber.subscribe(); + } catch (MqttException me) { + System.out.println("reason: " + me.getReasonCode()); + System.out.println("message: " + me.getMessage()); + System.out.println("localize: " + me.getLocalizedMessage()); + System.out.println("cause: " + me.getCause()); + System.out.println("exception: " + me); + } + } + + /** + * メッセージを受信する. + * 標準入力があるまで接続し続ける. + * + * @throws MqttException + * @throws InterruptedException + */ + MqttClient client; + public void subscribe() throws MqttException, InterruptedException { + // Subscribe設定 + final int qos = 2; + final String clientId = "Subscribe"; + + if (client != null) { + client.disconnect(); + client.close(); + } + client = new MqttClient(broker, clientId, new MemoryPersistence()); + if (callback == null) client.setCallback(this); + else client.setCallback(callback); + + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(false); + + System.out.println("Connecting to broker:" + broker); + client.connect(connOpts); + + System.out.println("Subscribe topic :" + topic); + client.subscribe(topic, qos); + + // BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); + // try { + // // 標準入力を受け取るまで待ち続ける + // br.readLine(); + // } catch (IOException e) { + // System.exit(1); + // } + // client.disconnect(); + // client.close(); + } + public void unsubscribe() throws MqttException, InterruptedException { + if (client != null) { + client.disconnect(); + client.close(); + } + System.out.println("Disconnected"); + } + + /** + * MqttCallbackに必要,subscribeからは呼び出されなさそう. + */ + @Override + public void deliveryComplete(IMqttDeliveryToken arg0) { + + } +} diff --git a/src/main/java/info/istlab/IoTP/MqttWindow.java b/src/main/java/info/istlab/IoTP/MqttWindow.java new file mode 100644 index 0000000..ab8c607 --- /dev/null +++ b/src/main/java/info/istlab/IoTP/MqttWindow.java @@ -0,0 +1,296 @@ +package info.istlab.IoTP; + +import java.awt.BorderLayout; +import java.awt.Color; +import java.awt.Dimension; +import java.awt.Point; +import java.awt.Toolkit; +import java.awt.event.ActionEvent; +import java.awt.event.ActionListener; +import java.awt.event.FocusEvent; +import java.awt.event.FocusListener; +import java.awt.event.WindowEvent; +import java.awt.event.WindowListener; + +import javax.swing.BorderFactory; +import javax.swing.BoxLayout; +import javax.swing.JButton; +import javax.swing.JFrame; +import javax.swing.JPanel; +import javax.swing.JScrollPane; +import javax.swing.JTextArea; +import javax.swing.JTextField; + +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +public class MqttWindow extends JFrame implements ActionListener, WindowListener, MqttCallback { + static MqttWindow theMqttWindow; + JTextField jtf; + JTextArea jta; + Thread thread; + + JTextField brokerjtf; + JTextField brokerportjtf; + JTextField topicjtf; + JButton connectB; + MqttSubscriber subscriber; + + public static MqttWindow create(){ + if (theMqttWindow != null) return theMqttWindow; + else theMqttWindow = new MqttWindow(); + return theMqttWindow; + } + + public static void main(String[] args) { + new MqttWindow(); + } + + public MqttWindow() { + super("MQTT TestApp"); + + brokerjtf = new JTextField("192.168.11.11", 30); + // brokerjtf = new JTextField("10.104.91.19", 30); + brokerportjtf = new JTextField("1883", 10); + topicjtf = new JTextField("+"); + connectB = new JButton("Connect"); + connectB.addActionListener(e -> connect(e)); + + JPanel settingP = new JPanel(); + settingP.setLayout(new BoxLayout(settingP, BoxLayout.Y_AXIS)); + JPanel v01p = new JPanel(new BorderLayout()); + v01p.setBorder(BorderFactory.createTitledBorder("Broker ( ex. 192.168.11.11 | mqtt.istlab.info ) and port ( ex. 1883 )")); + v01p.add(brokerjtf, BorderLayout.CENTER); + v01p.add(brokerportjtf, BorderLayout.EAST); + settingP.add(v01p); + JPanel v02p = new JPanel(new BorderLayout()); + v02p.setBorder(BorderFactory.createTitledBorder("Topic ( ex. jikken | jikken/A1han | jikken/+ )")); + v02p.add(topicjtf); + v02p.add(connectB, BorderLayout.EAST); + settingP.add(v02p); + + jtf = new JTextField("(ここをクリックして、入力して、Enterを押すと、publishします)"); + jtf.setForeground(Color.GRAY); + jtf.addFocusListener(new FocusListener() { + @Override + public void focusGained(FocusEvent e) { + if (jtf.getText().equals("(ここをクリックして、入力して、Enterを押すと、publishします)")) { + jtf.setText(""); + jtf.setForeground(Color.BLACK); + } + } + + @Override + public void focusLost(FocusEvent e) { + if (jtf.getText().isEmpty()) { + jtf.setForeground(Color.GRAY); + jtf.setText("(ここをクリックして、入力して、Enterを押すと、publishします)"); + } + } + }); + jtf.setBackground(new Color(210, 255, 255)); + // settingP.add(jtf); + + getContentPane().add(settingP, BorderLayout.NORTH); + jta = new JTextArea(); + jta.setBackground(Color.lightGray); + getContentPane().add(new JScrollPane(jta), BorderLayout.CENTER); + + JPanel v03p = new JPanel(new BorderLayout()); + v03p.setBorder(BorderFactory.createTitledBorder("Publish to the topic (未接続でもTopicが指定されていれば送信できます)")); + v03p.add(jtf); + + getContentPane().add(v03p, BorderLayout.SOUTH); + // compileB = new JButton("compile"); + // compileB.addActionListener(e -> { + // compile(e.getActionCommand()); + // }); + // getContentPane().add(compileB, BorderLayout.SOUTH); + + setSize(600, 500); + setLocation(centerOfScreen(getSize())); + setVisible(true); + setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE); + jtf.addActionListener(this); + addWindowListener(this); // Xをおしたらserial closeする + + // フォーカスをあえて外す + // jta.requestFocus(); + appendToJTA("まだ接続していません。Topicを設定してConnectをおしてください。\n"); + } + + private void connect(ActionEvent e) { + if (e.getActionCommand().equals("Connect")) { + subscriber = new MqttSubscriber(brokerjtf.getText(), brokerportjtf.getText(), topicjtf.getText(), this); + try { + subscriber.subscribe(); + } catch (MqttException | InterruptedException e1) { + e1.printStackTrace(); + } + appendToJTA("Connected.\n"); + jta.setBackground(Color.white); + topicjtf.setEditable(false); + connectB.setText("Disconnect"); + } else if (e.getActionCommand().equals("Disconnect")) { + try { + subscriber.unsubscribe(); + } catch (MqttException | InterruptedException e1) { + e1.printStackTrace(); + } finally { + subscriber = null; + appendToJTA("Disconnected.\n"); + jta.setBackground(Color.gray); + topicjtf.setEditable(true); + } + connectB.setText("Connect"); + } + } + + void appendToJTA(String s) { + if (jta == null) + return; + jta.append(s); + int len = jta.getDocument().getLength(); + if (jta.getSelectedText() == null) + jta.setCaretPosition(len); + } + + // ここで、画面中心にウィンドウ表示するため、いろいろ計算する + public static Point centerOfScreen(Dimension winSize) { + Dimension screenSize = Toolkit.getDefaultToolkit().getScreenSize(); + return new Point((screenSize.width - winSize.width) / 2, (screenSize.height - winSize.height) / 2); + } + + // ここで、画面中心にウィンドウ表示するため、いろいろ計算する + public static Point leftOfScreen(Dimension winSize) { + Dimension screenSize = Toolkit.getDefaultToolkit().getScreenSize(); + return new Point(10, (screenSize.height - winSize.height) / 2); + } + + /** + * 送信(publish)する + */ + @Override + public void actionPerformed(ActionEvent e) { + String message = e.getActionCommand(); + // System.out.println(action); + try { + if (message.length() > 0) { + // Topic name check + String topic = topicjtf.getText(); + if ( topic.contains("#") || topic.contains("+") ){ + appendToJTA("Publishするとき、Topic にワイルドカード文字 #+ を含めることはできません。"); + return; + } + MqttPublisher publisher = new MqttPublisher(brokerjtf.getText(), + topicjtf.getText()); + publisher.publish(message); + } + + } catch (Exception ex) { + System.out.println(ex.getMessage()); + } + } + + // /** + // * 再接続用(つかっていない) + // */ + // @Override + // public void run() { + // appendToJTA("(Waiting to reconnect.)\n"); + // try { + // Thread.sleep(10000); + // } catch (InterruptedException e) { + // } + // while (thread != null) { + // if (!sp.openPort()) { + // System.out.println("Unable to open the port."); + // return; + // } else { + // thread = null; + // appendToJTA("Reconnected.\n"); + // } + // try { + // Thread.sleep(2000); + // } catch (InterruptedException e) { + // } + + // } + // } + + private void closeWin() { + if (subscriber == null) + return; + try { + subscriber.unsubscribe(); + } catch (MqttException | InterruptedException e1) { + e1.printStackTrace(); + } finally { + subscriber = null; + appendToJTA("Disconnected.\n"); + jta.setBackground(Color.gray); + } + connectB.setText("Connect"); + if (Launcher.theapp == null) + System.exit(0); + else + dispose(); + } + + @Override + public void windowOpened(WindowEvent e) { + } + + @Override + public void windowClosing(WindowEvent e) { + closeWin(); + } + + @Override + public void windowClosed(WindowEvent e) { + closeWin(); + } + + @Override + public void windowIconified(WindowEvent e) { + } + + @Override + public void windowDeiconified(WindowEvent e) { + } + + @Override + public void windowActivated(WindowEvent e) { + } + + @Override + public void windowDeactivated(WindowEvent e) { + } + + /** + * MQTTブローカーとの接続を失った時に呼び出される. + */ + @Override + public void connectionLost(Throwable cause) { + System.out.println("Connection lost"); + System.exit(1); + } + + /** + * メッセージを受信したときに呼び出される. + */ + @Override + public void messageArrived(String topic, MqttMessage message) throws MqttException { + // System.out.println("Message arrived"); + appendToJTA("Topic : " + topic + " Message: " + new String(message.getPayload()) + "\n"); + // recieveTime = new Timestamp(System.currentTimeMillis()); + // MqttThread.recieveData = new String(message.getPayload()); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + throw new UnsupportedOperationException("Unimplemented method 'deliveryComplete'"); + } +}