001package org.opengion.plugin.daemon; 002 003import java.util.Date; 004import java.util.Locale; // 7.2.9.4 (2020/11/20) 005 006import javax.jms.QueueSession; 007 008// import org.hsqldb.lib.StringUtil; 009import org.opengion.fukurou.util.StringUtil; // 7.0.6.0 (2019/10/07) 010import org.opengion.fukurou.queue.QueueInfo; 011import org.opengion.fukurou.queue.QueueSend; 012import org.opengion.fukurou.queue.QueueSendFactory; 013import org.opengion.fukurou.util.HybsTimerTask; 014import org.opengion.hayabusa.common.HybsSystem; 015import org.opengion.hayabusa.queue.DBAccessQueue; 016 017/** 018 * メッセージキュー送信 019 * メッセージキュー送信テーブルを監視して、 020 * 送信処理を行います。 021 * 022 * @og.group メッセージ連携 023 * 024 * @og.rev 5.10.15.0 (2019/08/30) 新規作成 025 * @og.rev 5.10.15.2 (2019/09/20) DB登録の実装をhayabusa.queueに移動 026 * 027 * @version 5.0 028 * @author oota 029 * @since JDK7 030 * 031 */ 032public class Daemon_QueueSend extends HybsTimerTask { 033 private static final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys( "CLOUD_SQS_ACCESS_KEY" ); 034 private static final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys( "CLOUD_SQS_SECRET_KEY" ); 035 private static final int LOOP_COUNTER = 24; 036 037// private String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID"); 038 private final String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID"); 039 private final String USER_ID = "CYYYYY"; 040 private final String PG_ID = "DMN_QueSnd"; 041 private final String DMN_NAME = "QueueReceiveDMN"; 042 private final DBAccessQueue dbAccessQueue; 043 044 private int loopCnt ; 045 private QueueSend queueSend; 046 047 /** 048 * コンストラクター 049 * 初期処理を行います。 050 */ 051 public Daemon_QueueSend(){ 052 super(); // 7.2.9.4 (2020/11/20) PMD:It is a good practice to call super() in a constructor 053 dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME); 054 } 055 /** 056 * 開始処理 057 * タイマータスクのデーモン処理の開始ポイントです。 058 * 059 * @og.rev 7.2.9.4 (2020/11/20) spotbugs:呼び出したメソッドの Locale パラメータの使用を検討する 060 */ 061 @Override 062 protected void startDaemon() { 063 if (loopCnt % LOOP_COUNTER == 0) { 064 loopCnt = 1; 065 System.out.println(); 066// System.out.println(toString() + " " + new Date() + ""); 067 System.out.println(toString() + " " + new Date() ); // 7.2.9.4 (2020/11/20) PMD:Do not add empty strings 068 } else { 069 // メッセージキュー送信管理テーブルから、送信対象のレコードを取得 070 final String[][] vals = dbAccessQueue.selectGE65(); 071 072 // 取得データ分の繰り返し処理を実行する 073 for(int i = 0; i < vals.length; i++) { 074 final String[] record = vals[i]; 075 076 // GE65から取得した値を変数に格納 077 final String ykno = record[0]; 078 final String queueId = record[1]; 079 final String message = record[2]; 080 final String dedupliId = record[3]; 081 final String queSyu = record[4]; 082 final String jmsUrl = record[5]; 083 084// final String queueType = queSyu.toUpperCase(); 085 final String queueType = queSyu.toUpperCase( Locale.JAPAN ); // 7.2.9.4 (2020/11/20) 086 queueSend = QueueSendFactory.newQueueSend(queueType); 087 088 // 接続処理 089 queueSend.connect(jmsUrl, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY); 090 091 // メッセージ送信管理テーブルから取得したデータを送信実装予定 092 final QueueInfo queueInfo = new QueueInfo(); 093 094 // 応答確認種別 095 if("MQ".equals(queueType)){ 096 // MQメッセージサーバ指定時 097 queueInfo.setMqTransacted(false); 098 queueInfo.setMqAcknowledgeMode(QueueSession.AUTO_ACKNOWLEDGE); 099 // キュー名 100 queueInfo.setMqQueueName(queueId); 101 }else if("SQS".equals(queueType)){ 102 // SQSメッセージサーバ指定時 103 // グループID 104 queueInfo.setSqsFifoGroupId(queueId); 105 if(!StringUtil.isEmpty(dedupliId)) { 106 // 重複排除ID 107 // コンテンツに基づく重複排除が有効時は、未設定でも可(メッセージによる重複判定が行われる) 108 queueInfo.setSqsFifoDedupliId(dedupliId); 109 } 110 } 111 112 // メッセージ 113 queueInfo.setMessage(message); 114 115 // 完了フラグを処理中:2に更新 116 dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_PROCESS); 117 118 // メッセージ送信処理 119 try{ 120 queueSend.sendMessage(queueInfo); 121 122 // 完了フラグを完了:3に更新 123 dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_END); 124 125 }catch(Exception e) { 126 // 完了フラグをエラー:4に更新して、エラー情報を登録 127 dbAccessQueue.updateGE66Error(ykno, e.getMessage()); 128 } 129 } 130 131 // クローズ処理 132 queueSend.close(); 133 134 loopCnt++; 135 } 136 } 137}