现在的位置: 首页 > 综合 > 正文

Cassandra启动过程详解

2013年09月07日 ⁄ 综合 ⁄ 共 4312字 ⁄ 字号 评论关闭

Cassandra启动过程详解

这里的分析从CassandraDaemon.java文件开始。

一、配置文件storage-config.xml的读取和log4j的配置文件log4j.property的设置。

配置文件的读取和解析都是在org.apache.cassandra.config.DatabaseDescriptor类中完成的,这个类的作用非常简单,就是读取配置文件中各个配置项所定义的值,经过简单的验证,符合条件就将其值赋给DatabaseDescriptor的私有静态常量。值得注意的是关于Keyspace的解析,按照ColumnFamily的配置信息构建成org.apache.cassandra.config.CFMetaData对象,最后把这些所有ColumnFamily放入KeyspaceHashMap对象org.apache.cassandra.config.KSMetaData中,每个Keyspace就是一个Table。这些信息都是作为基本的元信息,可以通过DatabaseDescriptor类直接获取。

二、Keyspace的初始化。

这里主要调用Table.open(tableName)方法创建每个Table的实例。创建Table的实例将完成:1)获取该Table的元信息TableMatedate2)创建改Table下每个ColumnFamily的存储操作对象ColumnFamilyStore3)启动定时程序,检查该ColumnFamilyMemtable设置的MemtableFlushAfterMinutes是否已经过期,过期立即写到磁盘。详细过程可参见我前面关于该方法的详细代码跟踪分析。

一个Keyspace对应一个Table,一个Table持有多个ColumnFamilyStore,而一个ColumnFamily对应一个ColumnFamilyStoreTable并没有直接持有ColumnFamily的引用而是持有ColumnFamilyStore,这是因为ColumnFamilyStore类中不仅定义了对ColumnFamily的各种操作而且它还持有ColumnFamily在各种状态下数据对象的引用,所以持有了ColumnFamilyStore就可以操作任何与ColumnFamily相关的数据了。

三、Commitlog日志文件的恢复。

这里调用CmmitLog.recover()方法主要完成这几个操作,发现是否有没有被写到磁盘的数据,恢复这个数据,构建新的日志文件。CommitLog日志文件的恢复策略是,在头文件中发现没有被序列化的最新的ColumnFamilyId,然后取出这个这个被序列化RowMutation对象的起始地址,反序列化成为RowMutation对象,后面的操作和新添一条数据的流程是一样的,如果这个RowMutation对象中的数据被成功写到磁盘中,那么会在CommitLog去掉已经被持久化的ColumnFamilyId

四、检查数据文件是否需要压缩

调用CompactionManager.instance.checkAllColumnFamilies()检查CF对应的数据文件是否需要压缩。将相似大小的SStable放到一个bucket中,然后调用submitMinorIfNeeded(cfs)

五、启动存储服务

这是启动过程最重要的一步,需要启动很多服务。具体步骤有:

5.1)创建StorageMetadata

调用方法SystemTable.initMetadata()创建StorageMetadata。元数据只创建一次,如果元数据已经存在,则直接返回。StorageMetadata将包含三个关键信息:本节点的Token、当前generation以及ClusterName。这三个信息被存在StorageService类的属性metadata中(metadataStorageMetadata类型的对象),以便后面随时调用。

Cassandra判断如果是第一次启动,Cassandra将会创建三列分别存储这些信息并将它们存在在系统表的LocationInfoColumnFamily中,key是“L”。这里的Token判断用户是否指定,如果指定了则使用用户指定的,否则随机生成一个Token,但是这个Token有可能在后面被修改;generation=System.currentTimeMillis()/
1000
ClusterName为读取配置文件得到的值

如果不是第一次启动将会更新这三个值:读取数据文件中的Token信息,generation信息以及ClusterName信息后设置Token值和ClusterName的值,更新generation的值为max(当前时间秒数,old_generation+1)。这里有点要注意的是,如果在后续的过程中更改了配置文件中ClusterName的名字,这会跟数据文件中存储的信息不一致,最终会导致Cassandra无法启动。

5.2)创建所有目录

调用方法DatabaseDescriptor.createAllDirectories()创建所有的目录。包括数据文件目录data/以及日志文件目录commitlog/。同时还为keyspaces创建了数据文件目录的子目录data/systemdata/keyspace...keyspace为用户定义的keyspace)。当然这个方法早在Table.open()已经调用过了,在这里再次调用可能是为了某些测试需要。

5.3)启动GCInspector.instance.start服务

主要是统计统计当前系统中资源的使用情况(主要就是内存使用和回收情况),将这个信息记录到日志文件中,这个可以作为系统的监控日志使用。

5.4)启动消息监听服务

这个消息监听服务就是监听整个集群中其它节点发送到本节点的所有消息,Cassandra会根据每个消息的类型,做出相应的反应。消息监听代码如下:

public void listen(InetAddress localEp) throws IOException {
		ServerSocketChannel serverChannel = ServerSocketChannel.open();
		final ServerSocket ss = serverChannel.socket();
		ss.setReuseAddress(true);
		ss.bind(new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()));
		socketThread = new SocketThread(ss, "ACCEPT-" + localEp);
		socketThread.start();
}

  1. 这里用到了nio里面的异步IO与连网的部分。监听端口默认为7000。创建一个线程SocketThread用于监听消息。

  2. 每接收到一个消息,就创建一个新的线程newIncomingTcpConnection(socket).start()进行消息响应;该线程run方法中主要是对消息进行魔数的验证,以及读取消息头部和消息体等内容,然后将消息内容反序列化的任务MessageDeserializationTask递交到相应的消息反序列化线程池messageDeserializerExecutor_

  3. MessageDeserializationTask反序列化消息内容后调用MessagingService.receive()处理消息

  4. receive()方法中创建MessageDeliveryTask任务对象,根据消息类型得到相应的stage的线程池对象,如果没有对应的线程池,则使用messageDeserializerExecutor_

  5. stage线程池执行MessageDeliveryTask任务,该任务主要是根据消息中的Verb,调用相应的VerbHandler.doVerb()方法来完成消息的处理。比如GossipDigestAckVerbHandler.doVerb()用来处理Gossip阶段的ACK消息。

5.5)启动StorageLoadBalancer.instance.startBroadcasting服务

调用方法loadTimer_.schedule(newLoadDisseminator(),
2 * Gossiper.
intervalInMillis_,BROADCAST_INTERVAL)
,定时得到节点负载信息,2Gossiper心跳后开始,间隔时间为60s。该任务得到节点数据总量(包括所有Data文件、FIlter文件以及Index文件),并将其更新到ApplicationState中,然后就可以通过这个state来和其它节点交换信息。这个load信息在数据的存储和新节点加入的时候,会有参考价值。

5.6)启动Gossiper服务

在启动Gossiper服务之前,将StorageService注册为观察者,一旦节点的某些状态发生变化,而这些状态是StorageService感兴趣的,StorageServiceonChange方法就会触发。Gossiper服务就是一个定时程序,它会创建一个EndPointState对象。EndPointState对象持有HeartBeatState的引用和ApplicationState的一个引用集Map<String,ApplicationState>
applicationState_ =
newHashtable<String,ApplicationState>()
。对于每个Application对象,EndPointState只保存一个最新的值,所以新值会覆盖旧值。

HeartBeatState对象记录了当前心跳的generationversion,这个generation和前面的StorageMetadata存储的generation是一致的,在节点每次启动的时候更新;而version是从0开始的,每次更新加1;每个节点有一个HeartBeatState对象与之关联。

ApplicationState的一个引用集Map<String,ApplicationState>
applicationState_
则是记录一些状态信息,比如前面startBroadcasting()过程中记录节点负载情况。

抱歉!评论已关闭.