HRegionServer的功能
1. HRegionServer主要负责响应用户I/O请求,向底层文件系统中读写数据,是HBase中最核心的模块。
HRegionServer内存管理了一系列的HRegion对象,每个HRegion对应了table中的一个Region.
HRegion由多个HStore组成,每个HStore对应了table中的一个colum family的存储。即就是一个集中的存储单元。
2. HRegionServer实现了4个接口类:HRegionInterface, HBaseRPCErrorHandler,Runnable, RegionServerServices
HRegionInterface:Client通过此接口与HRegionServer进行交互,如获取region的meta信息(getRegionInfo)、get、put、delete等操作。
HBaseRPCErrorHandler:处理RPC调用错误,如OOME(OutOfMemoryError)
Runnable:Runnable是Thread的接口,Runnable只有一个run方法。
RegionServerServices:RegionServer提供的服务接口
3.HRegionServer的入口:
/** * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine */ public static void main(String[] args) throws Exception { VersionInfo.logVersion(); Configuration conf = HBaseConfiguration.create(); @SuppressWarnings("unchecked") Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); new HRegionServerCommandLine(regionServerClass).doMain(args); }
从主函数main,然后调用其相应的CommandLine类HRegionServerCommandLine的doMain方法启动。
4.HRegionServer启动
下面来分析下HRegionServer的启动过程,HRegionServer实现了Runnable接口,所以主要看一下run方法的实现。
(1)startRegionServer方法:
/** * @param hrs * @param name * @return Thread the RegionServer is running in correctly named. * @throws IOException */ public static Thread startRegionServer(final HRegionServer hrs, final String name) throws IOException { Thread t = new Thread(hrs); t.setName(name); t.start(); // Install shutdown hook that will catch signals and run an orderly shutdown // of the hrs. ShutdownHook.install(hrs.getConfiguration(), FileSystem.get(hrs .getConfiguration()), hrs, t); return t; }
(2)run方法:
/** * The HRegionServer sticks in this loop until closed. */ @SuppressWarnings("deprecation") public void run() { try { // Do pre-registration initializations; zookeeper, lease threads, etc. /* * initializeZooKeeper() * 1.初始化ZooKeeper,创建到ZooKeeper的连接 * 2.然后创建MasterAddressManager master的地址管理器,之后阻塞,一直等待master上线。 * 3.master创建之后,创建ClusterStatusTracker对象,等待集群的启动,即等待master将zookeeper设置为up状态。 * 4.最后就是创建CatalogTracker对象,并启动它,从而获取-ROOT-和.META.的地址 */ preRegistrationInitialization(); } catch (Throwable e) { abort("Fatal exception during initialization", e); } try { // Try and register with the Master; tell it we are here. Break if // server is stopped or the clusterup flag is down or hdfs went wacky. //等待集群up while (keepLooping()) { MapWritable w = reportForDuty(); if (w == null) { LOG.warn("reportForDuty failed; sleeping and then retrying."); this.sleeper.sleep(); } else { handleReportForDutyResponse(w); break; } } registerMBean(); // We registered with the Master. Go into run mode. long lastMsg = 0; long oldRequestCount = -1; // The main run loop. while (!this.stopped && isHealthy()) { if (!isClusterUp()) { if (isOnlineRegionsEmpty()) { stop("Exiting; cluster shutdown set and not carrying any regions"); } else if (!this.stopping) { this.stopping = true; LOG.info("Closing user regions"); closeUserRegions(this.abortRequested); } else if (this.stopping) { boolean allUserRegionsOffline = areAllUserRegionsOffline(); if (allUserRegionsOffline) { // Set stopped if no requests since last time we went around the loop. // The remaining meta regions will be closed on our way out. if (oldRequestCount == this.requestCount.get()) { stop("Stopped; only catalog regions remaining online"); break; } oldRequestCount = this.requestCount.get(); } else { // Make sure all regions have been closed -- some regions may // have not got it because we were splitting at the time of // the call to closeUserRegions. closeUserRegions(this.abortRequested); } LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString()); } } long now = System.currentTimeMillis(); if ((now - lastMsg) >= msgInterval) { doMetrics(); tryRegionServerReport(); lastMsg = System.currentTimeMillis(); } if (!this.stopped) this.sleeper.sleep(); } // for } catch (Throwable t) { if (!checkOOME(t)) { abort("Unhandled exception: " + t.getMessage(), t); } } // Run shutdown. if (mxBean != null) { MBeanUtil.unregisterMBean(mxBean); mxBean = null; } this.leases.closeAfterLeasesExpire(); this.rpcServer.stop(); if (this.splitLogWorker != null) { splitLogWorker.stop(); } if (this.infoServer != null) { LOG.info("Stopping infoServer"); try { this.infoServer.stop(); } catch (Exception e) { e.printStackTrace(); } } // Send cache a shutdown. if (cacheConfig.isBlockCacheEnabled()) { cacheConfig.getBlockCache().shutdown(); } // Send interrupts to wake up threads if sleeping so they notice shutdown. // TODO: Should we check they are alive? If OOME could have exited already if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary(); if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary(); if (this.hlogRoller != null) this.hlogRoller.interruptIfNecessary(); if (this.compactionChecker != null) this.compactionChecker.interrupt(); if (this.killed) { // Just skip out w/o closing regions. Used when testing. } else if (abortRequested) { if (this.fsOk) { closeAllRegions(abortRequested); // Don't leave any open file handles } LOG.info("aborting server " + this.serverNameFromMasterPOV); } else { closeAllRegions(abortRequested); closeAllScanners(); LOG.info("stopping server " + this.serverNameFromMasterPOV); } // Interrupt catalog tracker here in case any regions being opened out in // handlers are stuck waiting on meta or root. if (this.catalogTracker != null) this.catalogTracker.stop(); if (this.fsOk) { waitOnAllRegionsToClose(abortRequested); LOG.info("stopping server " + this.serverNameFromMasterPOV + "; all regions closed."); } //fsOk flag may be changed when closing regions throws exception. if (!this.killed && this.fsOk) { closeWAL(abortRequested ? false : true); } // Make sure the proxy is down. if (this.hbaseMaster != null) { HBaseRPC.stopProxy(this.hbaseMaster); this.hbaseMaster = null; } this.leases.close(); try { deleteMyEphemeralNode(); } catch (KeeperException e) { LOG.warn("Failed deleting my ephemeral node", e); } this.zooKeeper.close(); LOG.info("stopping server " + this.serverNameFromMasterPOV + "; zookeeper connection closed."); if (!killed) { /* * 关闭compactionChecker、cacheFlusher、hlogRoller、 * compactSplitThread、service、 */ join(); } LOG.info(Thread.currentThread().getName() + " exiting"); }
总结:
HRegionServer启动
1.初始化ZooKeeper,创建到ZooKeeper的连接
2.创建MasterAddressManager master的地址管理器,之后阻塞,一直等待master上线。
3.master创建之后,创建ClusterStatusTracker对象,等待集群的启动,即等待master将zookeeper设置为up状态。
4.创建CatalogTracker对象,并启动它,从而获取-ROOT-和.META.的地址
停止:
关闭compactionChecker、cacheFlusher、hlogRoller、compactSplitThread、service、replicationHandler线程