1、spark概述

  • spark是基于内存的一个计算框架,计算速度非常的快。这里面没有涉及到任何存储,如果想要处理外部的数据源,比如数据在HDFS上,此时我们就需要先搭建一个hadoop集群。

2、spark的特点

  • 1、速度快(比mapreduce在内存中快100倍,比在磁盘中快10倍)

    • (1)spark在处理的数据中间结果数据可以不落地,mapreduce每次中间结果都要落地。
    • (2)在mapreduce计算的时候,mapTask,reduceTask,每一个task都对应一个jvm进程。
      在spark中,它同样会按照hadoop中切片逻辑,会有N个task,而这些task都是运行在worker节点上,worker上会有executor进程,而这些task会以线程的方式运行在executor上面。
  • 2、易用性

    • 可以使用多种语言来编写spark应用程序
      • java
      • scala
      • Python
      • R
  • 3、通用性

    • 可以使用sparksql、sparkStreaming、Mlib、Graphx
  • 4、兼容性

    • 可以运行在不同的资源调度平台
      • yarn(resourceManger分配资源)
      • mesos(是apache下开源的资源调度框架)
      • standAlone(master进行资源的分配)

3、spark集群安装

  • 1、下载对应版本的安装包
  • 2、上传安装包到服务器上
  • 3、规划一下安装目录
  • 4、解压安装包到指定的安装目录
  • 5、重命名安装目录
  • 6、修改配置文件 cd conf
    • (1) spark-env.sh.template (需要 mv spark-env.sh.template spark-env.sh)
      • 配置javahome export JAVA_HOME=/export/servers/jdk
      • 配置master的Host export SPARK_MASTER_HOST=node1
      • 配置master的Port export SPARK_MASTER_PORT=7077
    • (2)slaves.template (需要 mv slaves.template slaves)
      • 添加worker节点
        • node2
        • node3
  • 7、配置一下spark的环境变量
  • 8、通过scp命令分发到其他节点中
    • spark安装目录
    • /etc/profile
  • 9、所有机器都要source /etc/profile
  • 10、可以启动spark集群
    • $SPARK_HOME/sbin/start-all.sh
    • 可以通过web界面访问master
      • http://node01:8080
  • 11、停止spark集群
    • $SPARK_HOME/sbin/stop-all.sh

4、spark高可用集群配置

  • 1、需要先zk集群
  • 2、修改spark配置(spark-env.sh)
    • (1)注释掉master的地址
    • (2) 引入zk配置
      • export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hdp-node-01:2181,hdp-node-02:2181,hdp-node-03:2181 -Dspark.deploy.zookeeper.dir=/spark"
  • 3、启动
  • 启动zk集群
  • 需要在spark集群中任意一台机器上启动 start-all.sh
    • 产生master进程
    • 并且会根据 slaves,去对应的主机名上启动worker进程
  • 在其他worker节点上单独启动master
    • start-master.sh

5、初识spark程序

  • 已经知道那个master是活着的master
  • --master spark://node1:7077
  • 有很多的master时候
  • --master spark://node1:7077,node2:7077,node3:7077

6、spark-shell使用

  • 1、spark-shell --master local[N] (本地单机版)
    • local[N]:表示在本地模拟N个线程来运行当前任务
  • 2、spark-shell --master local[*] (本地单机版)
    • 这个*表示当前机器上所有可用的资源
  • 3、spark-shell --master spark://node2:7077
  • 4、spark-shell 读取hdfs上的数据文件
    • sc.textFile("hdfs://node1:9000/wc.txt").flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect

7、spark整合hdfs

  • 1、修改配置文件(spark-env.sh)

    • 添加配置参数
      • export HADOOP_CONF_DIR=/export/servers/hadoop/etc/hadoop
        • 通过scp分发配置到其他节点
  • 2、可以sc.textFile("/wc.txt").flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect

8、scala语言编程spark单词计数

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//通过scala编写spark的单词计数程序
object WordCount {

  def main(args: Array[String]): Unit = {
    //1、创建SparkConf对象,设置appName和master地址,local[2]表示本地使用2个线程来进行计算
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
    //2、创建SparkContext对象,这个对象很重要,它会创建DAGScheduler和TaskScheduler
    val sc: SparkContext = new SparkContext(sparkConf)
    //设置日志输出级别
    sc.setLogLevel("WARN")
    //3、读取数据文件
    //val data: RDD[String] = sc.textFile(args(0))
    val data: RDD[String] = sc.textFile("E:\\wordcount\\input\\words.txt")
    //4、切分每一行,并且压平  hello、you、me
    val words: RDD[String] = data.flatMap(_.split(" "))
    //5、每个单词记位1
    val wordAndOne: RDD[(String, Int)] = words.map((_,1))
    //6、相同单词出现的次数进行累加
    val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
    //按照单词出现的次数降序排序
    val sortResult: RDD[(String, Int)] = result.sortBy(_._2,false)
    //7、收集数据,打印输出
    val finalResult: Array[(String, Int)] = sortResult.collect()
    //打印结果
    finalResult.foreach(x=>println(x))
    //关闭
    sc.stop()
  }
}

9、java语言编程spark单词计数

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

//利用java语言来实现spark的单词计数
public class WordCount_Java {
    public static void main(String[] args) {
        //1、创建SparkConf对象,设置appName和master地址
        SparkConf sparkConf = new SparkConf().setAppName("WordCount_Java").setMaster("local[2]");

        //2、创建javaSparkContext对象
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        //3、读取数据文件
        JavaRDD<String> dataJavaRDD = jsc.textFile("E:\\wordcount\\input\\words.txt");

        //4、对每一行进行切分压平
        JavaRDD<String> wordsJavaRDD = dataJavaRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override               //line表示每一行记录
            public Iterator<String> call(String line) throws Exception {
                //切分每一行
                String[] words = line.split(" ");

                return Arrays.asList(words).iterator();
            }
        });

        //5、每个单词记为1
        JavaPairRDD<String, Integer> wordAndOneJavaPairRDD = wordsJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<>(word, 1);
            }
        });

        //6、把相同单词出现的次数累加  (_+_)
        JavaPairRDD<String, Integer> resultJavaPairRDD = wordAndOneJavaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        //按照单词出现的次数降序排序
        //需要将(单词,次数)进行位置颠倒 (次数,单词)
        JavaPairRDD<Integer, String> sortJavaPairRDD = resultJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception {
                return new Tuple2<>(t._2, t._1);
            }
        }).sortByKey(false);

        //将(次数,单词)变为(单词,次数)
        JavaPairRDD<String, Integer> finalSortJavaPairRDD = sortJavaPairRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception {
                return new Tuple2<>(t._2, t._1);
            }
        });

        //7、收集打印
        List<Tuple2<String, Integer>> finalResult = finalSortJavaPairRDD.collect();

        for(Tuple2<String, Integer> t:finalResult){
            System.out.println(t);
        }

        jsc.stop();
    }
}

10、上面的程序所依赖的pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>cn.itcast</groupId>
    <artifactId>Spark</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>18</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
        <hadoop.version>2.7.4</hadoop.version>
        <spark.version>2.0.2</spark.version>
    </properties>
    <dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Spark运行基本流程图

图片1.png

1.数组

  • 1.1.数组
    • 1.1.定长数组和变长数组
      (1)定长数组定义格式:
      val arr=new Array[T] (数组长度)
      (2)变长数组定义格式:
      val arr = ArrayBuffer[T] () 注意需要导包:import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ArrayBuffer
object ArrayDemo {
  def main(args: Array[String]) {

    //初始化一个长度为8的定长数组,其所有元素均为0
    val arr1 = new Array[Int](8)
    //直接打印定长数组,内容为数组的hashcode值
    println(arr1)
    //将数组转换成数组缓冲,就可以看到原数组中的内容了
    //toBuffer会将数组转换长数组缓冲
    println(arr1.toBuffer)

    //注意:如果new,相当于调用了数组的apply方法,直接为数组赋值
    //初始化一个长度为1的定长数组
    val arr2 = Array[Int](10)
    println(arr2.toBuffer)

    //定义一个长度为3的定长数组
    val arr3 = Array("hadoop", "storm", "spark")
    //使用()来访问元素
    println(arr3(2))

    //////////////////////////////////////////////////
    //变长数组(数组缓冲)
    //如果想使用数组缓冲,需要导入import scala.collection.mutable.ArrayBuffer包
    val ab = ArrayBuffer[Int]()
    //向数组缓冲的尾部追加一个元素
    //+=尾部追加元素
    ab += 1
    //追加多个元素
    ab += (2, 3, 4, 5)
    //追加一个数组++=
    ab ++= Array(6, 7)
    //追加一个数组缓冲
    ab ++= ArrayBuffer(8,9)
    //打印数组缓冲ab

    //在数组某个位置插入元素用insert,从某下标插入
    ab.insert(0, -1, 0)
    //删除数组某个位置的元素用remove  按照下标删除
    ab.remove(0)
    println(ab)

  }
}
  • 1.2. 遍历数组

    • (1).增强for循环
    • (2).好用的until会生成脚标,0 until 10 包含0不包含10
object ForArrayDemo {
  def main(args: Array[String]) {
    //初始化一个数组
    val arr = Array(1,2,3,4,5,6,7,8)
    //增强for循环
    for(i <- arr)
      println(i)

    //好用的until会生成一个Range
    //reverse是将前面生成的Range反转
    for(i <- (0 until arr.length).reverse)
      println(arr(i))
  }
}
  • 1.3.数组转换
    yield关键字将原始的数组进行转换会产生一个新的数组,原始的数组不变
    图片4.png
object ArrayYieldDemo {
  def main(args: Array[String]) {
    //定义一个数组
    val arr = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
    //将偶数取出乘以10后再生成一个新的数组
    val res = for (e <- arr if e % 2 == 0) yield e * 10
    println(res.toBuffer)

    //更高级的写法,用着更爽
    //filter是过滤,接收一个返回值为boolean的函数
    //map相当于将数组中的每一个元素取出来,应用传进去的函数
    val r = arr.filter(_ % 2 == 0).map(_ * 10)
    println(r.toBuffer)

  }
}
  • 1.4数组常用算法
    在Scala中,数组上的某些方法对数组进行相应的操作非常方便
    图片5.png

2.映射

在Scala中,把哈希表这种数据结构叫做映射

  • 2.1构建映射

    • (1)val map=Map(键->值,键->值....)
    • (2)利用元组构建 val map=Map((键,值),(键,值),(键,值)....)
      图片6.png
  • 2.2.获取和修改映射中的值

    • 获取映射中的值:
      值=map(键)
      好用的getOrElse (如果映射中有值,返回映射中的值,没有就返回默认值)
      注意:在Scala中,有两种Map,一个是immutable包下的Map,该Map中的内容不可变;另一个是mutable包下的Map,该Map中的内容可变
      通常我们在创建一个集合是会用val这个关键字修饰一个变量(相当于java中的final),那么就意味着该变量的引用不可变,该引用中的内容是不是可变,取决于这个引用指向的集合的类型

3.元组

映射是K/V对偶的集合,对偶是元组的最简单形式,元组可以装着多个不同类型的值。

  • 3.1创建元组

    • (1)元组是不同类型的值的聚集;对偶是最简单的元组。
    • (2)元组表示通过将不同的值用小括号括起来,即表示元组。

    创建元组格式:
    val tuple=(元素,元素...)

  • 3.2获取元组中的值

    • 获取元组中的值格式:
      使用下划线加脚表 ,例如 t._1 t._2 t._3
      注意:元组中的元素脚标是从1开始的
  • 3.3将对偶的集合转换成映射

    • 将对偶的集合装换成映射:
      调用其toMap 方法
      图片7.png
  • 3.4.拉链操作

    • (1).使用zip命令可以将多个值绑定在一起
      图片8.png
      注意:如果两个数组的元素个数不一致,拉链操作后生成的数组的长度为较小的那个数组的元素个数

    • (2).如果其中一个元素的个数比较少,可以使用zipAll用默认的元素填充
      图片9.png

4.集合

Scala的集合有三大类:序列Seq、Set、映射Map,所有的集合都扩展自Iterable特质
在Scala中集合有可变(mutable)和不可变(immutable)两种类型,immutable类型的集合初始化后就不能改变了(注意与val修饰的变量进行区别)
  • 4.1 List

(1)不可变的序列 import scala.collection.immutable._

在Scala中列表要么为空(Nil表示空列表)要么是一个head元素加上一个tail列表。
9 :: List(5, 2)  :: 操作符是将给定的头和尾创建一个新的列表 
注意::: 操作符是右结合的,如9 :: 5 :: 2 :: Nil相当于 9 :: (5 :: (2 :: Nil))


list常用的操作符:
++[B](that: GenTraversableOnce[B]): List[B] 从列表的尾部添加另外一个列表
++: [B >: A, That](that: collection.Traversable[B])(implicit bf: CanBuildFrom[List[A], B, That]): That 在列表的头部添加一个列表
+: (elem: A): List[A] 在列表的头部添加一个元素
:+ (elem: A): List[A] 在列表的尾部添加一个元素
:: (x: A): List[A]     在列表的头部添加一个元素
::: (prefix: List[A]): List[A] 在列表的头部添加另外一个列表
val left = List(1,2,3)
val right = List(4,5,6)
//以下操作等价
left ++ right      // List(1,2,3,4,5,6)
left ++: right     // List(1,2,3,4,5,6)
right.++:(left)    // List(1,2,3,4,5,6)
right.:::(left)     // List(1,2,3,4,5,6)
//以下操作等价
0 +: left    //List(0,1,2,3)
left.+:(0)   //List(0,1,2,3)
//以下操作等价
left :+ 4    //List(1,2,3,4)
left.:+(4)   //List(1,2,3,4)
//以下操作等价
0 :: left      //List(0,1,2,3)
left.::(0)     //List(0,1,2,3)

(2)可变的序列 import scala.collection.mutable._

import scala.collection.mutable.ListBuffer

object MutListDemo extends App{
  //构建一个可变列表,初始有3个元素1,2,3
  val lst0 = ListBuffer[Int](1,2,3)
  //创建一个空的可变列表
  val lst1 = new ListBuffer[Int]
  //向lst1中追加元素,注意:没有生成新的集合
  lst1 += 4
  lst1.append(5)

  //将lst1中的元素最近到lst0中, 注意:没有生成新的集合
  lst0 ++= lst1

  //将lst0和lst1合并成一个新的ListBuffer 注意:生成了一个集合
  val lst2= lst0 ++ lst1

  //将元素追加到lst0的后面生成一个新的集合
  val lst3 = lst0 :+ 5

  //删除元素,注意:没有生成新的集合
  val lst4 = ListBuffer[Int](1,2,3,4,5)
  lst4 -= 5

  //删除一个集合列表,生成了一个新的集合
  val lst5=lst4--List(1,2)

  //把可变list 转换成不可变的list 直接加上toList
  val lst6=lst5.toList

  //把可变list 转变数组用toArray
  val lst7=lst5.toArray

  println(lst0)
  println(lst1)
  println(lst2)
  println(lst3)
  println(lst4)
  println(lst5)
  println(lst6)
  println(lst7)

}
  • 4.2.Set
    (1)不可变的Set import scala.collection.immutable._ Set代表一个没有重复元素的集合;将重复元素加入Set是没有用的,而且 Set 是不保证插入顺序的,即 Set 中的元素是乱序的。
    定义:val set=Set(元素,元素,.....)
//定义一个不可变的Set集合
scala> val set =Set(1,2,3,4,5,6,7)
set: scala.collection.immutable.Set[Int] = Set(5, 1, 6, 2, 7, 3, 4)

//元素个数
scala> set.size
res0: Int = 7

//取集合最小值
scala> set.min
res1: Int = 1

//取集合最大值
scala> set.max
res2: Int = 7

//将元素和set1合并生成一个新的set,原有set不变
scala> set + 8
res3: scala.collection.immutable.Set[Int] = Set(5, 1, 6, 2, 7, 3, 8, 4)

scala> val set1=Set(7,8,9)
set1: scala.collection.immutable.Set[Int] = Set(7, 8, 9)

//两个集合的交集
scala> set & set1
res4: scala.collection.immutable.Set[Int] = Set(7)

//两个集合的并集
scala> set ++ set1
res5: scala.collection.immutable.Set[Int] = Set(5, 1, 6, 9, 2, 7, 3, 8, 4)

//在第一个set基础上去掉第二个set中存在的元素
scala> set -- set1
res6: scala.collection.immutable.Set[Int] = Set(5, 1, 6, 2, 3, 4)

//返回第一个不同于第二个set的元素集合
scala> set &~ set1
res7: scala.collection.immutable.Set[Int] = Set(5, 1, 6, 2, 3, 4)


//计算符合条件的元素个数
scala> set.count(_ >5)
res8: Int = 2

/返回第一个不同于第二个的元素集合
scala> set.diff(set1)
res9: scala.collection.immutable.Set[Int] = Set(5, 1, 6, 2, 3, 4)

/返回第一个不同于第二个的元素集合
scala> set1.diff(set)
res10: scala.collection.immutable.Set[Int] = Set(8, 9)

//取子set(2,5为元素位置, 从0开始,包含头不包含尾)
scala> set.slice(2,5)
res11: scala.collection.immutable.Set[Int] = Set(6, 2, 7)

//迭代所有的子set,取指定的个数组合
scala> set1.subsets(2).foreach(x=>println(x))
Set(7, 8)
Set(7, 9)
Set(8, 9)

(2)可变的Set import scala.collection.mutable._

//导入包
scala> import scala.collection.mutable
import scala.collection.mutable
//定义一个可变的Set
scala> val set1=new HashSet[Int]()
set1: scala.collection.mutable.HashSet[Int] = Set()

//添加元素
scala> set1 += 1
res1: set1.type = Set(1)

//添加元素  add等价于+=
scala> set1.add(2)
res2: Boolean = true
scala> set1
res3: scala.collection.mutable.HashSet[Int] = Set(1, 2)

//向集合中添加元素集合
scala> set1 ++=Set(1,4,5)
res5: set1.type = Set(1, 5, 2, 4)

//删除一个元素
scala> set1 -=5
res6: set1.type = Set(1, 2, 4)

//删除一个元素
scala> set1.remove(1)
res7: Boolean = true
scala> set1
res8: scala.collection.mutable.HashSet[Int] = Set(2, 4)

  • 4.3.Map
    (1)不可变的Map import scala.collection.immutable._
定义Map集合
1.val map=Map(键->值,键->值...)
2.利用元组构建  val map=Map((键,值),(键,值),(键,值)....)
展现形式:
val  map = Map(“zhangsan”->30,”lisi”->40)
val  map = Map((“zhangsan”,30),(“lisi”,40))

3.操作map集合
获取值: 值=map(键)
原则:通过先获取键,在获取键对应值。

4.遍历map集合


scala> val imap=Map("zhangsan" -> 20,"lisi" ->30)
imap: scala.collection.immutable.Map[String,Int] = Map(zhangsan -> 20, lisi -> 30)
//方法一:显示所有的key
scala> imap.keys
res0: Iterable[String] = Set(zhangsan, lisi)

//方法二:显示所有的key
scala> imap.keySet
res1: scala.collection.immutable.Set[String] = Set(zhangsan, lisi)

//通过key获取value
scala> imap("lisi")
res2: Int = 30

//通过key获取value 有key对应的值则返回,没有就返回默认值0,
scala> imap.getOrElse("zhangsan",0)
res4: Int = 20

//没有对应的key,返回默认0
scala> imap.getOrElse("zhangsan1",0)
res5: Int = 0
//由于是不可变map,故不能向其添加、删除、修改键值对
* (2)可变的Map  import scala.collection.mutable._
//导包
import scala.collection.mutable
//声明一个可变集合
scala> val user =mutable.HashMap("zhangsan"->50,"lisi" -> 100)
user: scala.collection.mutable.HashMap[String,Int] = Map(lisi -> 100, zhangsan -> 50)

//添加键值对
scala> user +=("wangwu" -> 30)
res0: user.type = Map(lisi -> 100, zhangsan -> 50, wangwu -> 30)

//添加多个键值对
scala> user += ("zhangsan0" -> 30,"lisi0" -> 20)
res1: user.type = Map(zhangsan0 -> 30, lisi -> 100, zhangsan -> 50, lisi0 -> 20,wangwu -> 30)

//方法一:显示所有的key
scala> user.keys
res2: Iterable[String] = Set(zhangsan0, lisi, zhangsan, lisi0, wangwu)

//方法二:显示所有的key
scala> user.keySet
res3: scala.collection.Set[String] = Set(zhangsan0, lisi, zhangsan, lisi0, wangwu)

//通过key获取value
scala> user("zhangsan")
res4: Int = 50

//通过key获取value 有key对应的值则返回,没有就返回默认值0,
scala> user.getOrElse("zhangsan",0)
res5: Int = 50

//没有对应的key,返回默认0
scala> user.getOrElse("zhangsan1",0)
res6: Int = 0

//更新键值对
scala> user("zhangsan") = 55
scala> user("zhangsan")
res8: Int = 55

//更新多个键值对
scala> user += ("zhangsan" -> 60, "lisi" -> 50)
res9: user.type = Map(zhangsan0 -> 30, lisi -> 50, zhangsan -> 60, lisi0 -> 20,wangwu -> 30)

//删除key
scala> user -=("zhangsan")
res14: user.type = Map(zhangsan0 -> 30, lisi -> 50, lisi0 -> 20, wangwu -> 30)

//删除key
scala>user.remove("zhangsan0")

//遍历map 方法一:模式匹配
scala> for((x,y) <- user) println(x+" -> "+y)
lisi -> 50
lisi0 -> 20
wangwu -> 30

//遍历map 方法二:通过key值
scala> for(x<- user.keys) println(x+" -> "+user(x))
lisi -> 50
lisi0 -> 20
wangwu -> 30

//遍历map 方法三:通过foreach
scala>  user.foreach{case (x,y) => println(x+" -> "+y)}
lisi -> 50
lisi0 -> 20
wangwu -> 30

1.Scala概述

  • 什么是Scala
    • Scala是一种多范式的编程语言,其设计的初衷是要集成面向对象编程和函数式编程的各种特性。Scala运行于Java平台(Java虚拟机),并兼容现有的Java程序。

2.Scala编译器安装

  • 2.1、安装JDK(因为Scala是运行在JVM平台上的,所以安装Scala之前要安装JDK)
  • 2.2、安装Scala
    • Windows安装Scala编译器
      直接解压之后,配置环境变量即可

3.Scala基础

  • 1.声明变量
object VariableDemo {
  def main(args: Array[String]) {
    //使用val定义的变量值是不可变的,相当于java里用final修饰的变量
    val i = 1
    //使用var定义的变量是可变得,在Scala中鼓励使用val
    var s = "hello"
    //Scala编译器会自动推断变量的类型,必要的时候可以指定类型
    //变量名在前,类型在后
    val str: String = "itcast"
  }
}
  • 2.常用类型
    Scala和Java一样,有7种数值类型Byte、Char、Short、Int、Long、Float和Double(无包装类型)和一个Boolean类型
  • 3.条件表达式
object ConditionDemo {
  def main(args: Array[String]) {
    val x = 1
    //判断x的值,将结果赋给y
    val y = if (x > 0) 1 else -1
    //打印y的值
    println(y)

    //支持混合类型表达式
    val z = if (x > 1) 1 else "error"
    //打印z的值
    println(z)

    //如果缺失else,相当于if (x > 2) 1 else ()
    val m = if (x > 2) 1
    println(m)

    //在scala中每个表达式都有值,scala中有个Unit类,写做(),相当于Java中的void
    val n = if (x > 2) 1 else ()
    println(n)

    //if和else if
    val k = if (x < 0) 0
    else if (x >= 1) 1 else -1
    println(k)
  }
}
  • 4.块表达式
object BlockExpressionDemo {
  def main(args: Array[String]) {
    val x = 0
    //在scala中{}中包含一系列表达式,块中最后一个表达式的值就是块的值
    //下面就是一个块表达式
    val result = {
      if (x < 0){
        -1  
      } else if(x >= 1) {
        1
      } else {
        "error"
      }
    }
    //result的值就是块表达式的结果
    println(result)
  }
}
  • 5.循环在scala中有for循环和while循环,用for循环比较多, for循环语法结构:for (i <- 表达式/数组/集合)
object ForDemo {
  def main(args: Array[String]) {
    //for(i <- 表达式),表达式1 to 10返回一个Range(区间)
    //每次循环将区间中的一个值赋给i
    for (i <- 1 to 10)
      println(i)

    //for(i <- 数组)
    val arr = Array("a", "b", "c")
    for (i <- arr)
      println(i)

    //高级for循环
    //每个生成器都可以带一个条件,注意:if前面没有分号
    for(i <- 1 to 3; j <- 1 to 3 if i != j)
      print((10 * i + j) + " ")
    println()

    //for推导式:如果for循环的循环体以yield开始,则该循环会构建出一个集合
    //每次迭代生成集合中的一个值
    val v = for (i <- 1 to 10) yield i * 10
    println(v)
  }
}
  • 6.调用方法和函数
Scala中的+ - * / %等操作符的作用与Java一样,位操作符 & | ^ >> <<也一样。只是有
一点特别的:这些操作符实际上是方法。例如:
a + b
是如下方法调用的简写:
a.+(b)
a 方法 b可以写成 a.方法(b)
  • 7.定义方法
    图片1.png

方法的返回值类型可以不写,编译器可以自动推断出来,但是对于递归函数,必须指定返回类型

  • 8.定义函数
    图片2.png

方法和函数的区别
在函数式编程语言中,函数是“头等公民”,它可以像 任何其他数据类型一样被传递和操 作,函数是一个对象,继承自FuctionN, 函数对象有apply,curried,toString,tupled这些方法。 而方法不具有这些特性。如果想把方法转换成一个函数,可以用方法名跟上下划线的方式.

1.基础环境准备

前提是安装好jdk,zookeeper这里不做描述

2.安装storm

  1. 下载安装包

    wget    http://124.202.164.6/files/1139000006794ECA/apache.fayea.com/storm/apache-storm-0.9.5/apache-storm-0.9.5.tar.gz

  2. 解压安装包

    tar -zxvf apache-storm-0.9.5.tar.gz -C /export/servers/
    cd /export/servers/

  3. 修改配置文件

    mv /export/servers/storm/conf/storm.yaml /export/servers/storm/conf/storm.yaml.bak
    vi /export/servers/storm/conf/storm.yaml

  4. 输入以下内容:

    #指定storm使用的zk集群
    storm.zookeeper.servers:
    - "zk01"
    - "zk02"
    - "zk03"
    #指定storm集群中的nimbus节点所在的服务器
    nimbus.host: "storm01"
    #指定nimbus启动JVM最大可用内存大小
    nimbus.childopts: "-Xmx1024m"
    #指定supervisor启动JVM最大可用内存大小
    supervisor.childopts: "-Xmx1024m"
    #指定supervisor节点上,每个worker启动JVM最大可用内存大小
    worker.childopts: "-Xmx768m"
    #指定ui启动JVM最大可用内存大小,ui服务一般与nimbus同在一个节点上。
    ui.childopts: "-Xmx768m"
    #指定supervisor节点上,启动worker时对应的端口号,每个端口对应槽,每个槽位对应一个worker
    supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

  5. 分发安装包

    scp -r /export/servers/apache-storm-0.9.5 storm02:/export/servers
    scp -r /export/servers/apache-storm-0.9.5 storm03:/export/servers

  6. 启动集群

    在nimbus.host所属的机器上启动 nimbus服务
     cd /export/servers/storm/bin/
     nohup ./storm nimbus &

    在nimbus.host所属的机器上启动ui服务
    cd /export/servers/storm/bin/
    nohup ./storm ui &

    在其它个点击上启动supervisor服务
    cd /export/servers/storm/bin/
    nohup ./storm supervisor &
  1. 查看集群

    访问nimbus.host:/8080,即可看到storm的ui界面

  2. Storm常用操作命令

  • 有许多简单且有用的命令可以用来管理拓扑,它们可以提交、杀死、禁用、再平衡拓扑。
  • 提交任务命令格式:storm jar 【jar路径】 【拓扑包名.拓扑类名】 【拓扑名称】
bin/storm jar examples/storm-starter/storm-starter-topologies-0.10.0.jar storm.starter.WordCountTopology wordcount
  • 杀死任务命令格式:storm kill 【拓扑名称】 -w 10(执行kill命令时可以通过-w [等待秒数]指定拓扑停用以后的等待时间)
storm kill topology-name -w 10
  • 停用任务命令格式:storm deactivte 【拓扑名称】
storm deactivte topology-name

我们能够挂起或停用运行中的拓扑。当停用拓扑时,所有已分发的元组都会得到处理,但是spouts的nextTuple方法不会被调用。销毁一个拓扑,可以使用kill命令。它会以一种安全的方式销毁一个拓扑,首先停用拓扑,在等待拓扑消息的时间段内允许拓扑完成当前的数据流。

  • 启用任务命令格式:storm activate【拓扑名称】
storm activate topology-name
  • 重新部署任务命令格式:storm rebalance 【拓扑名称】
storm rebalance topology-name

再平衡使你重分配集群任务。这是个很强大的命令。比如,你向一个运行中的集群增加了节点。再平衡命令将会停用拓扑,然后在相应超时时间之后重分配worker,并重启拓扑。

1. producer端配置文件说明

#指定kafka节点列表,用于获取metadata,不必全部指定
metadata.broker.list=kafka01:9092,kafka02:9092,kafka03:9092

# 指定分区处理类。默认kafka.producer.DefaultPartitioner,表通过key哈希到对应分区
#partitioner.class=kafka.producer.DefaultPartitioner

# 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。
compression.codec=none

# 指定序列化处理类
serializer.class=kafka.serializer.DefaultEncoder

# 如果要压缩消息,这里指定哪些topic要压缩消息,默认empty,表示不压缩。
#compressed.topics=

# 设置发送数据是否需要服务端的反馈,有三个值0,1,-1
# 0: producer不会等待broker发送ack 
# 1: 当leader接收到消息之后发送ack 
# -1: 当所有的follower都同步消息成功后发送ack. 
request.required.acks=0 

# 在向producer发送ack之前,broker允许等待的最大时间 ,如果超时,broker将会向producer发送一个error ACK.意味着上一次消息因为某种原因未能成功(比如follower未能同步成功) 
request.timeout.ms=10000

# 同步还是异步发送消息,默认“sync”表同步,"async"表异步。异步可以提高发送吞吐量,
也意味着消息将会在本地buffer中,并适时批量发送,但是也可能导致丢失未发送过去的消息
producer.type=sync

# 在async模式下,当message被缓存的时间超过此值后,将会批量发送给broker,默认为5000ms
# 此值和batch.num.messages协同工作.
queue.buffering.max.ms = 5000

# 在async模式下,producer端允许buffer的最大消息量
# 无论如何,producer都无法尽快的将消息发送给broker,从而导致消息在producer端大量沉积
# 此时,如果消息的条数达到阀值,将会导致producer端阻塞或者消息被抛弃,默认为10000
queue.buffering.max.messages=20000

# 如果是异步,指定每次批量发送数据量,默认为200
batch.num.messages=500

# 当消息在producer端沉积的条数达到"queue.buffering.max.meesages"后 
# 阻塞一定时间后,队列仍然没有enqueue(producer仍然没有发送出任何消息) 
# 此时producer可以继续阻塞或者将消息抛弃,此timeout值用于控制"阻塞"的时间 
# -1: 无阻塞超时限制,消息不会被抛弃 
# 0:立即清空队列,消息被抛弃 
queue.enqueue.timeout.ms=-1


# 当producer接收到error ACK,或者没有接收到ACK时,允许消息重发的次数 
# 因为broker并没有完整的机制来避免消息重复,所以当网络异常时(比如ACK丢失) 
# 有可能导致broker接收到重复的消息,默认值为3.
message.send.max.retries=3

# producer刷新topic metada的时间间隔,producer需要知道partition leader的位置,以及当前topic的情况 
# 因此producer需要一个机制来获取最新的metadata,当producer遇到特定错误时,将会立即刷新 
# (比如topic失效,partition丢失,leader失效等),此外也可以通过此参数来配置额外的刷新机制,默认值600000 
topic.metadata.refresh.interval.ms=60000

2.broker端配置文件说明

#broker的全局唯一编号,不能重复
broker.id=0

#用来监听链接的端口,producer或consumer将在此端口建立连接
port=9092

#处理网络请求的线程数量
num.network.threads=3

#用来处理磁盘IO的现成数量
num.io.threads=8

#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400

#接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400

#请求套接字的缓冲区大小
socket.request.max.bytes=104857600

#kafka运行日志存放的路径
log.dirs=/export/data/kafka/

#topic在当前broker上的分片个数
num.partitions=2

#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1

#segment文件保留的最长时间,超时将被删除
log.retention.hours=1

#滚动生成新的segment文件的最大时间
log.roll.hours=1

#日志文件中每个segment的大小,默认为1G
log.segment.bytes=1073741824

#周期性检查文件大小的时间
log.retention.check.interval.ms=300000

#日志清理是否打开
log.cleaner.enable=true

#broker需要使用zookeeper保存meta数据
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181

#zookeeper链接超时时间
zookeeper.connection.timeout.ms=6000

#partion buffer中,消息的条数达到阈值,将触发flush到磁盘
log.flush.interval.messages=10000

#消息buffer的时间,达到阈值,将触发flush到磁盘
log.flush.interval.ms=3000

#删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
delete.topic.enable=true

#此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessfu* 错误!
host.name=kafka01

advertised.host.name=192.168.140.128

3.consumer端配置文件说明

# zookeeper连接服务器地址
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181

# zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉
zookeeper.session.timeout.ms=5000

#当消费者挂掉,其他消费者要等该指定时间才能检查到并且触发重新负载均衡
zookeeper.connection.timeout.ms=10000

# 指定多久消费者更新offset到zookeeper中。注意offset更新时基于time而不是每次获得的消息。一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息
zookeeper.sync.time.ms=2000

#指定消费 
group.id=itcast

# 当consumer消费一定量的消息之后,将会自动向zookeeper提交offset信息 
# 注意offset信息并不是每消费一次消息就向zk提交一次,而是现在本地保存(内存),并定期提交,默认为true
auto.commit.enable=true

# 自动更新时间。默认60 * 1000
auto.commit.interval.ms=1000

# 当前consumer的标识,可以设定,也可以有系统生成,主要用来跟踪消息消费情况,便于观察
conusmer.id=xxx 

# 消费者客户端编号,用于区分不同客户端,默认客户端程序自动产生
client.id=xxxx

# 最大取多少块缓存到消费者(默认10)
queued.max.message.chunks=50

# 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新  的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册 "Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点, 此值用于控制,注册节点的重试次数. 
rebalance.max.retries=5

# 获取消息的最大尺寸,broker不会像consumer输出大于此值的消息chunk 每次feth将得到多条消息,此值为总大小,提升此值,将会消耗更多的consumer端内存
fetch.min.bytes=6553600

# 当消息的尺寸不足时,server阻塞的时间,如果超时,消息将立即发送给consumer
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360

# 如果zookeeper没有offset值或offset值超出范围。那么就给个初始的offset。有smallest、largest、anything可选,分别表示给当前最小的offset、当前最大的offset、抛异常。默认largest
auto.offset.reset=smallest

# 指定序列化处理类
derializer.class=kafka.serializer.DefaultDecoder

4.Kafka整体概念梳理

  • Producer :消息生产者,就是向kafka broker发消息的客户端。
  • Consumer :消息消费者,向kafka broker取消息的客户端
  • Topic :名称。
  • Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
  • Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
  • Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
  • Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka
  • Replication:Kafka支持以Partition为单位对Message进行冗余备份,每个Partition都可以配置至少1个Replication(当仅1个Replication时即仅该Partition本身)。
  • Leader:每个Replication集合中的Partition都会选出一个唯一的Leader,所有的读写请求都由Leader处理。其他Replicas从Leader处把数据更新同步到本地,过程类似大家熟悉的MySQL中的Binlog同步。每个Cluster当中会选举出一个Broker来担任Controller,负责处理Partition的Leader选举,协调Partition迁移等工作。
  • ISR(In-Sync Replica):是Replicas的一个子集,表示目前Alive且与Leader能够“Catch-up”的Replicas集合。由于读写都是首先落到Leader上,所以一般来说通过同步机制从Leader上拉取数据的Replica都会和Leader有一些延迟(包括了延迟时间和延迟条数两个维度),任意一个超过阈值都会把该Replica踢出ISR。每个Partition都有它自己独立的ISR。

5.使用Kafka Producer生产数据的分发策略

The default partitioning strategy:
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose a partition in a round-robin fashion
分发策略:
1)如果指定了partition,直接使用
2)如果没有指定partition,但是制定了key,可以使用key做hash取模
3)如果没有指定partition,又没有指定key,使用轮训的方式
//在ProducerRecord构造参数中有key的情况下,会根据key进行hash取模,得到partition的编号
kafkaProducer.send(new ProducerRecord<String, String>("topic01","num","Consumer Group "));
// 如果沒有key,也沒有partition就會轮询
kafkaProducer.send(new ProducerRecord<String, String>("topic01", "afka Web "));
//如果指定了partition,就會使用partition
kafkaProducer.send(new ProducerRecord<String, String>("topic01",1,"num","value"));

6.producer数据生产不丢失的问题

  • 如果是同步模式下
    • 将发送状态设置为-1,是最为妥当的。但是,由于-1是让所有的副本都确定收到数据,这个过程会有较长的等待。面对海量的数据,如果每条消息都确认的话,效率会大大降低。
    • 一般做法的做法: 设置让leader接收到数据就确认,就也是1,提高效率,这个方案可能会有丢失的风险。
  • 如果是在异步模式下(也有ack)
    • 生产的数据并不会立即发送给broker,会在produer段有个容器(队列)来临时缓存数据。
    • 针对这个容器,有个阻塞设置。如果设置为0,就是立即丢弃数据。如果这是为-1,就永久阻塞。
      • 如果在producer永久阻塞时,人为关闭producer代码所在进程,会立即清空队列中的数据,导致数据丢失。