存档四月 2017

Hadoop之HDFS原理及文件上传下载源码分析(下)

  上篇Hadoop之HDFS原理及文件上传下载源码分析(上)楼主主要介绍了hdfs原理及FileSystem的初始化源码解析, Client如何与NameNode建立RPC通信。本篇将继续介绍hdfs文件上传、下载源解析。

文件上传

  先上文件上传的方法调用过程时序图:

  

  

   其主要执行过程:

  1.    FileSystem初始化,Client拿到NameNodeRpcServer代理对象,建立与NameNode的RPC通信(楼主上篇已经介绍过了)
  2.    调用FileSystem的create()方法,由于实现类为DistributedFileSystem,所有是调用该类中的create()方法
  3.    DistributedFileSystem持有DFSClient的引用,继续调用DFSClient中的create()方法
  4.    DFSOutputStream提供的静态newStreamForCreate()方法中调用NameNodeRpcServer服务端的create()方法并创建DFSOutputStream输出流对象返回
  5.    通过hadoop提供的IOUtil工具类将输出流输出到本地

  下面我们来看下源码:

  首先初始化文件系统,建立与服务端的RPC通信

  

1 HDFSDemo.java
2 OutputStream os = fs.create(new Path("/test.log"));

 

  调用FileSystem的create()方法,由于FileSystem是一个抽象类,这里实际上是调用的该类的子类create()方法

  

1  //FileSystem.java
2 public abstract FSDataOutputStream create(Path f,
3       FsPermission permission,
4       boolean overwrite,
5       int bufferSize,
6       short replication,
7       long blockSize,
8       Progressable progress) throws IOException;

   前面我们已经说过FileSystem.get()返回的是DistributedFileSystem对象,所以这里我们直接进入DistributedFileSystem:

 

  

 1   //DistributedFileSystem.java
 2 @Override
 3   public FSDataOutputStream create(final Path f, final FsPermission permission,
 4     final EnumSet cflags, final int bufferSize,
 5     final short replication, final long blockSize, final Progressable progress,
 6     final ChecksumOpt checksumOpt) throws IOException {
 7     statistics.incrementWriteOps(1);
 8     Path absF = fixRelativePart(f);
 9     return new FileSystemLinkResolver() {
10       @Override
11       public FSDataOutputStream doCall(final Path p)
12           throws IOException, UnresolvedLinkException {
13         final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
14                 cflags, replication, blockSize, progress, bufferSize,
15                 checksumOpt);
16         //dfs为DistributedFileSystem所持有的DFSClient对象,这里调用DFSClient中的create()方法
17         return dfs.createWrappedOutputStream(dfsos, statistics);
18       }
19       @Override
20       public FSDataOutputStream next(final FileSystem fs, final Path p)
21           throws IOException {
22         return fs.create(p, permission, cflags, bufferSize,
23             replication, blockSize, progress, checksumOpt);
24       }
25     }.resolve(this, absF);
26   }

  DFSClient的create()返回一个DFSOutputStream对象:

  

 1  //DFSClient.java
 2 public DFSOutputStream create(String src, 
 3                              FsPermission permission,
 4                              EnumSet flag, 
 5                              boolean createParent,
 6                              short replication,
 7                              long blockSize,
 8                              Progressable progress,
 9                              int buffersize,
10                              ChecksumOpt checksumOpt,
11                              InetSocketAddress[] favoredNodes) throws IOException {
12     checkOpen();
13     if (permission == null) {
14       permission = FsPermission.getFileDefault();
15     }
16     FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
17     if(LOG.isDebugEnabled()) {
18       LOG.debug(src + ": masked=" + masked);
19     }
20     //调用DFSOutputStream的静态方法newStreamForCreate,返回输出流
21     final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
22         src, masked, flag, createParent, replication, blockSize, progress,
23         buffersize, dfsClientConf.createChecksum(checksumOpt),
24         getFavoredNodesStr(favoredNodes));
25     beginFileLease(result.getFileId(), result);
26     return result;
27   }

  我们继续看下newStreamForCreate()中的业务逻辑:

  

 1 //DFSOutputStream.java
 2  static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
 3       FsPermission masked, EnumSet flag, boolean createParent,
 4       short replication, long blockSize, Progressable progress, int buffersize,
 5       DataChecksum checksum, String[] favoredNodes) throws IOException {
 6     TraceScope scope =
 7         dfsClient.getPathTraceScope("newStreamForCreate", src);
 8     try {
 9       HdfsFileStatus stat = null;
10       boolean shouldRetry = true;
11       int retryCount = CREATE_RETRY_COUNT;
12       while (shouldRetry) {
13         shouldRetry = false;
14         try {
15           //这里通过dfsClient的NameNode代理对象调用NameNodeRpcServer中实现的create()方法
16           stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
17               new EnumSetWritable(flag), createParent, replication,
18               blockSize, SUPPORTED_CRYPTO_VERSIONS);
19           break;
20         } catch (RemoteException re) {
21           IOException e = re.unwrapRemoteException(
22               AccessControlException.class,
23               DSQuotaExceededException.class,
24               FileAlreadyExistsException.class,
25               FileNotFoundException.class,
26               ParentNotDirectoryException.class,
27               NSQuotaExceededException.class,
28               RetryStartFileException.class,
29               SafeModeException.class,
30               UnresolvedPathException.class,
31               SnapshotAccessControlException.class,
32               UnknownCryptoProtocolVersionException.class);
33           if (e instanceof RetryStartFileException) {
34             if (retryCount > 0) {
35               shouldRetry = true;
36               retryCount--;
37             } else {
38               throw new IOException("Too many retries because of encryption" +
39                   " zone operations", e);
40             }
41           } else {
42             throw e;
43           }
44         }
45       }
46       Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
47      //new输出流对象
48       final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
49           flag, progress, checksum, favoredNodes);
50       out.start();//调用内部类DataStreamer的start()方法,DataStreamer继承Thread,所以说这是一个线程,从NameNode中申请新的block信息;
                同时前面我们介绍hdfs原理的时候提到的流水线作业(Pipeline)也是在这里实现,有兴趣的同学可以去研究下,这里就不带大家看了
51 return out; 52 } finally { 53 scope.close(); 54 } 55 }

    

  到此,Client拿到了服务端的输出流对象,那么后面就容易了,都是一些简答的文件输出,输入流的操作(hadoop提供的IOUitl)。

文件下载

  文件上传的大致流程与文件下载类似,与上传一样,我们先上程序方法调用时序图:

  

  主要执行过程:  

  1.    FileSystem初始化,Client拿到NameNodeRpcServer代理对象,建立与NameNode的RPC通信(与前面一样)
  2.    调用FileSystem的open()方法,由于实现类为DistributedFileSystem,所有是调用该类中的open()方法
  3.    DistributedFileSystem持有DFSClient的引用,继续调用DFSClient中的open()方法
  4.    实例化DFSInputStream输入流
  5.    调用openinfo()方法
  6.    调用fetchLocatedBlocksAndGetLastBlockLength()方法,抓取block信息并获取最后block长度
  7.        调用DFSClient中的getLocatedBlocks()方法,获取block信息
  8.    在callGetBlockLocations()方法中通过NameNode代理对象调用NameNodeRpcServer的getBlockLocations()方法
  9.        将block信息写入输出流
  10.        交给IOUtil,下载文件到本地

  接下来,我们开始看源码:

  首先任然是FileSystem的初始化,前面有,这里就不贴出来了,我们直接从DistributedFileSystem的open()开始看。

  

 1 //DistributedFifeSystem.java
 2 @Override
 3   public FSDataInputStream open(Path f, final int bufferSize)
 4       throws IOException {
 5     statistics.incrementReadOps(1);
 6     Path absF = fixRelativePart(f);
 7     return new FileSystemLinkResolver() {
 8       @Override
 9       public FSDataInputStream doCall(final Path p)
10           throws IOException, UnresolvedLinkException {
11         final DFSInputStream dfsis =
12           dfs.open(getPathName(p), bufferSize, verifyChecksum);
13         //dfs为DFSClient对象,调用open()返回输入流
14         return dfs.createWrappedInputStream(dfsis);
15       }
16       @Override
17       public FSDataInputStream next(final FileSystem fs, final Path p)
18           throws IOException {
19         return fs.open(p, bufferSize);
20       }
21     }.resolve(this, absF);
22   }

  DFSClient中并没有直接使用NameNode的代理对象,而是传给了DFSInputStream:

  

 1 //DFSClient.java
 2 public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
 3       throws IOException, UnresolvedLinkException {
 4     checkOpen();   
 5     TraceScope scope = getPathTraceScope("newDFSInputStream", src);
 6     try {
 7       //这里并没有直接通过NameNode的代理对象调用服务端的方法,直接new输入流并把当前对象作为参数传入
 8       return new DFSInputStream(this, src, verifyChecksum);
 9     } finally {
10       scope.close();
11     }
12   }

  那么在DFSInputStream必须持有DFSClient的引用:

  

 1 //DFSInputStream.java 构造
 2 DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum
 3                  ) throws IOException, UnresolvedLinkException {
 4     this.dfsClient = dfsClient;//只有DFSClient的引用
 5     this.verifyChecksum = verifyChecksum;
 6     this.src = src;
 7     synchronized (infoLock) {
 8       this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
 9     }
10     openInfo();//调openInfo()
11   }

  openInfo()用来抓取block信息:

 1 void openInfo() throws IOException, UnresolvedLinkException {
 2     synchronized(infoLock) {
 3       lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();//抓取block信息
 4       int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;//获取配置信息,尝试抓取的次数,楼主记得在2.6以前这里写的3;当然,现在的默认值也为3
 5       while (retriesForLastBlockLength > 0) {
 6         if (lastBlockBeingWrittenLength == -1) {
 7           DFSClient.LOG.warn("Last block locations not available. "
 8               + "Datanodes might not have reported blocks completely."
 9               + " Will retry for " + retriesForLastBlockLength + " times");
10           waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
11           lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
12         } else {
13           break;
14         }
15         retriesForLastBlockLength--;
16       }
17       if (retriesForLastBlockLength == 0) {
18         throw new IOException("Could not obtain the last block locations.");
19       }
20     }
21   }

 

  获取block信息:

 1 //DFSInputStream.java
 2 private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
 3     final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
 4     //回到DFSClient中来获取当前block信息
 5     if (DFSClient.LOG.isDebugEnabled()) {
 6       DFSClient.LOG.debug("newInfo = " + newInfo);
 7     }
 8     if (newInfo == null) {
 9       throw new IOException("Cannot open filename " + src);
10     }
11 
12     if (locatedBlocks != null) {
13       Iterator oldIter = locatedBlocks.getLocatedBlocks().iterator();
14       Iterator newIter = newInfo.getLocatedBlocks().iterator();
15       while (oldIter.hasNext() && newIter.hasNext()) {
16         if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
17           throw new IOException("Blocklist for " + src + " has changed!");
18         }
19       }
20     }
21     locatedBlocks = newInfo;
22     long lastBlockBeingWrittenLength = 0;
23     if (!locatedBlocks.isLastBlockComplete()) {
24       final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
25       if (last != null) {
26         if (last.getLocations().length == 0) {
27           if (last.getBlockSize() == 0) {         
28             return 0;
29           }
30           return -1;
31         }
32         final long len = readBlockLength(last);
33         last.getBlock().setNumBytes(len);
34         lastBlockBeingWrittenLength = len; 
35       }
36     }
37 
38     fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
39     //返回block开始写的位置
40     return lastBlockBeingWrittenLength;
41   }

  回到DFSClient中:

  

 1 DFSClient.java
 2 @VisibleForTesting
 3   public LocatedBlocks getLocatedBlocks(String src, long start, long length)
 4       throws IOException {
 5     TraceScope scope = getPathTraceScope("getBlockLocations", src);
 6     try {
 7       //这里NameNode作为参数传递到callGetBlockLocations()中
 8       return callGetBlockLocations(namenode, src, start, length);
 9     } finally {
10       scope.close();
11     }
12   }

  调用服务端方法,返回block信息:

 1 //DFSClient.java
 2 static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
 3       String src, long start, long length) 
 4       throws IOException {
 5     try {
 6      //看到这里,不用做过多的解释了吧?
 7       return namenode.getBlockLocations(src, start, length);
 8     } catch(RemoteException re) {
 9       throw re.unwrapRemoteException(AccessControlException.class,
10                                      FileNotFoundException.class,
11                                      UnresolvedPathException.class);
12     }
13   }

  最终将文件block相关信息写入输入流,通过工具类IOUtil输出到本地文件。

  那关于hadoop之hdfs原理及文件上传下载源码解析就写到这里,下系列的文章,楼主会写一些关于mapreduce或者hive相关的文章分享给大家。

  示例代码地址:https://github.com/LJunChina/hadoop

  

  

Hadoop之HDFS原理及文件上传下载源码分析(上)

HDFS原理

  首先说明下,hadoop的各种搭建方式不再介绍,相信各位玩hadoop的同学随便都能搭出来。

  楼主的环境:

  •   操作系统:Ubuntu 15.10
  •   hadoop版本:2.7.3
  •   HA:否(随便搭了个伪分布式)

文件上传

下图描述了Client向HDFS上传一个200M大小的日志文件的大致过程:

  首先,Client发起文件上传请求,即通过RPC与NameNode建立通讯。

  NameNode与各DataNode使用心跳机制来获取DataNode信息。NameNode收到Client请求后,获取DataNode信息,并将可存储文件的节点信息返回给Client。

  Client收到NameNode返回的信息,与对应的DataNode节点取得联系,并向该节点写文件。

  文件写入到DataNode后,以流水线的方式复制到其他DataNode(当然,这里面也有DataNode向NameNode申请block,这里不详细介绍),至于复制多少份,与所配置的hdfs-default.xml中的dfs.replication相关。

  元数据存储

  先明确几个概念:

  fsimage:元数据镜像文件。存储某一时段NameNode内存元数据信息。
  edits:操作日志文件。
  fstime:保存最近一次checkpoint的时间

  checkpoint可在hdfs-default.xml中具体配置,默认为3600秒:

1 property>
2   name>dfs.namenode.checkpoint.periodname>
3   value>3600value>
4   description>The number of seconds between two periodic checkpoints.
5   description>
6 property>

 

  fsimage和edits文件在namenode目录可以看到:

NameNode中的元数据信息:

 

  

  test.log文件上传后,Namenode始终在内存中保存metedata,用于处理“读请求”。metedata主要存储了文件名称(FileName),副本数量(replicas),分多少block存储(block-ids),分别存储在哪个节点上(id2host)等。

  到有“写请求”到来时,namenode会首先写editlog到磁盘,即向edits文件中写日志,成功返回后,才会修改内存,并且向客户端返回
  hadoop会维护一个fsimage文件,也就是namenode中metedata的镜像,但是fsimage不会随时与namenode内存中的metedata保持一致,而是每隔一段时间通过合并edits文件来更新内容。此时Secondary namenode就派上用场了,合并fsimage和edits文件并更新NameNode的metedata。
  Secondary namenode工作流程:

  1. secondary通知namenode切换edits文件
  2. secondary通过http请求从namenode获得fsimage和edits文件
  3. secondary将fsimage载入内存,然后开始合并edits
  4. secondary将新的fsimage发回给namenode
  5. namenode用新的fsimage替换旧的fsimage

  通过一张图可以表示为:

 文件下载

  文件下载相对来说就简单一些了,如图所示,Client要从DataNode上,读取test.log文件。而test.log由block1和block2组成。

  

  文件下载的主要流程为:

  • client向namenode发送请求。
  • namenode查看Metadata信息,返回test.log的block的位置。     

    Block1: h0,h1,h3
    Block2: h0,h2,h4

  • 开始从h0节点下载block1,block2。

源码分析

  我们先简单使用hadoop提供的API来实现文件的上传下载(文件删除、改名等操作比较简单,这里不演示):

 

  

 1 package cn.jon.hadoop.hdfs;
 2 
 3 import java.io.FileInputStream;
 4 import java.io.FileOutputStream;
 5 import java.io.IOException;
 6 import java.io.InputStream;
 7 import java.io.OutputStream;
 8 import java.net.URI;
 9 import java.net.URISyntaxException;
10 
11 import org.apache.hadoop.conf.Configuration;
12 import org.apache.hadoop.fs.FileSystem;
13 import org.apache.hadoop.fs.Path;
14 import org.apache.hadoop.io.IOUtils;
15 import org.junit.Before;
16 import org.junit.Test;
17 
18 public class HDFSDemo {
19     FileSystem fs = null;    
20     @Before
21     public void init(){
22         try {
23             //初始化文件系统
24             fs = FileSystem.get(new URI("hdfs://hadoopmaster:9000"), new Configuration(), "root");
25         } catch (IOException e) {
26             e.printStackTrace();
27         } catch (InterruptedException e) {
28             e.printStackTrace();
29         } catch (URISyntaxException e) {
30             e.printStackTrace();
31         }
32     }
33     public static void main(String[] args) {
34         
35     }
36     @Test
37     /**
38      * 文件上传
39      */
40     public void testFileUpload(){
41         try {
42             OutputStream os = fs.create(new Path("/test.log"));
43             FileInputStream fis = new FileInputStream("I://test.log");
44             IOUtils.copyBytes(fis, os, 2048,true);
45             //可以使用hadoop提供的简单方式
46             fs.copyFromLocalFile(new Path("I://test.log"), new Path("/test.log"));
47         } catch (IllegalArgumentException | IOException e) {
48             e.printStackTrace();
49         }
50     }
51     @Test    
52     /**
53      * 文件下载
54      */
55     public void testFileDownload(){
56         try {
57             InputStream is = fs.open(new Path("/test.log"));
58             FileOutputStream fos = new FileOutputStream("E://test.log");            
59             IOUtils.copyBytes(is, fos, 2048);
60             //可以使用hadoop提供的简单方式
61             fs.copyToLocalFile(new Path("/test.log"), new Path("E://test.log"));
62         } catch (IllegalArgumentException | IOException e) {
63             e.printStackTrace();
64         }
65     }
66 
67 }

  显而易见,只要是对hdfs上的文件进行操作,必须对FileSystem进行初始化,我们先来分析FileSystem的初始化:

  

1  public static FileSystem get(URI uri, Configuration conf) throws IOException {
2     return CACHE.get(uri, conf);//部分方法我只截取了部分代码,这里进入get()方法
3   }
1    FileSystem get(URI uri, Configuration conf) throws IOException{
2       Key key = new Key(uri, conf);
3       return getInternal(uri, conf, key);//调用getInternal()
4     }
 1 private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
 2      //使用单例模式创建FileSystem,这是由于FS的初始化需要大量的时间,使用单例保证只是第一次加载慢一些,返回FileSystem的子类实现DistributedFileSystem
 3       FileSystem fs;
 4       synchronized (this) {
 5         fs = map.get(key);
 6       }
 7       if (fs != null) {
 8         return fs;
 9       }
10 
11       fs = createFileSystem(uri, conf);
12       synchronized (this) { // refetch the lock again
13         FileSystem oldfs = map.get(key);
14         if (oldfs != null) { // a file system is created while lock is releasing
15           fs.close(); // close the new file system
16           return oldfs;  // return the old file system
17         }
18         
19         // now insert the new file system into the map
20         if (map.isEmpty()
21                 && !ShutdownHookManager.get().isShutdownInProgress()) {
22           ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
23         }
24         fs.key = key;
25         map.put(key, fs);
26         if (conf.getBoolean("fs.automatic.close", true)) {
27           toAutoClose.add(key);
28         }
29         return fs;
30       }
31     }

 

 1 public void initialize(URI uri, Configuration conf) throws IOException {
 2     super.initialize(uri, conf);
 3     setConf(conf);
 4 
 5     String host = uri.getHost();
 6     if (host == null) {
 7       throw new IOException("Incomplete HDFS URI, no host: "+ uri);
 8     }
 9     homeDirPrefix = conf.get(
10         DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,
11         DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);
12     
13     this.dfs = new DFSClient(uri, conf, statistics);//实例化DFSClient,并将它作为DistributedFileSystem的引用,下面我们跟进去
14     this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
15     this.workingDir = getHomeDirectory();
16   }

 

 1 public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
 2       Configuration conf, FileSystem.Statistics stats)
 3     throws IOException {
 4     //该构造太长,楼主只截取了重要部分给大家展示,有感兴趣的同学可以亲手进源码瞧瞧     
 5     NameNodeProxies.ProxyAndInfo proxyInfo = null;
 6     //这里声明了NameNode的代理对象,跟我们前面讨论的rpc就息息相关了
 7     if (proxyInfo != null) {
 8       this.dtService = proxyInfo.getDelegationTokenService();
 9       this.namenode = proxyInfo.getProxy();
10     } else if (rpcNamenode != null) {   
11       Preconditions.checkArgument(nameNodeUri == null);
12       this.namenode = rpcNamenode;
13       dtService = null;
14     } else {
15       Preconditions.checkArgument(nameNodeUri != null,
16           "null URI");
17       proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
18           ClientProtocol.class, nnFallbackToSimpleAuth);
19       this.dtService = proxyInfo.getDelegationTokenService();
20       this.namenode = proxyInfo.getProxy();//获取NameNode代理对象引用并自己持有,this.namenode类型为ClientProtocol,它是一个接口,我们看下这个接口
21     }
22   }

 

1 public interface ClientProtocol{
2       public static final long versionID = 69L;
3       //还有很多对NameNode操作的方法申明,包括对文件上传,下载,删除等
4       //楼主特意把versionID贴出来了,这就跟我们写的RPCDemo中的MyBizable接口完全类似,所以说Client一旦拿到该接口实现类的代理对象(NameNodeRpcServer),Client就可以实现与NameNode的RPC通信,我们继续跟进
5 }

 

 1  public static  ProxyAndInfo createProxy(Configuration conf,
 2       URI nameNodeUri, Class xface, AtomicBoolean fallbackToSimpleAuth)
 3       throws IOException {
 4     AbstractNNFailoverProxyProvider failoverProxyProvider =
 5         createFailoverProxyProvider(conf, nameNodeUri, xface, true,
 6           fallbackToSimpleAuth);  
 7     if (failoverProxyProvider == null) {
 8       // 如果不是HA的创建方式,楼主环境是伪分布式,所以走这里,我们跟进去
 9       return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
10           UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
11     } else {
12       // 如果有HA的创建方式
13       Conf config = new Conf(conf);
14       T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
15           RetryPolicies.failoverOnNetworkException(
16               RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts,
17               config.maxRetryAttempts, config.failoverSleepBaseMillis,
18               config.failoverSleepMaxMillis));
19       return new ProxyAndInfo(proxy, dtService,
20           NameNode.getAddress(nameNodeUri));
21     }
22   }

   最终返回的为ClientProtocol接口的子类代理对象,而NameNodeRpcServer类实现了ClientProtocol接口,因此返回的为NameNode的代理对象,当客户端拿到了NameNode的代理对象后,即与NameNode建立了RPC通信:

 1 private static ClientProtocol createNNProxyWithClientProtocol(
 2       InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
 3       boolean withRetries, AtomicBoolean fallbackToSimpleAuth)
 4       throws IOException {
 5     RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);//是不是感觉越来越像我们前面说到的RPC
 6 
 7     final RetryPolicy defaultPolicy = 
 8         RetryUtils.getDefaultRetryPolicy(//加载默认策虐
 9             conf, 
10             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, 
11             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT, 
12             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
13             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
14             SafeModeException.class);
15     
16     final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
17     //看到versionId了吗?这下明白了rpc的使用中目标接口必须要有这个字段了吧
18     ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
19         ClientNamenodeProtocolPB.class, version, address, ugi, conf,
20         NetUtils.getDefaultSocketFactory(conf),
21         org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
22         fallbackToSimpleAuth).getProxy();
23     //看到没?这里使用 RPC.getProtocolProxy()来创建ClientNamenodeProtocolPB对象,调试时可以清楚的看见,该对象引用的是一个代理对象,值为$Proxy12,由JDK的动态代理来实现。
24     //前面我们写RPCDemo程序时,用的是RPC.getProxy(),但是各位大家可以去看RPC源码,RPC.getProtocolProxy()最终还是调用的getProxy()
25     if (withRetries) {
26       Map methodNameToPolicyMap 
27                  = new HashMap();    
28       ClientProtocol translatorProxy =
29         new ClientNamenodeProtocolTranslatorPB(proxy);
30       return (ClientProtocol) RetryProxy.create(//这里再次使用代理模式对代理对象进行包装,也可以理解为装饰者模式
31           ClientProtocol.class,
32           new DefaultFailoverProxyProvider(
33               ClientProtocol.class, translatorProxy),
34           methodNameToPolicyMap,
35           defaultPolicy);
36     } else {
37       return new ClientNamenodeProtocolTranslatorPB(proxy);
38     }
39   }

  整个FileSystem的初始化用时序图表示为:

 

  到此,FileSystem的初始化就基本完成。由于文章篇幅过大的问题,所以楼主把HDFS原理及源码分析拆分成了两部分,上半部分主要是HDFS原理与FileSystem的初始化介绍,那在下半部分将会具体介绍HDFS文件上传、下载的源码解析。

  另外,文章用到的一些示例代码,将会在下半部分发布后,楼主一起上传到GitHub。

 

Hadoop之RPC简单使用(远程过程调用协议)

  一、RPC概述

  RPC是指远程过程调用,也就是说两台不同的服务器(不受操作系统限制),一个应用部署在Linux-A上,一个应用部署在Windows-B或Linux-B上,若A想要调用B上的某个方法method(),由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语意和传达调用的参数。

  楼主在接触RPC之前,用得最多的莫过于WebService。WebService可以说是在RPC发展的基础之上。RPC的协议有很多,比如最早的CORBA,Java RMI,Web Service等,又比如现在阿里巴巴的Dubbo,Apache下的hadoop项目。该篇楼主主要以hadoop的RPC为例。

  hadoop为何要使用RPC?在HDFS中,我们通过jsp可查看到有DataNode,NameNode,SecondaryNameNode主要进程(楼主只启动了HDFS),我们客户端Client与NameNode通信,NameNode与DataNode的通信,都是在不同进程间,不同系统间的通信。

  

  二、RPC流程

 

  通过下图,我们简单分析RPC的执行流程:

  

 

  首先,要解决通讯的问题,主要是通过在Client和Server之间建立TCP连接,远程过程调用的所有交换的数据都在这个连接里传输。连接可以是按需连接,调用结束后就断掉,也可以是长连接,多个远程过程调用共享同一个连接。

  第二,要解决寻址的问题,也就是说,A服务器上的应用怎么告诉底层的RPC框架,如何连接到B服务器(如主机或IP地址)以及特定的端口,方法的名称名称是什么,这样才能完成调用。

  第三,当Client上的应用发起远程过程调用时,方法的参数需要通过底层的网络协议如TCP传递到Server,由于网络协议是基于二进制的,内存中的参数的值要序列化成二进制的形式,也就是序列化(Serialize),通过寻址和传输将序列化的二进制发送给B服务器。

  第四,Server收到请求后,需要对参数进行反序列化(序列化的逆操作),恢复为内存中的表达方式,然后找到对应的方法(寻址的一部分)进行本地调用,然后得到返回值。

  三、hadoop—RPC的简单使用

  定义接口Bizable:

  

1 package cn.jon.hadoop.rpc;
2 
3 public interface MyBizable {
4     long versionID = 123456;//该字段必须要有,不然会报java.lang.NoSuchFieldException: versionID异常
5     public String doSomething(String str);
6 }

  服务端RPCServer实现MyBizable接口并绑定IP地址及端口号:

package cn.jon.hadoop.rpc;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;

public class RPCServer implements MyBizable {

    @Override
    public String doSomething(String str) {
        return str;
    }
    /**
     * @param args
     * @throws Exception 
     * @throws  
     */
    public static void main(String[] args) throws  Exception {
        Server server = new RPC.Builder(new Configuration())        
        .setProtocol(MyBizable.class)
        .setInstance(new RPCServer())
        .setBindAddress("192.168.8.100")
        .setPort(8077)
        .build();
        server.start();
    }

}

  客户端RPCClient:

  

package cn.jon.hadoop.rpc;

import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;

public class RPCClient {

    /**
     * @param args
     * @throws Exception 
     */
    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        MyBizable proxy = RPC.getProxy(MyBizable.class, 123456,new InetSocketAddress("192.168.8.100", 8077) , new Configuration());
        String result = proxy.doSomething("服务端");
        System.out.println(result);
        RPC.stopProxy(proxy);
    }

}

  楼主使用Linux作为客户端,Windows作为服务端,我们先把写好的程序打成jar,上传到Linux:

  

  然后,我们在windows端启动RPCServer:

  

  服务端启动好后,我们在Linux中执行RPCClient.jar:  

java -jar RPCClient.jar

  执行结果可以看到输出了“服务端”(楼主Linux时间没有调准确):

    

  相关代码地址:https://github.com/LJunChina/hadoop

  下篇楼主将对HDFS原理进行探讨,到时会更加详细的讨论RPC。