1.概述

1.1 Kylin是什么

    Apache Kylin(Extreme OLAP Engine for Big Data)是一个开源的分布式分析引擎,为Hadoop等大型分布式数据平台之上的超大规模数据集通过标准SQL查询及多维分析(OLAP)功能,提供亚秒级的交互式分析能力。

1.2 Kylin的由来

    Apache Kylin,中文名麒麟,是Hadoop动物园的重要成员。Apache Kylin是一个开源的分布式分析引擎,最初由eBay开发贡献至开源社区。它提供Hadoop之上的SQL查询接口及多维分析(OLAP)能力以支持大规模数据,能够处理TB乃至PB级别的分析任务,能够在亚秒级查询巨大的Hive表,并支持高并发。
    Apache Kylin于2014年10月在github开源,并很快在2014年11月加入Apache孵化器,于2015年11月正式毕业成为Apache顶级项目,也成为首个完全由中国团队设计开发的Apache顶级项目。于2016年3月,Apache Kylin核心开发成员创建了Kyligence公司,力求更好地推动项目和社区的快速发展。

1.3 为什么需要Kylin

    在大数据的背景下,Hadoop的出现解决了数据存储问题,但如何对海量数据进行OLAP查询,却一直令人十分头疼。
企业中大数据查询大致分为两种:即席查询和定制查询。
① 即席查询
    Hive、SparkSQL等OLAP引擎,虽然在很大程度上降低了数据分析的难度,但它们都只适用于即席查询的场景。它们的优点是查询灵活,但是随着数据量和计算复杂度的增长,响应时间不能得到保证。
② 定制查询
    多数情况下是对用户的操作做出实时反应,Hive等查询引擎很难满足实时查询,一般只能对数据仓库中的数据进行提前计算,然后将结果存入Mysql等关系型数据库,最后提供给用户进行查询。
    在上述背景下,Apache Kylin应运而生。不同于"大规模并行处理"Hive等架构,Apache Kylin采用"预计算"的模式,用户只需要提前定义好查询维度,Kylin将帮助我们进行计算,并将结果存储到HBase中,为海量数据的查询和分析提供亚秒级返回,是一种典型的"空间换时间"的解决方案。Apache Kylin的出现不仅很好地解决了海量数据快速查询的问题,也避免了手动开发和维护提前计算程序带来的一系列麻烦。

2.核心概念

2.1 数据仓库

    Data Warehouse,简称DW,中文名数据仓库,是商业智能(BI)中的核心部分。主要是将不同数据源的数据整合到一起,通过多维分析等方式为企业提供决策支持和报表生成。
数据仓库与数据库主要区别:用途不同
①、数据库面向事务,而数据仓库面向分析。
②、数据库一般存储在线的业务数据,需要对上层业务的改变做出实时反应,涉及到增删查改等操作,所以需要遵循三大范式,需要ACID。而数据仓库中存储的则主要是历史数据,主要目的是为企业决策提供支持,所以可能存在大量数据冗余,但利于多个维度查询,为决策者提供更多观察视角。
    在传统BI领域中,数据仓库的数据同样存储在Oracle、MySQL等数据库中,而在大数据领域中最常用的数据仓库就是Apache Hive,Hive也是Apache Kylin默认的数据源。

2.2 OLAP与OLTP

    OLAP(Online Analytical Process),联机分析处理,以多维度的方式分析数据,一般带有主观的查询需求,多应用在数据仓库。
    OLTP(Online Transaction Process),联机事务处理,侧重于数据库的增删查改等常用业务操作。

2.3 维度和度量

维度和度量是数据分析领域中两个常用的概念。
简单地说,维度就是观察数据的角度。比如气象站的采集数据,可以从时间的维度来观察:
表格.jpg

也可以从时间和气象站两个角度来观察:
1555654839(1).png

维度一般是离散的值,比如时间维度上的每一个独立的日期,或者气象站维度上的每一个独立的气象站ID。因此统计时可以把维度相同的记录聚合在一起,然后应用聚合函数做累加、均值、最大值、最小值等聚合计算。
度量就是被聚合的统计值,也就是聚合运算的结果,它一般是连续的值,如以上两个图中的温度值,或是其他测量点,比如风速、湿度、降雨量等等。通过对度量的比较和分析,我们就可以对数据做出评估,比如今年平均气温是否在正常范围,某个气象站的平均气温是否明显高于往年平均气温等等。

2.4 Cube和Cuboid

确定好了维度和度量之后,然后根据定义好的维度和度量,我们就可以构建Cube。对于一个给定的数据模型,我们可以对其上的所有维度进行组合。对于N个维度来说,组合所有可能性共有2的N次方种。对于每一种维度的组合,将度量做聚合计算,然后将运算的结果保存为一个物化视图,称为Cuboid。所有维度组合的Cuboid作为一个整体,被称为Cube。

1555655083(1).jpg

假设有一个电商的销售数据集,其中维度包括时间(Time)、商品(Item)、地点(Location)和供应
商(Supplier),度量为销售额(GMV)。那么所有维度的组合就有2的4次方,即16种。
 一维度(1D)的组合:有[Time]、[Item]、[Location]、[Supplier]4种。
 二维度(2D)的组合:有[Time Item]、[Time Location]、[Time Supplier]、[Item Location]、[Item Supplier]、[Location Supplier]6种。
 三维度(3D)的组合:有[Time Item Location][Item Location Supplier][Time Location Supplier][Time Item Supplier]4种。
 最后零维度(0D)和四维度(4D)的组合各有[]和[Time Item Location Supplier]1种。计算Cubiod,即按维度来聚合销售额。如果用SQL语句来表达计算Cuboid [Time, Item],那么SQL语句为:select Time, Item, Sum(GMV) as GMV from Sales group by Time, Item将计算的结果保存为物化视图,所有Cuboid物化视图的总称就是Cube。

2.5 事实表和维度表

事实表(Fact Table)是指存储有事实记录的表,如系统日志、销售记录、用户访问记录等。事实表的记录是动态增长的,所以它的体积通常远大于维度表。
维度表(Dimension Table)或维表,也称为查找表(Lookup Table),是与事实表相对应的一种表。它保存了维度的属性值,可以跟事实表做关联;相当于将事实表上经常重复的属性抽取、规范出来用一张表进行管理。常见的维度表有:日期表(存储与日期对应的周、月、季度等属性)、地区表(包含国家、省/州、城市等属性)等。维度表的变化通常不会太大。
使用维度表有许多好处:
①、缩小了事实表的大小。
②、便于维度的管理和维护,增加、删除和修改维度的属性,不必对事实表的大量记录进行改动。
③、 维度表可以为多个事实表重用。

2.6 星形模型

维度建模通常又分为星型模型、雪花模型
星形模型(Star Schema)是数据挖掘中常用的几种多维数据模型之一。它的特点是只有一张事实表,以及零到多个维度表,事实表与维度表通过主外键相关联,维度表之间没有关联,就像许多小星星围绕在一颗恒星周围,所以名为星形模型。

1555655396(1).png

雪花模型(SnowFlake Schema),就是将星形模型中的某些维表抽取成更细粒度的维表,然后让维表之间也进行关联,这种形状酷似雪花的的模型称为雪花模型
1555655496(1).png

3.运行原理

Kylin的核心思想是预计算,即对多维分析可能用到的度量进行预计算,将计算好的结果保存成Cube,供查询时直接访问。把高复杂度的聚合运算、多表连接等操作转换成对预计算结果的查询,这决定了Kylin能够拥有很好的快速查询和高并发能力。

3.1 技术架构

Apache Kylin系统主要可以分为在线查询和离线构建两部分,具体架构图如下:

1555655661(1).png

Kylin提供了一个称作Layer Cubing的算法,来构建Cube。简单来说,就是按照dimension数量从大到小的顺序,从Base Cuboid开始,依次基于上一层Cuboid的结果进行再聚合。每一层的计算都是一个单独的Map Reduce(Spark)任务。
MapReduce的计算结果最终保存到HBase中,HBase中每行记录的Rowkey由dimension组成,
measure会保存在column family中。为了减小存储代价,这里会对dimension和measure进行编码。查询阶段,利用HBase列存储的特性就可以保证Kylin有良好的快速响应和高并发。

3.2 特性

SQL接口

 Kylin主要的对外接口就是以SQL的形式提供的。SQL简单易用的特性极大地降低了Kylin的学习成本,不论是数据分析师还是Web开发程序员都能从中收益。

支持海量数据集

 不论是Hive、SparkSQL,还是Impala,它们的查询时间都随着数据量的增长而线性增长。而Apache Kylin使用预计算技术打破了这一点。Kylin在数据集规模上的局限性主要取决于维度的个数和基数,而不是数据集的大小,所以Kylin能更好地支持海量数据集的查询。

亚秒级响应

 受益于预计算技术,Kylin的查询速度非常快,因为复杂的连接、聚合等操作都在Cube的构建过程中已经完成了。    

水平扩展

Apache Kylin同样可以使用集群部署方式进行水平扩展。但部署多个节点只能提高Kylin处理查询的能力,而不能提升它的预计算能力。

可视化集成

Kylin提供与BI工具的整合能力,如Tableau,PowerBI/Excel,MSTR,QlikSense,Hue和SuperSet。

构建多维立方体(Cube)

用户能够在Kylin里为百亿以上数据集定义数据模型并构建立方体。

4.Kylin服务器模式

Kylin 实例是无状态的,其运行时状态存储在 HBase (由 conf/kylin.properties 中的 kylin.metadata.url 指定) 中的 metadata 中。出于负载均衡的考虑,建议运行多个Kylin 实例共享一个 metadata ,因此他们在表结构中共享同一个状态,比如job 状态, Cube 状态, 等等。
每一个 Kylin 实例在 conf/kylin.properties 中都有一个 “kylin.server.mode” entry,指定了运行时的模式,有 3 个选项:

job : 在实例中运行 job engine; Kylin job engine 管理集群 的 jobs。
query : 只运行 query engine; Kylin query engine 接收和回应你的 SQL 查询。
all : 在实例中既运行 job engine 也运行 query engines。

注意默认情况下只有一个实例可以运行 job engine (“all” 或 “job” 模式), 其它需要是 “query” 模式

5. 企业应用案例

Apache Kylin虽然还很年轻,但已经在多个企业的生产项目中得到了应用。下面我们来看一看Kylin在国内两个著名企业内的应用。
百度地图
大数据计算分析的三大痛点:

1.百亿级海量数据多维指标动态计算耗时问题,Apache Kylin通过预计算生成Cube结果数据集并存储到HBase的方式解决;
2.复杂条件筛选问题,用户查询时,Apache Kylin利用router查找算法及优化的HBase Coprocessor解决;
3.跨月、季度、年等大时间区间查询问题,对于预计算结果的存储,Apache Kylin利用Cube的Data Segment分区存储管理解决。

这3个痛点的解决,使百度地图在百亿级大数据规模下,且数据模型确定的具体多维分析产品中,达到单条SQL毫秒级响应。

1.环境说明

1.1 版本选择

环境配置.png

1.2Kylin 安装与配置

1.2.1 软件下载

下载地址:https://archive.apache.org/dist/kylin/apache-kylin-2.3.2/

1.2.2 解压

tar -zxvf apache-kylin-2.3.2-bin-cdh57.tar.gz

1.2.3 建立软连接

ln -s apache-kylin-2.3.2-bin kylin

1.2.4 添加 Kylin 相关环境变量

Kylin 集群节点环境变量配置
vim /etc/profile
export HADOOP_HOME=/home/hadoop/app/hadoop
export PATH=$HADOOP_HOME/bin:$PATH
export HBASE_HOME=/home/hadoop/app/hbase
export PATH=$HBASE_HOME/bin:$PATH
export HIVE_HOME=/home/hadoop/app/hive
export HIVE_CONF_HOME=$HIVE_HOME/conf
export PATH=$HIVE_HOME/bin:$PATH
export KAFKA_HOME=/home/hadoop/app/kafka
export PATH=$KAFKA_HOME/bin:$PATH
export HCAT_HOME=$HIVE_HOME/hcatalog
export PATH=$HCAT_HOME/bin:$PATH
export KYLIN_HOME=/home/hadoop/app/kylin
export PATH=$KYLIN_HOME/bin:$PATH
使环境变量生效
source /etc/profile
备注:kylin hive 安装目录在每个节点都需要(后面会拷贝)
scp -r hive hadoop@hadoop02:/home/hadoop/app/
scp -r hive hadoop@hadoop01:/home/hadoop/app/
scp -r apache-kylin-2.3.2-bin hadoop@hadoop01:/home/hadoop/app/
scp -r apache-kylin-2.3.2-bin hadoop@hadoop02:/home/hadoop/app/

1.2.5 修改 kylin.properties 配置文件

vi kylin.properties
#配置节点类型(kylin 主节点模式为 all,从节点的模式为 query)
kylin.server.mode=all
#kylin 集群节点配置
kylin.server.cluster-servers=hadoop01:7070,hadoop02:7070,hadoop03:7070
#定义 kylin 用于 MR jobs 的 job.jar 包和 hbase 的协处理 jar 包,用于提升性能(添加项)
kylin.job.jar=/home/hadoop/app/kylin/lib/kylin-job-2.3.2.jar
kylin.coprocessor.local.jar=/home/hadoop/app/kylin/lib/kylin-coprocessor-2.3.2.jar

1.2.6kylin 安装目录同步其他节点

scp -r apache-kylin-2.3.2-bin hadoop@hadoop01:/home/hadoop/app/
scp -r apache-kylin-2.3.2-bin hadoop@hadoop02:/home/hadoop/app/

1.2.7 修改其他节点 kylin.properties 配置文件

ln -s apache-kylin-2.3.2-bin kylin
vi kylin.properties
#从节点为 query 模式
kylin.server.mode=query

1.3 服务启动

1.3.1 启动 Zookeeper 集群

runRemoteCmd.sh "/home/hadoop/app/zookeeper/bin/zkServer.sh start" all

1.3.2 启动 hadoop 集群

#启动 hdfs
sbin/start-dfs.sh
#启动 yarn
sbin/start-yarn.sh
#开启 jobhistoryserver
sbin/mr-jobhistory-daemon.sh start historyserver

1.3.3 启动 HBase 集群

bin/start-hbase.sh

1.3.4 启动 Hive

#启动 mysql 服务
sudo service mysqld start
#启动 hive Metastore
bin/hive --service metastore &

1.3.5 启动 Kafka 集群【可选】

bin/kafka-server-start.sh config/server.properties

1.3.6 依赖检查(所有 kylin 节点检查)

#执行下面检查命令会在 hdfs 上创建 kylin 目录
./check-env.sh
hive 依赖检查

hive检查.png

hbase检查

hbase检查.png

1.3.7 启动 kylin 服务

所有节点启动:bin/kylin.sh start

1.3.8web ui 访问 kylin

访问地址:http://hadoop01:7070/kylin(其他节点也可以访问)
默认秘钥:admin/KYLIN

1、PV的案例代码展示

//利用spark程序统计运行商pv总量
object PV {

  def main(args: Array[String]): Unit = {
    // 1、创建sparkConf,设置appName和master
    val sparkConf: SparkConf = new SparkConf().setAppName("PV").setMaster("local[2]")

    // 2、创建sparkContext
    val sc: SparkContext = new SparkContext(sparkConf)

    //设置日志等级
    sc.setLogLevel("WARN")

    //3、读取日志数据
    val dataRDD: RDD[String] = sc.textFile("E:\\data\\access.txt")

    //4、统计PV
    //方法一:
    val dataOne: RDD[(String, Int)] = dataRDD.map(x=>("PV",1))
    val result: RDD[(String, Int)] = dataOne.reduceByKey(_+_)

   result.foreach(println)
    //方法二:
    println("方法二 PV总量: "+dataRDD.count())

    sc.stop()
    
  }

}

2、UV的案例代码展示

//利用spark程序统计运营商uv总量
object UV  extends App{

  //1、创建sparkConf,设置appName和master
  val sparkConf: SparkConf = new SparkConf().setAppName("UV").setMaster("local[2]")

  // 2、创建sparkContext
  val sc: SparkContext = new SparkContext(sparkConf)

  //设置日志等级
  sc.setLogLevel("WARN")

  //3、读取日志数据
  val dataRDD: RDD[String] = sc.textFile("E:\\data\\access.txt")

  // 4、切分每一行,获取对应的ip地址
  val ips: RDD[String] = dataRDD.map(_.split(" ")(0))

  // 5、去重
  val ipNum: Long = ips.distinct().count()

  //6、输出结果
  println("总的UV量: "+ipNum)

  sc.stop()
 
}

3、TopN的案例代码展示

object TopN extends App{
  // 1、创建sparkConf,设置appName和master
  val sparkConf: SparkConf = new SparkConf().setAppName("TopN").setMaster("local[2]")

  // 2、创建sparkContext
  val sc: SparkContext = new SparkContext(sparkConf)

  //设置日志等级
  sc.setLogLevel("WARN")

  //3、读取日志数据
  val dataRDD: RDD[String] = sc.textFile("E:\\data\\access.txt")

  //4、过滤掉缺失的字段的记录,切分每一行  获取url, 每个url记为1

  val urlAndOne: RDD[(String, Int)] = dataRDD.filter(_.split(" ").length>10).map(x=>(x.split(" ")(10),1))
  //过滤 "-"
  val urls: RDD[(String, Int)] = urlAndOne.filter(_._1.size>3)

  // 5、相同url出现的次数累加
  val result: RDD[(String, Int)] = urls.reduceByKey(_+_)

  //6、访问url最多的排序
  val resultSort: RDD[(String, Int)] = result.sortBy(_._2,false)

  // 7、取前五位
  val finalResultSort: Array[(String, Int)] = resultSort.take(5)

  //8、打印输出
  finalResultSort.foreach(println)

  sc.stop()
}

Pom.xml如下所示

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>cn.itcast</groupId>
    <artifactId>Spark</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>18</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
        <hadoop.version>2.7.4</hadoop.version>
        <spark.version>2.0.2</spark.version>
    </properties>
    <dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

资源如下:

access.txt

1、什么是RDD

  • RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合.
    • Dataset:一个数据集,简单的理解为集合,用于存放数据的
    • Distributed:它的数据分布式存储,并且可以做分布式的计算
    • Resilient:弹性的
      • 它表示的是数据可以保存在磁盘,也可以保存在内存中

2、RDD的五大特性

    • A list of partitions
    • 每个RDD都有一个分区列表
    • A function for computing each split
    • 作用在每个分区上面的函数
    • A list of dependencies on other RDDs
    • 一个RDD依赖其他多个RDD,这个特性很重要,rdd的容错机制就是根据这个特性而来的
    • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    • 可选项:针对于 kv 键值对的RDD才具有该分区特性
    • Optionally, a list of preferred locations to compute each split on (e.g. block locations for
      an HDFS file)
    • 可选项 : 数据本地性,数据最优,选择尽量存储在worker节点上的数据节点。

3、创建rdd方式

  • 1、通过已经存在的集合

    • val rdd1=sc.parallelize(Array(1,2,3,4))
  • 2、读取外部数据源

    • val rdd2=sc.textFile("文件")
  • 3、由一个rdd转化成一个新的RDD

    • val rdd3=rdd2.flatMap

4、RDD的算子分类

  • transformation:转换操作,将一个rdd转换生成一个新的rdd(flatMap/map/reduceByKey),它属于懒加载,延迟执行,并不会立即触发任务的执行。
  • action :此时才会真正的触发任务的计算。

5、rdd中的2种依赖关系

  • 窄依赖:每个父RDD的partition最多只被子的rdd的一个partition
  • 宽依赖:子rdd的partition会依赖于父rdd的多个partiiton

6、lineage(血统)

  • 它会记录下当前作用在rdd上的分区数据(元数据)和一系列的转换行为,当子rdd中某个分区数据丢失之后,只需要通过血统来重新计算恢复当前丢失数据的分区(spark 具备容错机制)

7、RDD的缓存方式

  • cache:直接将rdd中的数据,保存在内存中,其本质是persist(StorageLevel.MEMOEY_ONLY)
  • persist:可以有丰富的缓存级别
  • 当rdd设置了缓存之后,如果下面有需要用到该RDD的数据的时候,就不要重复计算,可以直接从缓存中获取得到。

8、checkpoint

  • 会对数据进行一个持久化操作,保存在hdfs

  • 使用的时候:

    • 需要sc.setCheckpointdir 来设置一个检查点目录
    • 对需要缓存的rdd调用checkpoint
    • 注意:同样在执行数据缓存的时候,需要有对应的action算子操作,才会真正触发持久化操作。
  • 在做checkpoint操作的时候,此时会先执行对应触发action算子的rdd结果,计算完成之后又会开辟一个新的job来计算你设置了checkpoint的rdd的结果。

  • 在设置了checkpoint之后,对应这个rdd会改变之前的依赖关系,如果当前数据丢失了,只有重头计算得到。

  • 如何使用checkpoint

    • 可以对要做checkPoint的rdd,先进行一个cache
    • 在做一个checkpint操作
  • 数据恢复的一般顺序

    • 内存---------->checkpoint------------>重新计算得到