"Hello" 发布的文章

1、Flink如何保证精确一次性消费

Flink 保证精确一次性消费主要依赖于两种Flink机制
1、Checkpoint机制
2、二阶段提交机制
Checkpoint机制
主要是当Flink开启Checkpoint的时候,会往Source端插入一条barrir,然后这个barrir随着数据流向一直流动,当流入到一个算子的时候,这个算子就开始制作checkpoint,制作的是从barrir来到之前的时候当前算子的状态,将状态写入状态后端当中。然后将barrir往下流动,当流动到keyby 或者shuffle算子的时候,例如当一个算子的数据,依赖于多个流的时候,这个时候会有barrir对齐,也就是当所有的barrir都来到这个算子的时候进行制作checkpoint,依次进行流动,当流动到sink算子的时候,并且sink算子也制作完成checkpoint会向jobmanager 报告 checkpoint n 制作完成。
二阶段提交机制
Flink 提供了CheckpointedFunction与CheckpointListener这样两个接口,CheckpointedFunction中有snapshotState方法,每次checkpoint触发执行方法,通常会将缓存数据放入状态中,可以理解为一个hook,这个方法里面可以实现预提交,CheckpointListyener中有notifyCheckpointComplete方法,checkpoint完成之后的通知方法,这里可以做一些额外的操作。例如FLinkKafkaConumerBase使用这个来完成Kafka offset的提交,在这个方法里面可以实现提交操作。在2PC中提到如果对应流程例如某个checkpoint失败的话,那么checkpoint就会回滚,不会影响数据一致性,那么如果在通知checkpoint成功的之后失败了,那么就会在initalizeSate方法中完成事务的提交,这样可以保证数据的一致性。最主要是根据checkpoint的状态文件来判断的。
2、flink和spark区别

flink是一个类似spark的“开源技术栈”,因为它也提供了批处理,流式计算,图计算,交互式查询,机器学习等。flink也是内存计算,比较类似spark,但是不一样的是,spark的计算模型基于RDD,将流式计算看成是特殊的批处理,他的DStream其实还是RDD。而flink吧批处理当成是特殊的流式计算,但是批处理和流式计算的层的引擎是两个,抽象了DataSet和DataStream。flink在性能上也表现的很好,流式计算延迟比spark少,能做到真正的流式计算,而spark只能是准流式计算。而且在批处理上,当迭代次数变多,flink的速度比spark还要快,所以如果flink早一点出来,或许比现在的Spark更火。
3、Flink的状态可以用来做什么?

Flink状态主要有两种使用方式:
checkpoint的数据恢复
逻辑计算
4、Flink的waterMark机制,Flink watermark传递机制

Flink 中的watermark机制是用来处理乱序的,flink的时间必须是event time ,有一个简单的例子就是,假如窗口是5秒,watermark是2秒,那么 总共就是7秒,这个时候什么时候会触发计算呢,假设数据初始时间是1000,那么等到6999的时候会触发5999窗口的计算,那么下一个就是13999的时候触发10999的窗口
其实这个就是watermark的机制,在多并行度中,例如在kafka中会所有的分区都达到才会触发窗口
5、Flink的时间语义

Event Time 事件产生的时间
Ingestion time 事件进入Flink的时间
processing time 事件进入算子的时间
6、Flink window join

1、window join,即按照指定的字段和滚动滑动窗口和会话窗口进行 inner join
2、是coGoup 其实就是left join 和 right join,
3、interval join 也就是 在窗口中进行join 有一些问题,因为有些数据是真的会后到的,时间还很长,那么这个时候就有了interval join但是必须要是事件时间,并且还要指定watermark和水位以及获取事件时间戳。并且要设置 偏移区间,因为join 也不能一直等的。
7、flink窗口函数有哪些

Tumbing window
Silding window
Session window
Count winodw
8、keyedProcessFunction 是如何工作的。假如是event time的话

keyedProcessFunction 是有一个ontime 操作的,假如是 event时间的时候 那么 调用的时间就是查看,event的watermark 是否大于 trigger time 的时间,如果大于则进行计算,不大于就等着,如果是kafka的话,那么默认是分区键最小的时间来进行触发。
9、flink是怎么处理离线数据的例如和离线数据的关联?

1、async io
2、broadcast
3、async io + cache
4、open方法中读取,然后定时线程刷新,缓存更新是先删除,之后再来一条之后再负责写入缓存
10、flink支持的数据类型

DataSet Api 和 DataStream Api、Table Api
11、Flink出现数据倾斜怎么办

Flink数据倾斜如何查看:
在flink的web ui中可以看到数据倾斜的情况,就是每个subtask处理的数据量差距很大,例如有的只有一M 有的100M 这就是严重的数据倾斜了。
KafkaSource端发生的数据倾斜
例如上游kafka发送的时候指定的key出现了数据热点问题,那么就在接入之后,做一个负载均衡(前提下游不是keyby)。
聚合类算子数据倾斜
预聚合加全局聚合
12、flink 维表关联怎么做的

1、async io
2、broadcast
3、async io + cache
4、open方法中读取,然后定时线程刷新,缓存更新是先删除,之后再来一条之后再负责写入缓存
13、Flink checkpoint的超时问题 如何解决。

1、是否网络问题
2、是否是barrir问题
3、查看webui,是否有数据倾斜
4、有数据倾斜的话,那么解决数据倾斜后,会有改善,
14、flinkTopN与离线的TopN的区别

topn 无论是在离线还是在实时计算中都是比较常见的功能,不同于离线计算中的topn,实时数据是持续不断的,这样就给topn的计算带来很大的困难,因为要持续在内存中维持一个topn的数据结构,当有新数据来的时候,更新这个数据结构
15、sparkstreaming 和flink 里checkpoint的区别

sparkstreaming 的checkpoint会导致数据重复消费
但是flink的 checkpoint可以 保证精确一次性,同时可以进行增量,快速的checkpoint的,有三个状态后端,memery、rocksdb、hdfs
16、简单介绍一下cep状态编程

Complex Event Processing(CEP):
FLink Cep 是在FLink中实现的复杂时间处理库,CEP允许在无休止的时间流中检测事件模式,让我们有机会掌握数据中重要的部分,一个或多个由简单事件构成的时间流通过一定的规则匹配,然后输出用户想得到的数据,也就是满足规则的复杂事件。
17、 Flink cep连续事件的可选项有什么

18、如何通过flink的CEP来实现支付延迟提醒

19、Flink cep 你用过哪些业务场景

20、cep底层如何工作

21、cep怎么老化

22、cep性能调优

23、Flink的背压,介绍一下Flink的反压,你们是如何监控和发现的呢。

Flink 没有使用任何复杂的机制来解决反压问题,Flink 在数据传输过程中使用了分布式阻塞队列。我们知道在一个阻塞队列中,当队列满了以后发送者会被天然阻塞住,这种阻塞功能相当于给这个阻塞队列提供了反压的能力。
当你的任务出现反压时,如果你的上游是类似 Kafka 的消息系统,很明显的表现就是消费速度变慢,Kafka 消息出现堆积。
如果你的业务对数据延迟要求并不高,那么反压其实并没有很大的影响。但是对于规模很大的集群中的大作业,反压会造成严重的“并发症”。首先任务状态会变得很大,因为数据大规模堆积在系统中,这些暂时不被处理的数据同样会被放到“状态”中。另外,Flink 会因为数据堆积和处理速度变慢导致 checkpoint 超时,而 checkpoint 是 Flink 保证数据一致性的关键所在,最终会导致数据的不一致发生。
Flink Web UI
Flink 的后台页面是我们发现反压问题的第一选择。Flink 的后台页面可以直观、清晰地看到当前作业的运行状态。
Web UI,需要注意的是,只有用户在访问点击某一个作业时,才会触发反压状态的计算。在默认的设置下,Flink的TaskManager会每隔50ms触发一次反压状态监测,共监测100次,并将计算结果反馈给JobManager,最后由JobManager进行反压比例的计算,然后进行展示。
在生产环境中Flink任务有反压有三种OK、LOW、HIGH
OK正常
LOW一般
HIGH高负载
24、Flink的CBO,逻辑执行计划和物理执行计划

Flink的优化执行其实是借鉴的数据库的优化器来生成的执行计划。
CBO,成本优化器,代价最小的执行计划就是最好的执行计划。传统的数据库,成本优化器做出最优化的执行计划是依据统计信息来计算的。Flink 的成本优化器也一样。Flink 在提供最终执行前,优化每个查询的执行逻辑和物理执行计划。这些优化工作是交给底层来完成的。根据查询成本执行进一步的优化,从而产生潜在的不同决策:如何排序连接,执行哪种类型的连接,并行度等等。
// TODO
25、Flink中数据聚合,不使用窗口怎么实现聚合

valueState 用于保存单个值
ListState 用于保存list元素
MapState 用于保存一组键值对
ReducingState 提供了和ListState相同的方法,返回一个ReducingFunction聚合后的值。
AggregatingState和 ReducingState类似,返回一个AggregatingState内部聚合后的值
26、Flink中state有哪几种存储方式

Memery、RocksDB、HDFS
27、Flink 异常数据怎么处理

异常数据在我们的场景中,一般分为缺失字段和异常值数据。
异常值: 例如宝宝的年龄的数据,例如对于母婴行业来讲,一个宝宝的年龄是一个至关重要的数据,可以说是最重要的,因为宝宝大于3岁几乎就不会在母婴上面购买物品。像我们的有当日、未知、以及很久的时间。这样都属于异常字段,这些数据我们会展示出来给店长和区域经理看,让他们知道多少个年龄是不准的。如果要处理的话,可以根据他购买的时间来进行实时矫正,例如孕妇服装、奶粉的段位、纸尿裤的大小,以及奶嘴啊一些能够区分年龄段的来进行处理。我们并没有实时处理这些数据,我们会有一个底层的策略任务夜维去跑,一个星期跑一次。
缺失字段: 例如有的字段真的缺失的很厉害,能修补就修补。不能修补就放弃,就像上家公司中的新闻推荐过滤器。
28、Flink 监控你们怎么做的

1、我们监控了Flink的任务是否停止
2、我们监控了Flink的Kafka的LAG
3、我们会进行实时数据对账,例如销售额。
29、Flink 有数据丢失的可能吗

Flink有三种数据消费语义:
At Most Once 最多消费一次 发生故障有可能丢失
At Least Once 最少一次 发生故障有可能重复
Exactly-Once 精确一次 如果产生故障,也能保证数据不丢失不重复。
flink 新版本已经不提供 At-Most-Once 语义。
30、Flink interval join 你能简单的写一写吗

DataStream keyed1 = ds1.keyBy(o -> o.getString("key"))
DataStream keyed2 = ds2.keyBy(o -> o.getString("key"))
//右边时间戳-5s<=左边流时间戳<=右边时间戳-1s
keyed1.intervalJoin(keyed2).between(Time.milliseconds(-5), Time.milliseconds(5))
31、Flink 提交的时候 并行度如何制定,以及资源如何配置

并行度根据kafka topic的并行度,一个并行度3个G
32、Flink的boardcast join 的原理是什么

利用 broadcast State 将维度数据流广播到下游所有 task 中。这个 broadcast 的流可以与我们的事件流进行 connect,然后在后续的 process 算子中进行关联操作即可。
33、flink的source端断了,比如kafka出故障,没有数据发过来,怎么处理?

会有报警,监控的kafka偏移量也就是LAG。
34、flink有什么常用的流的API?

window join 啊 cogroup 啊 map flatmap,async io 等
35、flink的水位线,你了解吗,能简单介绍一下吗

Flink 的watermark是一种延迟触发的机制。
一般watermark是和window结合来进行处理乱序数据的,Watermark最根本就是一个时间机制,例如我设置最大乱序时间为2s,窗口时间为5秒,那么就是当事件时间大于7s的时候会触发窗口。当然假如有数据分区的情况下,例如kafka中接入watermake的话,那么watermake是会流动的,取的是所有分区中最小的watermake进行流动,因为只有最小的能够保证,之前的数据都已经来到了,可以触发计算了。
36、Flink怎么维护Checkpoint?在HDFS上存储的话会有小文件吗

默认情况下,如果设置了Checkpoint选项,Flink只保留最近成功生成的1个Checkpoint。当Flink程序失败时,可以从最近的这个Checkpoint来进行恢复。但是,如果我们希望保留多个Checkpoint,并能够根据实际需要选择其中一个进行恢复,这样会更加灵活。Flink支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置指定最多需要保存Checkpoint的个数。
关于小文件问题可以参考代达罗斯之殇-大数据领域小文件问题解决攻略。
37、Spark和Flink的序列化,有什么区别吗?

Spark 默认使用的是 Java序列化机制,同时还有优化的机制,也就是kryo
Flink是自己实现的序列化机制,也就是TypeInformation
38、Flink是怎么处理迟到数据的?但是实际开发中不能有数据迟到,怎么做?

Flink 的watermark是一种延迟触发的机制。
一般watermark是和window结合来进行处理乱序数据的,Watermark最根本就是一个时间机制,例如我设置最大乱序时间为2s,窗口时间为5秒,那么就是当事件时间大于7s的时候会触发窗口。当然假如有数据分区的情况下,例如kafka中接入watermake的话,那么watermake是会流动的,取的是所有分区中最小的watermake进行流动,因为只有最小的能够保证,之前的数据都已经来到了,可以触发计算了。
39、画出flink执行时的流程图。

图片
40、Flink分区分配策略

41、Flink关闭后状态端数据恢复得慢怎么办?

42、了解flink的savepoint吗?讲一下savepoint和checkpoint的不同和各有什么优势

43、flink的状态后端机制

Flink的状态后端是Flink在做checkpoint的时候将状态快照持久化,有三种状态后端 Memery、HDFS、RocksDB
44、flink中滑动窗口和滚动窗口的区别,实际应用的窗口是哪种?用的是窗口长度和滑动步长是多少?

45、用flink能替代spark的批处理功能吗

Flink 未来的目标是批处理和流处理一体化,因为批处理的数据集你可以理解为是一个有限的数据流。Flink 在批出理方面,尤其是在今年 Flink 1.9 Release 之后,合入大量在 Hive 方面的功能,你可以使用 Flink SQL 来读取 Hive 中的元数据和数据集,并且使用 Flink SQL 对其进行逻辑加工,不过目前 Flink 在批处理方面的性能,还是干不过 Spark的。
目前看来,Flink 在批处理方面还有很多内容要做,当然,如果是实时计算引擎的引入,Flink 当然是首选。
46、flink计算的UV你们是如何设置状态后端保存数据

可以使用布隆过滤器。
47、sparkstreaming和flink在执行任务上有啥区别,不是简单的流处理和微批,sparkstreaming提交任务是分解成stage,flink是转换graph,有啥区别?

48、flink把streamgraph转化成jobGraph是在哪个阶段?

49、Flink中的watermark除了处理乱序数据还有其他作用吗?

还有kafka数据顺序消费的处理。
50、flink你一般设置水位线设置多少

我们之前设置的水位线是6s
52、Flink任务提交流程

图片
Flink任务提交后,Client向HDFS上传Flink的jar包和配置,之后向Yarn ResourceManager提交任务,ResourceManager分配Container资源并通知对应的NodeManager启动 ApplicationMaster,ApplicationMaster启动后加载Flink的jar包和配置构建环境,然后启动JobManager;之后Application Master向ResourceManager申请资源启动TaskManager ,ResourceManager分配Container资源后,由ApplicationMaster通知资源所在的节点的NodeManager启动TaskManager,NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager,TaskManager启动向JobManager发送心跳,并等待JobManager向其分配任务。
53、Flink技术架构图

图片
54、flink如何实现在指定时间进行计算。

55、手写Flink topN

57、Flink的Join算子有哪些

一般join是发生在window上面的:
1、window join,即按照指定的字段和滚动滑动窗口和会话窗口进行 inner join
2、是coGoup 其实就是left join 和 right join,
3、interval join 也就是 在窗口中进行join 有一些问题,因为有些数据是真的会后到的,时间还很长,那么这个时候就有了interval join但是必须要是事件时间,并且还要指定watermark和水位以及获取事件时间戳。并且要设置 偏移区间,因为join 也不能一直等的。
58、Flink1.10 有什么新特性吗?

内存管理及配置优化
Flink 目前的 TaskExecutor 内存模型存在着一些缺陷,导致优化资源利用率比较困难,例如:
流和批处理内存占用的配置模型不同
流处理中的 RocksDB state backend 需要依赖用户进行复杂的配置
为了让内存配置变的对于用户更加清晰、直观,Flink 1.10 对 TaskExecutor 的内存模型和配置逻辑进行了较大的改动 (FLIP-49 [7])。这些改动使得 Flink 能够更好地适配所有部署环境(例如 Kubernetes, Yarn, Mesos),让用户能够更加严格的控制其内存开销。
Managed 内存扩展
Managed 内存的范围有所扩展,还涵盖了 RocksDB state backend 使用的内存。尽管批处理作业既可以使用堆内内存也可以使用堆外内存,使用 RocksDB state backend 的流处理作业却只能利用堆外内存。因此为了让用户执行流和批处理作业时无需更改集群的配置,我们规定从现在起 managed 内存只能在堆外。
简化 RocksDB 配置
此前,配置像 RocksDB 这样的堆外 state backend 需要进行大量的手动调试,例如减小 JVM 堆空间、设置 Flink 使用堆外内存等。现在,Flink 的开箱配置即可支持这一切,且只需要简单地改变 managed 内存的大小即可调整 RocksDB state backend 的内存预算。
另一个重要的优化是,Flink 现在可以限制 RocksDB 的 native 内存占用,以避免超过总的内存预算—这对于 Kubernetes 等容器化部署环境尤为重要。
统一的作业提交逻辑
在此之前,提交作业是由执行环境负责的,且与不同的部署目标(例如 Yarn, Kubernetes, Mesos)紧密相关。这导致用户需要针对不同环境保留多套配置,增加了管理的成本。
在 Flink 1.10 中,作业提交逻辑被抽象到了通用的 Executor 接口。新增加的 ExecutorCLI (引入了为任意执行目标指定配置参数的统一方法。此外,随着引入 JobClient负责获取 JobExecutionResult,获取作业执行结果的逻辑也得以与作业提交解耦。
原生 Kubernetes 集成(Beta)
对于想要在容器化环境中尝试 Flink 的用户来说,想要在 Kubernetes 上部署和管理一个 Flink standalone 集群,首先需要对容器、算子及像 kubectl 这样的环境工具有所了解。
在 Flink 1.10 中,我们推出了初步的支持 session 模式的主动 Kubernetes 集成(FLINK-9953)。其中,“主动”指 Flink ResourceManager (K8sResMngr) 原生地与 Kubernetes 通信,像 Flink 在 Yarn 和 Mesos 上一样按需申请 pod。用户可以利用 namespace,在多租户环境中以较少的资源开销启动 Flink。这需要用户提前配置好 RBAC 角色和有足够权限的服务账号。
图片
Table API/SQL: 生产可用的 Hive 集成
Flink 1.9 推出了预览版的 Hive 集成。该版本允许用户使用 SQL DDL 将 Flink 特有的元数据持久化到 Hive Metastore、调用 Hive 中定义的 UDF 以及读、写 Hive 中的表。Flink 1.10 进一步开发和完善了这一特性,带来了全面兼容 Hive 主要版本的生产可用的 Hive 集成。
Batch SQL 原生分区支持
此前,Flink 只支持写入未分区的 Hive 表。在 Flink 1.10 中,Flink SQL 扩展支持了 INSERT OVERWRITE 和 PARTITION 的语法(FLIP-63 ),允许用户写入 Hive 中的静态和动态分区。
写入静态分区
INSERT { INTO | OVERWRITE } TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement;
写入动态分区
INSERT { INTO | OVERWRITE } TABLE tablename1 select_statement1 FROM from_statement;
对分区表的全面支持,使得用户在读取数据时能够受益于分区剪枝,减少了需要扫描的数据量,从而大幅提升了这些操作的性能。
另外,除了分区剪枝,Flink 1.10 的 Hive 集成还引入了许多数据读取方面的优化,例如:
投影下推:Flink 采用了投影下推技术,通过在扫描表时忽略不必要的域,最小化 Flink 和 Hive 表之间的数据传输量。这一优化在表的列数较多时尤为有效。
LIMIT 下推:对于包含 LIMIT 语句的查询,Flink 在所有可能的地方限制返回的数据条数,以降低通过网络传输的数据量。
读取数据时的 ORC 向量化:为了提高读取 ORC 文件的性能,对于 Hive 2.0.0 及以上版本以及非复合数据类型的列,Flink 现在默认使用原生的 ORC 向量化读取器。
59、Flink的重启策略

固定延迟重启策略
固定延迟重启策略是尝试给定次数重新启动作业。如果超过最大尝试次数,则作业失败。在两次连续重启尝试之间,会有一个固定的延迟等待时间。
故障率重启策略
故障率重启策略在故障后重新作业,当设置的故障率(failure rate)超过每个时间间隔的故障时,作业最终失败。在两次连续重启尝试之间,重启策略延迟等待一段时间。
无重启策略
作业直接失败,不尝试重启。
后备重启策略
使用群集定义的重新启动策略。这对于启用检查点的流式传输程序很有帮助。默认情况下,如果没有定义其他重启策略,则选择固定延迟重启策略。
60、Flink什么时候用aggregate()或者process()

aggregate: 增量聚合
process: 全量聚合
当计算累加操作时候可以使用aggregate操作。
当计算窗口内全量数据的时候使用process,例如排序等操作。

1、彼得原理
每个组织都是由各种不同的职位、等级或阶层的排列所组成,每个人都隶属于其中的某个等级。彼得原理是美国学者劳伦斯·彼得在对组织中人员晋升的相关现象研究后,得出一个结论:在各种组织中,雇员总是趋向 ...
数据分析师 1、彼得原理
每个组织都是由各种不同的职位、等级或阶层的排列所组成,每个人都隶属于其中的某个等级。 彼得原理是美国学者劳伦斯·彼得在对组织中人员晋升的相关现象研究后,得出一个结论:在各种组织中,雇员总是趋向于晋升到其不称职的地位。 彼得原理有时也被称为向上爬的原理。 这种现象在现实生活中无处不在:一名称职的教授被提升为大学校长后,却无法胜任;一个优秀的运动员被提升为主管体育的官员,而无所作为。 对一个组织而言,一旦相当部分人员被推到其不称职的级别,就会造成组织的人浮于事,效率低下,导致平庸者出人头地,发展停滞。 因此,这就要求改变单纯的根据贡献决定晋升的企业员工晋升机制,不能因某人在某个岗位上干得很出色,就推断此人一定能够胜任更高一级的职务。将一名职工晋升到一个无法很好发挥才能的岗位,不仅不是对本人的奖励,反而使其无法很好发挥才能,也给企业带来损失。 这个问题其实是普遍存在的,就不多说了。但是还有一种情况就是,上司总是趋向于把你放在你能力暂时达不到的职位。而过一段时间之后,你会通过压力、调节、学习等来达到与该职位要求相符的能力。这样便达到了个人的提高。两种情况其实都有道理,还是因人而异。决定性因素是领导看人与用人的标准。
2、酒与污水定律
酒与污水定律是指把一匙酒倒进一桶污水,得到的是一桶污水;如果把一匙污水倒进一桶酒,得到的还是一桶污水。 在任何组织里,几乎都存在几个难弄的人物,他们存在的目的似乎就是为了把事情搞糟。最糟糕的是,他们像果箱里的烂苹果,如果不及时处理,它会迅速传染,把果箱里其他苹果也弄烂。烂苹果的可怕之处,在于它那惊人的破坏力。 一个正直能干的人进入一个混乱的部门可能会被吞没,而一个无德无才者能很快将一个高效的部门变成一盘散沙。组织系统往往是脆弱的,是建立在相互理解、妥协和容忍的基础上的,很容易被侵害、被毒化。破坏者能力非凡的另一个重要原因在于,破坏总比建设容易。一个能工巧匠花费时日精心制作的陶瓷器,一头驴子一秒钟就能毁坏掉。如果一个组织里有这样的一头驴子,即使拥有再多的能工巧匠,也不会有多少像样的工作成果。如果你的组织里有这样的一头驴子,你应该马上把它清除掉,如果你无力这样做,就应该把它拴起来。 我们部门有这样的人,之前差点把所有花瓶都打了,还好,及时把这头驴子栓起来才防止了损失的进一步扩大。对于这个理论,我深有体会。作管理,要及时的发现身边谁是这头搞破坏的驴子。找到这头驴子之后的措施倒是简单得多了。

3、木桶定律
水桶定律是讲一只水桶能装多少水,这完全取决于它最短的那块木板。 这就是说任何一个组织,可能面临的一个共同问题,即构成组织的各个部分往往是优劣不齐的,而劣势部分往往决定整个组织的水平。水桶定律与酒与污水定律不同,后者讨论的是组织中的破坏力量,最短的木板却是组织中有用的一个部分,只不过比其他部分差一些,你不能把它们当成烂苹果扔掉。 强弱只是相对而言的,无法消除,问题在于你容忍这种弱点到什么程度,如果严重到成为阻碍工作的瓶颈,你就不得不有所动作。 与你合作的同事、受你支配的下属其实都是各个不同长短的板子,绝对不可能有一样齐的情况,除了对于特别矮的板子要剔除之外,我们要作的就是让相对较矮的那块板子提升高度。一直这样循环往复,你的桶子里面装的水将越来越多。
引用
另外现在除了木桶定律之外,还有一个箍桶理论。 木桶能装多少水,除了木板的高低之外,还有各个板子之间的紧密程度。如果板子都很高,但是之间的缝隙很大,那么水一样会从中间漏出来。这个箍,就是团队合作。我非常重视这个箍。

4、马太效应
《新约·马太福音》中有这样一个故事:一个国王远行前,交给3个仆人每人一锭银子,吩咐道:你们去做生意,等我回来时,再来见我。国王回来时,第一个仆人说:主人,你交给我的一锭银子,我已赚了10锭。于是,国王奖励他10座城邑。第二个仆人报告:主人,你给我的一锭银子,我已赚了5锭。于是,国王奖励他5座城邑。第三仆人报告说:主人,你给我的1锭银子,我一直包在手帕里,怕丢失,一直没有拿出来。于是,国王命令将第三个仆人的1锭银子赏给第一个仆人,说:凡是少的,就连他所有的,也要夺过来。凡是多的,还要给他,叫他多多益善,这就是马太效应,反应当今社会中存在的一个普遍现象,即赢家通吃。 对企业经营发展而言,马太效应告诉我们,要想在某一个领域保持优势,就必须在此领域迅速做大。当你成为某个领域的领头羊时,即便投资回报率相同,你也能更轻易地获得比弱小的同行更大的收益。而若没有实力迅速在某个领域做大,就要不停地寻找新的发展领域,才能保证获得较好的回报。 不要抱怨这个世界为什么不公平,因为这个世界从来就没有公平过。

5、零和游戏原理
零和游戏是指一项游戏中,游戏者有输有赢,一方所赢正是另一方所输,游戏的总成绩永远为零,零和游戏原理之所以广受关注,主要是因为人们在社会的方方面面都能发现与零和游戏类似的局面,胜利者的光荣后面往往隐藏着失败者的辛酸和苦涩。 20世纪,人类经历两次世界大战、经济高速增长,科技进步、全球一体化以及日益严重的环境污染,零和游戏观念正逐渐被双赢观念所取代。人们开始认识到利已不一定要建立在损人的基础上。通过有效合作皆大欢喜的结局是可能出现的。 但从零和游戏走向双赢,要求各方面要有真诚合作的精神和勇气,在合作中不要小聪明,不要总想占别人的小便宜,要遵守游戏规则,否则双赢的局面就不可能出现,最终吃亏的还是合作者自己。

6、华盛顿合作规律
华盛顿合作规律说的是一个人敷衍了事,两个人互相推诿,三个人则永无成事之日。多少有点类似于我们三个和尚的故事。 人与人的合作,不是人力的简单相加,而是要复杂和微妙得多。在这种合作中,假定每个人的能力都为1,那么,10个人的合作结果有时比10大得多,有时,甚至比1还要小。因为人不是静止物,而更像方向各异的能量,相互推动时,自然事半功倍,相互抵触时,则一事无成。 我们传统的管理理论中,对合作研究得并不多,最直观的反映就是,目前的大多数管理制度和行为都是致力于减少人力的无谓消耗,而非利用组织提高人的效能。换言之,不妨说管理的主要目的不是让每个人做得更好,而是避免内耗过多。 跟箍桶原理类似的,这也是强调了合作的重要性。

7、手表定理
手表定理是指一个人有一只表时,可以知道现在是几点钟,当他同时拥有两只表时,却无法确定。两只手表并不能告诉一个人更准确的时间,反而会让看表的人失去对准确时间的信心。 手表定理在企业经营管理方面,给我们一种非常直观的启发,就是对同一个人或同一个组织的管理,不能同时采用两种不同的方法,不能同时设置两个不同的目标,甚至每一个人不能由两个人同时指挥,否则将使这个企业或这个人无所适从。 手表定理所指的另一层含义在于,每个人都不能同时选择两种不同的价值观,否则,你的行为将陷于混乱。 我个人觉得这个是在管理中最容易出现问题的环节,也是最难以解决的。因为这个的问题出在管理层,管理层的标准不统一,对下属员工造成的影响是巨大的。 奖励标准、惩罚标准不统一,不同领导不一样,同一个领导对待不通员工标准不一样,这都是会影响到团队士气的重要因素,因此,作为中间管理层,最应该做好的是与上级统一管理标准,同时不要轻易的惩罚或奖励一个下属,评定好事情的级别再做出奖惩反应也不迟。

8、不值得定律
不值得定律最直观的表述是:不值得做的的事情,就不值得做好。这个定律再简单不过了,重要性却时时被人们忽视遗忘。 不值得定律反映人们的一种心理,一个人如果从事的是一份自认为不值得做的事情,往往会保持冷嘲热讽,敷衍了事的态度,不仅成功率低,而且即使成功,也不觉得有多大的成就感。 因此,对个人来说,应在多种可供选择的奋斗目标及价值观中挑选一种,然后为之奋斗。选择你所爱的,爱你所选择的,才可能激发我们的斗志,也可以心安理得。而对一个企业或组织来说,则要很好地分析员工的性格特性,合理分配工作,如让成就欲较强的职工单独或牵头完成具有一定风险和难度的工作,并在其完成时,给予及时的肯定和赞扬;让依附欲较强的职工,更多地参加到某个团体协同工作;让权力欲较强的职工,担任一个与之能力相适应的主管。同时要加强员工对企业目标的认同感,让员工感觉到自己所做的工作是值得的,这样才能激发职工的热情。

9、蘑菇管理
蘑菇管理是许多组织对待初出茅庐者的一种管理方法,初学者被置于阴暗的角落(不受重视的部门,或打杂跑腿的工作),浇上一头大粪(无端的批评、指责、代人受过),任其自生自灭(得不到必要的指导和提携)。 相信很多人都有过这样一段蘑菇的经历,这不一定是什么坏事,尤其是当一切刚刚开始的时候,当几天蘑菇,能够消除我们很多不切实际的幻想,让我们更加接近现实,看问题也更加实际。一个组织,一般对新进的人员都是一视同仁,从起薪到工作都不会有大的差别。无论你是多么优秀的人才,在刚开始的时候,都只能从最简单的事情做起,蘑菇的经历,对于成长中的年轻人来说,就象蚕茧,是羽化前必须经历的一步。 所以,如何高效率地走过生命的这一段,从中尽可能汲取经验,成熟起来,并树立良好的值得信赖的个人形象,是每个刚入社会的年轻人必须面对的课题。 对于刚出校园的学生来说,一般都有一些通病:自命不凡,激情四射,骄傲浮躁,不甘心作配角等。采用蘑菇管理其实是对毕业生的磨练,我很赞成使用这种方法来对待毕业生。

10、奥卡姆剃刀定律
12世纪,英国奥卡姆的威廉主张唯名论,只承认确实存在的东西,认为那些空洞无物的普遍性概念都是无用的累赘,应当被无情地剃除。他主张如无必要,勿增实体。这就是常说的奥卡姆剃刀。这把剃刀曾使很多人感到威胁,被认为是异端邪说,威廉本人也因此受到迫害。然而,并未损害这把刀的锋利,相反,经过数百年的岁月,奥卡姆剃刀已被历史磨得越来越快,并早已超载原来狭窄的领域,而具有广泛、丰富、深刻的意义。 奥卡姆剃刀定律在企业管理中可进一步演化为简单与复杂定律:把事情变复杂很简单,把事情变简单很复杂。这个定律要求,我们在处理事情时,要把握事情的主要实质,把握主流,解决最根本的问题,尤其要顺应自然,不要把事情人为地复杂化,这样才能把事情处理好

1. yarn-cluster模式

与standalone模式不同,yarn-cluster是基于yarn集群,yarn集群上有ResourceManager(RM)和NodeManager(NM)。

1、发送请求到RM,请求启动AM

2、RM会分配container,在某个NM上启动AM

3、AM启动后,和RM通讯,请求container来启动executor

4、RM会给AM提供一批container用于启动executor

5、AM连接其他NM启动executor

6、executor启动后向AM反向注册

7、后续的DAGScheduler、TaskScheduler、Shuffle等操作都是和standaloe一样。
1555667308(1).png

2.yarn-client模式

1555669634(1).png

从上图可以看出,yarn-client和yarn-cluster的区别就在于,Driver是运行在本地客户端,它的AM只是作为一个Executor启动器,并没有Driver进程。

1、yarn-client,driver运行在本地客户端,负责调度Application,会与yarn集群产生大量的网络通信,从而导致网卡流量激增。好处是,执行时可以在本地看到所有的log,便于调试。所以一般用于测试环境。

2、yarn-cluster,driver运行在NodeManager,每次运行都是随机分配到NM机器上去,不会有网卡流量激增的问题。缺点就是本地提交后看不到log,只能通过yarn application-logs application id命令来查看。比较麻烦。

1.概述

1.1 Kylin是什么

    Apache Kylin(Extreme OLAP Engine for Big Data)是一个开源的分布式分析引擎,为Hadoop等大型分布式数据平台之上的超大规模数据集通过标准SQL查询及多维分析(OLAP)功能,提供亚秒级的交互式分析能力。

1.2 Kylin的由来

    Apache Kylin,中文名麒麟,是Hadoop动物园的重要成员。Apache Kylin是一个开源的分布式分析引擎,最初由eBay开发贡献至开源社区。它提供Hadoop之上的SQL查询接口及多维分析(OLAP)能力以支持大规模数据,能够处理TB乃至PB级别的分析任务,能够在亚秒级查询巨大的Hive表,并支持高并发。
    Apache Kylin于2014年10月在github开源,并很快在2014年11月加入Apache孵化器,于2015年11月正式毕业成为Apache顶级项目,也成为首个完全由中国团队设计开发的Apache顶级项目。于2016年3月,Apache Kylin核心开发成员创建了Kyligence公司,力求更好地推动项目和社区的快速发展。

1.3 为什么需要Kylin

    在大数据的背景下,Hadoop的出现解决了数据存储问题,但如何对海量数据进行OLAP查询,却一直令人十分头疼。
企业中大数据查询大致分为两种:即席查询和定制查询。
① 即席查询
    Hive、SparkSQL等OLAP引擎,虽然在很大程度上降低了数据分析的难度,但它们都只适用于即席查询的场景。它们的优点是查询灵活,但是随着数据量和计算复杂度的增长,响应时间不能得到保证。
② 定制查询
    多数情况下是对用户的操作做出实时反应,Hive等查询引擎很难满足实时查询,一般只能对数据仓库中的数据进行提前计算,然后将结果存入Mysql等关系型数据库,最后提供给用户进行查询。
    在上述背景下,Apache Kylin应运而生。不同于"大规模并行处理"Hive等架构,Apache Kylin采用"预计算"的模式,用户只需要提前定义好查询维度,Kylin将帮助我们进行计算,并将结果存储到HBase中,为海量数据的查询和分析提供亚秒级返回,是一种典型的"空间换时间"的解决方案。Apache Kylin的出现不仅很好地解决了海量数据快速查询的问题,也避免了手动开发和维护提前计算程序带来的一系列麻烦。

2.核心概念

2.1 数据仓库

    Data Warehouse,简称DW,中文名数据仓库,是商业智能(BI)中的核心部分。主要是将不同数据源的数据整合到一起,通过多维分析等方式为企业提供决策支持和报表生成。
数据仓库与数据库主要区别:用途不同
①、数据库面向事务,而数据仓库面向分析。
②、数据库一般存储在线的业务数据,需要对上层业务的改变做出实时反应,涉及到增删查改等操作,所以需要遵循三大范式,需要ACID。而数据仓库中存储的则主要是历史数据,主要目的是为企业决策提供支持,所以可能存在大量数据冗余,但利于多个维度查询,为决策者提供更多观察视角。
    在传统BI领域中,数据仓库的数据同样存储在Oracle、MySQL等数据库中,而在大数据领域中最常用的数据仓库就是Apache Hive,Hive也是Apache Kylin默认的数据源。

2.2 OLAP与OLTP

    OLAP(Online Analytical Process),联机分析处理,以多维度的方式分析数据,一般带有主观的查询需求,多应用在数据仓库。
    OLTP(Online Transaction Process),联机事务处理,侧重于数据库的增删查改等常用业务操作。

2.3 维度和度量

维度和度量是数据分析领域中两个常用的概念。
简单地说,维度就是观察数据的角度。比如气象站的采集数据,可以从时间的维度来观察:
表格.jpg

也可以从时间和气象站两个角度来观察:
1555654839(1).png

维度一般是离散的值,比如时间维度上的每一个独立的日期,或者气象站维度上的每一个独立的气象站ID。因此统计时可以把维度相同的记录聚合在一起,然后应用聚合函数做累加、均值、最大值、最小值等聚合计算。
度量就是被聚合的统计值,也就是聚合运算的结果,它一般是连续的值,如以上两个图中的温度值,或是其他测量点,比如风速、湿度、降雨量等等。通过对度量的比较和分析,我们就可以对数据做出评估,比如今年平均气温是否在正常范围,某个气象站的平均气温是否明显高于往年平均气温等等。

2.4 Cube和Cuboid

确定好了维度和度量之后,然后根据定义好的维度和度量,我们就可以构建Cube。对于一个给定的数据模型,我们可以对其上的所有维度进行组合。对于N个维度来说,组合所有可能性共有2的N次方种。对于每一种维度的组合,将度量做聚合计算,然后将运算的结果保存为一个物化视图,称为Cuboid。所有维度组合的Cuboid作为一个整体,被称为Cube。

1555655083(1).jpg

假设有一个电商的销售数据集,其中维度包括时间(Time)、商品(Item)、地点(Location)和供应
商(Supplier),度量为销售额(GMV)。那么所有维度的组合就有2的4次方,即16种。
 一维度(1D)的组合:有[Time]、[Item]、[Location]、[Supplier]4种。
 二维度(2D)的组合:有[Time Item]、[Time Location]、[Time Supplier]、[Item Location]、[Item Supplier]、[Location Supplier]6种。
 三维度(3D)的组合:有[Time Item Location][Item Location Supplier][Time Location Supplier][Time Item Supplier]4种。
 最后零维度(0D)和四维度(4D)的组合各有[]和[Time Item Location Supplier]1种。计算Cubiod,即按维度来聚合销售额。如果用SQL语句来表达计算Cuboid [Time, Item],那么SQL语句为:select Time, Item, Sum(GMV) as GMV from Sales group by Time, Item将计算的结果保存为物化视图,所有Cuboid物化视图的总称就是Cube。

2.5 事实表和维度表

事实表(Fact Table)是指存储有事实记录的表,如系统日志、销售记录、用户访问记录等。事实表的记录是动态增长的,所以它的体积通常远大于维度表。
维度表(Dimension Table)或维表,也称为查找表(Lookup Table),是与事实表相对应的一种表。它保存了维度的属性值,可以跟事实表做关联;相当于将事实表上经常重复的属性抽取、规范出来用一张表进行管理。常见的维度表有:日期表(存储与日期对应的周、月、季度等属性)、地区表(包含国家、省/州、城市等属性)等。维度表的变化通常不会太大。
使用维度表有许多好处:
①、缩小了事实表的大小。
②、便于维度的管理和维护,增加、删除和修改维度的属性,不必对事实表的大量记录进行改动。
③、 维度表可以为多个事实表重用。

2.6 星形模型

维度建模通常又分为星型模型、雪花模型
星形模型(Star Schema)是数据挖掘中常用的几种多维数据模型之一。它的特点是只有一张事实表,以及零到多个维度表,事实表与维度表通过主外键相关联,维度表之间没有关联,就像许多小星星围绕在一颗恒星周围,所以名为星形模型。

1555655396(1).png

雪花模型(SnowFlake Schema),就是将星形模型中的某些维表抽取成更细粒度的维表,然后让维表之间也进行关联,这种形状酷似雪花的的模型称为雪花模型
1555655496(1).png

3.运行原理

Kylin的核心思想是预计算,即对多维分析可能用到的度量进行预计算,将计算好的结果保存成Cube,供查询时直接访问。把高复杂度的聚合运算、多表连接等操作转换成对预计算结果的查询,这决定了Kylin能够拥有很好的快速查询和高并发能力。

3.1 技术架构

Apache Kylin系统主要可以分为在线查询和离线构建两部分,具体架构图如下:

1555655661(1).png

Kylin提供了一个称作Layer Cubing的算法,来构建Cube。简单来说,就是按照dimension数量从大到小的顺序,从Base Cuboid开始,依次基于上一层Cuboid的结果进行再聚合。每一层的计算都是一个单独的Map Reduce(Spark)任务。
MapReduce的计算结果最终保存到HBase中,HBase中每行记录的Rowkey由dimension组成,
measure会保存在column family中。为了减小存储代价,这里会对dimension和measure进行编码。查询阶段,利用HBase列存储的特性就可以保证Kylin有良好的快速响应和高并发。

3.2 特性

SQL接口

 Kylin主要的对外接口就是以SQL的形式提供的。SQL简单易用的特性极大地降低了Kylin的学习成本,不论是数据分析师还是Web开发程序员都能从中收益。

支持海量数据集

 不论是Hive、SparkSQL,还是Impala,它们的查询时间都随着数据量的增长而线性增长。而Apache Kylin使用预计算技术打破了这一点。Kylin在数据集规模上的局限性主要取决于维度的个数和基数,而不是数据集的大小,所以Kylin能更好地支持海量数据集的查询。

亚秒级响应

 受益于预计算技术,Kylin的查询速度非常快,因为复杂的连接、聚合等操作都在Cube的构建过程中已经完成了。    

水平扩展

Apache Kylin同样可以使用集群部署方式进行水平扩展。但部署多个节点只能提高Kylin处理查询的能力,而不能提升它的预计算能力。

可视化集成

Kylin提供与BI工具的整合能力,如Tableau,PowerBI/Excel,MSTR,QlikSense,Hue和SuperSet。

构建多维立方体(Cube)

用户能够在Kylin里为百亿以上数据集定义数据模型并构建立方体。

4.Kylin服务器模式

Kylin 实例是无状态的,其运行时状态存储在 HBase (由 conf/kylin.properties 中的 kylin.metadata.url 指定) 中的 metadata 中。出于负载均衡的考虑,建议运行多个Kylin 实例共享一个 metadata ,因此他们在表结构中共享同一个状态,比如job 状态, Cube 状态, 等等。
每一个 Kylin 实例在 conf/kylin.properties 中都有一个 “kylin.server.mode” entry,指定了运行时的模式,有 3 个选项:

job : 在实例中运行 job engine; Kylin job engine 管理集群 的 jobs。
query : 只运行 query engine; Kylin query engine 接收和回应你的 SQL 查询。
all : 在实例中既运行 job engine 也运行 query engines。

注意默认情况下只有一个实例可以运行 job engine (“all” 或 “job” 模式), 其它需要是 “query” 模式

5. 企业应用案例

Apache Kylin虽然还很年轻,但已经在多个企业的生产项目中得到了应用。下面我们来看一看Kylin在国内两个著名企业内的应用。
百度地图
大数据计算分析的三大痛点:

1.百亿级海量数据多维指标动态计算耗时问题,Apache Kylin通过预计算生成Cube结果数据集并存储到HBase的方式解决;
2.复杂条件筛选问题,用户查询时,Apache Kylin利用router查找算法及优化的HBase Coprocessor解决;
3.跨月、季度、年等大时间区间查询问题,对于预计算结果的存储,Apache Kylin利用Cube的Data Segment分区存储管理解决。

这3个痛点的解决,使百度地图在百亿级大数据规模下,且数据模型确定的具体多维分析产品中,达到单条SQL毫秒级响应。

1.环境说明

1.1 版本选择

环境配置.png

1.2Kylin 安装与配置

1.2.1 软件下载

下载地址:https://archive.apache.org/dist/kylin/apache-kylin-2.3.2/

1.2.2 解压

tar -zxvf apache-kylin-2.3.2-bin-cdh57.tar.gz

1.2.3 建立软连接

ln -s apache-kylin-2.3.2-bin kylin

1.2.4 添加 Kylin 相关环境变量

Kylin 集群节点环境变量配置
vim /etc/profile
export HADOOP_HOME=/home/hadoop/app/hadoop
export PATH=$HADOOP_HOME/bin:$PATH
export HBASE_HOME=/home/hadoop/app/hbase
export PATH=$HBASE_HOME/bin:$PATH
export HIVE_HOME=/home/hadoop/app/hive
export HIVE_CONF_HOME=$HIVE_HOME/conf
export PATH=$HIVE_HOME/bin:$PATH
export KAFKA_HOME=/home/hadoop/app/kafka
export PATH=$KAFKA_HOME/bin:$PATH
export HCAT_HOME=$HIVE_HOME/hcatalog
export PATH=$HCAT_HOME/bin:$PATH
export KYLIN_HOME=/home/hadoop/app/kylin
export PATH=$KYLIN_HOME/bin:$PATH
使环境变量生效
source /etc/profile
备注:kylin hive 安装目录在每个节点都需要(后面会拷贝)
scp -r hive hadoop@hadoop02:/home/hadoop/app/
scp -r hive hadoop@hadoop01:/home/hadoop/app/
scp -r apache-kylin-2.3.2-bin hadoop@hadoop01:/home/hadoop/app/
scp -r apache-kylin-2.3.2-bin hadoop@hadoop02:/home/hadoop/app/

1.2.5 修改 kylin.properties 配置文件

vi kylin.properties
#配置节点类型(kylin 主节点模式为 all,从节点的模式为 query)
kylin.server.mode=all
#kylin 集群节点配置
kylin.server.cluster-servers=hadoop01:7070,hadoop02:7070,hadoop03:7070
#定义 kylin 用于 MR jobs 的 job.jar 包和 hbase 的协处理 jar 包,用于提升性能(添加项)
kylin.job.jar=/home/hadoop/app/kylin/lib/kylin-job-2.3.2.jar
kylin.coprocessor.local.jar=/home/hadoop/app/kylin/lib/kylin-coprocessor-2.3.2.jar

1.2.6kylin 安装目录同步其他节点

scp -r apache-kylin-2.3.2-bin hadoop@hadoop01:/home/hadoop/app/
scp -r apache-kylin-2.3.2-bin hadoop@hadoop02:/home/hadoop/app/

1.2.7 修改其他节点 kylin.properties 配置文件

ln -s apache-kylin-2.3.2-bin kylin
vi kylin.properties
#从节点为 query 模式
kylin.server.mode=query

1.3 服务启动

1.3.1 启动 Zookeeper 集群

runRemoteCmd.sh "/home/hadoop/app/zookeeper/bin/zkServer.sh start" all

1.3.2 启动 hadoop 集群

#启动 hdfs
sbin/start-dfs.sh
#启动 yarn
sbin/start-yarn.sh
#开启 jobhistoryserver
sbin/mr-jobhistory-daemon.sh start historyserver

1.3.3 启动 HBase 集群

bin/start-hbase.sh

1.3.4 启动 Hive

#启动 mysql 服务
sudo service mysqld start
#启动 hive Metastore
bin/hive --service metastore &

1.3.5 启动 Kafka 集群【可选】

bin/kafka-server-start.sh config/server.properties

1.3.6 依赖检查(所有 kylin 节点检查)

#执行下面检查命令会在 hdfs 上创建 kylin 目录
./check-env.sh
hive 依赖检查

hive检查.png

hbase检查

hbase检查.png

1.3.7 启动 kylin 服务

所有节点启动:bin/kylin.sh start

1.3.8web ui 访问 kylin

访问地址:http://hadoop01:7070/kylin(其他节点也可以访问)
默认秘钥:admin/KYLIN