001package org.opengion.fukurou.queue;
002
003import java.util.ArrayList;
004import java.util.List;
005
006import javax.jms.JMSException;
007// import javax.jms.Message;
008import javax.jms.MessageListener;
009import javax.jms.Queue;
010import javax.jms.QueueConnection;
011import javax.jms.QueueConnectionFactory;
012import javax.jms.QueueReceiver;
013import javax.jms.QueueSession;
014import javax.jms.TextMessage;
015import javax.naming.Context;
016import javax.naming.InitialContext;
017
018import org.apache.activemq.ActiveMQConnectionFactory;
019
020/**
021 * MQメッセージ受信用クラス。
022 *
023 * @og.group メッセージ連携
024 *
025 * @og.rev 5.10.15.2 (2019/09/20) 新規作成
026 *
027 * @version 5
028 * @author oota
029 * @since JDK7
030 */
031public class QueueReceive_MQ implements QueueReceive{
032
033        private QueueConnection connection ;
034        private QueueSession session ;
035        private QueueReceiver receiver ;
036//      List<QueueReceiver> listReceiver ;
037        private List<QueueReceiver> listReceiver ;              // 7.2.9.4 (2020/11/20)
038        private boolean batch ;
039
040        /**
041         * 接続処理
042         * メッセージキューサーバに接続します。
043         *
044         *  @param jmsServer jsmサーバ
045         *  @param sqsAccessKey sqs用awsアクセスキー(MQでは利用しません)
046         *  @param sqsSecretKey sqs用awsシークレットキー(MQでは利用しません)
047         */
048        public void connect(final String jmsServer, final String sqsAccessKey, final String sqsSecretKey) {
049                connect(jmsServer);
050        }
051
052        /**
053         * 接続処理
054         * jmsServerに接続します。
055         * MQの場合は、受信リスナーを設定して、随時メッセージ受信処理を行います。
056         * SQSの場合は最大受信件数の10件の処理を行います。
057         *
058         * @param jmsServer 接続先情報 MQ:jndi接続先 SQS:URL
059         */
060        private void connect(final String jmsServer) {
061                try {
062                        if(batch) {
063                                // バッチ用
064                                final String mqUserId = System.getProperty("mqUserId");
065                                final String mqPassword = System.getProperty("mqPassword");
066                                final QueueConnectionFactory factory = new ActiveMQConnectionFactory(jmsServer);
067                                connection = factory.createQueueConnection(mqUserId,  mqPassword);
068                        }else {
069                                // jndi接続用
070                                final Context ctx = new InitialContext();
071                                final QueueConnectionFactory factory = (QueueConnectionFactory)ctx.lookup("java:comp/env/" + jmsServer);
072                                connection = factory.createQueueConnection();
073                        }
074
075                        connection.start();
076
077                        // Receiveの作成
078                        session = connection.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE);
079
080                        // 初期化
081                        listReceiver = new ArrayList<QueueReceiver>();
082                }catch(Exception e) {
083                        throw new RuntimeException("MQサーバの接続に失敗しました。:" + e.getMessage());
084                }
085        }
086
087        /**
088         * 受信処理
089         * メッセージキューの受信の処理を行います。
090         *
091         * @param queueName キューの名前
092         * @return キュー情報格納クラス
093         */
094        @Override
095        public QueueInfo receive(final String queueName) {
096                QueueInfo queueInfo = null;
097
098                try {
099                        final Queue queue = session.createQueue(queueName);
100                        receiver = session.createReceiver(queue);
101
102                        final TextMessage msg = (TextMessage)receiver.receive(1000);
103
104                        if(msg != null) {
105                                // メッセージ受信の確認応答
106                                msg.acknowledge();
107
108                                // メッセージの設定
109                                queueInfo = new QueueInfo();
110                                queueInfo.setMessage(msg.getText());
111                        }
112                }catch(Exception e) {
113                        throw new RuntimeException(e.getMessage());
114                }finally {
115                        try {
116                                receiver.close();
117                        }catch(Exception e) { ; }
118                }
119
120                return queueInfo;
121        }
122
123        /**
124         * リスナーの起動
125         * 指定したキュー名に対して、
126         * MessageListenerのリスナーを設定します。
127         *
128         * @param queueName キュー名
129         * @param listener MessageListerを実装したクラス
130         */
131        @Override
132        public void setListener(final String queueName, final MessageListener listener) {
133                QueueReceiver receiver = null;
134                try {
135                        final Queue queue = session.createQueue(queueName);
136                        receiver = session.createReceiver(queue);
137                        receiver.setMessageListener(listener);
138
139                        // リスナーの起動
140                        listReceiver.add(receiver);
141                }catch(JMSException e) {
142                        throw new RuntimeException("リスナーの起動に失敗しました。" + e.getMessage());
143                }
144        }
145
146        /**
147         * クローズリスナー
148         * レシーバーをクローズすることで、
149         * リスナーの処理を終了します。
150         */
151        public void closeListener() {
152                for(final QueueReceiver receiver: listReceiver) {
153                        try {
154                                receiver.close();
155                        }catch(Exception e) { ; }
156                }
157
158                // 初期化
159                listReceiver = null;
160                listReceiver = new ArrayList<QueueReceiver>();
161        }
162
163        /**
164         * クローズ処理
165         * クローズ処理を行います。
166         */
167        @Override
168        public void close() {
169                if(receiver != null) {
170                        try {
171                                receiver.close();
172                        }catch(Exception e) { ; }
173                }
174                if(session != null) {
175                        try {
176                                session.close();
177                        }catch(Exception e) { ; }
178                }
179                if(connection != null) {
180                        try {
181                                connection.close();
182                        }catch(Exception e) { ; }
183                }
184        }
185
186        /**
187         * バッチ処理判定フラグを設定します。
188         *
189         * @param batchFlg バッチ処理判定フラグ
190         */
191        public void setBatchFlg(final Boolean batchFlg) {
192                batch = batchFlg;
193        }
194
195        /**
196         * 検証用メソッド
197         * テスト用のメソッドです。
198         *
199         * @param args 引数
200         */
201        public static void main(final String[] args) {
202                final QueueReceive receive = new QueueReceive_MQ();
203                final String jmsServer = "tcp://localhost:61616";
204
205                // バッチフラグにtrueを設定
206                // 未設定の場合は、tomcatのjndi接続処理が実行されます。
207                receive.setBatchFlg(true);
208
209                // 認証情報の設定
210                System.setProperty("mqUserId", "admin");
211                System.setProperty("mqPassword", "admin");
212
213                // 接続
214                receive.connect(jmsServer, null, null);
215
216                // 処理対象のキュー名
217                final String queueName = "queue01";
218
219
220                // ** 1件受信する場合
221                final QueueInfo queueInfo = receive.receive(queueName);
222                if(queueInfo != null) {
223                        System.out.println("message:" + queueInfo.getMessage());
224                }else {
225                        System.out.println("キューが登録されていません。");
226                }
227
228//              // ** リスナーを設定して、受信を検知すると処理を実行します。(MQのみ)
229//              // MessageListerを実装した、QueueReceiveListenerクラスを作成します。
230//              MessageListener listener = new QueueReceiveListener();
231//              receive.setListener(queueName, listener);
232//              // 複数のキューにリスナーを設定することも可能です。
233//              receive.setListener("queue02", listener);
234//
235//              try {
236//                      // 1分間リスナーを起動しておく場合の、プロセス待機処理
237//                      Thread.sleep(60 * 1000);
238//              }catch(InterruptedException e) {
239//                      throw new RuntimeException(e.getMessage());
240//              }
241
242                // リスナー利用時は、closeListenerを実行して、解放してください。
243                receive.closeListener();
244
245                // 終了処理
246                receive.close();
247        }
248
249//      /**
250//       * QueueReceiveリスナークラス
251//       * リスナー用のクラスです。
252//       * MQに設定することで、メッセージが受信されると、
253//       * 自動的にonMessageメソッドが実行されます。
254//       *
255//       */
256//      static class QueueReceiveListener implements MessageListener {
257//              /**
258//               * メッセージ受信処理
259//               * MQサーバにメッセージが受信されると、
260//               * メソッドの処理が行われます。
261//               *
262//               * @param message 受信メッセージ
263//               */
264//              @Override
265//              public void onMessage(final Message message) {
266//
267//                      // メッセージ受信
268//                      TextMessage msg = (TextMessage) message;
269//                      String msgText = "";
270//
271//                      try {
272//                              // キューサーバのメッセージを取得
273//                              msgText = msg.getText();
274//                              // メーッセージの受信応答を返します。
275//                              msg.acknowledge();
276//
277//                              System.out.println("message:" + msgText);
278//
279//                      } catch (JMSException e) {
280//                              throw new RuntimeException(e.getMessage());
281//                      }
282//              }
283//      }
284
285}