存档五月 2017

分布式队列ZooKeeper的实现

一、背景

  有一些时候,多个团队需要共同完成一个任务,比如,A团队将Hadoop集群计算的结果交给B团队继续计算,B完成了自己任务再交给C团队继续做。这就有点像业务系统的工作流一样,一环一环地传下

去,直到最后一部分完成。在业务系统中,我们经常会用SOA的架构来解决这种问题,每个团队在ESB(企业服务股总线)服务器上部署自己的服务,然后通过消息中间件完成调度任务。对亍分步式的多个

Hadoop集群系统的协作,同样可以用这种架构来做只要把消息中间件引擎换成支持分步式的消息中间件的引擎就行了。

  本文楼主将使用zookeeper做为分步式消息中间件构造一个大型超市的部分数据计算模型来完成各个区域利润计算的业务需求。

  由于采购和销售分别是由不同厂商进行的软件开发和维护,而且业务往来也在不同的城市和地区。 所以在每月底结算时,工作量都特别大。 比如,计算利润表: 当月利润 = 当月销售金额 – 当月采购

额 – 当月其他支出(楼主只是粗略计算)。如果采购系统是单独的系统,销售是另外单独的系统,及以其他几十个大大小小的系统, 如何能让多个系统,配合起来完成该需求?

二、系统构思

  楼主基于zookeeper来构建一个分步式队列的应用,来解决上面的功能需求。排除了ESB的部分,只保留zookeeper进行实现。

  1.   采购数据:海量数据,基于Hadoop存储和分析(楼主环境有限,只使用了很少的数据)
  2.   销售数据:海量数据,基于Hadoop存储和分析(楼主环境有限,只使用了很少的数据)
  3.   其他费用支出:为少量数据,基于文件或数据库存储和分析

  我们设计一个同步队列,这个队列有3个条件节点,分别对应采购(purchase),销售 (sell),其他费用(other)3个部分。当3个节点都被创建后,程序会自动触发计算利润, 幵创建利润(profit)节点。上面3个节点的创建,无顺序要求。每个节点只能被创建一次 。

  

 

  Hadoop mapreduce1,Hadoop mapreduce2 是2个独立的Hadoop集群应用。 Java App 是2个独立的Java应用 。ZooKeeper集群的有3个节点 。

  • /queue,是znode的队列目录,假设队列长度为3
  • /queue/purchase,是znode队列中,1号排对者,由Hadoop mapreduce1提交,用于统计采购金额
  • /queue/sell,是znode队列中,2号排对者,由Hadoop mapreduce2提交,用于统计销售金额
  • /queue/other,是znode队列中,3号排对者,由Java App提交,用于统计其他费用支出金额
  • /queue/profit,当znode队列中满了,触发创建利润节点。

  当/qeueu/profit被创建后,利润java app被启动,所有zookeeper的连接通知同步程序(红色线),队列已完成,所有程序结束。

三、环境准备

  1)hadoop集群。楼主用的6个节点的hadoop2.7.3集群,各位同学可以根据自己的实际情况进行搭建,但至少需要1台伪分布式的。(参考http://www.cnblogs.com/qq503665965/p/6790580.html

  2)zookeeper集群。至少三个节点。安装参考楼主这篇文章(http://www.cnblogs.com/qq503665965/p/6790580.html

  3)java开发环境。

四、mapreduce及java app程序

  计算采购金额:

  1 package zkqueue;
  2 import java.io.IOException;
  3 import java.util.HashMap;
  4 import java.util.Map;
  5 import java.util.regex.Pattern;
  6 
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.IntWritable;
  9 import org.apache.hadoop.io.LongWritable;
 10 import org.apache.hadoop.io.Text;
 11 import org.apache.hadoop.mapred.JobConf;
 12 import org.apache.hadoop.mapreduce.Job;
 13 import org.apache.hadoop.mapreduce.Mapper;
 14 import org.apache.hadoop.mapreduce.Reducer;
 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 19 
 20 
 21 /**
 22  * 采购金额计算
 23  * @author Jon_China
 24  *
 25  */
 26 public class Purchase {
 27 
 28     public static final String HDFS = "hdfs://192.168.8.101:9000";
 29     public static final Pattern DELIMITER = Pattern.compile("[\t,]");
 30 
 31     public static class PurchaseMapper extends Mapper {
 32 
 33         private String month = "2017-01";
 34         private Text k = new Text(month);
 35         private IntWritable v = new IntWritable();
 36         private int money = 0;
 37 
 38         public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
 39             System.out.println(values.toString());
 40             String[] tokens = DELIMITER.split(values.toString());//拆分源数据
 41             if (tokens[3].startsWith(month)) {// 过滤1月份数据
 42                 money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//计算
 43                 v.set(money);
 44                 context.write(k, v);
 45             }
 46         }
 47     }
 48 
 49     public static class PurchaseReducer extends Reducer {
 50         private IntWritable v = new IntWritable();
 51         private int money = 0;
 52 
 53         @Override
 54         public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
 55             for (IntWritable line : values) {
 56                 money += line.get();
 57             }
 58             v.set(money);
 59             context.write(null, v);
 60             System.out.println("Output:" + key + "," + money);
 61         }
 62 
 63     }
 64 
 65     public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
 66         JobConf conf = config();
 67         String local_data = path.get("purchase");
 68         String input = path.get("input");
 69         String output = path.get("output");
 70 
 71         
 72         HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
 73         hdfs.rmr(input);
 74         hdfs.mkdirs(input);
 75         hdfs.copyFile(local_data, input);
 76 
 77         Job job = Job.getInstance(conf);
 78         job.setJarByClass(Purchase.class);
 79 
 80         job.setOutputKeyClass(Text.class);
 81         job.setOutputValueClass(IntWritable.class);
 82 
 83         job.setMapperClass(PurchaseMapper.class);
 84         job.setReducerClass(PurchaseReducer.class);
 85 
 86         job.setInputFormatClass(TextInputFormat.class);
 87         job.setOutputFormatClass(TextOutputFormat.class);
 88 
 89         FileInputFormat.setInputPaths(job, new Path(input));
 90         FileOutputFormat.setOutputPath(job, new Path(output));
 91 
 92         job.waitForCompletion(true);
 93     }
 94 
 95     public static JobConf config() {
 96         JobConf conf = new JobConf(Purchase.class);
 97         conf.setJobName("purchase");
 98         conf.addResource("classpath:/hadoop/core-site.xml");
 99         conf.addResource("classpath:/hadoop/hdfs-site.xml");
100         conf.addResource("classpath:/hadoop/mapred-site.xml");
101         conf.addResource("classpath:/hadoop/yarn-site.xml");
102         return conf;
103     }
104     
105     public static Map path(){
106         Map path = new HashMap();
107         path.put("purchase", Purchase.class.getClassLoader().getResource("logfile/biz/purchase.csv").getPath());// 源文件数据
108         path.put("input", HDFS + "/user/hdfs/biz/purchase");//hdfs存储路径
109         path.put("output", HDFS + "/user/hdfs/biz/purchase/output"); //hdfs输出路径
110         return path;
111     }
112 
113     public static void main(String[] args) throws Exception {
114         run(path());
115     }
116 
117 }

  销售数据计算:

  1 package zkqueue;
  2 
  3 import java.io.IOException;
  4 import java.util.HashMap;
  5 import java.util.Map;
  6 import java.util.regex.Pattern;
  7 
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.IntWritable;
 10 import org.apache.hadoop.io.LongWritable;
 11 import org.apache.hadoop.io.Text;
 12 import org.apache.hadoop.mapred.JobConf;
 13 import org.apache.hadoop.mapreduce.Job;
 14 import org.apache.hadoop.mapreduce.Mapper;
 15 import org.apache.hadoop.mapreduce.Reducer;
 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 17 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 20 
 21 /**
 22  * 销售数据计算
 23  * @author Jon_China
 24  *
 25  */
 26 public class Sell {
 27 
 28     public static final String HDFS = "hdfs://192.168.8.101:9000";
 29     public static final Pattern DELIMITER = Pattern.compile("[\t,]");
 30 
 31     public static class SellMapper extends Mapper {
 32 
 33         private String month = "2013-01";
 34         private Text k = new Text(month);
 35         private IntWritable v = new IntWritable();
 36         private int money = 0;
 37 
 38         public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
 39             System.out.println(values.toString());
 40             String[] tokens = DELIMITER.split(values.toString());
 41             if (tokens[3].startsWith(month)) {// 1月的数据
 42                 money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//单价*数量
 43                 v.set(money);
 44                 context.write(k, v);
 45             }
 46         }
 47     }
 48 
 49     public static class SellReducer extends Reducer {
 50         private IntWritable v = new IntWritable();
 51         private int money = 0;
 52 
 53         @Override
 54         public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
 55             for (IntWritable line : values) {
 56                 money += line.get();
 57             }
 58             v.set(money);
 59             context.write(null, v);
 60             System.out.println("Output:" + key + "," + money);
 61         }
 62 
 63     }
 64 
 65     public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
 66         JobConf conf = config();
 67         String local_data = path.get("sell");
 68         String input = path.get("input");
 69         String output = path.get("output");
 70 
 71         // 初始化sell
 72         HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
 73         hdfs.rmr(input);
 74         hdfs.mkdirs(input);
 75         hdfs.copyFile(local_data, input);
 76 
 77         Job job = Job.getInstance(conf);
 78         job.setJarByClass(Sell.class);
 79 
 80         job.setOutputKeyClass(Text.class);
 81         job.setOutputValueClass(IntWritable.class);
 82 
 83         job.setMapperClass(SellMapper.class);
 84         job.setReducerClass(SellReducer.class);
 85 
 86         job.setInputFormatClass(TextInputFormat.class);
 87         job.setOutputFormatClass(TextOutputFormat.class);
 88 
 89         FileInputFormat.setInputPaths(job, new Path(input));
 90         FileOutputFormat.setOutputPath(job, new Path(output));
 91 
 92         job.waitForCompletion(true);
 93     }
 94 
 95     public static JobConf config() {// Hadoop集群的远程配置信息
 96         JobConf conf = new JobConf(Purchase.class);
 97         conf.setJobName("purchase");
 98         conf.addResource("classpath:/hadoop/core-site.xml");
 99         conf.addResource("classpath:/hadoop/hdfs-site.xml");
100         conf.addResource("classpath:/hadoop/mapred-site.xml");
101         conf.addResource("classpath:/hadoop/yarn-site.xml");
102         return conf;
103     }
104     
105     public static Map path(){
106         Map path = new HashMap();
107         path.put("sell", Sell.class.getClassLoader().getResource("logfile/biz/sell.csv").getPath());// 本地的数据文件
108         path.put("input", HDFS + "/user/hdfs/biz/sell");// HDFS的目录
109         path.put("output", HDFS + "/user/hdfs/biz/sell/output"); // 输出目录
110         return path;
111     }
112 
113     public static void main(String[] args) throws Exception {
114         run(path());
115     }
116 
117 }

  其他金额计算:

 1 package zkqueue;
 2 
 3 import java.io.BufferedReader;
 4 import java.io.File;
 5 import java.io.FileReader;
 6 import java.io.IOException;
 7 import java.util.regex.Pattern;
 8 
 9 public class Other {
10 
11     public static String file = "/logfile/biz/other.csv";
12     public static final Pattern DELIMITER = Pattern.compile("[\t,]");
13     private static String month = "2017-01";
14 
15     public static void main(String[] args) throws IOException {
16         calcOther(file);
17     }
18 
19     public static int calcOther(String file) throws IOException {
20         int money = 0;
21         BufferedReader br = new BufferedReader(new FileReader(new File(file)));
22 
23         String s = null;
24         while ((s = br.readLine()) != null) {
25             String[] tokens = DELIMITER.split(s);
26             if (tokens[0].startsWith(month)) {// 1月的数据
27                 money += Integer.parseInt(tokens[1]);
28             }
29         }
30         br.close();
31         System.out.println("Output:" + month + "," + money);
32         return money;
33     }
34 }

  计算利润:

  

 1 package zkqueue;
 2 
 3 import java.io.IOException;
 4 
 5 
 6 /**
 7  * 利润计算
 8  * @author Jon_China
 9  *
10  */
11 public class Profit {
12 
13     public static void main(String[] args) throws Exception {
14         profit();
15     }
16 
17     public static void profit() throws Exception {
18         int sell = getSell();
19         int purchase = getPurchase();
20         int other = getOther();
21         int profit = sell - purchase - other;
22         System.out.printf("profit = sell - purchase - other = %d - %d - %d = %d\n", sell, purchase, other, profit);
23     }
24 
25     public static int getPurchase() throws Exception {
26         HdfsDAO hdfs = new HdfsDAO(Purchase.HDFS, Purchase.config());
27         return Integer.parseInt(hdfs.cat(Purchase.path().get("output") + "/part-r-00000").trim());
28     }
29 
30     public static int getSell() throws Exception {
31         HdfsDAO hdfs = new HdfsDAO(Sell.HDFS, Sell.config());
32         return Integer.parseInt(hdfs.cat(Sell.path().get("output") + "/part-r-00000").trim());
33     }
34 
35     public static int getOther() throws IOException {
36         return Other.calcOther(Other.file);
37     }
38 
39 }

  zookeeper任务调度:

  1 package zkqueue;
  2 
  3 import java.io.IOException;
  4 import java.util.List;
  5 
  6 import org.apache.zookeeper.CreateMode;
  7 import org.apache.zookeeper.KeeperException;
  8 import org.apache.zookeeper.WatchedEvent;
  9 import org.apache.zookeeper.Watcher;
 10 import org.apache.zookeeper.ZooDefs.Ids;
 11 import org.apache.zookeeper.ZooKeeper;
 12 /**
 13  * 分布式队列zookeeper调度
 14  * @author Jon_China
 15  *
 16  */
 17 public class QueueZookeeper {
 18     //设置队列目录树
 19     final public static String QUEUE = "/queue";
 20     final public static String PROFIT = "/queue/profit";
 21     final public static String PURCHASE = "/queue/purchase";
 22     final public static String SELL = "/queue/sell";
 23     final public static String OTHER = "/queue/other";
 24 
 25     public static void main(String[] args) throws Exception {
 26         if (args.length == 0) {
 27             System.out.println("Please start a task:");
 28         } else {
 29             doAction(Integer.parseInt(args[0]));
 30         }
 31     }
 32     public static void doAction(int client) throws Exception {
 33         //zookeeper地址
 34         String host1 = "192.168.8.104:2181";
 35         String host2 = "192.168.8.105:2181";
 36         String host3 = "192.168.8.106:2181";
 37 
 38         ZooKeeper zk = null;
 39         switch (client) {//1,2,3分别将不同任务加入队列
 40         case 1:
 41             zk = connection(host1);
 42             initQueue(zk);
 43             doPurchase(zk);
 44             break;
 45         case 2:
 46             zk = connection(host2);
 47             initQueue(zk);
 48             doSell(zk);
 49             break;
 50         case 3:
 51             zk = connection(host3);
 52             initQueue(zk);
 53             doOther(zk);
 54             break;
 55         }
 56     }
 57 
 58     // 创建一个与服务器的连接
 59     public static ZooKeeper connection(String host) throws IOException {
 60         ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
 61             // 监控所有被触发的事件
 62             public void process(WatchedEvent event) {
 63                 if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals(PROFIT)) {
 64                     System.out.println("Queue has Completed!!!");
 65                 }
 66             }
 67         });
 68         return zk;
 69     }
 70     /**
 71      * 初始化队列
 72      * @param zk
 73      * @throws KeeperException
 74      * @throws InterruptedException
 75      */
 76     public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
 77         System.out.println("WATCH => " + PROFIT);
 78         zk.exists(PROFIT, true);
 79 
 80         if (zk.exists(QUEUE, false) == null) {
 81             System.out.println("create " + QUEUE);
 82             zk.create(QUEUE, QUEUE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 83         } else {
 84             System.out.println(QUEUE + " is exist!");
 85         }
 86     }
 87     /**
 88      * 采购任务
 89      * @param zk
 90      * @throws Exception
 91      */
 92     public static void doPurchase(ZooKeeper zk) throws Exception {
 93         if (zk.exists(PURCHASE, false) == null) {
 94             
 95             Purchase.run(Purchase.path());
 96             
 97             System.out.println("create " + PURCHASE);
 98             zk.create(PURCHASE, PURCHASE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 99         } else {
100             System.out.println(PURCHASE + " is exist!");
101         }
102         isCompleted(zk);
103     }
104     /**
105      * 销售任务
106      * @param zk
107      * @throws Exception
108      */
109     public static void doSell(ZooKeeper zk) throws Exception {
110         if (zk.exists(SELL, false) == null) {
111             
112             Sell.run(Sell.path());
113             
114             System.out.println("create " + SELL);
115             zk.create(SELL, SELL.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
116         } else {
117             System.out.println(SELL + " is exist!");
118         }
119         isCompleted(zk);
120     }
121     /**
122      * 其他计算任务
123      * @param zk
124      * @throws Exception
125      */
126     public static void doOther(ZooKeeper zk) throws Exception {
127         if (zk.exists(OTHER, false) == null) {
128             
129             Other.calcOther(Other.file);
130             
131             System.out.println("create " + OTHER);
132             zk.create(OTHER, OTHER.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
133         } else {
134             System.out.println(OTHER + " is exist!");
135         }
136         isCompleted(zk);
137     }
138     /**
139      * 检测完成情况
140      * @param zk
141      * @throws Exception
142      */
143     public static void isCompleted(ZooKeeper zk) throws Exception {
144         int size = 3;
145         List children = zk.getChildren(QUEUE, true);
146         int length = children.size();
147 
148         System.out.println("Queue Complete:" + length + "/" + size);
149         if (length >= size) {
150             System.out.println("create " + PROFIT);
151             Profit.profit();
152             zk.create(PROFIT, PROFIT.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
153 
154             for (String child : children) {// 清空节点
155                 zk.delete(QUEUE + "/" + child, -1);
156             }
157 
158         }
159     }
160 }

四、运行结果

  在最后一步,统计其他费用数据程序运行后,从日志中看到3个条件节点都已满足要求 。然后,通过同步的分步式队列自动启动了计算利润的程序,幵在日志中打印了2017 年1月的利润为-6693765。

  示例代码地址:https://github.com/LJunChina/hadoop/tree/master/distributed_mq

hive网站日志数据分析

  一、说在前面的话

  上一篇,楼主介绍了使用flume集群来模拟网站产生的日志数据收集到hdfs。但我们所采集的日志数据是不规则的,同时也包含了许多无用的日志。当需要分析一些核心指标来满足系统业务决策的时候,对日志的数据清洗在所难免,楼主本篇将介绍如何使用mapreduce程序对日志数据进行清洗,将清洗后的结构化数据存储到hive,并进行相关指标的提取。

  先明白几个概念:

  1)PV(Page View)。页面浏览量即为PV,是指所有用户浏览页面的总和,一个独立用户每打开一个页面就被记录1 次。计算方式为:记录计数

  2)注册用户数。对注册页面访问的次数。计算方式:对访问member.php?mod=register的url,计数

  3)IP数。一天之内,访问网站的不同独立IP 个数加和。其中同一IP无论访问了几个页面,独立IP 数均为1。这是我们最熟悉的一个概念,无论同一个IP上有多少台主机,或者其他用户,从某种程度上来说,独立IP的多少,是衡量网站推广活动好坏最直接的数据。计算方式:对不同ip,计数

  4)跳出率。只浏览了一个页面便离开了网站的访问次数占总的访问次数的百分比,即只浏览了一个页面的访问次数 / 全部的访问次数汇总。跳出率是非常重要的访客黏性指标,它显示了访客对网站的兴趣程度。跳出率越低说明流量质量越好,访客对网站的内容越感兴趣,这些访客越可能是网站的有效用户、忠实用户。该指标也可以衡量网络营销的效果,指出有多少访客被网络营销吸引到宣传产品页或网站上之后,又流失掉了,可以说就是煮熟的鸭子飞了。比如,网站在某媒体上打广告推广,分析从这个推广来源进入的访客指标,其跳出率可以反映出选择这个媒体是否合适,广告语的撰写是否优秀,以及网站入口页的设计是否用户体验良好。
   计算方式:(1)统计一天内只出现一条记录的ip,称为跳出数
                   (2)跳出数/PV
  本次楼主只做以上几项简单指标的分析,各个网站的作用领域不一样,所涉及的分析指标也有很大差别,各位同学可以根据自己的需求尽情拓展。废话不多说,上干货。

  二、环境准备  

  1)hadoop集群。楼主用的6个节点的hadoop2.7.3集群,各位同学可以根据自己的实际情况进行搭建,但至少需要1台伪分布式的。(参考http://www.cnblogs.com/qq503665965/p/6790580.html

  2)hive。用于对各项核心指标进行分析(安装楼主不再介绍了)

  3)mysql。存储分析后的数据指标。

  4)sqoop。从hive到mysql的数据导入。

  三、数据清洗

  我们先看看从flume收集到hdfs中的源日志数据格式:  

1 27.19.74.143 - - [30/4/2017:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
2 211.97.15.179 - - [30/4/2017:17:38:22 +0800] "GET /home.php?mod=misc&ac=sendmail&rand=1369906181 HTTP/1.1" 200 -

  上面包含条个静态资源日志和一条正常链接日志(楼主这里不做静态资源日志的分析),需要将以 /static 开头的日志文件过滤掉;时间格式需要转换为时间戳;去掉IP与时间之间的无用符号;过滤掉请求方式;“/”分隔符、http协议、请求状态及当次流量。效果如下:  

1 211.97.15.179   20170430173820  home.php?mod=misc&ac=sendmail&rand=1369906181

  先写个日志解析类,测试是否能解析成功,我们再写mapreduce程序:

  

 1 package mapreduce;
 2 
 3 import java.text.ParseException;
 4 import java.text.SimpleDateFormat;
 5 import java.util.Date;
 6 import java.util.Locale;
 7 
 8 
 9 public class LogParser {
10     public static final SimpleDateFormat FORMAT = new SimpleDateFormat("d/MM/yyyy:HH:mm:ss", Locale.ENGLISH);
11     public static final SimpleDateFormat dateformat1=new SimpleDateFormat("yyyyMMddHHmmss");
12     public static void main(String[] args) throws ParseException {
13         final String S1 = "27.19.74.143 - - [30/04/2017:17:38:20 +0800] \"GET /static/image/common/faq.gif HTTP/1.1\" 200 1127";
14         LogParser parser = new LogParser();
15         final String[] array = parser.parse(S1);
16         System.out.println("源数据: "+S1);
17         System.out.format("清洗结果数据:  ip=%s, time=%s, url=%s, status=%s, traffic=%s", array[0], array[1], array[2], array[3], array[4]);
18     }
19     /**
20      * 解析英文时间字符串
21      * @param string
22      * @return
23      * @throws ParseException
24      */
25     private Date parseDateFormat(String string){
26         Date parse = null;
27         try {
28             parse = FORMAT.parse(string);
29         } catch (ParseException e) {
30             e.printStackTrace();
31         }
32         return parse;
33     }
34     /**
35      * 解析日志的行记录
36      * @param line
37      * @return 数组含有5个元素,分别是ip、时间、url、状态、流量
38      */
39     public String[] parse(String line){
40         String ip = parseIP(line);
41         String time = parseTime(line);
42         String url = parseURL(line);
43         String status = parseStatus(line);
44         String traffic = parseTraffic(line);
45         
46         return new String[]{ip, time ,url, status, traffic};
47     }
48     
49     private String parseTraffic(String line) {
50         final String trim = line.substring(line.lastIndexOf("\"")+1).trim();
51         String traffic = trim.split(" ")[1];
52         return traffic;
53     }
54     private String parseStatus(String line) {
55         final String trim = line.substring(line.lastIndexOf("\"")+1).trim();
56         String status = trim.split(" ")[0];
57         return status;
58     }
59     private String parseURL(String line) {
60         final int first = line.indexOf("\"");
61         final int last = line.lastIndexOf("\"");
62         String url = line.substring(first+1, last);
63         return url;
64     }
65     private String parseTime(String line) {
66         final int first = line.indexOf("[");
67         final int last = line.indexOf("+0800]");
68         String time = line.substring(first+1,last).trim();
69         Date date = parseDateFormat(time);
70         return dateformat1.format(date);
71     }
72     private String parseIP(String line) {
73         String ip = line.split("- -")[0].trim();
74         return ip;
75     }
76 }

  输出结果:  

1 源数据: 27.19.74.143 - - [30/04/2017:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
2 清洗结果数据:  ip=27.19.74.143, time=20170430173820, url=GET /static/image/common/faq.gif HTTP/1.1, status=200, traffic=1127

  再看mapreduce业务逻辑,在map中,我们需要拿出ip、time、url这三个属性的值,同时过滤掉静态资源日志。map的k1用默认的LongWritable就OK,v1不用说Text,k2、v2与k1、v1类型对应就行:  

 1 static class MyMapper extends Mapper{
 2         LogParser logParser = new LogParser();
 3         Text v2 = new Text();
 4         @Override
 5         protected void map(LongWritable key, Text value, Mapper.Context context)
 6                 throws IOException, InterruptedException {
 7             final String[] parsed = logParser.parse(value.toString());
 8             
 9             //过滤掉静态信息
10             if(parsed[2].startsWith("GET /static/") || parsed[2].startsWith("GET /uc_server")){
11                 return;
12             }            
13             //过掉开头的特定格式字符串
14             if(parsed[2].startsWith("GET /")){
15                 parsed[2] = parsed[2].substring("GET /".length());
16             }
17             else if(parsed[2].startsWith("POST /")){
18                 parsed[2] = parsed[2].substring("POST /".length());
19             }            
20             //过滤结尾的特定格式字符串
21             if(parsed[2].endsWith(" HTTP/1.1")){
22                 parsed[2] = parsed[2].substring(0, parsed[2].length()-" HTTP/1.1".length());
23             }            
24             v2.set(parsed[0]+"\t"+parsed[1]+"\t"+parsed[2]);
25             context.write(key, v2);
26         }

  reduce相对来说就比较简单了,我们只需再讲map的输出写到一个文件中就OK:  

1 static class MyReducer extends Reducer{
2         @Override
3         protected void reduce(LongWritable arg0, Iterable arg1,
4                 Reducer.Context context) throws IOException, InterruptedException {
5             for (Text v2 : arg1) {                
6                 context.write(v2, NullWritable.get());
7             }
8         }
9     }

  最后,组装JOB:  

 1 public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
 2         Job job = Job.getInstance(new Configuration());
 3         job.setJarByClass(LogParser.class);        
 4         job.setMapperClass(MyMapper.class);
 5         job.setMapOutputKeyClass(LongWritable.class);
 6         job.setMapOutputValueClass(Text.class);
 7         FileInputFormat.setInputPaths(job, new Path("/logs/20170430.log"));        
 8         job.setReducerClass(MyReducer.class);
 9         job.setOutputKeyClass(Text.class);
10         job.setOutputValueClass(NullWritable.class);
11         FileOutputFormat.setOutputPath(job, new Path("/20170430"));
12         job.waitForCompletion(true);
13     }

  mapreduce完成后就是运行job了:

  1)打包,mapreduce程序为loger.jar

  2)上传jar包。运行loger.jar hadoop jar loger.jar 

  运行结果:

  

  hdfs多了20170430目录:

  

  我们下载下来看看清洗后的数据是否符合要求:

  

  日志数据的清洗到此就完成了,接下来我们要在此之上使用hive提取核心指标数据。

  四、核心指标分析

  1)构建一个外部分区表,sql脚本如下:  

1 CREATE EXTERNAL TABLE sitelog(ip string, atime string, url string) PARTITIONED BY (logdate string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION '/20170430';

  2)增加分区,sql脚本如下:  

ALTER TABLE sitelog ADD PARTITION(logdate='20170430') LOCATION '/sitelog_cleaned/20170430';

  3)统计每日PV,sql脚本如下:  

1 CREATE TABLE sitelog_pv_20170430 AS SELECT COUNT(1) AS PV FROM sitelog WHERE logdate='20170430';

  4)统计每日注册用户数,sql脚本如下:  

1 CREATE TABLE sitelog_reguser_20170430 AS SELECT COUNT(1) AS REGUSER FROM sitelog WHERE logdate=20170430' AND INSTR(url,'member.php?mod=register')>0;

  5)统计每日独立IP,sql脚本如下:

1 CREATE TABLE site_ip_20170430 AS SELECT COUNT(DISTINCT ip) AS IP FROM sitelog WHERE logdate='20170430';

  6)统计每日跳出的用户数,sql脚本如下:

CREATE TABLE sitelog_jumper_20170430 AS SELECT COUNT(1) AS jumper FROM (SELECT COUNT(ip) AS times FROM sitelog WHERE logdate='20170430' GROUP BY ip HAVING times=1) e;

  7)把每天统计的数据放入一张表中,sql脚本如下:

1 CREATE TABLE sitelog_20170430 AS SELECT '20170430', a.pv, b.reguser, c.ip, d.jumper FROM sitelog_pv_20170430 a JOIN sitelog_reguser_20170430 b ON 1=1 JOIN sitelog_ip_20170430 c ON 1=1 JOIN sitelog_jumper_20170430 d ON 1=1 ;

  8)使用sqoop把数据导出到mysql中:

sqoop export --connect jdbc:mysql://hadoop02:3306/sitelog --username root --password root --table sitelog-result --fields-terminated-by '\001' --export-dir '/user/hive/warehouse/sitelog_20170430'

   结果如下:

  2017年4月30日日志分析结果:PV数为:169857;当日注册用户数:28;独立IP数:10411;跳出数:3749.

  到此,一个简单的网站日志分析楼主就介绍完了,后面可视化的展示楼主就不写了,比较简单。相关代码地址:https://github.com/LJunChina/hadoop

flume集群日志收集

一、Flume简介

  Flume是一个分布式的、高可用的海量日志收集、聚合和传输日志收集系统,支持在日志系统中定制各类数据发送方(如:Kafka,HDFS等),便于收集数据。其核心为agent,agent是一个java进程,运行在日志收集节点。

agent里面包含3个核心组件:source、channel、sink。
   source组件是专用于收集日志的,可以处理各种类型各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义,同时 source组件把数据收集

以后,临时存放在channel中。

  channel组件是在agent中专用于临时存储数据的,可以存放在memory、jdbc、file、自定义等。channel中的数据只有在sink发送成功之后才会被删除。

  sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义。

  在整个数据传输过程中,流动的是event。事务保证是在event级别。flume可以支持多级flume的agent,支持扇入(fan-in)、扇出(fan-out)。

 

二、环境准备

  1)hadoop集群(楼主用的版本2.7.3,共6个节点,可参考http://www.cnblogs.com/qq503665965/p/6790580.html

  2)flume集群规划:

HOST

作用

方式

路径

hadoop01

agent

spooldir

/home/hadoop/logs

hadoop05

collector

HDFS

/logs

hadoop06

collector

HDFS

/logs

  其基本结构官网给出了更加具体的说明,这里楼主就直接copy过来了:

三、集群配置

  1)系统环境变量配置  

1 export FLUME_HOME=/home/hadoop/apache-flume-1.7.0-bin
2 export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$FLUME_HOME/bin

  记得 source /etc/profile

  2)flume JDK环境  

1 mv flume-env.sh.templete flume-env.sh
2 vim flume-env.sh
3 export JAVA_HOME=/usr/jdk1.7.0_60//增加JDK安装路径即可

  3)hadoop01中flume配置

  在conf目录增加配置文件 flume-client ,内容为:  

 1 #agent1名称
 2 agent1.channels = c1
 3 agent1.sources = r1
 4 agent1.sinks = k1 k2
 5 
 6 #sink组名称
 7 agent1.sinkgroups = g1 
 8 
 9 #set channel
10 agent1.channels.c1.type = memory
11 agent1.channels.c1.capacity = 1000
12 agent1.channels.c1.transactionCapacity = 100
13 
14 agent1.sources.r1.channels = c1
15 agent1.sources.r1.type = spooldir
16 #日志源
17 agent1.sources.r1.spoolDir =/home/hadoop/logs
18 
19 agent1.sources.r1.interceptors = i1 i2
20 agent1.sources.r1.interceptors.i1.type = static
21 agent1.sources.r1.interceptors.i1.key = Type
22 agent1.sources.r1.interceptors.i1.value = LOGIN
23 agent1.sources.r1.interceptors.i2.type = timestamp
24 
25 # 设置sink1
26 agent1.sinks.k1.channel = c1
27 agent1.sinks.k1.type = avro
28 #sink1所在主机
29 agent1.sinks.k1.hostname = hadoop05
30 agent1.sinks.k1.port = 52020
31 
32 #设置sink2
33 agent1.sinks.k2.channel = c1
34 agent1.sinks.k2.type = avro
35 #sink2所在主机
36 agent1.sinks.k2.hostname = hadoop06
37 agent1.sinks.k2.port = 52020
38 
39 #设置sink组包含sink1,sink2
40 agent1.sinkgroups.g1.sinks = k1 k2
41 
42 #高可靠性
43 agent1.sinkgroups.g1.processor.type = failover
44 #设置优先级
45 agent1.sinkgroups.g1.processor.priority.k1 = 10
46 agent1.sinkgroups.g1.processor.priority.k2 = 1
47 agent1.sinkgroups.g1.processor.maxpenalty = 10000

  4)hadoop05配置

 1 #设置 Agent名称
 2 a1.sources = r1
 3 a1.channels = c1
 4 a1.sinks = k1
 5 
 6 #设置channlels
 7 a1.channels.c1.type = memory
 8 a1.channels.c1.capacity = 1000
 9 a1.channels.c1.transactionCapacity = 100
10 
11 # 当前节点信息
12 a1.sources.r1.type = avro
13 #绑定主机名
14 a1.sources.r1.bind = hadoop05
15 a1.sources.r1.port = 52020
16 a1.sources.r1.interceptors = i1
17 a1.sources.r1.interceptors.i1.type = static
18 a1.sources.r1.interceptors.i1.key = Collector
19 a1.sources.r1.interceptors.i1.value = hadoop05
20 a1.sources.r1.channels = c1
21 
22 #sink的hdfs地址
23 a1.sinks.k1.type=hdfs
24 a1.sinks.k1.hdfs.path=/logs
25 a1.sinks.k1.hdfs.fileType=DataStream
26 a1.sinks.k1.hdfs.writeFormat=TEXT
27 #没1K产生文件
28 a1.sinks.k1.hdfs.rollInterval=1
29 a1.sinks.k1.channel=c1
30 #文件后缀
31 a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d

  5)hadoop06配置

 1 #设置 Agent名称
 2 a1.sources = r1
 3 a1.channels = c1
 4 a1.sinks = k1
 5 
 6 #设置channel
 7 a1.channels.c1.type = memory
 8 a1.channels.c1.capacity = 1000
 9 a1.channels.c1.transactionCapacity = 100
10 
11 # 当前节点信息
12 a1.sources.r1.type = avro
13 #绑定主机名
14 a1.sources.r1.bind = hadoop06
15 a1.sources.r1.port = 52020
16 a1.sources.r1.interceptors = i1
17 a1.sources.r1.interceptors.i1.type = static
18 a1.sources.r1.interceptors.i1.key = Collector
19 a1.sources.r1.interceptors.i1.value = hadoop06
20 a1.sources.r1.channels = c1
21 #设置sink的hdfs地址目录
22 a1.sinks.k1.type=hdfs
23 a1.sinks.k1.hdfs.path=/logs
24 a1.sinks.k1.hdfs.fileType=DataStream
25 a1.sinks.k1.hdfs.writeFormat=TEXT
26 a1.sinks.k1.hdfs.rollInterval=1
27 a1.sinks.k1.channel=c1
28 a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d

四、启动flume集群

  1)启动collector,即hadoop05,hadoop06 

1 flume-ng agent -n a1 -c conf -f flume-server -Dflume.root.logger=DEBUG,console

  2)启动agent,即hadoop01

flume-ng agent -n a1 -c conf -f flume-client -Dflume.root.logger=DEBUG,console

  agent启动后,hadoop05,hadoop06的控制台可看到如下打印信息:

  

五、日志收集测试

  1)启动zookeeper集群(未搭建zookeeper的同学可以忽略)

  2)启动hdfs start-dfs.sh 

  3)模拟网站日志,楼主这里随便弄的测试数据

 

  4)上传到/hadoop/home/logs

  hadoop01输出:

 

   hadoop05输出:

 

  由于hadoop05设置的优先级高于hadoop06,因此hadoop06无日志写入。

  我们再看hdfs上,是否成功上传了日志文件:

  

六、高可用性测试

  由于楼主设置的hadoop05的优先级要高于hadoop06,这也是上面的日志收集hadoop05输出而不是hadoop06输出的原因。现在我们干掉优先级高的hadoop05,看hadoop06是否能正常进行日志采集工作。

  

  我们向日志源添加一个测试日志文件:

  

  hadoop06输出:

 

   查看hdfs:

  

  好了!flume集群配置及日志收集就介绍到这里,下次楼主楼主会具体介绍利用mapreduce对日志进行清洗,并存储到hbase相关的内容。

  

 

hadoop高可靠性HA集群

概述


 

简单hdfs高可用架构图

 

  在hadoop2.x中通常由两个NameNode组成,一个处于active状态,另一个处于standby状态。Active NameNode对外提供服务,而Standby NameNode则不对外提供服务,仅同步active namenode的状态,以便能够在它失败时快速进行切换。
    hadoop2.x官方提供了两种HDFS HA的解决方案,一种是NFS,另一种是QJM。这里楼主使用简单的QJM。在该方案中,主备NameNode之间通过一组JournalNode同步元数据信息,一条数据只要成功写入多数JournalNode即认为写入成功。通常配置奇数个JournalNode(我配了3个)。
    这里还配置了一个zookeeper集群,用于ZKFC(DFSZKFailoverController)故障转移,当Active NameNode挂掉了,会自动切换Standby NameNode为standby状态。hadoop2.4以前的版本中依然存在一个问题,就是ResourceManager只有一个,存在单点故障,2.4以后解决了这个问题,有两个ResourceManager,一个是Active,一个是Standby,状态由zookeeper进行协调。yarn的HA配置楼主会给出配置文件,受环境影响,这里就不搭建yarn的高可用性了。

主要步骤


 

  1. 备6台Linux机器
  2. 安装JDK、配置主机名、修改IP地址、关闭防火墙
  3. 配置SSH免登陆
  4. 安装zookeeper集群
  5. zookeeper、hadoop环境变量配置
  6. 核心配置文件修改
  7. 启动zookeeper集群
  8. 启动journalnode
  9. 格式化文件系统、格式化zk
  10. 启动hdfs、启动yarn

前期准备


集群规划

  

 主机名 IP 安装软件 进程
hadoop01 192.168.8.101 jdk、hadoop NameNode、DFSZKFailoverController(zkfc)
hadoop02 192.168.8.102 jdk、hadoop NameNode、DFSZKFailoverController(zkfc)
hadoop03 192.168.8.103 jdk、hadoop ResourceManager
hadoop04 192.168.8.104 jdk、hadoop、zookeeper DataNode、NodeManager、JournalNode、QuorumPeerMain
hadoop05 192.168.8.105 jdk、hadoop、zookeeper DataNode、NodeManager、JournalNode、QuorumPeerMain
hadoop06 192.168.8.106 jdk、hadoop、zookeeper DataNode、NodeManager、JournalNode、QuorumPeerMain

 

Linux环境  

  1.由于楼主机器硬件环境的限制,这里只准备了6台centos7的系统。


  2.修改IP。如果跟楼主一样使用VM搭集群,请使用only-host模式。

vim /etc/sysconfig/network-scripts/ifcfg-ens3

  

TYPE="Ethernet"
BOOTPROTO="static"
DEFROUTE="yes"
PEERDNS="yes"
PEERROUTES="yes"
IPV4_FAILURE_FATAL="no"
IPV6INIT="yes"
IPV6_AUTOCONF="yes"
IPV6_DEFROUTE="yes"
IPV6_PEERDNS="yes"
IPV6_PEERROUTES="yes"
IPV6_FAILURE_FATAL="no"
IPV6_ADDR_GEN_MODE="stable-privacy"
NAME="ens33"
UUID="7f13c30b-0943-49e9-b25d-8aa8cab95e20"
DEVICE="ens33"
ONBOOT="yes"
IPADDR="192.168.8.101"
NETMASK="255.255.255.0"
GATEWAY="192.168.8.1"

  3.修改主机名和IP的映射关系 

1  vim /etc/host
2 
3 192.168.8.101 hadoop01
4 192.168.8.102 hadoop02
5 192.168.8.103 hadoop03
6 192.168.8.104 hadoop04
7 192.168.8.105 hadoop05
8 192.168.8.106 hadoop06

  4.关闭防火墙

1 systemctl stop firewalld.service //停止firewall
2 systemctl disable firewalld.service //禁止firewall开机启动

  5.修改主机名

1 hostnamectl set-hostname hadoop01
2 hostnamectl set-hostname hadoop02
3 hostnamectl set-hostname hadoop03
4 hostnamectl set-hostname hadoop04
5 hostnamectl set-hostname hadoop05
6 hostnamectl set-hostname hadoop06

  6.ssh免登陆

  生成公钥、私钥

  

ssh-keygen -t rsa //一直回车

  将公钥发送到其他机器

ssh-coyp-id hadoop01
ssh-coyp-id hadoop02
ssh-coyp-id hadoop03
ssh-coyp-id hadoop04
ssh-coyp-id hadoop05
ssh-coyp-id hadoop06

  7.安装JDK,配置环境变量

  hadoop01,hadoop02,hadoop03

1 export JAVA_HOME=/usr/jdk1.7.0_60
2 export HADOOP_HOME=/home/hadoop/hadoop-2.7.3
3 export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

  hadoop04,hadoop05,hadoop06(包含zookeeper)

1 export JAVA_HOME=/usr/jdk1.7.0_60
2 export HADOOP_HOME=/home/hadoop/hadoop-2.7.3
3 export ZOOKEEPER_HOME=/home/hadoop/zookeeper-3.4.10
4 export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

zookeeper集群安装


 

  1.上传zk安装包
  上传到/home/hadoop
  2.解压  

tar -zxvf zookeeper-3.4.10.tar.gz

  3.配置(先在一台节点上配置)
    在conf目录,更改zoo_sample.cfg文件为zoo.cfg
  

 mv zoo_sample.cfg zoo.cfg

  
    修改配置文件(zoo.cfg)
       

1 dataDir=/home/hadoop/zookeeper-3.4.10/data       
2 server.1=hadoop04:2888:3888
3 server.2=hadoop05:2888:3888
4 server.3=hadoop06:2888:3888

   
    在(dataDir=/home/hadoop/zookeeper-3.4.10/data)创建一个myid文件,里面内容是server.N中的N(server.2里面内容为2)
      

1  echo "5" > myid    

    4.将配置好的zk拷贝到其他节点
      

1 scp -r /home/hadoop/zookeeper-3.4.5/ hadoop05:/home/hadoop
2 scp -r /home/hadoop/zookeeper-3.4.5/ hadoop06:/home/hadoop

  
    注意:在其他节点上一定要修改myid的内容
        在hadoop05应该将myid的内容改为2 (echo “6” > myid)
        在hadoop06应该将myid的内容改为3 (echo “7” > myid)

 5.启动集群
    分别启动hadoop04,hadoop05,hadoop06上的zookeeper      

1 zkServer.sh start

hadoop2.7.3集群安装


 

  1.解压         

1  tar -zxvf hadoop-2.7.3.tar.gz 

      2.配置core-site.xml          

 1 configuration>
 2     
 3     property>
 4         name>fs.defaultFSname>
 5         value>hdfs://ns1value>
 6     property>
 7     
 8     property>
 9         name>hadoop.tmp.dirname>
10         value>/home/hadoop/hadoop-2.7.3/tmpvalue>
11     property>
12     
13     property>
14         name>ha.zookeeper.quorumname>
15         value>hadoop04:2181,hadoop05:2181,hadoop06:2181value>
16     property>
17 configuration>

   3.配置hdf-site.xml

 1 configuration>
 2 
 3     property>
 4         name>dfs.nameservicesname>
 5         value>ns1value>
 6     property>
 7     
 8     property>
 9         name>dfs.ha.namenodes.ns1name>
10         value>nn1,nn2value>
11     property>
12     
13     property>
14         name>dfs.namenode.rpc-address.ns1.nn1name>
15         value>hadoop01:9000value>
16     property>
17     
18     property>
19         name>dfs.namenode.http-address.ns1.nn1name>
20         value>hadoop01:50070value>
21     property>
22     
23     property>
24         name>dfs.namenode.rpc-address.ns1.nn2name>
25         value>hadoop02:9000value>
26     property>
27     
28     property>
29         name>dfs.namenode.http-address.ns1.nn2name>
30         value>hadoop02:50070value>
31     property>
32     
33     property>
34         name>dfs.namenode.shared.edits.dirname>
35         value>qjournal://hadoop04:8485;hadoop05:8485;hadoop06:8485/ns1value>
36     property>
37     
38     property>
39         name>dfs.journalnode.edits.dirname>
40         value>file:/home/hadoop/hadoop-2.7.3/journalvalue>
41     property>
42     
43     property>
44         name>dfs.ha.automatic-failover.enabledname>
45         value>truevalue>
46     property>
47     
48     property>
49         name>dfs.client.failover.proxy.provider.ns1name>
50         value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvidervalue>
51     property>
52     
53     property>
54         name>dfs.ha.fencing.methodsname>
55         value>
56             sshfence
57             shell(/bin/true)
58         value>
59     property>
60     
61     property>
62         name>dfs.ha.fencing.ssh.private-key-filesname>
63         value>/home/hadoop/.ssh/id_rsavalue>
64     property>
65     
66     property>
67         name>dfs.ha.fencing.ssh.connect-timeoutname>
68         value>30000value>
69     property>
70 configuration>

  4.配置mapred-site.xml

1 configuration>
2 
3     property>
4         name>mapreduce.framework.namename>
5         value>yarnvalue>
6     property>
7 configuration>

  5.配置yarn-site.xml

 

 1 configuration>
 2 
 3 
 4     
 5     property>
 6        name>yarn.resourcemanager.hostname.rm1name>
 7        value>hadoop03value>
 8     property>        
 9     property>
10        name>yarn.nodemanager.aux-servicesname>
11        value>mapreduce_shufflevalue>
12     property>
13 configuration>

  6.配置slaves

1 hadoop04
2 hadoop05
3 hadoop06

  7.将配置好的hadoop拷贝到其他节点

scp -r /home/hadoop/hadoop-2.7.3 hadoop02:/home/hadoop
scp -r /home/hadoop/hadoop-2.7.3 hadoop03:/home/hadoop
scp -r /home/hadoop/hadoop-2.7.3 hadoop04:/home/hadoop
scp -r /home/hadoop/hadoop-2.7.3 hadoop05:/home/hadoop
scp -r /home/hadoop/hadoop-2.7.3 hadoop06:/home/hadoop

启动


 

  1.启动zookeeper集群(分别在hadoop04hadoop05hadoop06上启动zookeeper)
           

1 zkServer.sh start

  2.查看zookeeper状态

1 zkServer.sh status

  包含一个leader,二个follower

  

  3.启动journalnode(分别在hadoop04,hadoop05,hadoop06上执行)          

hadoop-daemon.sh start journalnode

        运行jps命令检验,hadoop04,hadoop05,hadoop06上多了JournalNode进程

  4.格式化HDFS

     在hadoop01上执行命令:          

1 hdfs namenode -format

  检查是否成功看终端知否打印:

  

       格式化后会在根据core-site.xml中的hadoop.tmp.dir配置生成个文件,这里楼主配置的是/home/hadoop/hadoop-2.7.3/tmp,然后将/home/hadoop/hadoop-2.7.3/tmp拷贝到ihadoop02的/home/hadoop/hadoop-2.7.3/下。
           

1 scp -r tmp/ hadoop02:/hadoop/hadoop-2.7.3/

  5.格式化ZK(在hadoop01上执行即可)
          

1  hdfs zkfc -formatZK

  效果如下(前面有点多截不下来,只截取了后面一部分):

  6.启动HDFS(在hadoop01上执行)

1 start-dfs.sh

      7.启动YARN(在hadoop03上执行)          

1 start-yarn.sh

验证


 

  到此,hadoop-2.7.3集群全部配置完毕,下面我们来验证:
  

  浏览器访问http://192.168.8.101:50070    NameNode ‘hadoop01:9000’ (active)
             http://192.168.8.102:50070   NameNode ‘hadoop02:9000’ (standby)

 

 

  浏览器访问resourceManager:http://192.168.8.103:8088

    

 

    我们可以模拟NameNode(active)宕机,来验证HDFS的HA是否有效,NameNode(active)宕机后,NameNode(standby)会转为active状态,这里楼主不在演示。

结语


 

  官网给出的文档还是比较详细的,楼主也是提取了官网的QJM解决方案来进行搭建。另外,yarn的HA搭建官网也给出了具体配置,有兴趣的同学可以试一试。