java结合WebSphere MQ实现接收队列文件功能
首先我们先来简单介绍下webspheremq以及安装使用简介
webspheremq :用于传输信息具有跨平台的功能。
1安装webspheremq并启动
2webspheremq建立queueManager(如:MQSI_SAMPLE_QM)
3建立queue类型选择Local类型的(如lq )
4 建立channels类型选择ServerConnection(如BridgeChannel)
接下来,我们来看实例代码:
MQFileReceiver.java
packagecom.mq.dpca.file;
importjava.io.File;
importjava.io.FileOutputStream;
importcom.ibm.mq.MQEnvironment;
importcom.ibm.mq.MQException;
importcom.ibm.mq.MQGetMessageOptions;
importcom.ibm.mq.MQMessage;
importcom.ibm.mq.MQQueue;
importcom.ibm.mq.MQQueueManager;
importcom.ibm.mq.constants.MQConstants;
importcom.mq.dpca.msg.MQConfig;
importcom.mq.dpca.util.ReadCmdLine;
importcom.mq.dpca.util.RenameUtil;
/**
*
*MQ分组接收文件功能
*主动轮询
*/
publicclassMQFileReceiver{
privateMQQueueManagerqmgr;//连接到队列管理器
privateMQQueueinQueue;//传输队列
privateStringqueueName="";//队列名称
privateStringhost="";//
privateintport=1414;//侦听器的端口号
privateStringchannel="";//通道名称
privateStringqmgrName="";//队列管理器
privateMQMessageinMsg;//创建消息缓冲
privateMQGetMessageOptionsgmo;//设置获取消息选项
privatestaticStringfileName=null;//接收队列上的消息并存入文件
privateintccsid=0;
privatestaticStringfile_dir=null;
/**
*程序的入口
*
*@paramargs
*/
publicstaticvoidmain(Stringargs[]){
MQFileReceivermfs=newMQFileReceiver();
//初始化连接
mfs.initproperty();
//接收文件
mfs.runGoupReceiver();
//获取shell脚本名
//Stringshellname=MQConfig.getValueByKey(fileName);
//if(shellname!=null&&!"".equals(shellname)){
////调用shell
//ReadCmdLine.callShell(shellname);
//}else{
//System.out.println("havenoshellname,Onlyreceivefiles.");
//}
}
publicvoidrunGoupReceiver(){
try{
init();
getGroupMessages();
qmgr.commit();
System.out.println("\nMessagessuccessfullyReceive");
}catch(MQExceptionmqe){
mqe.printStackTrace();
try{
System.out.println("\nBackingoutTransaction");
qmgr.backout();
System.exit(2);
}catch(Exceptione){
e.printStackTrace();
System.exit(2);
}
}catch(Exceptione){
e.printStackTrace();
System.exit(2);
}
}
/**
*初始化服务器连接信息
*
*@throwsException
*/
privatevoidinit()throwsException{
/*为客户机连接设置MQEnvironment属性*/
MQEnvironment.hostname=host;
MQEnvironment.channel=channel;
MQEnvironment.port=port;
/*连接到队列管理器*/
qmgr=newMQQueueManager(qmgrName);
/*设置队列打开选项以输*/
intopnOptn=MQConstants.MQOO_INPUT_AS_Q_DEF
|MQConstants.MQOO_FAIL_IF_QUIESCING;
/*打开队列以输*/
inQueue=qmgr.accessQueue(queueName,opnOptn,null,null,null);
}
/**
*接受文件的主函数
*
*@throwsException
*/
publicvoidgetGroupMessages(){
/*设置获取消息选项*/
gmo=newMQGetMessageOptions();
gmo.options=MQConstants.MQGMO_FAIL_IF_QUIESCING;
gmo.options=gmo.options+MQConstants.MQGMO_SYNCPOINT;
/*等待消息*/
gmo.options=gmo.options+MQConstants.MQGMO_WAIT;
/*设置等待时间限制*/
gmo.waitInterval=5000;
/*只获取消息*/
gmo.options=gmo.options+MQConstants.MQGMO_ALL_MSGS_AVAILABLE;
/*以辑顺序获取消息*/
gmo.options=gmo.options+MQConstants.MQGMO_LOGICAL_ORDER;
gmo.matchOptions=MQConstants.MQMO_MATCH_GROUP_ID;
/*创建消息缓冲*/
inMsg=newMQMessage();
try{
FileOutputStreamfos=null;
/*处理组消息*/
while(true){
try{
inQueue.get(inMsg,gmo);
if(fos==null){
try{
fileName=inMsg.getStringProperty("fileName");
StringfileName_full=null;
fileName_full=file_dir+RenameUtil.rename(fileName);
fos=newFileOutputStream(newFile(fileName_full));
intmsgLength=inMsg.getMessageLength();
byte[]buffer=newbyte[msgLength];
inMsg.readFully(buffer);
fos.write(buffer,0,msgLength);
/*查看是否是最后消息标识*/
charx=gmo.groupStatus;
if(x==MQConstants.MQGS_LAST_MSG_IN_GROUP){
System.out.println("LastMsginGroup");
break;
}
inMsg.clearMessage();
}catch(Exceptione){
System.out
.println("Receiverthemessagewithoutproperty,donothing!");
inMsg.clearMessage();
}
}else{
intmsgLength=inMsg.getMessageLength();
byte[]buffer=newbyte[msgLength];
inMsg.readFully(buffer);
fos.write(buffer,0,msgLength);
/*查看是否是最后消息标识*/
charx=gmo.groupStatus;
if(x==MQConstants.MQGS_LAST_MSG_IN_GROUP){
System.out.println("LastMsginGroup");
break;
}
inMsg.clearMessage();
}
}catch(Exceptione){
charx=gmo.groupStatus;
if(x==MQConstants.MQGS_LAST_MSG_IN_GROUP){
System.out.println("LastMsginGroup");
}
break;
}
}
if(fos!=null)
fos.close();
}catch(Exceptione){
System.out.println(e.getMessage());
}
}
publicvoidinitproperty(){
MQConfigconfig=newMQConfig().getInstance();
if(config.getMQ_MANAGER()!=null){
qmgrName=config.getMQ_MANAGER();
queueName=config.getMQ_QUEUE_NAME();
channel=config.getMQ_CHANNEL();
host=config.getMQ_HOST_NAME();
port=Integer.valueOf(config.getMQ_PROT());
ccsid=Integer.valueOf(config.getMQ_CCSID());
file_dir=config.getFILE_DIR();
}
}
}