分类目录后端开发

print2flash文档在线预览应用(java,.net)

一、背景  


 

  前段时间,LZ的boss突然给了出了这样一个需求:将原项目中的所有文章关联的附件TXT、PDF、office相关文件全部以flash的形式在网页上进行展示,便于预览。看似简单的需求,整个研发小组都懵逼的。LZ也没办法只好Google一把,找了好几套解决方案,基本上是调第三方软件的API,像什么flashpaper、print2flash等等。LZ先试了flashpaper,反正是各种问题,不支持64位系统什么的;print2flash在版本4以前也是很多问题,dll调用失败,dll缺失。最终,LZ在官网下载了最新的版本(商业软件,收费的),用起来还是相对来说比较稳定,至今没出现问题。提供的API还是比较丰富的,本篇LZ主要介绍java和.NET的操作。

二、print2flash安装


 

  这里LZ为了简单实验主要介绍破解版的安装(费了好大的力才找到的),如果商用,还是给钱找别人买吧。

  1)下载print2flash4.如果想使用服务器模式,大概需要600刀,也不是很贵。官网地址http://print2flash.com/

  2)运行print2flashsetup.exe一直下一步就OK。

  3)下载补丁文件。共3个文件p2fServer.exe,print2f4.exe,Print2FlashPrinter4.dll;如果之前已设置了windows服务方式启动,则先需要停止Print2Flash4服务;拷贝p2fServer.exe、print2f4.exe到安装目标文件夹替换原文件,拷贝Print2FlashPrinter4.dll到以下文件夹替换原始文件:C:\Windows\System32\spool\drivers\x64\以及C:\Windows\System32\spool\drivers\x64\3\

  4)注册print2flash4服务。运行print2flash,options->print2flash service configuration,输入Windows密码,没有的话就不用输,勾选Grant access and launch pemmissions to Everyone给EveryOne用户授予权限。点击OK,如果提示successful,Current status显示Installed表示安装成功。

  5)开启print2flash4服务。安装成功后,在windows服务中可以找到print2flash服务。启动该服务。

三、java操作API


 

  开始前我们需要将print2flash提供的SDK中的P2FConst.java引入:

  1 public class P2FConst
  2 {
  3     // 枚举常量
  4     public static final int MSEXCEL = 0x00000001;
  5     public static final int MSWORD = 0x00000002;
  6     public static final int MSPOWERPOINT = 0x00000004;
  7     public static final int ACROBAT = 0x00000008;
  8 
  9     
 10     public static final int JPEG = 1;
 11     public static final int PNG = 2;
 12 
 13     
 14     public static final int STRETCH = 1;
 15     public static final int TILE = 2;
 16 
 17     
 18     public static final int INTLOGO = 0x00000001;
 19     public static final int INTDRAG = 0x00000002;
 20     public static final int INTSELTEXT = 0x00000004;
 21     public static final int INTZOOMSLIDER = 0x00000008;
 22     public static final int INTZOOMBOX = 0x00000010;
 23     public static final int INTFITWIDTH = 0x00000020;
 24     public static final int INTFITPAGE = 0x00000040;
 25     public static final int INTPREVPAGE = 0x00000080;
 26     public static final int INTGOTOPAGE = 0x00000100;
 27     public static final int INTNEXTPAGE = 0x00000200;
 28     public static final int INTSEARCHBOX = 0x00000400;
 29     public static final int INTSEARCHBUT = 0x00000800;
 30     public static final int INTROTATE = 0x00001000;
 31     public static final int INTPRINT = 0x00002000;
 32     public static final int INTNEWWIND = 0x00004000;
 33     public static final int INTHELP = 0x00008000;
 34     public static final int INTBACKBUTTON = 0x00030000;
 35     public static final int INTBACKBUTTONAUTO = 0x00010000;
 36     public static final int INTFORWARDBUTTON = 0x000C0000;
 37     public static final int INTFORWARDBUTTONAUTO = 0x00040000;
 38     public static final int INTFULLSCREEN = 0x00300000;
 39     public static final int INTFULLSCREENAUTO = 0x00100000;
 40 
 41 
 42     // METADATAPORMAT enumeration constants
 43     public static final int XML = 1;
 44     public static final int TEXT = 2;
 45 
 46     // OUTPUTFORMAT enumeration constants
 47     public static final int SINGLEFILE = 1;
 48     public static final int SINGLEFILEPERPAGE = 2;
 49     public static final int EXTVIEWER = 4;
 50 
 51     // PAPER_ORIENTATION enumeration constants
 52     public static final int ORIENT_PORTRAIT = 1;
 53     public static final int ORIENT_LANDSCAPE = 2;
 54 
 55     // PROTECTION_OPTION enumeration constants
 56     public static final int PROTDISPRINT = 0x00000001;
 57     public static final int PROTDISTEXTCOPY = 0x00000002;
 58     public static final int PROTENAPI = 0x00000004;
 59 
 60     // ThreeStateFlag enumeration constants
 61     public static final int TSF_NO = 0;
 62     public static final int TSF_YES = 1;
 63     public static final int TSF_AUTO = 2;
 64 
 65     // TOOLBARIMAGE enumeration constants
 66     public static final int IMGLOGO = 1;
 67     public static final int IMGDRAG = 2;
 68     public static final int IMGSELTEXT = 3;
 69     public static final int IMGZOOMRULER = 4;
 70     public static final int IMGZOOMFOCUSNADLE = 5;
 71     public static final int IMGZOOMNADLE = 6;
 72     public static final int IMGFITWIDTH = 7;
 73     public static final int IMGFITPAGE = 8;
 74     public static final int IMGPREVPAGE = 9;
 75     public static final int IMGNEXTPAGE = 10;
 76     public static final int IMGSEARCHBUT = 11;
 77     public static final int IMGROTATE = 12;
 78     public static final int IMGPRINT = 13;
 79     public static final int IMGNEWWIND = 14;
 80     public static final int IMGHELP = 15;
 81     public static final int IMGMORE = 16;
 82     public static final int IMGTOOLBARBGR = 17;
 83     public static final int IMGBACK = 18;
 84     public static final int IMGFORWARD = 19;
 85     public static final int IMGFULLSCREEN = 20;
 86     public static final int IMGEXITFULLSCREEN = 21;
 87 
 88     // TEMPLATETYPE enumeration constants
 89     public static final int TEMPLATE_CUSTOM = 1;
 90     public static final int TEMPLATE_ACTIONSCRIPT2 = 2;
 91     public static final int TEMPLATE_ACTIONSCRIPT3 = 3;
 92 
 93     // WATERMARKANCHOR enumeration constants
 94     public static final int CENTER = 0;
 95     public static final int LEFTCENTER = 1;
 96     public static final int RIGHTCENTER = 2;
 97     public static final int TOPCENTER = 16;
 98     public static final int BOTTOMCENTER = 32;
 99     public static final int LEFTTOP = 17;
100     public static final int RIGHTTOP = 18;
101     public static final int LEFTBOTTOM = 33;
102     public static final int RIGHTBOTTOM = 34;
103 
104     // COMPRESSION_METHOD enumeration constants
105     public static final int COMPRESSION_METHOD_ZLIB = 0;
106     public static final int COMPRESSION_METHOD_LZMA = 1;
107 
108     // DOCUMENT_TYPE Enumeration constants
109     public static final int FLASH = 1;
110     public static final int HTML5 = 2;
111     
112     // BROWSER_TYPE Enumeration public static final intants
113     public static final int INTERNET_EXPLORER =   1;
114     public static final int FIREFOX =             2;
115     public static final int CHROME =              4;
116     public static final int OPERA =               8;
117     public static final int SAFARI =              16;
118     
119     // POWERPOINT_PRINTOUTPUT Enumeration constants
120     public static final int POWERPOINT_PRINTOUTPUT_AUTO =            0;
121     public static final int POWERPOINT_PRINTOUTPUT_Slides =            1;
122     public static final int POWERPOINT_PRINTOUTPUT_TwoSlideHandouts =    2;
123     public static final int POWERPOINT_PRINTOUTPUT_ThreeSlideHandouts =    3;
124     public static final int POWERPOINT_PRINTOUTPUT_SixSlideHandouts    =    4;
125     public static final int POWERPOINT_PRINTOUTPUT_NotesPages =         5;
126     public static final int POWERPOINT_PRINTOUTPUT_Outline =        6;
127     public static final int POWERPOINT_PRINTOUTPUT_BuildSlides =        7;
128     public static final int POWERPOINT_PRINTOUTPUT_FourSlideHandouts =    8;
129     public static final int POWERPOINT_PRINTOUTPUT_NineSlideHandouts =    9;
130     public static final int POWERPOINT_PRINTOUTPUT_OneSlideHandouts    =    10;
131 }

  开始转换Test.java类:

 1 package print2flash;
 2 
 3 import java.io.IOException;
 4 
 5 import com.jacob.activeX.*;
 6 import com.jacob.com.*;
 7 
 8 public class Test {
 9 
10     public static void main(String[] args) throws IOException {
11         try {
12             ComThread.InitSTA();//com组件管理,用来初始化com线程,释放线程
13             ActiveXComponent p2f = new ActiveXComponent("Print2Flash4.Server");//创建print2flash的一个应用,调用print2flash服务
14             //设置属性值,可根据自己需求修改
15             ActiveXComponent defProfile = new ActiveXComponent(p2f.getProperty("DefaultProfile").toDispatch());
16             defProfile.setProperty("InterfaceOptions", P2FConst.INTLOGO | P2FConst.INTZOOMSLIDER | P2FConst.INTPREVPAGE
17                     | P2FConst.INTGOTOPAGE | P2FConst.INTNEXTPAGE);
18             defProfile.setProperty("ProtectionOptions", P2FConst.PROTDISPRINT | P2FConst.PROTENAPI);
19             defProfile.setProperty("DocumentType", P2FConst.FLASH | P2FConst.HTML5);
20             p2f.invoke("ConvertFile", "E:/print2flashTest/test.doc");//方法调用
21             System.out.println("转换成功!");
22         } catch (Exception e) {
23             System.out.println("转换异常: " + e.toString());
24         } finally {
25             ComThread.Release();
26         }
27     }
28 }

  注意:这里涉及到java应用调用windows的COM组件,需要用到jacob,没有的同学,可以用LZ提供的链接地址下载(文章最后会一起附上)。

  1)将jacob.jar build path。

  2)将jacob-1.16-M1-x64.dll(根据自己系统情况选择)复制到jre/bin目录(如果没有该dll,会抛出找不到ComThread类异常)

  接下来LZ准备一个test.doc,来进行测试:

  

 

   test.docx.swf即为生成的flash文件,可嵌入到你需要的网页中。

 四、.NET操作API


 

  .NET的操作相对来说就简单得多了,LZ这里也是只做一个简单的win32控制台演示:

  1)首先新建win32控制台应用程序,增加程序集Interop.Print2Flash4.dll。

  2)在Proogram.cs中写具体业务逻辑:  

 1 using System;
 2 using System.Collections.Generic;
 3 using System.IO;
 4 using System.Linq;
 5 using System.Text;
 6 
 7 namespace Test
 8 {
 9     class Program
10     {
11         static void Main(string[] args)
12         {
13             string fs_filename = System.AppDomain.CurrentDomain.BaseDirectory()+"/print2flashTest/test.docx";
14             string fs_convertedfilename = System.AppDomain.CurrentDomain.BaseDirectory()+"/print2flashTest/swf/test.swf";
15             Print2Flash4.Server2 p2fServer = new Print2Flash4.Server2();//创建print2flash对象
16             //设置参数值,可以参考官网SDK文档          
17             p2fServer.ConvertFile(fs_filename, fs_convertedfilename, null, null, null);
18         }
19     }
20 }

  运行结果与java一样。官网SDK还提供了ASP、PHP等脚本语言的操作API,有情趣的同学可以自己尝试一下。

  

五、总结


  print2flash这款软件还是不错的,转换后的效果比较清晰。运用范围差不多就是内容管理系统、文章管理等附件的预览,当然也可以用来控制游客用户不允许文字复制等等。最后附上本文LZ所用到的源代码,补丁,附件的下载地址:

  源代码:https://github.com/LJunChina/JavaResource/tree/master/print2flash

  print2flash4补丁:http://download.csdn.net/detail/qq503665965/9860438

  jacob:http://download.csdn.net/detail/qq503665965/9860441

  SDK:http://download.csdn.net/detail/qq503665965/9860435

  

 

分布式队列ZooKeeper的实现

一、背景

  有一些时候,多个团队需要共同完成一个任务,比如,A团队将Hadoop集群计算的结果交给B团队继续计算,B完成了自己任务再交给C团队继续做。这就有点像业务系统的工作流一样,一环一环地传下

去,直到最后一部分完成。在业务系统中,我们经常会用SOA的架构来解决这种问题,每个团队在ESB(企业服务股总线)服务器上部署自己的服务,然后通过消息中间件完成调度任务。对亍分步式的多个

Hadoop集群系统的协作,同样可以用这种架构来做只要把消息中间件引擎换成支持分步式的消息中间件的引擎就行了。

  本文楼主将使用zookeeper做为分步式消息中间件构造一个大型超市的部分数据计算模型来完成各个区域利润计算的业务需求。

  由于采购和销售分别是由不同厂商进行的软件开发和维护,而且业务往来也在不同的城市和地区。 所以在每月底结算时,工作量都特别大。 比如,计算利润表: 当月利润 = 当月销售金额 – 当月采购

额 – 当月其他支出(楼主只是粗略计算)。如果采购系统是单独的系统,销售是另外单独的系统,及以其他几十个大大小小的系统, 如何能让多个系统,配合起来完成该需求?

二、系统构思

  楼主基于zookeeper来构建一个分步式队列的应用,来解决上面的功能需求。排除了ESB的部分,只保留zookeeper进行实现。

  1.   采购数据:海量数据,基于Hadoop存储和分析(楼主环境有限,只使用了很少的数据)
  2.   销售数据:海量数据,基于Hadoop存储和分析(楼主环境有限,只使用了很少的数据)
  3.   其他费用支出:为少量数据,基于文件或数据库存储和分析

  我们设计一个同步队列,这个队列有3个条件节点,分别对应采购(purchase),销售 (sell),其他费用(other)3个部分。当3个节点都被创建后,程序会自动触发计算利润, 幵创建利润(profit)节点。上面3个节点的创建,无顺序要求。每个节点只能被创建一次 。

  

 

  Hadoop mapreduce1,Hadoop mapreduce2 是2个独立的Hadoop集群应用。 Java App 是2个独立的Java应用 。ZooKeeper集群的有3个节点 。

  • /queue,是znode的队列目录,假设队列长度为3
  • /queue/purchase,是znode队列中,1号排对者,由Hadoop mapreduce1提交,用于统计采购金额
  • /queue/sell,是znode队列中,2号排对者,由Hadoop mapreduce2提交,用于统计销售金额
  • /queue/other,是znode队列中,3号排对者,由Java App提交,用于统计其他费用支出金额
  • /queue/profit,当znode队列中满了,触发创建利润节点。

  当/qeueu/profit被创建后,利润java app被启动,所有zookeeper的连接通知同步程序(红色线),队列已完成,所有程序结束。

三、环境准备

  1)hadoop集群。楼主用的6个节点的hadoop2.7.3集群,各位同学可以根据自己的实际情况进行搭建,但至少需要1台伪分布式的。(参考http://www.cnblogs.com/qq503665965/p/6790580.html

  2)zookeeper集群。至少三个节点。安装参考楼主这篇文章(http://www.cnblogs.com/qq503665965/p/6790580.html

  3)java开发环境。

四、mapreduce及java app程序

  计算采购金额:

  1 package zkqueue;
  2 import java.io.IOException;
  3 import java.util.HashMap;
  4 import java.util.Map;
  5 import java.util.regex.Pattern;
  6 
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.IntWritable;
  9 import org.apache.hadoop.io.LongWritable;
 10 import org.apache.hadoop.io.Text;
 11 import org.apache.hadoop.mapred.JobConf;
 12 import org.apache.hadoop.mapreduce.Job;
 13 import org.apache.hadoop.mapreduce.Mapper;
 14 import org.apache.hadoop.mapreduce.Reducer;
 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 19 
 20 
 21 /**
 22  * 采购金额计算
 23  * @author Jon_China
 24  *
 25  */
 26 public class Purchase {
 27 
 28     public static final String HDFS = "hdfs://192.168.8.101:9000";
 29     public static final Pattern DELIMITER = Pattern.compile("[\t,]");
 30 
 31     public static class PurchaseMapper extends Mapper {
 32 
 33         private String month = "2017-01";
 34         private Text k = new Text(month);
 35         private IntWritable v = new IntWritable();
 36         private int money = 0;
 37 
 38         public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
 39             System.out.println(values.toString());
 40             String[] tokens = DELIMITER.split(values.toString());//拆分源数据
 41             if (tokens[3].startsWith(month)) {// 过滤1月份数据
 42                 money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//计算
 43                 v.set(money);
 44                 context.write(k, v);
 45             }
 46         }
 47     }
 48 
 49     public static class PurchaseReducer extends Reducer {
 50         private IntWritable v = new IntWritable();
 51         private int money = 0;
 52 
 53         @Override
 54         public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
 55             for (IntWritable line : values) {
 56                 money += line.get();
 57             }
 58             v.set(money);
 59             context.write(null, v);
 60             System.out.println("Output:" + key + "," + money);
 61         }
 62 
 63     }
 64 
 65     public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
 66         JobConf conf = config();
 67         String local_data = path.get("purchase");
 68         String input = path.get("input");
 69         String output = path.get("output");
 70 
 71         
 72         HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
 73         hdfs.rmr(input);
 74         hdfs.mkdirs(input);
 75         hdfs.copyFile(local_data, input);
 76 
 77         Job job = Job.getInstance(conf);
 78         job.setJarByClass(Purchase.class);
 79 
 80         job.setOutputKeyClass(Text.class);
 81         job.setOutputValueClass(IntWritable.class);
 82 
 83         job.setMapperClass(PurchaseMapper.class);
 84         job.setReducerClass(PurchaseReducer.class);
 85 
 86         job.setInputFormatClass(TextInputFormat.class);
 87         job.setOutputFormatClass(TextOutputFormat.class);
 88 
 89         FileInputFormat.setInputPaths(job, new Path(input));
 90         FileOutputFormat.setOutputPath(job, new Path(output));
 91 
 92         job.waitForCompletion(true);
 93     }
 94 
 95     public static JobConf config() {
 96         JobConf conf = new JobConf(Purchase.class);
 97         conf.setJobName("purchase");
 98         conf.addResource("classpath:/hadoop/core-site.xml");
 99         conf.addResource("classpath:/hadoop/hdfs-site.xml");
100         conf.addResource("classpath:/hadoop/mapred-site.xml");
101         conf.addResource("classpath:/hadoop/yarn-site.xml");
102         return conf;
103     }
104     
105     public static Map path(){
106         Map path = new HashMap();
107         path.put("purchase", Purchase.class.getClassLoader().getResource("logfile/biz/purchase.csv").getPath());// 源文件数据
108         path.put("input", HDFS + "/user/hdfs/biz/purchase");//hdfs存储路径
109         path.put("output", HDFS + "/user/hdfs/biz/purchase/output"); //hdfs输出路径
110         return path;
111     }
112 
113     public static void main(String[] args) throws Exception {
114         run(path());
115     }
116 
117 }

  销售数据计算:

  1 package zkqueue;
  2 
  3 import java.io.IOException;
  4 import java.util.HashMap;
  5 import java.util.Map;
  6 import java.util.regex.Pattern;
  7 
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.IntWritable;
 10 import org.apache.hadoop.io.LongWritable;
 11 import org.apache.hadoop.io.Text;
 12 import org.apache.hadoop.mapred.JobConf;
 13 import org.apache.hadoop.mapreduce.Job;
 14 import org.apache.hadoop.mapreduce.Mapper;
 15 import org.apache.hadoop.mapreduce.Reducer;
 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 17 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 20 
 21 /**
 22  * 销售数据计算
 23  * @author Jon_China
 24  *
 25  */
 26 public class Sell {
 27 
 28     public static final String HDFS = "hdfs://192.168.8.101:9000";
 29     public static final Pattern DELIMITER = Pattern.compile("[\t,]");
 30 
 31     public static class SellMapper extends Mapper {
 32 
 33         private String month = "2013-01";
 34         private Text k = new Text(month);
 35         private IntWritable v = new IntWritable();
 36         private int money = 0;
 37 
 38         public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
 39             System.out.println(values.toString());
 40             String[] tokens = DELIMITER.split(values.toString());
 41             if (tokens[3].startsWith(month)) {// 1月的数据
 42                 money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//单价*数量
 43                 v.set(money);
 44                 context.write(k, v);
 45             }
 46         }
 47     }
 48 
 49     public static class SellReducer extends Reducer {
 50         private IntWritable v = new IntWritable();
 51         private int money = 0;
 52 
 53         @Override
 54         public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
 55             for (IntWritable line : values) {
 56                 money += line.get();
 57             }
 58             v.set(money);
 59             context.write(null, v);
 60             System.out.println("Output:" + key + "," + money);
 61         }
 62 
 63     }
 64 
 65     public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
 66         JobConf conf = config();
 67         String local_data = path.get("sell");
 68         String input = path.get("input");
 69         String output = path.get("output");
 70 
 71         // 初始化sell
 72         HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
 73         hdfs.rmr(input);
 74         hdfs.mkdirs(input);
 75         hdfs.copyFile(local_data, input);
 76 
 77         Job job = Job.getInstance(conf);
 78         job.setJarByClass(Sell.class);
 79 
 80         job.setOutputKeyClass(Text.class);
 81         job.setOutputValueClass(IntWritable.class);
 82 
 83         job.setMapperClass(SellMapper.class);
 84         job.setReducerClass(SellReducer.class);
 85 
 86         job.setInputFormatClass(TextInputFormat.class);
 87         job.setOutputFormatClass(TextOutputFormat.class);
 88 
 89         FileInputFormat.setInputPaths(job, new Path(input));
 90         FileOutputFormat.setOutputPath(job, new Path(output));
 91 
 92         job.waitForCompletion(true);
 93     }
 94 
 95     public static JobConf config() {// Hadoop集群的远程配置信息
 96         JobConf conf = new JobConf(Purchase.class);
 97         conf.setJobName("purchase");
 98         conf.addResource("classpath:/hadoop/core-site.xml");
 99         conf.addResource("classpath:/hadoop/hdfs-site.xml");
100         conf.addResource("classpath:/hadoop/mapred-site.xml");
101         conf.addResource("classpath:/hadoop/yarn-site.xml");
102         return conf;
103     }
104     
105     public static Map path(){
106         Map path = new HashMap();
107         path.put("sell", Sell.class.getClassLoader().getResource("logfile/biz/sell.csv").getPath());// 本地的数据文件
108         path.put("input", HDFS + "/user/hdfs/biz/sell");// HDFS的目录
109         path.put("output", HDFS + "/user/hdfs/biz/sell/output"); // 输出目录
110         return path;
111     }
112 
113     public static void main(String[] args) throws Exception {
114         run(path());
115     }
116 
117 }

  其他金额计算:

 1 package zkqueue;
 2 
 3 import java.io.BufferedReader;
 4 import java.io.File;
 5 import java.io.FileReader;
 6 import java.io.IOException;
 7 import java.util.regex.Pattern;
 8 
 9 public class Other {
10 
11     public static String file = "/logfile/biz/other.csv";
12     public static final Pattern DELIMITER = Pattern.compile("[\t,]");
13     private static String month = "2017-01";
14 
15     public static void main(String[] args) throws IOException {
16         calcOther(file);
17     }
18 
19     public static int calcOther(String file) throws IOException {
20         int money = 0;
21         BufferedReader br = new BufferedReader(new FileReader(new File(file)));
22 
23         String s = null;
24         while ((s = br.readLine()) != null) {
25             String[] tokens = DELIMITER.split(s);
26             if (tokens[0].startsWith(month)) {// 1月的数据
27                 money += Integer.parseInt(tokens[1]);
28             }
29         }
30         br.close();
31         System.out.println("Output:" + month + "," + money);
32         return money;
33     }
34 }

  计算利润:

  

 1 package zkqueue;
 2 
 3 import java.io.IOException;
 4 
 5 
 6 /**
 7  * 利润计算
 8  * @author Jon_China
 9  *
10  */
11 public class Profit {
12 
13     public static void main(String[] args) throws Exception {
14         profit();
15     }
16 
17     public static void profit() throws Exception {
18         int sell = getSell();
19         int purchase = getPurchase();
20         int other = getOther();
21         int profit = sell - purchase - other;
22         System.out.printf("profit = sell - purchase - other = %d - %d - %d = %d\n", sell, purchase, other, profit);
23     }
24 
25     public static int getPurchase() throws Exception {
26         HdfsDAO hdfs = new HdfsDAO(Purchase.HDFS, Purchase.config());
27         return Integer.parseInt(hdfs.cat(Purchase.path().get("output") + "/part-r-00000").trim());
28     }
29 
30     public static int getSell() throws Exception {
31         HdfsDAO hdfs = new HdfsDAO(Sell.HDFS, Sell.config());
32         return Integer.parseInt(hdfs.cat(Sell.path().get("output") + "/part-r-00000").trim());
33     }
34 
35     public static int getOther() throws IOException {
36         return Other.calcOther(Other.file);
37     }
38 
39 }

  zookeeper任务调度:

  1 package zkqueue;
  2 
  3 import java.io.IOException;
  4 import java.util.List;
  5 
  6 import org.apache.zookeeper.CreateMode;
  7 import org.apache.zookeeper.KeeperException;
  8 import org.apache.zookeeper.WatchedEvent;
  9 import org.apache.zookeeper.Watcher;
 10 import org.apache.zookeeper.ZooDefs.Ids;
 11 import org.apache.zookeeper.ZooKeeper;
 12 /**
 13  * 分布式队列zookeeper调度
 14  * @author Jon_China
 15  *
 16  */
 17 public class QueueZookeeper {
 18     //设置队列目录树
 19     final public static String QUEUE = "/queue";
 20     final public static String PROFIT = "/queue/profit";
 21     final public static String PURCHASE = "/queue/purchase";
 22     final public static String SELL = "/queue/sell";
 23     final public static String OTHER = "/queue/other";
 24 
 25     public static void main(String[] args) throws Exception {
 26         if (args.length == 0) {
 27             System.out.println("Please start a task:");
 28         } else {
 29             doAction(Integer.parseInt(args[0]));
 30         }
 31     }
 32     public static void doAction(int client) throws Exception {
 33         //zookeeper地址
 34         String host1 = "192.168.8.104:2181";
 35         String host2 = "192.168.8.105:2181";
 36         String host3 = "192.168.8.106:2181";
 37 
 38         ZooKeeper zk = null;
 39         switch (client) {//1,2,3分别将不同任务加入队列
 40         case 1:
 41             zk = connection(host1);
 42             initQueue(zk);
 43             doPurchase(zk);
 44             break;
 45         case 2:
 46             zk = connection(host2);
 47             initQueue(zk);
 48             doSell(zk);
 49             break;
 50         case 3:
 51             zk = connection(host3);
 52             initQueue(zk);
 53             doOther(zk);
 54             break;
 55         }
 56     }
 57 
 58     // 创建一个与服务器的连接
 59     public static ZooKeeper connection(String host) throws IOException {
 60         ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
 61             // 监控所有被触发的事件
 62             public void process(WatchedEvent event) {
 63                 if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals(PROFIT)) {
 64                     System.out.println("Queue has Completed!!!");
 65                 }
 66             }
 67         });
 68         return zk;
 69     }
 70     /**
 71      * 初始化队列
 72      * @param zk
 73      * @throws KeeperException
 74      * @throws InterruptedException
 75      */
 76     public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
 77         System.out.println("WATCH => " + PROFIT);
 78         zk.exists(PROFIT, true);
 79 
 80         if (zk.exists(QUEUE, false) == null) {
 81             System.out.println("create " + QUEUE);
 82             zk.create(QUEUE, QUEUE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 83         } else {
 84             System.out.println(QUEUE + " is exist!");
 85         }
 86     }
 87     /**
 88      * 采购任务
 89      * @param zk
 90      * @throws Exception
 91      */
 92     public static void doPurchase(ZooKeeper zk) throws Exception {
 93         if (zk.exists(PURCHASE, false) == null) {
 94             
 95             Purchase.run(Purchase.path());
 96             
 97             System.out.println("create " + PURCHASE);
 98             zk.create(PURCHASE, PURCHASE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 99         } else {
100             System.out.println(PURCHASE + " is exist!");
101         }
102         isCompleted(zk);
103     }
104     /**
105      * 销售任务
106      * @param zk
107      * @throws Exception
108      */
109     public static void doSell(ZooKeeper zk) throws Exception {
110         if (zk.exists(SELL, false) == null) {
111             
112             Sell.run(Sell.path());
113             
114             System.out.println("create " + SELL);
115             zk.create(SELL, SELL.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
116         } else {
117             System.out.println(SELL + " is exist!");
118         }
119         isCompleted(zk);
120     }
121     /**
122      * 其他计算任务
123      * @param zk
124      * @throws Exception
125      */
126     public static void doOther(ZooKeeper zk) throws Exception {
127         if (zk.exists(OTHER, false) == null) {
128             
129             Other.calcOther(Other.file);
130             
131             System.out.println("create " + OTHER);
132             zk.create(OTHER, OTHER.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
133         } else {
134             System.out.println(OTHER + " is exist!");
135         }
136         isCompleted(zk);
137     }
138     /**
139      * 检测完成情况
140      * @param zk
141      * @throws Exception
142      */
143     public static void isCompleted(ZooKeeper zk) throws Exception {
144         int size = 3;
145         List children = zk.getChildren(QUEUE, true);
146         int length = children.size();
147 
148         System.out.println("Queue Complete:" + length + "/" + size);
149         if (length >= size) {
150             System.out.println("create " + PROFIT);
151             Profit.profit();
152             zk.create(PROFIT, PROFIT.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
153 
154             for (String child : children) {// 清空节点
155                 zk.delete(QUEUE + "/" + child, -1);
156             }
157 
158         }
159     }
160 }

四、运行结果

  在最后一步,统计其他费用数据程序运行后,从日志中看到3个条件节点都已满足要求 。然后,通过同步的分步式队列自动启动了计算利润的程序,幵在日志中打印了2017 年1月的利润为-6693765。

  示例代码地址:https://github.com/LJunChina/hadoop/tree/master/distributed_mq

hive网站日志数据分析

  一、说在前面的话

  上一篇,楼主介绍了使用flume集群来模拟网站产生的日志数据收集到hdfs。但我们所采集的日志数据是不规则的,同时也包含了许多无用的日志。当需要分析一些核心指标来满足系统业务决策的时候,对日志的数据清洗在所难免,楼主本篇将介绍如何使用mapreduce程序对日志数据进行清洗,将清洗后的结构化数据存储到hive,并进行相关指标的提取。

  先明白几个概念:

  1)PV(Page View)。页面浏览量即为PV,是指所有用户浏览页面的总和,一个独立用户每打开一个页面就被记录1 次。计算方式为:记录计数

  2)注册用户数。对注册页面访问的次数。计算方式:对访问member.php?mod=register的url,计数

  3)IP数。一天之内,访问网站的不同独立IP 个数加和。其中同一IP无论访问了几个页面,独立IP 数均为1。这是我们最熟悉的一个概念,无论同一个IP上有多少台主机,或者其他用户,从某种程度上来说,独立IP的多少,是衡量网站推广活动好坏最直接的数据。计算方式:对不同ip,计数

  4)跳出率。只浏览了一个页面便离开了网站的访问次数占总的访问次数的百分比,即只浏览了一个页面的访问次数 / 全部的访问次数汇总。跳出率是非常重要的访客黏性指标,它显示了访客对网站的兴趣程度。跳出率越低说明流量质量越好,访客对网站的内容越感兴趣,这些访客越可能是网站的有效用户、忠实用户。该指标也可以衡量网络营销的效果,指出有多少访客被网络营销吸引到宣传产品页或网站上之后,又流失掉了,可以说就是煮熟的鸭子飞了。比如,网站在某媒体上打广告推广,分析从这个推广来源进入的访客指标,其跳出率可以反映出选择这个媒体是否合适,广告语的撰写是否优秀,以及网站入口页的设计是否用户体验良好。
   计算方式:(1)统计一天内只出现一条记录的ip,称为跳出数
                   (2)跳出数/PV
  本次楼主只做以上几项简单指标的分析,各个网站的作用领域不一样,所涉及的分析指标也有很大差别,各位同学可以根据自己的需求尽情拓展。废话不多说,上干货。

  二、环境准备  

  1)hadoop集群。楼主用的6个节点的hadoop2.7.3集群,各位同学可以根据自己的实际情况进行搭建,但至少需要1台伪分布式的。(参考http://www.cnblogs.com/qq503665965/p/6790580.html

  2)hive。用于对各项核心指标进行分析(安装楼主不再介绍了)

  3)mysql。存储分析后的数据指标。

  4)sqoop。从hive到mysql的数据导入。

  三、数据清洗

  我们先看看从flume收集到hdfs中的源日志数据格式:  

1 27.19.74.143 - - [30/4/2017:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
2 211.97.15.179 - - [30/4/2017:17:38:22 +0800] "GET /home.php?mod=misc&ac=sendmail&rand=1369906181 HTTP/1.1" 200 -

  上面包含条个静态资源日志和一条正常链接日志(楼主这里不做静态资源日志的分析),需要将以 /static 开头的日志文件过滤掉;时间格式需要转换为时间戳;去掉IP与时间之间的无用符号;过滤掉请求方式;“/”分隔符、http协议、请求状态及当次流量。效果如下:  

1 211.97.15.179   20170430173820  home.php?mod=misc&ac=sendmail&rand=1369906181

  先写个日志解析类,测试是否能解析成功,我们再写mapreduce程序:

  

 1 package mapreduce;
 2 
 3 import java.text.ParseException;
 4 import java.text.SimpleDateFormat;
 5 import java.util.Date;
 6 import java.util.Locale;
 7 
 8 
 9 public class LogParser {
10     public static final SimpleDateFormat FORMAT = new SimpleDateFormat("d/MM/yyyy:HH:mm:ss", Locale.ENGLISH);
11     public static final SimpleDateFormat dateformat1=new SimpleDateFormat("yyyyMMddHHmmss");
12     public static void main(String[] args) throws ParseException {
13         final String S1 = "27.19.74.143 - - [30/04/2017:17:38:20 +0800] \"GET /static/image/common/faq.gif HTTP/1.1\" 200 1127";
14         LogParser parser = new LogParser();
15         final String[] array = parser.parse(S1);
16         System.out.println("源数据: "+S1);
17         System.out.format("清洗结果数据:  ip=%s, time=%s, url=%s, status=%s, traffic=%s", array[0], array[1], array[2], array[3], array[4]);
18     }
19     /**
20      * 解析英文时间字符串
21      * @param string
22      * @return
23      * @throws ParseException
24      */
25     private Date parseDateFormat(String string){
26         Date parse = null;
27         try {
28             parse = FORMAT.parse(string);
29         } catch (ParseException e) {
30             e.printStackTrace();
31         }
32         return parse;
33     }
34     /**
35      * 解析日志的行记录
36      * @param line
37      * @return 数组含有5个元素,分别是ip、时间、url、状态、流量
38      */
39     public String[] parse(String line){
40         String ip = parseIP(line);
41         String time = parseTime(line);
42         String url = parseURL(line);
43         String status = parseStatus(line);
44         String traffic = parseTraffic(line);
45         
46         return new String[]{ip, time ,url, status, traffic};
47     }
48     
49     private String parseTraffic(String line) {
50         final String trim = line.substring(line.lastIndexOf("\"")+1).trim();
51         String traffic = trim.split(" ")[1];
52         return traffic;
53     }
54     private String parseStatus(String line) {
55         final String trim = line.substring(line.lastIndexOf("\"")+1).trim();
56         String status = trim.split(" ")[0];
57         return status;
58     }
59     private String parseURL(String line) {
60         final int first = line.indexOf("\"");
61         final int last = line.lastIndexOf("\"");
62         String url = line.substring(first+1, last);
63         return url;
64     }
65     private String parseTime(String line) {
66         final int first = line.indexOf("[");
67         final int last = line.indexOf("+0800]");
68         String time = line.substring(first+1,last).trim();
69         Date date = parseDateFormat(time);
70         return dateformat1.format(date);
71     }
72     private String parseIP(String line) {
73         String ip = line.split("- -")[0].trim();
74         return ip;
75     }
76 }

  输出结果:  

1 源数据: 27.19.74.143 - - [30/04/2017:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
2 清洗结果数据:  ip=27.19.74.143, time=20170430173820, url=GET /static/image/common/faq.gif HTTP/1.1, status=200, traffic=1127

  再看mapreduce业务逻辑,在map中,我们需要拿出ip、time、url这三个属性的值,同时过滤掉静态资源日志。map的k1用默认的LongWritable就OK,v1不用说Text,k2、v2与k1、v1类型对应就行:  

 1 static class MyMapper extends Mapper{
 2         LogParser logParser = new LogParser();
 3         Text v2 = new Text();
 4         @Override
 5         protected void map(LongWritable key, Text value, Mapper.Context context)
 6                 throws IOException, InterruptedException {
 7             final String[] parsed = logParser.parse(value.toString());
 8             
 9             //过滤掉静态信息
10             if(parsed[2].startsWith("GET /static/") || parsed[2].startsWith("GET /uc_server")){
11                 return;
12             }            
13             //过掉开头的特定格式字符串
14             if(parsed[2].startsWith("GET /")){
15                 parsed[2] = parsed[2].substring("GET /".length());
16             }
17             else if(parsed[2].startsWith("POST /")){
18                 parsed[2] = parsed[2].substring("POST /".length());
19             }            
20             //过滤结尾的特定格式字符串
21             if(parsed[2].endsWith(" HTTP/1.1")){
22                 parsed[2] = parsed[2].substring(0, parsed[2].length()-" HTTP/1.1".length());
23             }            
24             v2.set(parsed[0]+"\t"+parsed[1]+"\t"+parsed[2]);
25             context.write(key, v2);
26         }

  reduce相对来说就比较简单了,我们只需再讲map的输出写到一个文件中就OK:  

1 static class MyReducer extends Reducer{
2         @Override
3         protected void reduce(LongWritable arg0, Iterable arg1,
4                 Reducer.Context context) throws IOException, InterruptedException {
5             for (Text v2 : arg1) {                
6                 context.write(v2, NullWritable.get());
7             }
8         }
9     }

  最后,组装JOB:  

 1 public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
 2         Job job = Job.getInstance(new Configuration());
 3         job.setJarByClass(LogParser.class);        
 4         job.setMapperClass(MyMapper.class);
 5         job.setMapOutputKeyClass(LongWritable.class);
 6         job.setMapOutputValueClass(Text.class);
 7         FileInputFormat.setInputPaths(job, new Path("/logs/20170430.log"));        
 8         job.setReducerClass(MyReducer.class);
 9         job.setOutputKeyClass(Text.class);
10         job.setOutputValueClass(NullWritable.class);
11         FileOutputFormat.setOutputPath(job, new Path("/20170430"));
12         job.waitForCompletion(true);
13     }

  mapreduce完成后就是运行job了:

  1)打包,mapreduce程序为loger.jar

  2)上传jar包。运行loger.jar hadoop jar loger.jar 

  运行结果:

  

  hdfs多了20170430目录:

  

  我们下载下来看看清洗后的数据是否符合要求:

  

  日志数据的清洗到此就完成了,接下来我们要在此之上使用hive提取核心指标数据。

  四、核心指标分析

  1)构建一个外部分区表,sql脚本如下:  

1 CREATE EXTERNAL TABLE sitelog(ip string, atime string, url string) PARTITIONED BY (logdate string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION '/20170430';

  2)增加分区,sql脚本如下:  

ALTER TABLE sitelog ADD PARTITION(logdate='20170430') LOCATION '/sitelog_cleaned/20170430';

  3)统计每日PV,sql脚本如下:  

1 CREATE TABLE sitelog_pv_20170430 AS SELECT COUNT(1) AS PV FROM sitelog WHERE logdate='20170430';

  4)统计每日注册用户数,sql脚本如下:  

1 CREATE TABLE sitelog_reguser_20170430 AS SELECT COUNT(1) AS REGUSER FROM sitelog WHERE logdate=20170430' AND INSTR(url,'member.php?mod=register')>0;

  5)统计每日独立IP,sql脚本如下:

1 CREATE TABLE site_ip_20170430 AS SELECT COUNT(DISTINCT ip) AS IP FROM sitelog WHERE logdate='20170430';

  6)统计每日跳出的用户数,sql脚本如下:

CREATE TABLE sitelog_jumper_20170430 AS SELECT COUNT(1) AS jumper FROM (SELECT COUNT(ip) AS times FROM sitelog WHERE logdate='20170430' GROUP BY ip HAVING times=1) e;

  7)把每天统计的数据放入一张表中,sql脚本如下:

1 CREATE TABLE sitelog_20170430 AS SELECT '20170430', a.pv, b.reguser, c.ip, d.jumper FROM sitelog_pv_20170430 a JOIN sitelog_reguser_20170430 b ON 1=1 JOIN sitelog_ip_20170430 c ON 1=1 JOIN sitelog_jumper_20170430 d ON 1=1 ;

  8)使用sqoop把数据导出到mysql中:

sqoop export --connect jdbc:mysql://hadoop02:3306/sitelog --username root --password root --table sitelog-result --fields-terminated-by '\001' --export-dir '/user/hive/warehouse/sitelog_20170430'

   结果如下:

  2017年4月30日日志分析结果:PV数为:169857;当日注册用户数:28;独立IP数:10411;跳出数:3749.

  到此,一个简单的网站日志分析楼主就介绍完了,后面可视化的展示楼主就不写了,比较简单。相关代码地址:https://github.com/LJunChina/hadoop

flume集群日志收集

一、Flume简介

  Flume是一个分布式的、高可用的海量日志收集、聚合和传输日志收集系统,支持在日志系统中定制各类数据发送方(如:Kafka,HDFS等),便于收集数据。其核心为agent,agent是一个java进程,运行在日志收集节点。

agent里面包含3个核心组件:source、channel、sink。
   source组件是专用于收集日志的,可以处理各种类型各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义,同时 source组件把数据收集

以后,临时存放在channel中。

  channel组件是在agent中专用于临时存储数据的,可以存放在memory、jdbc、file、自定义等。channel中的数据只有在sink发送成功之后才会被删除。

  sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义。

  在整个数据传输过程中,流动的是event。事务保证是在event级别。flume可以支持多级flume的agent,支持扇入(fan-in)、扇出(fan-out)。

 

二、环境准备

  1)hadoop集群(楼主用的版本2.7.3,共6个节点,可参考http://www.cnblogs.com/qq503665965/p/6790580.html

  2)flume集群规划:

HOST

作用

方式

路径

hadoop01

agent

spooldir

/home/hadoop/logs

hadoop05

collector

HDFS

/logs

hadoop06

collector

HDFS

/logs

  其基本结构官网给出了更加具体的说明,这里楼主就直接copy过来了:

三、集群配置

  1)系统环境变量配置  

1 export FLUME_HOME=/home/hadoop/apache-flume-1.7.0-bin
2 export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$FLUME_HOME/bin

  记得 source /etc/profile

  2)flume JDK环境  

1 mv flume-env.sh.templete flume-env.sh
2 vim flume-env.sh
3 export JAVA_HOME=/usr/jdk1.7.0_60//增加JDK安装路径即可

  3)hadoop01中flume配置

  在conf目录增加配置文件 flume-client ,内容为:  

 1 #agent1名称
 2 agent1.channels = c1
 3 agent1.sources = r1
 4 agent1.sinks = k1 k2
 5 
 6 #sink组名称
 7 agent1.sinkgroups = g1 
 8 
 9 #set channel
10 agent1.channels.c1.type = memory
11 agent1.channels.c1.capacity = 1000
12 agent1.channels.c1.transactionCapacity = 100
13 
14 agent1.sources.r1.channels = c1
15 agent1.sources.r1.type = spooldir
16 #日志源
17 agent1.sources.r1.spoolDir =/home/hadoop/logs
18 
19 agent1.sources.r1.interceptors = i1 i2
20 agent1.sources.r1.interceptors.i1.type = static
21 agent1.sources.r1.interceptors.i1.key = Type
22 agent1.sources.r1.interceptors.i1.value = LOGIN
23 agent1.sources.r1.interceptors.i2.type = timestamp
24 
25 # 设置sink1
26 agent1.sinks.k1.channel = c1
27 agent1.sinks.k1.type = avro
28 #sink1所在主机
29 agent1.sinks.k1.hostname = hadoop05
30 agent1.sinks.k1.port = 52020
31 
32 #设置sink2
33 agent1.sinks.k2.channel = c1
34 agent1.sinks.k2.type = avro
35 #sink2所在主机
36 agent1.sinks.k2.hostname = hadoop06
37 agent1.sinks.k2.port = 52020
38 
39 #设置sink组包含sink1,sink2
40 agent1.sinkgroups.g1.sinks = k1 k2
41 
42 #高可靠性
43 agent1.sinkgroups.g1.processor.type = failover
44 #设置优先级
45 agent1.sinkgroups.g1.processor.priority.k1 = 10
46 agent1.sinkgroups.g1.processor.priority.k2 = 1
47 agent1.sinkgroups.g1.processor.maxpenalty = 10000

  4)hadoop05配置

 1 #设置 Agent名称
 2 a1.sources = r1
 3 a1.channels = c1
 4 a1.sinks = k1
 5 
 6 #设置channlels
 7 a1.channels.c1.type = memory
 8 a1.channels.c1.capacity = 1000
 9 a1.channels.c1.transactionCapacity = 100
10 
11 # 当前节点信息
12 a1.sources.r1.type = avro
13 #绑定主机名
14 a1.sources.r1.bind = hadoop05
15 a1.sources.r1.port = 52020
16 a1.sources.r1.interceptors = i1
17 a1.sources.r1.interceptors.i1.type = static
18 a1.sources.r1.interceptors.i1.key = Collector
19 a1.sources.r1.interceptors.i1.value = hadoop05
20 a1.sources.r1.channels = c1
21 
22 #sink的hdfs地址
23 a1.sinks.k1.type=hdfs
24 a1.sinks.k1.hdfs.path=/logs
25 a1.sinks.k1.hdfs.fileType=DataStream
26 a1.sinks.k1.hdfs.writeFormat=TEXT
27 #没1K产生文件
28 a1.sinks.k1.hdfs.rollInterval=1
29 a1.sinks.k1.channel=c1
30 #文件后缀
31 a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d

  5)hadoop06配置

 1 #设置 Agent名称
 2 a1.sources = r1
 3 a1.channels = c1
 4 a1.sinks = k1
 5 
 6 #设置channel
 7 a1.channels.c1.type = memory
 8 a1.channels.c1.capacity = 1000
 9 a1.channels.c1.transactionCapacity = 100
10 
11 # 当前节点信息
12 a1.sources.r1.type = avro
13 #绑定主机名
14 a1.sources.r1.bind = hadoop06
15 a1.sources.r1.port = 52020
16 a1.sources.r1.interceptors = i1
17 a1.sources.r1.interceptors.i1.type = static
18 a1.sources.r1.interceptors.i1.key = Collector
19 a1.sources.r1.interceptors.i1.value = hadoop06
20 a1.sources.r1.channels = c1
21 #设置sink的hdfs地址目录
22 a1.sinks.k1.type=hdfs
23 a1.sinks.k1.hdfs.path=/logs
24 a1.sinks.k1.hdfs.fileType=DataStream
25 a1.sinks.k1.hdfs.writeFormat=TEXT
26 a1.sinks.k1.hdfs.rollInterval=1
27 a1.sinks.k1.channel=c1
28 a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d

四、启动flume集群

  1)启动collector,即hadoop05,hadoop06 

1 flume-ng agent -n a1 -c conf -f flume-server -Dflume.root.logger=DEBUG,console

  2)启动agent,即hadoop01

flume-ng agent -n a1 -c conf -f flume-client -Dflume.root.logger=DEBUG,console

  agent启动后,hadoop05,hadoop06的控制台可看到如下打印信息:

  

五、日志收集测试

  1)启动zookeeper集群(未搭建zookeeper的同学可以忽略)

  2)启动hdfs start-dfs.sh 

  3)模拟网站日志,楼主这里随便弄的测试数据

 

  4)上传到/hadoop/home/logs

  hadoop01输出:

 

   hadoop05输出:

 

  由于hadoop05设置的优先级高于hadoop06,因此hadoop06无日志写入。

  我们再看hdfs上,是否成功上传了日志文件:

  

六、高可用性测试

  由于楼主设置的hadoop05的优先级要高于hadoop06,这也是上面的日志收集hadoop05输出而不是hadoop06输出的原因。现在我们干掉优先级高的hadoop05,看hadoop06是否能正常进行日志采集工作。

  

  我们向日志源添加一个测试日志文件:

  

  hadoop06输出:

 

   查看hdfs:

  

  好了!flume集群配置及日志收集就介绍到这里,下次楼主楼主会具体介绍利用mapreduce对日志进行清洗,并存储到hbase相关的内容。

  

 

hadoop高可靠性HA集群

概述


 

简单hdfs高可用架构图

 

  在hadoop2.x中通常由两个NameNode组成,一个处于active状态,另一个处于standby状态。Active NameNode对外提供服务,而Standby NameNode则不对外提供服务,仅同步active namenode的状态,以便能够在它失败时快速进行切换。
    hadoop2.x官方提供了两种HDFS HA的解决方案,一种是NFS,另一种是QJM。这里楼主使用简单的QJM。在该方案中,主备NameNode之间通过一组JournalNode同步元数据信息,一条数据只要成功写入多数JournalNode即认为写入成功。通常配置奇数个JournalNode(我配了3个)。
    这里还配置了一个zookeeper集群,用于ZKFC(DFSZKFailoverController)故障转移,当Active NameNode挂掉了,会自动切换Standby NameNode为standby状态。hadoop2.4以前的版本中依然存在一个问题,就是ResourceManager只有一个,存在单点故障,2.4以后解决了这个问题,有两个ResourceManager,一个是Active,一个是Standby,状态由zookeeper进行协调。yarn的HA配置楼主会给出配置文件,受环境影响,这里就不搭建yarn的高可用性了。

主要步骤


 

  1. 备6台Linux机器
  2. 安装JDK、配置主机名、修改IP地址、关闭防火墙
  3. 配置SSH免登陆
  4. 安装zookeeper集群
  5. zookeeper、hadoop环境变量配置
  6. 核心配置文件修改
  7. 启动zookeeper集群
  8. 启动journalnode
  9. 格式化文件系统、格式化zk
  10. 启动hdfs、启动yarn

前期准备


集群规划

  

 主机名 IP 安装软件 进程
hadoop01 192.168.8.101 jdk、hadoop NameNode、DFSZKFailoverController(zkfc)
hadoop02 192.168.8.102 jdk、hadoop NameNode、DFSZKFailoverController(zkfc)
hadoop03 192.168.8.103 jdk、hadoop ResourceManager
hadoop04 192.168.8.104 jdk、hadoop、zookeeper DataNode、NodeManager、JournalNode、QuorumPeerMain
hadoop05 192.168.8.105 jdk、hadoop、zookeeper DataNode、NodeManager、JournalNode、QuorumPeerMain
hadoop06 192.168.8.106 jdk、hadoop、zookeeper DataNode、NodeManager、JournalNode、QuorumPeerMain

 

Linux环境  

  1.由于楼主机器硬件环境的限制,这里只准备了6台centos7的系统。


  2.修改IP。如果跟楼主一样使用VM搭集群,请使用only-host模式。

vim /etc/sysconfig/network-scripts/ifcfg-ens3

  

TYPE="Ethernet"
BOOTPROTO="static"
DEFROUTE="yes"
PEERDNS="yes"
PEERROUTES="yes"
IPV4_FAILURE_FATAL="no"
IPV6INIT="yes"
IPV6_AUTOCONF="yes"
IPV6_DEFROUTE="yes"
IPV6_PEERDNS="yes"
IPV6_PEERROUTES="yes"
IPV6_FAILURE_FATAL="no"
IPV6_ADDR_GEN_MODE="stable-privacy"
NAME="ens33"
UUID="7f13c30b-0943-49e9-b25d-8aa8cab95e20"
DEVICE="ens33"
ONBOOT="yes"
IPADDR="192.168.8.101"
NETMASK="255.255.255.0"
GATEWAY="192.168.8.1"

  3.修改主机名和IP的映射关系 

1  vim /etc/host
2 
3 192.168.8.101 hadoop01
4 192.168.8.102 hadoop02
5 192.168.8.103 hadoop03
6 192.168.8.104 hadoop04
7 192.168.8.105 hadoop05
8 192.168.8.106 hadoop06

  4.关闭防火墙

1 systemctl stop firewalld.service //停止firewall
2 systemctl disable firewalld.service //禁止firewall开机启动

  5.修改主机名

1 hostnamectl set-hostname hadoop01
2 hostnamectl set-hostname hadoop02
3 hostnamectl set-hostname hadoop03
4 hostnamectl set-hostname hadoop04
5 hostnamectl set-hostname hadoop05
6 hostnamectl set-hostname hadoop06

  6.ssh免登陆

  生成公钥、私钥

  

ssh-keygen -t rsa //一直回车

  将公钥发送到其他机器

ssh-coyp-id hadoop01
ssh-coyp-id hadoop02
ssh-coyp-id hadoop03
ssh-coyp-id hadoop04
ssh-coyp-id hadoop05
ssh-coyp-id hadoop06

  7.安装JDK,配置环境变量

  hadoop01,hadoop02,hadoop03

1 export JAVA_HOME=/usr/jdk1.7.0_60
2 export HADOOP_HOME=/home/hadoop/hadoop-2.7.3
3 export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

  hadoop04,hadoop05,hadoop06(包含zookeeper)

1 export JAVA_HOME=/usr/jdk1.7.0_60
2 export HADOOP_HOME=/home/hadoop/hadoop-2.7.3
3 export ZOOKEEPER_HOME=/home/hadoop/zookeeper-3.4.10
4 export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

zookeeper集群安装


 

  1.上传zk安装包
  上传到/home/hadoop
  2.解压  

tar -zxvf zookeeper-3.4.10.tar.gz

  3.配置(先在一台节点上配置)
    在conf目录,更改zoo_sample.cfg文件为zoo.cfg
  

 mv zoo_sample.cfg zoo.cfg

  
    修改配置文件(zoo.cfg)
       

1 dataDir=/home/hadoop/zookeeper-3.4.10/data       
2 server.1=hadoop04:2888:3888
3 server.2=hadoop05:2888:3888
4 server.3=hadoop06:2888:3888

   
    在(dataDir=/home/hadoop/zookeeper-3.4.10/data)创建一个myid文件,里面内容是server.N中的N(server.2里面内容为2)
      

1  echo "5" > myid    

    4.将配置好的zk拷贝到其他节点
      

1 scp -r /home/hadoop/zookeeper-3.4.5/ hadoop05:/home/hadoop
2 scp -r /home/hadoop/zookeeper-3.4.5/ hadoop06:/home/hadoop

  
    注意:在其他节点上一定要修改myid的内容
        在hadoop05应该将myid的内容改为2 (echo “6” > myid)
        在hadoop06应该将myid的内容改为3 (echo “7” > myid)

 5.启动集群
    分别启动hadoop04,hadoop05,hadoop06上的zookeeper      

1 zkServer.sh start

hadoop2.7.3集群安装


 

  1.解压         

1  tar -zxvf hadoop-2.7.3.tar.gz 

      2.配置core-site.xml          

 1 configuration>
 2     
 3     property>
 4         name>fs.defaultFSname>
 5         value>hdfs://ns1value>
 6     property>
 7     
 8     property>
 9         name>hadoop.tmp.dirname>
10         value>/home/hadoop/hadoop-2.7.3/tmpvalue>
11     property>
12     
13     property>
14         name>ha.zookeeper.quorumname>
15         value>hadoop04:2181,hadoop05:2181,hadoop06:2181value>
16     property>
17 configuration>

   3.配置hdf-site.xml

 1 configuration>
 2 
 3     property>
 4         name>dfs.nameservicesname>
 5         value>ns1value>
 6     property>
 7     
 8     property>
 9         name>dfs.ha.namenodes.ns1name>
10         value>nn1,nn2value>
11     property>
12     
13     property>
14         name>dfs.namenode.rpc-address.ns1.nn1name>
15         value>hadoop01:9000value>
16     property>
17     
18     property>
19         name>dfs.namenode.http-address.ns1.nn1name>
20         value>hadoop01:50070value>
21     property>
22     
23     property>
24         name>dfs.namenode.rpc-address.ns1.nn2name>
25         value>hadoop02:9000value>
26     property>
27     
28     property>
29         name>dfs.namenode.http-address.ns1.nn2name>
30         value>hadoop02:50070value>
31     property>
32     
33     property>
34         name>dfs.namenode.shared.edits.dirname>
35         value>qjournal://hadoop04:8485;hadoop05:8485;hadoop06:8485/ns1value>
36     property>
37     
38     property>
39         name>dfs.journalnode.edits.dirname>
40         value>file:/home/hadoop/hadoop-2.7.3/journalvalue>
41     property>
42     
43     property>
44         name>dfs.ha.automatic-failover.enabledname>
45         value>truevalue>
46     property>
47     
48     property>
49         name>dfs.client.failover.proxy.provider.ns1name>
50         value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvidervalue>
51     property>
52     
53     property>
54         name>dfs.ha.fencing.methodsname>
55         value>
56             sshfence
57             shell(/bin/true)
58         value>
59     property>
60     
61     property>
62         name>dfs.ha.fencing.ssh.private-key-filesname>
63         value>/home/hadoop/.ssh/id_rsavalue>
64     property>
65     
66     property>
67         name>dfs.ha.fencing.ssh.connect-timeoutname>
68         value>30000value>
69     property>
70 configuration>

  4.配置mapred-site.xml

1 configuration>
2 
3     property>
4         name>mapreduce.framework.namename>
5         value>yarnvalue>
6     property>
7 configuration>

  5.配置yarn-site.xml

 

 1 configuration>
 2 
 3 
 4     
 5     property>
 6        name>yarn.resourcemanager.hostname.rm1name>
 7        value>hadoop03value>
 8     property>        
 9     property>
10        name>yarn.nodemanager.aux-servicesname>
11        value>mapreduce_shufflevalue>
12     property>
13 configuration>

  6.配置slaves

1 hadoop04
2 hadoop05
3 hadoop06

  7.将配置好的hadoop拷贝到其他节点

scp -r /home/hadoop/hadoop-2.7.3 hadoop02:/home/hadoop
scp -r /home/hadoop/hadoop-2.7.3 hadoop03:/home/hadoop
scp -r /home/hadoop/hadoop-2.7.3 hadoop04:/home/hadoop
scp -r /home/hadoop/hadoop-2.7.3 hadoop05:/home/hadoop
scp -r /home/hadoop/hadoop-2.7.3 hadoop06:/home/hadoop

启动


 

  1.启动zookeeper集群(分别在hadoop04hadoop05hadoop06上启动zookeeper)
           

1 zkServer.sh start

  2.查看zookeeper状态

1 zkServer.sh status

  包含一个leader,二个follower

  

  3.启动journalnode(分别在hadoop04,hadoop05,hadoop06上执行)          

hadoop-daemon.sh start journalnode

        运行jps命令检验,hadoop04,hadoop05,hadoop06上多了JournalNode进程

  4.格式化HDFS

     在hadoop01上执行命令:          

1 hdfs namenode -format

  检查是否成功看终端知否打印:

  

       格式化后会在根据core-site.xml中的hadoop.tmp.dir配置生成个文件,这里楼主配置的是/home/hadoop/hadoop-2.7.3/tmp,然后将/home/hadoop/hadoop-2.7.3/tmp拷贝到ihadoop02的/home/hadoop/hadoop-2.7.3/下。
           

1 scp -r tmp/ hadoop02:/hadoop/hadoop-2.7.3/

  5.格式化ZK(在hadoop01上执行即可)
          

1  hdfs zkfc -formatZK

  效果如下(前面有点多截不下来,只截取了后面一部分):

  6.启动HDFS(在hadoop01上执行)

1 start-dfs.sh

      7.启动YARN(在hadoop03上执行)          

1 start-yarn.sh

验证


 

  到此,hadoop-2.7.3集群全部配置完毕,下面我们来验证:
  

  浏览器访问http://192.168.8.101:50070    NameNode ‘hadoop01:9000’ (active)
             http://192.168.8.102:50070   NameNode ‘hadoop02:9000’ (standby)

 

 

  浏览器访问resourceManager:http://192.168.8.103:8088

    

 

    我们可以模拟NameNode(active)宕机,来验证HDFS的HA是否有效,NameNode(active)宕机后,NameNode(standby)会转为active状态,这里楼主不在演示。

结语


 

  官网给出的文档还是比较详细的,楼主也是提取了官网的QJM解决方案来进行搭建。另外,yarn的HA搭建官网也给出了具体配置,有兴趣的同学可以试一试。

 

Hadoop之HDFS原理及文件上传下载源码分析(下)

  上篇Hadoop之HDFS原理及文件上传下载源码分析(上)楼主主要介绍了hdfs原理及FileSystem的初始化源码解析, Client如何与NameNode建立RPC通信。本篇将继续介绍hdfs文件上传、下载源解析。

文件上传

  先上文件上传的方法调用过程时序图:

  

  

   其主要执行过程:

  1.    FileSystem初始化,Client拿到NameNodeRpcServer代理对象,建立与NameNode的RPC通信(楼主上篇已经介绍过了)
  2.    调用FileSystem的create()方法,由于实现类为DistributedFileSystem,所有是调用该类中的create()方法
  3.    DistributedFileSystem持有DFSClient的引用,继续调用DFSClient中的create()方法
  4.    DFSOutputStream提供的静态newStreamForCreate()方法中调用NameNodeRpcServer服务端的create()方法并创建DFSOutputStream输出流对象返回
  5.    通过hadoop提供的IOUtil工具类将输出流输出到本地

  下面我们来看下源码:

  首先初始化文件系统,建立与服务端的RPC通信

  

1 HDFSDemo.java
2 OutputStream os = fs.create(new Path("/test.log"));

 

  调用FileSystem的create()方法,由于FileSystem是一个抽象类,这里实际上是调用的该类的子类create()方法

  

1  //FileSystem.java
2 public abstract FSDataOutputStream create(Path f,
3       FsPermission permission,
4       boolean overwrite,
5       int bufferSize,
6       short replication,
7       long blockSize,
8       Progressable progress) throws IOException;

   前面我们已经说过FileSystem.get()返回的是DistributedFileSystem对象,所以这里我们直接进入DistributedFileSystem:

 

  

 1   //DistributedFileSystem.java
 2 @Override
 3   public FSDataOutputStream create(final Path f, final FsPermission permission,
 4     final EnumSet cflags, final int bufferSize,
 5     final short replication, final long blockSize, final Progressable progress,
 6     final ChecksumOpt checksumOpt) throws IOException {
 7     statistics.incrementWriteOps(1);
 8     Path absF = fixRelativePart(f);
 9     return new FileSystemLinkResolver() {
10       @Override
11       public FSDataOutputStream doCall(final Path p)
12           throws IOException, UnresolvedLinkException {
13         final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
14                 cflags, replication, blockSize, progress, bufferSize,
15                 checksumOpt);
16         //dfs为DistributedFileSystem所持有的DFSClient对象,这里调用DFSClient中的create()方法
17         return dfs.createWrappedOutputStream(dfsos, statistics);
18       }
19       @Override
20       public FSDataOutputStream next(final FileSystem fs, final Path p)
21           throws IOException {
22         return fs.create(p, permission, cflags, bufferSize,
23             replication, blockSize, progress, checksumOpt);
24       }
25     }.resolve(this, absF);
26   }

  DFSClient的create()返回一个DFSOutputStream对象:

  

 1  //DFSClient.java
 2 public DFSOutputStream create(String src, 
 3                              FsPermission permission,
 4                              EnumSet flag, 
 5                              boolean createParent,
 6                              short replication,
 7                              long blockSize,
 8                              Progressable progress,
 9                              int buffersize,
10                              ChecksumOpt checksumOpt,
11                              InetSocketAddress[] favoredNodes) throws IOException {
12     checkOpen();
13     if (permission == null) {
14       permission = FsPermission.getFileDefault();
15     }
16     FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
17     if(LOG.isDebugEnabled()) {
18       LOG.debug(src + ": masked=" + masked);
19     }
20     //调用DFSOutputStream的静态方法newStreamForCreate,返回输出流
21     final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
22         src, masked, flag, createParent, replication, blockSize, progress,
23         buffersize, dfsClientConf.createChecksum(checksumOpt),
24         getFavoredNodesStr(favoredNodes));
25     beginFileLease(result.getFileId(), result);
26     return result;
27   }

  我们继续看下newStreamForCreate()中的业务逻辑:

  

 1 //DFSOutputStream.java
 2  static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
 3       FsPermission masked, EnumSet flag, boolean createParent,
 4       short replication, long blockSize, Progressable progress, int buffersize,
 5       DataChecksum checksum, String[] favoredNodes) throws IOException {
 6     TraceScope scope =
 7         dfsClient.getPathTraceScope("newStreamForCreate", src);
 8     try {
 9       HdfsFileStatus stat = null;
10       boolean shouldRetry = true;
11       int retryCount = CREATE_RETRY_COUNT;
12       while (shouldRetry) {
13         shouldRetry = false;
14         try {
15           //这里通过dfsClient的NameNode代理对象调用NameNodeRpcServer中实现的create()方法
16           stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
17               new EnumSetWritable(flag), createParent, replication,
18               blockSize, SUPPORTED_CRYPTO_VERSIONS);
19           break;
20         } catch (RemoteException re) {
21           IOException e = re.unwrapRemoteException(
22               AccessControlException.class,
23               DSQuotaExceededException.class,
24               FileAlreadyExistsException.class,
25               FileNotFoundException.class,
26               ParentNotDirectoryException.class,
27               NSQuotaExceededException.class,
28               RetryStartFileException.class,
29               SafeModeException.class,
30               UnresolvedPathException.class,
31               SnapshotAccessControlException.class,
32               UnknownCryptoProtocolVersionException.class);
33           if (e instanceof RetryStartFileException) {
34             if (retryCount > 0) {
35               shouldRetry = true;
36               retryCount--;
37             } else {
38               throw new IOException("Too many retries because of encryption" +
39                   " zone operations", e);
40             }
41           } else {
42             throw e;
43           }
44         }
45       }
46       Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
47      //new输出流对象
48       final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
49           flag, progress, checksum, favoredNodes);
50       out.start();//调用内部类DataStreamer的start()方法,DataStreamer继承Thread,所以说这是一个线程,从NameNode中申请新的block信息;
                同时前面我们介绍hdfs原理的时候提到的流水线作业(Pipeline)也是在这里实现,有兴趣的同学可以去研究下,这里就不带大家看了
51 return out; 52 } finally { 53 scope.close(); 54 } 55 }

    

  到此,Client拿到了服务端的输出流对象,那么后面就容易了,都是一些简答的文件输出,输入流的操作(hadoop提供的IOUitl)。

文件下载

  文件上传的大致流程与文件下载类似,与上传一样,我们先上程序方法调用时序图:

  

  主要执行过程:  

  1.    FileSystem初始化,Client拿到NameNodeRpcServer代理对象,建立与NameNode的RPC通信(与前面一样)
  2.    调用FileSystem的open()方法,由于实现类为DistributedFileSystem,所有是调用该类中的open()方法
  3.    DistributedFileSystem持有DFSClient的引用,继续调用DFSClient中的open()方法
  4.    实例化DFSInputStream输入流
  5.    调用openinfo()方法
  6.    调用fetchLocatedBlocksAndGetLastBlockLength()方法,抓取block信息并获取最后block长度
  7.        调用DFSClient中的getLocatedBlocks()方法,获取block信息
  8.    在callGetBlockLocations()方法中通过NameNode代理对象调用NameNodeRpcServer的getBlockLocations()方法
  9.        将block信息写入输出流
  10.        交给IOUtil,下载文件到本地

  接下来,我们开始看源码:

  首先任然是FileSystem的初始化,前面有,这里就不贴出来了,我们直接从DistributedFileSystem的open()开始看。

  

 1 //DistributedFifeSystem.java
 2 @Override
 3   public FSDataInputStream open(Path f, final int bufferSize)
 4       throws IOException {
 5     statistics.incrementReadOps(1);
 6     Path absF = fixRelativePart(f);
 7     return new FileSystemLinkResolver() {
 8       @Override
 9       public FSDataInputStream doCall(final Path p)
10           throws IOException, UnresolvedLinkException {
11         final DFSInputStream dfsis =
12           dfs.open(getPathName(p), bufferSize, verifyChecksum);
13         //dfs为DFSClient对象,调用open()返回输入流
14         return dfs.createWrappedInputStream(dfsis);
15       }
16       @Override
17       public FSDataInputStream next(final FileSystem fs, final Path p)
18           throws IOException {
19         return fs.open(p, bufferSize);
20       }
21     }.resolve(this, absF);
22   }

  DFSClient中并没有直接使用NameNode的代理对象,而是传给了DFSInputStream:

  

 1 //DFSClient.java
 2 public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
 3       throws IOException, UnresolvedLinkException {
 4     checkOpen();   
 5     TraceScope scope = getPathTraceScope("newDFSInputStream", src);
 6     try {
 7       //这里并没有直接通过NameNode的代理对象调用服务端的方法,直接new输入流并把当前对象作为参数传入
 8       return new DFSInputStream(this, src, verifyChecksum);
 9     } finally {
10       scope.close();
11     }
12   }

  那么在DFSInputStream必须持有DFSClient的引用:

  

 1 //DFSInputStream.java 构造
 2 DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum
 3                  ) throws IOException, UnresolvedLinkException {
 4     this.dfsClient = dfsClient;//只有DFSClient的引用
 5     this.verifyChecksum = verifyChecksum;
 6     this.src = src;
 7     synchronized (infoLock) {
 8       this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
 9     }
10     openInfo();//调openInfo()
11   }

  openInfo()用来抓取block信息:

 1 void openInfo() throws IOException, UnresolvedLinkException {
 2     synchronized(infoLock) {
 3       lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();//抓取block信息
 4       int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;//获取配置信息,尝试抓取的次数,楼主记得在2.6以前这里写的3;当然,现在的默认值也为3
 5       while (retriesForLastBlockLength > 0) {
 6         if (lastBlockBeingWrittenLength == -1) {
 7           DFSClient.LOG.warn("Last block locations not available. "
 8               + "Datanodes might not have reported blocks completely."
 9               + " Will retry for " + retriesForLastBlockLength + " times");
10           waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
11           lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
12         } else {
13           break;
14         }
15         retriesForLastBlockLength--;
16       }
17       if (retriesForLastBlockLength == 0) {
18         throw new IOException("Could not obtain the last block locations.");
19       }
20     }
21   }

 

  获取block信息:

 1 //DFSInputStream.java
 2 private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
 3     final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
 4     //回到DFSClient中来获取当前block信息
 5     if (DFSClient.LOG.isDebugEnabled()) {
 6       DFSClient.LOG.debug("newInfo = " + newInfo);
 7     }
 8     if (newInfo == null) {
 9       throw new IOException("Cannot open filename " + src);
10     }
11 
12     if (locatedBlocks != null) {
13       Iterator oldIter = locatedBlocks.getLocatedBlocks().iterator();
14       Iterator newIter = newInfo.getLocatedBlocks().iterator();
15       while (oldIter.hasNext() && newIter.hasNext()) {
16         if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
17           throw new IOException("Blocklist for " + src + " has changed!");
18         }
19       }
20     }
21     locatedBlocks = newInfo;
22     long lastBlockBeingWrittenLength = 0;
23     if (!locatedBlocks.isLastBlockComplete()) {
24       final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
25       if (last != null) {
26         if (last.getLocations().length == 0) {
27           if (last.getBlockSize() == 0) {         
28             return 0;
29           }
30           return -1;
31         }
32         final long len = readBlockLength(last);
33         last.getBlock().setNumBytes(len);
34         lastBlockBeingWrittenLength = len; 
35       }
36     }
37 
38     fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
39     //返回block开始写的位置
40     return lastBlockBeingWrittenLength;
41   }

  回到DFSClient中:

  

 1 DFSClient.java
 2 @VisibleForTesting
 3   public LocatedBlocks getLocatedBlocks(String src, long start, long length)
 4       throws IOException {
 5     TraceScope scope = getPathTraceScope("getBlockLocations", src);
 6     try {
 7       //这里NameNode作为参数传递到callGetBlockLocations()中
 8       return callGetBlockLocations(namenode, src, start, length);
 9     } finally {
10       scope.close();
11     }
12   }

  调用服务端方法,返回block信息:

 1 //DFSClient.java
 2 static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
 3       String src, long start, long length) 
 4       throws IOException {
 5     try {
 6      //看到这里,不用做过多的解释了吧?
 7       return namenode.getBlockLocations(src, start, length);
 8     } catch(RemoteException re) {
 9       throw re.unwrapRemoteException(AccessControlException.class,
10                                      FileNotFoundException.class,
11                                      UnresolvedPathException.class);
12     }
13   }

  最终将文件block相关信息写入输入流,通过工具类IOUtil输出到本地文件。

  那关于hadoop之hdfs原理及文件上传下载源码解析就写到这里,下系列的文章,楼主会写一些关于mapreduce或者hive相关的文章分享给大家。

  示例代码地址:https://github.com/LJunChina/hadoop

  

  

Hadoop之HDFS原理及文件上传下载源码分析(上)

HDFS原理

  首先说明下,hadoop的各种搭建方式不再介绍,相信各位玩hadoop的同学随便都能搭出来。

  楼主的环境:

  •   操作系统:Ubuntu 15.10
  •   hadoop版本:2.7.3
  •   HA:否(随便搭了个伪分布式)

文件上传

下图描述了Client向HDFS上传一个200M大小的日志文件的大致过程:

  首先,Client发起文件上传请求,即通过RPC与NameNode建立通讯。

  NameNode与各DataNode使用心跳机制来获取DataNode信息。NameNode收到Client请求后,获取DataNode信息,并将可存储文件的节点信息返回给Client。

  Client收到NameNode返回的信息,与对应的DataNode节点取得联系,并向该节点写文件。

  文件写入到DataNode后,以流水线的方式复制到其他DataNode(当然,这里面也有DataNode向NameNode申请block,这里不详细介绍),至于复制多少份,与所配置的hdfs-default.xml中的dfs.replication相关。

  元数据存储

  先明确几个概念:

  fsimage:元数据镜像文件。存储某一时段NameNode内存元数据信息。
  edits:操作日志文件。
  fstime:保存最近一次checkpoint的时间

  checkpoint可在hdfs-default.xml中具体配置,默认为3600秒:

1 property>
2   name>dfs.namenode.checkpoint.periodname>
3   value>3600value>
4   description>The number of seconds between two periodic checkpoints.
5   description>
6 property>

 

  fsimage和edits文件在namenode目录可以看到:

NameNode中的元数据信息:

 

  

  test.log文件上传后,Namenode始终在内存中保存metedata,用于处理“读请求”。metedata主要存储了文件名称(FileName),副本数量(replicas),分多少block存储(block-ids),分别存储在哪个节点上(id2host)等。

  到有“写请求”到来时,namenode会首先写editlog到磁盘,即向edits文件中写日志,成功返回后,才会修改内存,并且向客户端返回
  hadoop会维护一个fsimage文件,也就是namenode中metedata的镜像,但是fsimage不会随时与namenode内存中的metedata保持一致,而是每隔一段时间通过合并edits文件来更新内容。此时Secondary namenode就派上用场了,合并fsimage和edits文件并更新NameNode的metedata。
  Secondary namenode工作流程:

  1. secondary通知namenode切换edits文件
  2. secondary通过http请求从namenode获得fsimage和edits文件
  3. secondary将fsimage载入内存,然后开始合并edits
  4. secondary将新的fsimage发回给namenode
  5. namenode用新的fsimage替换旧的fsimage

  通过一张图可以表示为:

 文件下载

  文件下载相对来说就简单一些了,如图所示,Client要从DataNode上,读取test.log文件。而test.log由block1和block2组成。

  

  文件下载的主要流程为:

  • client向namenode发送请求。
  • namenode查看Metadata信息,返回test.log的block的位置。     

    Block1: h0,h1,h3
    Block2: h0,h2,h4

  • 开始从h0节点下载block1,block2。

源码分析

  我们先简单使用hadoop提供的API来实现文件的上传下载(文件删除、改名等操作比较简单,这里不演示):

 

  

 1 package cn.jon.hadoop.hdfs;
 2 
 3 import java.io.FileInputStream;
 4 import java.io.FileOutputStream;
 5 import java.io.IOException;
 6 import java.io.InputStream;
 7 import java.io.OutputStream;
 8 import java.net.URI;
 9 import java.net.URISyntaxException;
10 
11 import org.apache.hadoop.conf.Configuration;
12 import org.apache.hadoop.fs.FileSystem;
13 import org.apache.hadoop.fs.Path;
14 import org.apache.hadoop.io.IOUtils;
15 import org.junit.Before;
16 import org.junit.Test;
17 
18 public class HDFSDemo {
19     FileSystem fs = null;    
20     @Before
21     public void init(){
22         try {
23             //初始化文件系统
24             fs = FileSystem.get(new URI("hdfs://hadoopmaster:9000"), new Configuration(), "root");
25         } catch (IOException e) {
26             e.printStackTrace();
27         } catch (InterruptedException e) {
28             e.printStackTrace();
29         } catch (URISyntaxException e) {
30             e.printStackTrace();
31         }
32     }
33     public static void main(String[] args) {
34         
35     }
36     @Test
37     /**
38      * 文件上传
39      */
40     public void testFileUpload(){
41         try {
42             OutputStream os = fs.create(new Path("/test.log"));
43             FileInputStream fis = new FileInputStream("I://test.log");
44             IOUtils.copyBytes(fis, os, 2048,true);
45             //可以使用hadoop提供的简单方式
46             fs.copyFromLocalFile(new Path("I://test.log"), new Path("/test.log"));
47         } catch (IllegalArgumentException | IOException e) {
48             e.printStackTrace();
49         }
50     }
51     @Test    
52     /**
53      * 文件下载
54      */
55     public void testFileDownload(){
56         try {
57             InputStream is = fs.open(new Path("/test.log"));
58             FileOutputStream fos = new FileOutputStream("E://test.log");            
59             IOUtils.copyBytes(is, fos, 2048);
60             //可以使用hadoop提供的简单方式
61             fs.copyToLocalFile(new Path("/test.log"), new Path("E://test.log"));
62         } catch (IllegalArgumentException | IOException e) {
63             e.printStackTrace();
64         }
65     }
66 
67 }

  显而易见,只要是对hdfs上的文件进行操作,必须对FileSystem进行初始化,我们先来分析FileSystem的初始化:

  

1  public static FileSystem get(URI uri, Configuration conf) throws IOException {
2     return CACHE.get(uri, conf);//部分方法我只截取了部分代码,这里进入get()方法
3   }
1    FileSystem get(URI uri, Configuration conf) throws IOException{
2       Key key = new Key(uri, conf);
3       return getInternal(uri, conf, key);//调用getInternal()
4     }
 1 private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
 2      //使用单例模式创建FileSystem,这是由于FS的初始化需要大量的时间,使用单例保证只是第一次加载慢一些,返回FileSystem的子类实现DistributedFileSystem
 3       FileSystem fs;
 4       synchronized (this) {
 5         fs = map.get(key);
 6       }
 7       if (fs != null) {
 8         return fs;
 9       }
10 
11       fs = createFileSystem(uri, conf);
12       synchronized (this) { // refetch the lock again
13         FileSystem oldfs = map.get(key);
14         if (oldfs != null) { // a file system is created while lock is releasing
15           fs.close(); // close the new file system
16           return oldfs;  // return the old file system
17         }
18         
19         // now insert the new file system into the map
20         if (map.isEmpty()
21                 && !ShutdownHookManager.get().isShutdownInProgress()) {
22           ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
23         }
24         fs.key = key;
25         map.put(key, fs);
26         if (conf.getBoolean("fs.automatic.close", true)) {
27           toAutoClose.add(key);
28         }
29         return fs;
30       }
31     }

 

 1 public void initialize(URI uri, Configuration conf) throws IOException {
 2     super.initialize(uri, conf);
 3     setConf(conf);
 4 
 5     String host = uri.getHost();
 6     if (host == null) {
 7       throw new IOException("Incomplete HDFS URI, no host: "+ uri);
 8     }
 9     homeDirPrefix = conf.get(
10         DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,
11         DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);
12     
13     this.dfs = new DFSClient(uri, conf, statistics);//实例化DFSClient,并将它作为DistributedFileSystem的引用,下面我们跟进去
14     this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
15     this.workingDir = getHomeDirectory();
16   }

 

 1 public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
 2       Configuration conf, FileSystem.Statistics stats)
 3     throws IOException {
 4     //该构造太长,楼主只截取了重要部分给大家展示,有感兴趣的同学可以亲手进源码瞧瞧     
 5     NameNodeProxies.ProxyAndInfo proxyInfo = null;
 6     //这里声明了NameNode的代理对象,跟我们前面讨论的rpc就息息相关了
 7     if (proxyInfo != null) {
 8       this.dtService = proxyInfo.getDelegationTokenService();
 9       this.namenode = proxyInfo.getProxy();
10     } else if (rpcNamenode != null) {   
11       Preconditions.checkArgument(nameNodeUri == null);
12       this.namenode = rpcNamenode;
13       dtService = null;
14     } else {
15       Preconditions.checkArgument(nameNodeUri != null,
16           "null URI");
17       proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
18           ClientProtocol.class, nnFallbackToSimpleAuth);
19       this.dtService = proxyInfo.getDelegationTokenService();
20       this.namenode = proxyInfo.getProxy();//获取NameNode代理对象引用并自己持有,this.namenode类型为ClientProtocol,它是一个接口,我们看下这个接口
21     }
22   }

 

1 public interface ClientProtocol{
2       public static final long versionID = 69L;
3       //还有很多对NameNode操作的方法申明,包括对文件上传,下载,删除等
4       //楼主特意把versionID贴出来了,这就跟我们写的RPCDemo中的MyBizable接口完全类似,所以说Client一旦拿到该接口实现类的代理对象(NameNodeRpcServer),Client就可以实现与NameNode的RPC通信,我们继续跟进
5 }

 

 1  public static  ProxyAndInfo createProxy(Configuration conf,
 2       URI nameNodeUri, Class xface, AtomicBoolean fallbackToSimpleAuth)
 3       throws IOException {
 4     AbstractNNFailoverProxyProvider failoverProxyProvider =
 5         createFailoverProxyProvider(conf, nameNodeUri, xface, true,
 6           fallbackToSimpleAuth);  
 7     if (failoverProxyProvider == null) {
 8       // 如果不是HA的创建方式,楼主环境是伪分布式,所以走这里,我们跟进去
 9       return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
10           UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
11     } else {
12       // 如果有HA的创建方式
13       Conf config = new Conf(conf);
14       T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
15           RetryPolicies.failoverOnNetworkException(
16               RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts,
17               config.maxRetryAttempts, config.failoverSleepBaseMillis,
18               config.failoverSleepMaxMillis));
19       return new ProxyAndInfo(proxy, dtService,
20           NameNode.getAddress(nameNodeUri));
21     }
22   }

   最终返回的为ClientProtocol接口的子类代理对象,而NameNodeRpcServer类实现了ClientProtocol接口,因此返回的为NameNode的代理对象,当客户端拿到了NameNode的代理对象后,即与NameNode建立了RPC通信:

 1 private static ClientProtocol createNNProxyWithClientProtocol(
 2       InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
 3       boolean withRetries, AtomicBoolean fallbackToSimpleAuth)
 4       throws IOException {
 5     RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);//是不是感觉越来越像我们前面说到的RPC
 6 
 7     final RetryPolicy defaultPolicy = 
 8         RetryUtils.getDefaultRetryPolicy(//加载默认策虐
 9             conf, 
10             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, 
11             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT, 
12             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
13             DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
14             SafeModeException.class);
15     
16     final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
17     //看到versionId了吗?这下明白了rpc的使用中目标接口必须要有这个字段了吧
18     ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
19         ClientNamenodeProtocolPB.class, version, address, ugi, conf,
20         NetUtils.getDefaultSocketFactory(conf),
21         org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
22         fallbackToSimpleAuth).getProxy();
23     //看到没?这里使用 RPC.getProtocolProxy()来创建ClientNamenodeProtocolPB对象,调试时可以清楚的看见,该对象引用的是一个代理对象,值为$Proxy12,由JDK的动态代理来实现。
24     //前面我们写RPCDemo程序时,用的是RPC.getProxy(),但是各位大家可以去看RPC源码,RPC.getProtocolProxy()最终还是调用的getProxy()
25     if (withRetries) {
26       Map methodNameToPolicyMap 
27                  = new HashMap();    
28       ClientProtocol translatorProxy =
29         new ClientNamenodeProtocolTranslatorPB(proxy);
30       return (ClientProtocol) RetryProxy.create(//这里再次使用代理模式对代理对象进行包装,也可以理解为装饰者模式
31           ClientProtocol.class,
32           new DefaultFailoverProxyProvider(
33               ClientProtocol.class, translatorProxy),
34           methodNameToPolicyMap,
35           defaultPolicy);
36     } else {
37       return new ClientNamenodeProtocolTranslatorPB(proxy);
38     }
39   }

  整个FileSystem的初始化用时序图表示为:

 

  到此,FileSystem的初始化就基本完成。由于文章篇幅过大的问题,所以楼主把HDFS原理及源码分析拆分成了两部分,上半部分主要是HDFS原理与FileSystem的初始化介绍,那在下半部分将会具体介绍HDFS文件上传、下载的源码解析。

  另外,文章用到的一些示例代码,将会在下半部分发布后,楼主一起上传到GitHub。

 

Hadoop之RPC简单使用(远程过程调用协议)

  一、RPC概述

  RPC是指远程过程调用,也就是说两台不同的服务器(不受操作系统限制),一个应用部署在Linux-A上,一个应用部署在Windows-B或Linux-B上,若A想要调用B上的某个方法method(),由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语意和传达调用的参数。

  楼主在接触RPC之前,用得最多的莫过于WebService。WebService可以说是在RPC发展的基础之上。RPC的协议有很多,比如最早的CORBA,Java RMI,Web Service等,又比如现在阿里巴巴的Dubbo,Apache下的hadoop项目。该篇楼主主要以hadoop的RPC为例。

  hadoop为何要使用RPC?在HDFS中,我们通过jsp可查看到有DataNode,NameNode,SecondaryNameNode主要进程(楼主只启动了HDFS),我们客户端Client与NameNode通信,NameNode与DataNode的通信,都是在不同进程间,不同系统间的通信。

  

  二、RPC流程

 

  通过下图,我们简单分析RPC的执行流程:

  

 

  首先,要解决通讯的问题,主要是通过在Client和Server之间建立TCP连接,远程过程调用的所有交换的数据都在这个连接里传输。连接可以是按需连接,调用结束后就断掉,也可以是长连接,多个远程过程调用共享同一个连接。

  第二,要解决寻址的问题,也就是说,A服务器上的应用怎么告诉底层的RPC框架,如何连接到B服务器(如主机或IP地址)以及特定的端口,方法的名称名称是什么,这样才能完成调用。

  第三,当Client上的应用发起远程过程调用时,方法的参数需要通过底层的网络协议如TCP传递到Server,由于网络协议是基于二进制的,内存中的参数的值要序列化成二进制的形式,也就是序列化(Serialize),通过寻址和传输将序列化的二进制发送给B服务器。

  第四,Server收到请求后,需要对参数进行反序列化(序列化的逆操作),恢复为内存中的表达方式,然后找到对应的方法(寻址的一部分)进行本地调用,然后得到返回值。

  三、hadoop—RPC的简单使用

  定义接口Bizable:

  

1 package cn.jon.hadoop.rpc;
2 
3 public interface MyBizable {
4     long versionID = 123456;//该字段必须要有,不然会报java.lang.NoSuchFieldException: versionID异常
5     public String doSomething(String str);
6 }

  服务端RPCServer实现MyBizable接口并绑定IP地址及端口号:

package cn.jon.hadoop.rpc;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;

public class RPCServer implements MyBizable {

    @Override
    public String doSomething(String str) {
        return str;
    }
    /**
     * @param args
     * @throws Exception 
     * @throws  
     */
    public static void main(String[] args) throws  Exception {
        Server server = new RPC.Builder(new Configuration())        
        .setProtocol(MyBizable.class)
        .setInstance(new RPCServer())
        .setBindAddress("192.168.8.100")
        .setPort(8077)
        .build();
        server.start();
    }

}

  客户端RPCClient:

  

package cn.jon.hadoop.rpc;

import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;

public class RPCClient {

    /**
     * @param args
     * @throws Exception 
     */
    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        MyBizable proxy = RPC.getProxy(MyBizable.class, 123456,new InetSocketAddress("192.168.8.100", 8077) , new Configuration());
        String result = proxy.doSomething("服务端");
        System.out.println(result);
        RPC.stopProxy(proxy);
    }

}

  楼主使用Linux作为客户端,Windows作为服务端,我们先把写好的程序打成jar,上传到Linux:

  

  然后,我们在windows端启动RPCServer:

  

  服务端启动好后,我们在Linux中执行RPCClient.jar:  

java -jar RPCClient.jar

  执行结果可以看到输出了“服务端”(楼主Linux时间没有调准确):

    

  相关代码地址:https://github.com/LJunChina/hadoop

  下篇楼主将对HDFS原理进行探讨,到时会更加详细的讨论RPC。

JavaScript数据结构——链表的实现

  前面楼主分别讨论了数据结构栈与队列的实现,当时所用的数据结构都是用的数组来进行实现,但是数组有的时候并不是最佳的数据结构,比如在数组中新增删除元素的时候需要将其他元素进行移动,而在javascript中使用spit()方法不需要访问其他元素。如果你在使用数组的时候发现很慢,就可以考虑使用链表。

 

  链表的概念

  链表是一种常见的数据结构。它是动态地进行存储分配的一种结构。链表有一个“头指针”变量,以head表示,它存放一个地址,指向一个元素。每个结点都使用一个对象的引用指标它的后继,指向另一个结点的引用叫做链。

  

 

  数组元素依靠下标(位置)来进行引用,而链表元素则是靠相互之间的关系来进行引用。因此链表的插入效率很高,下图演示了链表结点d的插入过程:  

  删除过程:

  

  基于对象的链表

  我们定义2个类,Node类与LinkedList类,Node为结点数据,LinkedList保存操作链表的方法。

  首先看Node类:  

1 function Node(element){
2     this.element = element;
3     this.next = null;
4 }

  element用来保存结点上的数据,next用来保存指向一下结点的的链接。  

  LinkedList类:

 1 function LinkedList(){
 2 
 3          this.head = new Node('head');
 4 
 5          this.find = find;
 6 
 7          this.insert = insert;
 8 
 9          this.remove = remove;
10 
11          this.show = show;
12 
13 }

  find()方法,从头结点开始,沿着链表结点一直查找,直到找到与item内容相等的element则返回该结点,没找到则返回空。

1 function find(item){
2          var currentNode = this.head;//从头结点开始
3          while(currentNode.element!=item){
4                   currentNode =  currentNode.next;
5          }
6          return currentNode;//找到返回结点数据,没找到返回null
7 }

  Insert方法。通过前面元素插入的演示可以看出,实现插入简单四步:

1、创建结点

2、找到目标结点

3、修改目标结点的next指向链接

4、将目标结点的next值赋值给要插入的结点的next

1 function insert(newElement,item){
2          var newNode = new Node(newElement);
3          var currentNode = this.find(item);
4          newNode.next = currentNode.next;
5          currentNode.next = newNode;
6 }

  Remove()方法。删除某一节点需要先找到被删除结点的前结点,为此我们定义方法frontNode():

1 function frontNode(item){
2          var currentNode = this.head;
3          while(currentNode.next.element!=item&&currentNode.next!=null){
4                   currentNode = currentNode.next;
5          }      
6          return currentNode;
7 }

  简答三步:

1、创建结点

2、找到目标结点的前结点

3、修改前结点的next指向被删除结点的n后一个结点

1 function remove(item){
2          var frontNode = this.frontNode(item);
3          //console.log(frontNode.element);
4          frontNode.next = frontNode.next.next;
5 }

  Show()方法:

1 function show(){
2          var currentNode = this.head,result;
3          while(currentNode.next!=null){
4                   result += currentNode.next.element;//为了不显示head结点
5                   currentNode = currentNode.next;
6          }
7 }

  测试程序:

1 var list = new LinkedList();
2 list.insert("a","head");
3 list.insert("b","a");
4 list.insert("c","b");
5 console.log(list.show());
6 list.remove("b");
7 console.log(list.show());

  输出:

 

  双向链表

  从链表的头节点遍历到尾节点很简单,但有的时候,我们需要从后向前遍。此时我们可以通过给 Node 对象增加一个属性,该属性存储指向前驱节点的链接。楼主用下图来双向链表的工作原理。

 

  首先我们先给Node类增加front属性:  

1 function Node(element){
2     this.element = element;
3     this.next = null;
4     this.front = null;
5 }

  当然,对应的insert()方法和remove()方法我们也需要做相应的修改:  

 1 function insert(newElement,item){
 2     var newNode = new Node(newElement);
 3     var currentNode = this.find(item);
 4     newNode.next = currentNode.next;
 5     newNode.front = currentNode;//增加front指向前驱结点
 6     currentNode.next = newNode;
 7 }
 8 
 9 
10 function remove(item){   
11     var currentNode = this.find(item);//找到需要删除的节点
12     if (currentNode.next != null) {
13         currentNode.front.next = currentNode.next;//让前驱节点指向需要删除的节点的下一个节点
14         currentNode.next.front = currentNode.front;//让后继节点指向需要删除的节点的上一个节点
15         currentNode.next = null;//并设置前驱与后继的指向为空
16         currentNode.front = null;       
17     }    
18 }

  反序显示链表:

  需要给双向链表增加一个方法,用来查找最后的节点。 findLast() 方法找出了链表中的最后一个节点,可以免除从前往后遍历链。

1 function findLast() {//查找链表的最后一个节点
2     var currentNode = this.head;
3     while (currentNode.next != null) {
4         currentNode = currentNode.next;
5     }
6     return currentNode;
7 }

  实现反序输出:

1 function showReverse() {
2     var currentNode = this.head, result = "";
3     currentNode = this.findLast();  
4     while(currentNode.front!=null){
5         result += currentNode.element + " ";
6         currentNode = currentNode.front;
7     }
8     return result;
9 }

  测试程序:

1 var list = new LinkedList();
2 list.insert("a","head");
3 list.insert("b","a");
4 list.insert("c","b");
5 console.log(list);
6 list.remove("b");
7 console.log(list.show());
8 console.log(list.showReverse());

  输出:

 

 

  循环链表

 

  循环链表是另一种形式的链式存贮结构。它的特点是表中最后一个结点的指针域指向头结点,整个链表形成一个环。循环链表和单向链表相似,节点类型都是一样的。唯一的区别是,在创建循环链表时,让其头节点的 next 属性指向它本身,即:

1 head.next = head

  这种行为会传导至链表中的每个节点,使得每个节点的 next 属性都指向链表的头节点。楼主用下图来表示循环链表:

  

  

  修改构造方法:

1 function LinkedList(){
2     this.head = new Node('head');//初始化
3     this.head.next = this.head;//直接将头节点的next指向头节点形成循环链表
4     this.find = find;
5     this.frontNode = frontNode;
6     this.insert = insert;
7     this.remove = remove;
8     this.show = show;  
9 }

  这时需要注意链表的输出方法show()与find()方法,原来的方式在循环链表里会陷入死循环,while循环的循环条件需要修改为当循环到头节点时退出循环。

function find(item){
    var currentNode = this.head;//从头结点开始
    while(currentNode.element!=item&&currentNode.next.element!='head'){
        currentNode =  currentNode.next;
    }
    return currentNode;//找到返回结点数据,没找到返回null
}
function show(){
    var currentNode = this.head,result = "";
    while (currentNode.next != null && currentNode.next.element != "head") {      
        result += currentNode.next.element + " ";
        currentNode = currentNode.next;
    }
    return result;
}

  测试程序:

var list = new LinkedList();
list.insert("a","head");
list.insert("b","a");
list.insert("c","b");
console.log(list.show());
list.remove("b");
console.log(list.show());

  测试结果:

 

  本文用到的示例代码地址:https://github.com/LJunChina/JavaScript