结合别人的文章,做RocketMQ的一点原理分析,结合源码(尽量)----未完待续
Broker
与Namesrv的关系
1.从namesrv获取配置信息
1 /** 2 * BrokerConfig类 3 * 4 * broker每隔30秒(此时间无法更改)向所有nameserver发送心跳,心跳包含了自身的topic配置信息。 5 * 这里的“此时间无法更改”是别人的总结,还没搞懂为啥此时间不可更改,明明原版注释中写的是取值范围在10,000到60,000之间 6 * 7 * This configurable item defines interval of topics registration of broker to name server. Allowing values are 8 * between 10, 000 and 60, 000 milliseconds. 9 */ 10 private int registerNameServerPeriod = 1000 * 30;
2.namesrv检查broker的心跳
1 /** 2 * NamesrvController类 3 * Namesrv定时检查Broker心跳 4 * 每10秒检查一次,时间戳超过2分钟,则认为Broker失效,从brokerLiveTable中移除 5 */ 6 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 7 8 @Override 9 public void run() { 10 NamesrvController.this.routeInfoManager.scanNotActiveBroker(); 11 } 12 }, 5, 10, TimeUnit.SECONDS); 13 14 ... 15 16 /** 17 * RouteInfoManager类 18 * Namesrv查看不活跃|失效的Broker 19 */ 20 public void scanNotActiveBroker() { 21 Iterator> it = this.brokerLiveTable.entrySet().iterator(); 22 while (it.hasNext()) { 23 Entry next = it.next(); 24 long last = next.getValue().getLastUpdateTimestamp(); 25 if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { 26 RemotingUtil.closeChannel(next.getValue().getChannel()); 27 it.remove(); 28 log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); 29 this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); 30 } 31 } 32 }
3.Namesrv检查与Broker的长连接
1 /** 2 * BrokerHousekeepingService类 3 * 当NameServer和Broker的长连接断掉以后,onChannelDestroy函数会被调用,把这个Broker的信息清理出去。 4 */ 5 @Override 6 public void onChannelClose(String remoteAddr, Channel channel) { 7 this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); 8 } 9 10 /** 11 * BrokerHousekeepingService类 12 * 当NameServer和Broker的长连接断掉以后,onChannelDestroy函数会被调用,把这个Broker的信息清理出去。 13 */ 14 @Override 15 public void onChannelException(String remoteAddr, Channel channel) { 16 this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); 17 } 18 19 /** 20 * BrokerHousekeepingService类 21 * 当NameServer和Broker的长连接断掉以后,onChannelDestroy函数会被调用,把这个Broker的信息清理出去。 22 */ 23 @Override 24 public void onChannelIdle(String remoteAddr, Channel channel) { 25 this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); 26 }
Consumer|Producer
与Namesrv的关系
1 /** 2 * ClientConfig类 3 * 4 * 默认情况下,消费者每隔30秒从nameserver获取所有topic的最新队列情况, 5 * 这意味着某个broker如果宕机,客户端最多要30秒才能感知。 6 * 7 * 默认情况下,生产者每隔30秒从nameserver获取所有topic的最新队列情况, 8 * 这意味着某个broker如果宕机,生产者最多要30秒才能感知,在此期间,发往该broker的消息发送失败。 9 * 10 * Pulling topic information interval from the named server 11 */ 12 private int pollNameServerInterval = 1000 * 30;
与Broker的关系
1.客户端(生产者|消费者)向Broker发送心跳
1 /** 2 * ClientConfig类 3 * 4 * 默认情况下,消费者|生产者 每隔30秒向所有broker发送心跳 5 * 6 * Heartbeat interval in microseconds with message broker 7 */ 8 private int heartbeatBrokerInterval = 1000 * 30;
2.Broker定期检查客户端心跳检查
1 /** 2 * ClientHousekeepingService类 3 * 4 * Broker定时清理不活动的客户端(生产者|消费者)连接 5 * 每10秒检查一次,时间戳超过2分钟,则关闭客户端(生产者|消费者)连接 6 */ 7 public void start() { 8 9 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 10 @Override 11 public void run() { 12 try { 13 ClientHousekeepingService.this.scanExceptionChannel(); 14 } catch (Throwable e) { 15 log.error("Error occurred when scan not active client channels.", e); 16 } 17 } 18 }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS); 19 } 20 21 ... 22 23 /** 24 * ClientHousekeepingService类 25 * 26 * Broker定时清理不活动的客户端(生产者|消费者)连接 27 * 每10秒检查一次,时间戳超过2分钟,则关闭客户端(生产者|消费者)连接 28 */ 29 private void scanExceptionChannel() { 30 this.brokerController.getProducerManager().scanNotActiveChannel(); 31 this.brokerController.getConsumerManager().scanNotActiveChannel(); 32 this.brokerController.getFilterServerManager().scanNotActiveChannel(); 33 }
3.Broker检查与客户端的长连接
1 /** 2 * ClientHousekeepingService类 3 * 4 * 当客户端(Producer|Consumer)和Broker的长连接断掉以后,doChannelCloseEvent函数会被调用,把对应客户端(Producer|Consumer)断开连接 5 */ 6 @Override 7 public void onChannelClose(String remoteAddr, Channel channel) { 8 this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); 9 this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); 10 this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); 11 } 12 13 /** 14 * ClientHousekeepingService类 15 * 16 * 当客户端(Producer|Consumer)和Broker的长连接断掉以后,doChannelCloseEvent函数会被调用,把对应客户端(Producer|Consumer)断开连接 17 */ 18 @Override 19 public void onChannelException(String remoteAddr, Channel channel) { 20 this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); 21 this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); 22 this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); 23 } 24 25 /** 26 * ClientHousekeepingService类 27 * 28 * 当客户端(Producer|Consumer)和Broker的长连接断掉以后,doChannelCloseEvent函数会被调用,把对应客户端(Producer|Consumer)断开连接 29 */ 30 @Override 31 public void onChannelIdle(String remoteAddr, Channel channel) { 32 this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel); 33 this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel); 34 this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel); 35 }