C#实现同Active MQ通讯的方法
本文实例讲述了C#实现同ActiveMQ通讯的方法。分享给大家供大家参考,具体如下:
内容概要:
主要以源码的形式介绍如何用C#实现同ActiveMQ的通讯。本文假设你已经正确安装JDK1.6.x,了解ActiveMQ并有一定的编程基础。
正文:
JMS程序的最终目的是生产和消费的消息能被其他程序使用,JMS的Message是一个既简单又不乏灵活性的基本格式,允许创建不同平台上符合非JMS程序格式的消息。
Message由消息头,属性和消息体三部份组成。
ActiveMQ支持过滤机制,即生产者可以设置消息的属性(Properties),该属性与消费者端的Selector对应,只有消费者设置的selector与消息的Properties匹配,消息才会发给该消费者。Topic和Queue都支持Selector。
示例代码:
usingSystem; usingSystem.Collections.Generic; usingSystem.Linq; usingSystem.Text; usingSystem.Windows; usingSystem.Windows.Controls; usingSystem.Windows.Data; usingSystem.Windows.Documents; usingSystem.Windows.Input; usingSystem.Windows.Media; usingSystem.Windows.Media.Imaging; usingSystem.Windows.Navigation; usingSystem.Windows.Shapes; usingApache.NMS; usingSystem.Diagnostics; usingApache.NMS.Util; usingSystem.Windows.Threading; /* *功能描述:C#使用ActiveMQ示例 *修改次数:2 *最后更新:byKagula,2012-07-31 * *前提条件: *[1]apache-activemq-5.4.2 *[2]Apache.NMS.ActiveMQ-1.5.6-bin *[3]WinXPSP3 *[4]VS2008SP1 *[5]WPF工程With.NETFramework3.5 * *启动 * *不带安全控制方式启动 *[你的解压路径]\apache-activemq-5.4.2\bin\activemq.bat * *安全方式启动 *添加环境变量:ACTIVEMQ_ENCRYPTION_PASSWORD=activemq *[你的解压路径]\apache-activemq-5.4.2\bin>activemqxbean:file:../conf/activemq-security.xml * *ActiveMQ管理地址 *http://127.0.0.1:8161/admin/ *添加访问"http://127.0.0.1:8161/admin/"的限制 * *第一步:添加访问限制 *修改D:\apache\apache-activemq-5.4.2\conf\jetty.xml文件 *下面这行编码,原 *<propertyname="authenticate"value="true"/> *修改为 *<propertyname="authenticate"value="false"/> * *第二步:修改登录用户名密码,缺省分别为admin,admin *D:\apache\apache-activemq-5.4.2\conf\jetty-realm.properties * *用户管理(前提:以安全方式启动ActiveMQ) * *在[你的解压路径]\apache-activemq-5.4.2\conf\credentials.properties文件中修改默认的用户名密码 *在[你的解压路径]\apache-activemq-5.4.2\conf\activemq-security.xml文件中可以添加新的用户名 *e.g.添加oa用户,密码同用户名。 *<authenticationUserusername="oa"password="oa"groups="users,admins"/> * *在[你的解压路径]\apache-activemq-5.4.2\conf\activemq-security.xml文件中你还可以设置指定的Topic或Queue *只能被哪些用户组read或write。 * * *配置C#withWPF项目 *项目的[Application]->[TargetFramework]属性设置为[.NETFramework3.5](这是VS2008WPF工程的默认设置) *添加[你的解压路径]\Apache.NMS.ActiveMQ-1.5.6-bin\lib\Apache.NMS\net-3.5\Apache.NMS.dll的引用 *Apache.NMS.dll相当于接口 * *如果是以Debug方式调试 *把[你的解压路径]\Apache.NMS.ActiveMQ-1.5.6-bin\build\net-3.5\debug\目录下的 *Apache.NMS.ActiveMQ.dll文件复制到你项目的Debug目录下 *Apache.NMS.ActiveMQ.dll相当于实现 * *如果是以Release方式调试 *参考上文,去取Apache.NMS,Release目录下相应的DLL文件,并复制到你项目的Release目录下。 * * *参考资料 *[1]《C#调用ActiveMQ官方示例》http://activemq.apache.org/nms/examples.html *[2]《ActiveMQNMS下载地址》http://activemq.apache.org/nms/activemq-downloads.html *[3]《ActiveMQ在C#中的应用示例》https://www.nhooo.com/article/87956.htm *[4]《NMSAPIReference》http://activemq.apache.org/nms/nms-api.html */ namespacetestActiveMQSubscriber { ///<summary> ///InteractionlogicforWindow1.xaml ///</summary> publicpartialclassWindow1:Window { privatestaticIConnectionFactoryconnFac; privatestaticIConnectionconnection; privatestaticISessionsession; privatestaticIDestinationdestination; privatestaticIMessageProducerproducer; privatestaticIMessageConsumerconsumer; protectedstaticITextMessagemessage=null; publicWindow1() { InitializeComponent(); initAMQ("MyFirstTopic"); } privatevoidinitAMQ(StringstrTopicName) { try { connFac=newNMSConnectionFactory(newUri("activemq:failover:(tcp://localhost:61616)")); //新建连接 //connection=connFac.CreateConnection("oa","oa");//设置连接要用的用户名、密码 //如果你要持久“订阅”,则需要设置ClientId,这样程序运行当中被停止,恢复运行时,能拿到没接收到的消息! connection.ClientId="testinglistener"; connection=connFac.CreateConnection();//如果你是缺省方式启动ActiveMQ服务,则不需填用户名、密码 //创建Session session=connection.CreateSession(); //发布/订阅模式,适合一对多的情况 destination=SessionUtil.GetDestination(session,"topic://"+strTopicName); //新建生产者对象 producer=session.CreateProducer(destination); producer.DeliveryMode=MsgDeliveryMode.NonPersistent;//ActiveMQ服务器停止工作后,消息不再保留 //新建消费者对象:普通“订阅”模式 //consumer=session.CreateConsumer(destination);//不需要持久“订阅” //新建消费者对象:持久"订阅"模式: //持久“订阅”后,如果你的程序被停止工作后,恢复运行, //从第一次持久订阅开始,没收到的消息还可以继续收 consumer=session.CreateDurableConsumer( session.GetTopic(strTopicName) ,connection.ClientId,null,false); //设置消息接收事件 consumer.Listener+=newMessageListener(OnMessage); //启动来自ActiveMQ的消息侦听 connection.Start(); } catch(Exceptione) { //初始化ActiveMQ连接失败,往VS2008的Output窗口写入出错信息! Debug.WriteLine(e.Message); } } privatevoidSendMsg2Topic_Click(objectsender,RoutedEventArgse) { //发送消息 ITextMessagerequest=session.CreateTextMessage(DateTime.Now.ToLocalTime()+""+tbMsg.Text); producer.Send(request); } protectedvoidOnMessage(IMessagereceivedMsg) { //接收消息 message=receivedMsgasITextMessage; //UI线程,显示收到的消息 Dispatcher.Invoke(DispatcherPriority.Normal,newAction(()=> { DateTimedt=newDateTime(); ListBoxItemlbi=newListBoxItem(); lbi.Content=DateTime.Now.ToLocalTime()+""+message.Text; lbR.Items.Add(lbi); })); } } }
队列通讯方式,消费者例子
usingSystem; usingSystem.Collections.Generic; usingSystem.Linq; usingSystem.Text; usingApache.NMS; usingSystem.Diagnostics; usinglog4net; usingApache.NMS.Util; usingSystem.Collections; namespaceCat8637AutoCallServer { publicclassSMTask { publicStringCallee{get;set;} publicStringCheckNumber{get;set;} publicintDeadline{get;set;} publicoverrideStringToString() { returnString.Format("Callee={0},CheckNumber={1},Deadline={2}", Callee,CheckNumber,Deadline); } } /* *负责接收任务,并把任务放在任务等待队列中。 */ publicclassMQClient { privatestaticreadonlyILoglogger=LogManager.GetLogger(typeof(MQClient)); privatestaticIConnectionconnection=null; privatestaticISessionsession=null; Queue_voiceSMTasks=newQueue(); publicMQClient() { try { IConnectionFactoryfactory=newNMSConnectionFactory(newUri("activemq:failover:(tcp://localhost:61616)")); //新建连接 //connection=connFac.CreateConnection("oa","oa");//设置连接要用的用户名、密码 connection=factory.CreateConnection(); session=connection.CreateSession(); IMessageConsumerconsumer=session.CreateConsumer(session.GetQueue("TaskIssue_VoiceSM")); consumer.Listener+=newMessageListener(OnMessage); connection.Start(); } catch(Exceptionex) { Debug.WriteLine(ex.Message); } } protectedvoidOnMessage(IMessagereceivedMsg) { IMessagemessage=receivedMsgasITextMessage; SMTasksmTask=newSMTask(); smTask.Callee=message.Properties["Callee"]asString; smTask.CheckNumber=message.Properties["Message"]asString; smTask.Deadline=Convert.ToInt32(message.Properties["deadline"]asString); logger.Info("Received:"+smTask.ToString()); lock(_voiceSMTasks) { _voiceSMTasks.Enqueue(smTask); } } publicSMTaskGetVoiceSMTask() { SMTaskresult=null; lock(_voiceSMTasks) { if(_voiceSMTasks.Count>0) { result=_voiceSMTasks.Dequeue()asSMTask; } } returnresult; } } }
队列通讯方式,生产者例子
privatevoidSend_Click(objectsender,RoutedEventArgse) { try { IDestinationdestination=SessionUtil.GetDestination(session,"queue://TaskIssue_VoiceSM"); //新建生产者对象 IMessageProducerproducer=session.CreateProducer(destination); producer.DeliveryMode=MsgDeliveryMode.NonPersistent;//ActiveMQ服务器停止工作后,消息不再保留 ITextMessagerequest=session.CreateTextMessage(); request.NMSCorrelationID="TestVoiceSM";//这里我填了应用程序的名称。 request.Properties["Callee"]=tbCallee.Text; request.Properties["Message"]=tbCheckNumber.Text; request.Properties["deadline"]=tbValidDuration.Text; producer.Send(request); } catch(Exceptionex) { //初始化ActiveMQ连接失败,往VS2008的Output窗口写入出错信息! Debug.WriteLine(ex.Message); } } privatevoidWindow_Closed(objectsender,EventArgse) { try { if(session==null) return; //if(connection==null) //return; session.Close(); //connection.Close(); } catch(Exceptionex) { Debug.WriteLine(ex.Message); } }
更多关于C#相关内容感兴趣的读者可查看本站专题:《C#窗体操作技巧汇总》、《C#常见控件用法教程》、《WinForm控件用法总结》、《C#程序设计之线程使用技巧总结》、《C#操作Excel技巧总结》、《C#中XML文件操作技巧汇总》、《C#数据结构与算法教程》、《C#数组操作技巧总结》及《C#面向对象程序设计入门教程》
希望本文所述对大家C#程序设计有所帮助。