博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[RabbitMQ] AutorecoveringConnection在连接恢复后才调用ShutdownListener 此博文包含图片
阅读量:4106 次
发布时间:2019-05-25

本文共 4354 字,大约阅读时间需要 14 分钟。

想玩一玩RabbitMQ中的ShutdownListener和RecoveryListener,又不想写自己的重连接逻辑,所以使用了ConnectionFactory类的setAutomaticRecoveryEnabled方法让其自动恢复连接。代码如下:

package com.yqu.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;public class AutoRecoveryRecv {  private final static String QUEUE_NAME = "hello";  public static void main(String[] argv) throws Exception {    try {      ConnectionFactory factory = new ConnectionFactory();      factory.setHost(ConnectionFactoryConfiguration.HOST);      factory.setUsername(ConnectionFactoryConfiguration.USERNAME);      factory.setPassword(ConnectionFactoryConfiguration.PASSWORD);      factory.setAutomaticRecoveryEnabled(true);      factory.setNetworkRecoveryInterval(10000);      Connection connection = factory.newConnection();      connection.addShutdownListener(new ShutdownListener() {        @Override        public void shutdownCompleted(ShutdownSignalException cause)        {          String hardError = "";          String applInit = "";          if (cause.isHardError()) {            hardError = "connection";          } else {            hardError = "channel";          }          if (cause.isInitiatedByApplication()) {            applInit = "application";          } else {            applInit = "broker";          }          System.out.println("Connectivity to MQ has failed.  It was caused by "                  + applInit + " at the " + hardError                  + " level.  Reason received " + cause.getReason());        }      });      ((Recoverable)connection).addRecoveryListener(new RecoveryListener() {        @Override        public void handleRecovery(Recoverable recoverable) {          if( recoverable instanceof Connection ) {            System.out.println("Connection was recovered.");          } else if ( recoverable instanceof Channel ) {            int channelNumber = ((Channel) recoverable).getChannelNumber();            System.out.println( "Connection to channel #"                    + channelNumber + " was recovered." );          }        }      });      Channel channel = connection.createChannel();      channel.queueDeclare(QUEUE_NAME, false, false, false, null);      System.out.println(" [*] Waiting for messages. To exit press CTRL+C");      Consumer consumer = new DefaultConsumer(channel) {        @Override        public void handleDelivery(String consumerTag, Envelope envelope,                                   AMQP.BasicProperties properties, byte[] body)                throws IOException {          String message = new String(body, "UTF-8");          System.out.println(" [x] Received '" + message + "'");        }      };      channel.basicConsume(QUEUE_NAME, true, consumer);    }    catch(Throwable t) {      t.printStackTrace();    }  }}

重启了RabbitMQ所在的服务器,然后获得下列日志,显示连接已修复,然后才调用ShutdownListener显示连接关闭信息:

 

为什么会这样?下面就开始我的解密之旅!

首先看一下ShutdownNotifier接口的层次图,Connection下共有三个子孙类,尚不确定在我的测试里是哪一个?

 

看一下com.rabbitmq.client.ConnectionFactory类newConnection方法实现,豁然开朗!对于ConnectionFactory类,如果automaticRecoveryEnabled被设置则创建AutorecoveringConnection,否则创建AMQConnection。而RecoveryAwareAMQConnectionFactory才会创建RecoveryAwareAMQConnection。

AutorecoveringConnection类init方法会调用addAutomaticRecoveryListener方法,addAutomaticRecoveryListener方法会添加一个ShutdownListener类实例automaticRecoveryListener到列表shutdownListeners,而我创建的ShutdownListener类实例是列表shutdownListeners中的第二个元素。

参看下面堆栈信息,ShutdownNotifierComponent类的notifyListeners会遍历所有shutdownListeners列表所有监听器,第一个关闭监听器实例automaticRecoveryListener会进行连接修复并通知RecoveryListener,然后才到我的关闭监听器打印关闭信息。

at AutoRecoveryRecv$2.handleRecovery(AutoRecoveryRecv.at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.notifyRecoveryListeners(AutorecoveringConnection.at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.- locked <0x30d> (a com.rabbitmq.client.impl.recovery.AutorecoveringConnection)at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.at com.rabbitmq.client.impl.recovery.AutorecoveringConnection$1.shutdownCompleted(AutorecoveringConnection.at com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.at

项目在https://github.com/rabbitmq/rabbitmq-java-client/pull/136修正了这个错误,先通知所有ShutdownListener再尝试自动修复连接。但是在我所用的amqp-client-3.6.5.jar还没有包含这一修改。

分享:

转载地址:http://wansi.baihongyu.com/

你可能感兴趣的文章
warning:Instance variable used while 'self' is not set to the result of '[(super or self) init…]'
查看>>
unity3d中的lua脚本异常捕获
查看>>
iOS开发:"此证书的签发者无效"解决方法
查看>>
一步一步实现iOS微信自动抢红包(非越狱)
查看>>
微信读书 iOS 性能优化总结
查看>>
OpenUDID冲突的问题
查看>>
MAC下SVN 命令的使用
查看>>
iOS Device Types(设备型号:iPhone Model)
查看>>
Swift Name Mangling - Swift语言的名字重整技术
查看>>
CocoaPods error:The dependency '' is not used in any concrete target
查看>>
'-ObjC'的替代解决方法
查看>>
Unity3D 学习 - 通过C#脚本创建简单的按钮、响应事件
查看>>
Unity编译iOS工程的自动化配置(XUPorter)
查看>>
苹果App Store新规:6月1日后所有应用必须支持IPv6-only网络
查看>>
使用Proguard混淆java源代码
查看>>
提高iOS开发质量的一些事
查看>>
XCode 开发去除 UserInterfaceState.xcuserstate 文件为版本控制带来的困扰
查看>>
iOS Crash文件的解析(一)
查看>>
iOS Crash文件的符号化(二)
查看>>
C++符号的还原(demangling)
查看>>