DFSClient是分布式文件系统客户端,它能够连接到Hadoop文件系统执行指定任务,那么它要与Namenode与Datanode基于一定的协议来进行通信。这个通信过程中,涉及到不同进程之间的通信。在org.apache.hadoop.ipc包中,定义了进程间通信的Client端与Server端的抽象,也就是基于C/S模式进行通信。这里先对org.apache.hadoop.ipc包中有关类的源代码阅读分析。
首先看该包中类的继承关系,如下所示:
我阅读该包源程序的方法是,先从C/S通讯的两端Client类与Server类来阅读分析,然后再对实现的一个RPC类进行分析。
Client类
首先从Client客户端类的实现开始,该类定义了如下属性:
private SocketFactory socketFactory; // Socket工厂,用来创建Socket连接
private int refCount = 1;
final private static String PING_INTERVAL_NAME = "ipc.ping.interval"; // 通过配置文件读取ping间隔
final static int DEFAULT_PING_INTERVAL = 60000; // 默认ping间隔为1分钟
final static int PING_CALL_ID = -1;
从属性可以看出,一个Clinet主要处理的是与服务端进行连接的工作,包括连接的创建、监控等。为了能够了解到Client如何实现它所抽象的操作,先分别看一下该类定义的5个内部类:
- Client.Call内部类
该内部类,是客户端调用的一个抽象,主要定义了一次调用所需要的条件,以及修改Client客户端的一些全局统计变量,如下所示:
protected Call(Writable param) {
this.param = param;
synchronized (Client.this) {
this.id = counter++; // 互斥修改法:对多个连接的调用线程进行统计
}
}
/** 调用完成,设置标志,唤醒其它线程 */
protected synchronized void callComplete() {
this.done = true;
notify(); // 唤醒其它调用者
}
/**
* 调用出错,同样置调用完成标志,并设置出错信息
*/
public synchronized void setException(IOException error) {
this.error = error;
callComplete();
}
/**
* 调用完成,设置调用返回的值
*/
public synchronized void setValue(Writable value) {
this.value = value;
callComplete();
}
}
上面的Call内部类主要是对一次调用的实例进行监视与管理,即使获取调用返回值,如果出错则获取出错信息,同时修改Client全局统计变量。
- Client.ConnectionId内部类
该内部类是一个连接的实体类,标识了一个连接实例的Socket地址、用户信息UserGroupInformation、连接的协议类。每个连接都是通过一个该类的实例唯一标识。如下所示:
该类中有一个用来判断两个连接ConnectionId实例是否相等的equals方法:
只有当Socket地址、用户信息UserGroupInformation、连接的协议类这三个属性的值相等时,才被认为是同一个ConnectionId实例。
- Client.ParallelResults内部类
该内部类是用来收集在并行调用环境中结果的实体类,如下所示:
public ParallelResults(int size) {
this.values = new Writable[size];
this.size = size;
}
/** 收集并行调用返回值 */
public synchronized void callComplete(ParallelCall call) {
values[call.index] = call.value; // 存储返回值
count++; // 统计返回值个数
if (count == size) // 并行调用的多个调用完成
notify(); // 唤醒下一个实例
}
}
- Client.ParallelCall内部类
该内部类继承自上面的内部类Call,只是返回值使用上面定义的ParallelResults实体类来封装,如下所示:
public ParallelCall(Writable param, ParallelResults results, int index) {
super(param);
this.results = results;
this.index = index;
}
/** 收集并行调用返回结果值 */
protected void callComplete() {
results.callComplete(this);
}
}
- Client.Connection内部类
该类是一个连接管理内部线程类,该内部类是一个连接线程,继承自Thread类。它读取每一个Call调用实例执行后从服务端返回的响应信息,并通知其他调用实例。每一个连接具有一个连接到远程主机的Socket,该Socket能够实现多路复用,使得多个调用复用该Socket,客户端收到的调用得到的响应可能是无序的。
该类定义的属性如下所示:
private Socket socket = null; // 客户端已连接的Socket
private DataInputStream in;
private DataOutputStream out;
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>(); // 当前活跃的调用列表
private AtomicLong lastActivity = new AtomicLong();// 最后I/O活跃的时间
private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // 连接是否关闭
private IOException closeException; // 连接关闭原因
上面使用到了java.util.concurrent.atomic包中的一些工具,像AtomicLong、AtomicBoolean,这些类能够以原子方式更新其值,支持在单个变量上解除锁二实现线程的安全。这些类能够使用get方法读取volatile变量的内存效果,set方法可以设置对应变量的内存值。通过后面的代码可以看到该类工具类的使用。例如:
上面定义的calls集合,是用来保存当前活跃的调用实例,以键值对的形式保存,键是一个Call的id,值是Call的实例,因此,该类一定提供了向该集合中添加新的调用实例、移除调用实例等等操作,分别将方法签名列表如下:
/*
* 等待某个调用线程唤醒自己,可能开始如下操作:
* 1、读取RPC响应数据
* 2、idle时间过长
* 3、被标记为应该关闭
* 4、客户端已经终止
*/
private synchronized boolean waitForWork();
/*
* 接收到响应(因为每次从DataInputStream in中读取响应信息只有一个,无需同步)
*/
private void receiveResponse();
/*
* 关闭连接,需要迭代calls集合,清除连接
*/
private synchronized void close();
可以看到,当每次调用touch方法的时候,都会将lastActivity原子变量设置为系统的当前时间,更新了变量的值。该操作是对多个线程进行互斥的,也就是每次修改lastActivity的值的时候,都会对该变量加锁,从内存中读取该变量的当前值,因此可能会出现阻塞的情况。
下面看一个Connection实例的构造实现:
UserGroupInformation ticket = remoteId.getTicket(); // 用户信息
Class<?> protocol = remoteId.getProtocol(); // 协议
header = new ConnectionHeader(protocol == null ? null : protocol.getName(), ticket); // 连接头信息
this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " + remoteId.getAddress().toString() + " from " + ((ticket==null)?"an unknown user":ticket.getUserName()));
this.setDaemon(true); // 并设置一个连接为后台线程
}
通过Collection实例的构造,可以看到,客户端所拥有的Connection实例,通过一个远程ConnectionId实例来建立到客户端到服务端的连接。接着看一下Connection线程类线程体代码:
while (waitForWork()) {// 等待某个连接实例空闲,如果存在则唤醒它执行一些任务
receiveResponse(); // 接收RPC响应
}
close(); // 关闭
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": stopped, remaining connections " + connections.size());
}
客户端Client类提供的最基本的功能就是执行RPC调用,其中,提供了两种调用方式,一种就是串行单个调用,另一种就是并行调用,分别介绍如下。首先是串行单个调用的实现方法call,如下所示:
if (interrupted) {
// set the interrupt flag now that we are done waiting
Thread.currentThread().interrupt();
}
if (call.error != null) {
if (call.error instanceof RemoteException) {
call.error.fillInStackTrace();
throw call.error;
} else { // local exception
throw wrapException(addr, call.error);
}
} else {
return call.value; // 调用返回的响应值
}
}
}
然后,就是并行调用的实现call方法,如下所示:
return results.values; // 调用返回一组响应值
}
}
客户端可以根据服务端暴露的远程地址,来与服务器建立连接,并执行RPC调用,发送调用参数数据。
Server端有点复杂,后面单独分析其实现过程和机制。