现在的位置: 首页 > 综合 > 正文

HRegionServer启动和停止过程分析

2017年11月15日 ⁄ 综合 ⁄ 共 5991字 ⁄ 字号 评论关闭

HRegionServer的功能

1. HRegionServer主要负责响应用户I/O请求,向底层文件系统中读写数据,是HBase中最核心的模块。
   HRegionServer内存管理了一系列的HRegion对象,每个HRegion对应了table中的一个Region.
   HRegion由多个HStore组成,每个HStore对应了table中的一个colum family的存储。即就是一个集中的存储单元。

2. HRegionServer实现了4个接口类:HRegionInterface, HBaseRPCErrorHandler,Runnable, RegionServerServices

   HRegionInterface:Client通过此接口与HRegionServer进行交互,如获取region的meta信息(getRegionInfo)、get、put、delete等操作。
   HBaseRPCErrorHandler:处理RPC调用错误,如OOME(OutOfMemoryError)
   Runnable:Runnable是Thread的接口,Runnable只有一个run方法。
   RegionServerServices:RegionServer提供的服务接口

3.HRegionServer的入口:

  /**
   * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
   */
  public static void main(String[] args) throws Exception {
	VersionInfo.logVersion();
    Configuration conf = HBaseConfiguration.create();
    @SuppressWarnings("unchecked")
    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
        .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);

    new HRegionServerCommandLine(regionServerClass).doMain(args);
  }

  从主函数main,然后调用其相应的CommandLine类HRegionServerCommandLine的doMain方法启动。

4.HRegionServer启动

下面来分析下HRegionServer的启动过程,HRegionServer实现了Runnable接口,所以主要看一下run方法的实现。

(1)startRegionServer方法:

  /**
   * @param hrs
   * @param name
   * @return Thread the RegionServer is running in correctly named.
   * @throws IOException
   */
  public static Thread startRegionServer(final HRegionServer hrs,
      final String name) throws IOException {
    Thread t = new Thread(hrs);
    t.setName(name);
    t.start();
    // Install shutdown hook that will catch signals and run an orderly shutdown
    // of the hrs.
    ShutdownHook.install(hrs.getConfiguration(), FileSystem.get(hrs
        .getConfiguration()), hrs, t);
    return t;
  }

(2)run方法:
   

  /**
   * The HRegionServer sticks in this loop until closed.
   */
  @SuppressWarnings("deprecation")
  public void run() {
    try {
      // Do pre-registration initializations; zookeeper, lease threads, etc.
        /*
         *  initializeZooKeeper()
         *   1.初始化ZooKeeper,创建到ZooKeeper的连接    
         *   2.然后创建MasterAddressManager master的地址管理器,之后阻塞,一直等待master上线。
         *   3.master创建之后,创建ClusterStatusTracker对象,等待集群的启动,即等待master将zookeeper设置为up状态。 
         *   4.最后就是创建CatalogTracker对象,并启动它,从而获取-ROOT-和.META.的地址
         */    	
      preRegistrationInitialization();
    } catch (Throwable e) {
      abort("Fatal exception during initialization", e);
    }

    try {
      // Try and register with the Master; tell it we are here.  Break if
      // server is stopped or the clusterup flag is down or hdfs went wacky.
        //等待集群up
      while (keepLooping()) {
        MapWritable w = reportForDuty();
        if (w == null) {
          LOG.warn("reportForDuty failed; sleeping and then retrying.");
          this.sleeper.sleep();
        } else {
          handleReportForDutyResponse(w);
          break;
        }
      }
      registerMBean();

      // We registered with the Master.  Go into run mode.
      long lastMsg = 0;
      long oldRequestCount = -1;
      // The main run loop.
      while (!this.stopped && isHealthy()) {
        if (!isClusterUp()) {
          if (isOnlineRegionsEmpty()) {
            stop("Exiting; cluster shutdown set and not carrying any regions");
          } else if (!this.stopping) {
            this.stopping = true;
            LOG.info("Closing user regions");
            closeUserRegions(this.abortRequested);
          } else if (this.stopping) {
            boolean allUserRegionsOffline = areAllUserRegionsOffline();
            if (allUserRegionsOffline) {
              // Set stopped if no requests since last time we went around the loop.
              // The remaining meta regions will be closed on our way out.
              if (oldRequestCount == this.requestCount.get()) {
                stop("Stopped; only catalog regions remaining online");
                break;
              }
              oldRequestCount = this.requestCount.get();
            } else {
              // Make sure all regions have been closed -- some regions may
              // have not got it because we were splitting at the time of
              // the call to closeUserRegions.
              closeUserRegions(this.abortRequested);
            }
            LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
          }
        }
        long now = System.currentTimeMillis();
        if ((now - lastMsg) >= msgInterval) {
          doMetrics();
          tryRegionServerReport();
          lastMsg = System.currentTimeMillis();
        }
        if (!this.stopped) this.sleeper.sleep();
      } // for
    } catch (Throwable t) {
      if (!checkOOME(t)) {
        abort("Unhandled exception: " + t.getMessage(), t);
      }
    }
    // Run shutdown.
    if (mxBean != null) {
      MBeanUtil.unregisterMBean(mxBean);
      mxBean = null;
    }
    this.leases.closeAfterLeasesExpire();
    this.rpcServer.stop();
    if (this.splitLogWorker != null) {
      splitLogWorker.stop();
    }
    if (this.infoServer != null) {
      LOG.info("Stopping infoServer");
      try {
        this.infoServer.stop();
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
    // Send cache a shutdown.
    if (cacheConfig.isBlockCacheEnabled()) {
      cacheConfig.getBlockCache().shutdown();
    }

    // Send interrupts to wake up threads if sleeping so they notice shutdown.
    // TODO: Should we check they are alive? If OOME could have exited already
    if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
    if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
    if (this.hlogRoller != null) this.hlogRoller.interruptIfNecessary();
    if (this.compactionChecker != null)
      this.compactionChecker.interrupt();

    if (this.killed) {
      // Just skip out w/o closing regions.  Used when testing.
    } else if (abortRequested) {
      if (this.fsOk) {
        closeAllRegions(abortRequested); // Don't leave any open file handles
      }
      LOG.info("aborting server " + this.serverNameFromMasterPOV);
    } else {
      closeAllRegions(abortRequested);
      closeAllScanners();
      LOG.info("stopping server " + this.serverNameFromMasterPOV);
    }
    // Interrupt catalog tracker here in case any regions being opened out in
    // handlers are stuck waiting on meta or root.
    if (this.catalogTracker != null) this.catalogTracker.stop();
    if (this.fsOk) {
      waitOnAllRegionsToClose(abortRequested);
      LOG.info("stopping server " + this.serverNameFromMasterPOV +
        "; all regions closed.");
    }

    //fsOk flag may be changed when closing regions throws exception.
    if (!this.killed && this.fsOk) {
      closeWAL(abortRequested ? false : true);
    }

    // Make sure the proxy is down.
    if (this.hbaseMaster != null) {
      HBaseRPC.stopProxy(this.hbaseMaster);
      this.hbaseMaster = null;
    }
    this.leases.close();
    try {
      deleteMyEphemeralNode();
    } catch (KeeperException e) {
      LOG.warn("Failed deleting my ephemeral node", e);
    }
    this.zooKeeper.close();
    LOG.info("stopping server " + this.serverNameFromMasterPOV +
      "; zookeeper connection closed.");

    if (!killed) {
    /*
     *  关闭compactionChecker、cacheFlusher、hlogRoller、
     *  compactSplitThread、service、
     */
      join();
    }
    LOG.info(Thread.currentThread().getName() + " exiting");
  }

总结:
     HRegionServer启动
     1.初始化ZooKeeper,创建到ZooKeeper的连接    
     2.创建MasterAddressManager master的地址管理器,之后阻塞,一直等待master上线。
     3.master创建之后,创建ClusterStatusTracker对象,等待集群的启动,即等待master将zookeeper设置为up状态。 
     4.创建CatalogTracker对象,并启动它,从而获取-ROOT-和.META.的地址
     停止:
关闭compactionChecker、cacheFlusher、hlogRoller、compactSplitThread、service、replicationHandler线程

抱歉!评论已关闭.