2017年11月

1、什么是RDD

  • RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合.
    • Dataset:一个数据集,简单的理解为集合,用于存放数据的
    • Distributed:它的数据分布式存储,并且可以做分布式的计算
    • Resilient:弹性的
      • 它表示的是数据可以保存在磁盘,也可以保存在内存中

2、RDD的五大特性

    • A list of partitions
    • 每个RDD都有一个分区列表
    • A function for computing each split
    • 作用在每个分区上面的函数
    • A list of dependencies on other RDDs
    • 一个RDD依赖其他多个RDD,这个特性很重要,rdd的容错机制就是根据这个特性而来的
    • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    • 可选项:针对于 kv 键值对的RDD才具有该分区特性
    • Optionally, a list of preferred locations to compute each split on (e.g. block locations for
      an HDFS file)
    • 可选项 : 数据本地性,数据最优,选择尽量存储在worker节点上的数据节点。

3、创建rdd方式

  • 1、通过已经存在的集合

    • val rdd1=sc.parallelize(Array(1,2,3,4))
  • 2、读取外部数据源

    • val rdd2=sc.textFile("文件")
  • 3、由一个rdd转化成一个新的RDD

    • val rdd3=rdd2.flatMap

4、RDD的算子分类

  • transformation:转换操作,将一个rdd转换生成一个新的rdd(flatMap/map/reduceByKey),它属于懒加载,延迟执行,并不会立即触发任务的执行。
  • action :此时才会真正的触发任务的计算。

5、rdd中的2种依赖关系

  • 窄依赖:每个父RDD的partition最多只被子的rdd的一个partition
  • 宽依赖:子rdd的partition会依赖于父rdd的多个partiiton

6、lineage(血统)

  • 它会记录下当前作用在rdd上的分区数据(元数据)和一系列的转换行为,当子rdd中某个分区数据丢失之后,只需要通过血统来重新计算恢复当前丢失数据的分区(spark 具备容错机制)

7、RDD的缓存方式

  • cache:直接将rdd中的数据,保存在内存中,其本质是persist(StorageLevel.MEMOEY_ONLY)
  • persist:可以有丰富的缓存级别
  • 当rdd设置了缓存之后,如果下面有需要用到该RDD的数据的时候,就不要重复计算,可以直接从缓存中获取得到。

8、checkpoint

  • 会对数据进行一个持久化操作,保存在hdfs

  • 使用的时候:

    • 需要sc.setCheckpointdir 来设置一个检查点目录
    • 对需要缓存的rdd调用checkpoint
    • 注意:同样在执行数据缓存的时候,需要有对应的action算子操作,才会真正触发持久化操作。
  • 在做checkpoint操作的时候,此时会先执行对应触发action算子的rdd结果,计算完成之后又会开辟一个新的job来计算你设置了checkpoint的rdd的结果。

  • 在设置了checkpoint之后,对应这个rdd会改变之前的依赖关系,如果当前数据丢失了,只有重头计算得到。

  • 如何使用checkpoint

    • 可以对要做checkPoint的rdd,先进行一个cache
    • 在做一个checkpint操作
  • 数据恢复的一般顺序

    • 内存---------->checkpoint------------>重新计算得到

1、spark概述

  • spark是基于内存的一个计算框架,计算速度非常的快。这里面没有涉及到任何存储,如果想要处理外部的数据源,比如数据在HDFS上,此时我们就需要先搭建一个hadoop集群。

2、spark的特点

  • 1、速度快(比mapreduce在内存中快100倍,比在磁盘中快10倍)

    • (1)spark在处理的数据中间结果数据可以不落地,mapreduce每次中间结果都要落地。
    • (2)在mapreduce计算的时候,mapTask,reduceTask,每一个task都对应一个jvm进程。
      在spark中,它同样会按照hadoop中切片逻辑,会有N个task,而这些task都是运行在worker节点上,worker上会有executor进程,而这些task会以线程的方式运行在executor上面。
  • 2、易用性

    • 可以使用多种语言来编写spark应用程序
      • java
      • scala
      • Python
      • R
  • 3、通用性

    • 可以使用sparksql、sparkStreaming、Mlib、Graphx
  • 4、兼容性

    • 可以运行在不同的资源调度平台
      • yarn(resourceManger分配资源)
      • mesos(是apache下开源的资源调度框架)
      • standAlone(master进行资源的分配)

3、spark集群安装

  • 1、下载对应版本的安装包
  • 2、上传安装包到服务器上
  • 3、规划一下安装目录
  • 4、解压安装包到指定的安装目录
  • 5、重命名安装目录
  • 6、修改配置文件 cd conf
    • (1) spark-env.sh.template (需要 mv spark-env.sh.template spark-env.sh)
      • 配置javahome export JAVA_HOME=/export/servers/jdk
      • 配置master的Host export SPARK_MASTER_HOST=node1
      • 配置master的Port export SPARK_MASTER_PORT=7077
    • (2)slaves.template (需要 mv slaves.template slaves)
      • 添加worker节点
        • node2
        • node3
  • 7、配置一下spark的环境变量
  • 8、通过scp命令分发到其他节点中
    • spark安装目录
    • /etc/profile
  • 9、所有机器都要source /etc/profile
  • 10、可以启动spark集群
    • $SPARK_HOME/sbin/start-all.sh
    • 可以通过web界面访问master
      • http://node01:8080
  • 11、停止spark集群
    • $SPARK_HOME/sbin/stop-all.sh

4、spark高可用集群配置

  • 1、需要先zk集群
  • 2、修改spark配置(spark-env.sh)
    • (1)注释掉master的地址
    • (2) 引入zk配置
      • export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hdp-node-01:2181,hdp-node-02:2181,hdp-node-03:2181 -Dspark.deploy.zookeeper.dir=/spark"
  • 3、启动
  • 启动zk集群
  • 需要在spark集群中任意一台机器上启动 start-all.sh
    • 产生master进程
    • 并且会根据 slaves,去对应的主机名上启动worker进程
  • 在其他worker节点上单独启动master
    • start-master.sh

5、初识spark程序

  • 已经知道那个master是活着的master
  • --master spark://node1:7077
  • 有很多的master时候
  • --master spark://node1:7077,node2:7077,node3:7077

6、spark-shell使用

  • 1、spark-shell --master local[N] (本地单机版)
    • local[N]:表示在本地模拟N个线程来运行当前任务
  • 2、spark-shell --master local[*] (本地单机版)
    • 这个*表示当前机器上所有可用的资源
  • 3、spark-shell --master spark://node2:7077
  • 4、spark-shell 读取hdfs上的数据文件
    • sc.textFile("hdfs://node1:9000/wc.txt").flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect

7、spark整合hdfs

  • 1、修改配置文件(spark-env.sh)

    • 添加配置参数
      • export HADOOP_CONF_DIR=/export/servers/hadoop/etc/hadoop
        • 通过scp分发配置到其他节点
  • 2、可以sc.textFile("/wc.txt").flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect

8、scala语言编程spark单词计数

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//通过scala编写spark的单词计数程序
object WordCount {

  def main(args: Array[String]): Unit = {
    //1、创建SparkConf对象,设置appName和master地址,local[2]表示本地使用2个线程来进行计算
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
    //2、创建SparkContext对象,这个对象很重要,它会创建DAGScheduler和TaskScheduler
    val sc: SparkContext = new SparkContext(sparkConf)
    //设置日志输出级别
    sc.setLogLevel("WARN")
    //3、读取数据文件
    //val data: RDD[String] = sc.textFile(args(0))
    val data: RDD[String] = sc.textFile("E:\\wordcount\\input\\words.txt")
    //4、切分每一行,并且压平  hello、you、me
    val words: RDD[String] = data.flatMap(_.split(" "))
    //5、每个单词记位1
    val wordAndOne: RDD[(String, Int)] = words.map((_,1))
    //6、相同单词出现的次数进行累加
    val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
    //按照单词出现的次数降序排序
    val sortResult: RDD[(String, Int)] = result.sortBy(_._2,false)
    //7、收集数据,打印输出
    val finalResult: Array[(String, Int)] = sortResult.collect()
    //打印结果
    finalResult.foreach(x=>println(x))
    //关闭
    sc.stop()
  }
}

9、java语言编程spark单词计数

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

//利用java语言来实现spark的单词计数
public class WordCount_Java {
    public static void main(String[] args) {
        //1、创建SparkConf对象,设置appName和master地址
        SparkConf sparkConf = new SparkConf().setAppName("WordCount_Java").setMaster("local[2]");

        //2、创建javaSparkContext对象
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        //3、读取数据文件
        JavaRDD<String> dataJavaRDD = jsc.textFile("E:\\wordcount\\input\\words.txt");

        //4、对每一行进行切分压平
        JavaRDD<String> wordsJavaRDD = dataJavaRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override               //line表示每一行记录
            public Iterator<String> call(String line) throws Exception {
                //切分每一行
                String[] words = line.split(" ");

                return Arrays.asList(words).iterator();
            }
        });

        //5、每个单词记为1
        JavaPairRDD<String, Integer> wordAndOneJavaPairRDD = wordsJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<>(word, 1);
            }
        });

        //6、把相同单词出现的次数累加  (_+_)
        JavaPairRDD<String, Integer> resultJavaPairRDD = wordAndOneJavaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        //按照单词出现的次数降序排序
        //需要将(单词,次数)进行位置颠倒 (次数,单词)
        JavaPairRDD<Integer, String> sortJavaPairRDD = resultJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception {
                return new Tuple2<>(t._2, t._1);
            }
        }).sortByKey(false);

        //将(次数,单词)变为(单词,次数)
        JavaPairRDD<String, Integer> finalSortJavaPairRDD = sortJavaPairRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception {
                return new Tuple2<>(t._2, t._1);
            }
        });

        //7、收集打印
        List<Tuple2<String, Integer>> finalResult = finalSortJavaPairRDD.collect();

        for(Tuple2<String, Integer> t:finalResult){
            System.out.println(t);
        }

        jsc.stop();
    }
}

10、上面的程序所依赖的pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>cn.itcast</groupId>
    <artifactId>Spark</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>18</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
        <hadoop.version>2.7.4</hadoop.version>
        <spark.version>2.0.2</spark.version>
    </properties>
    <dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Spark运行基本流程图

图片1.png

1.数组

  • 1.1.数组
    • 1.1.定长数组和变长数组
      (1)定长数组定义格式:
      val arr=new Array[T] (数组长度)
      (2)变长数组定义格式:
      val arr = ArrayBuffer[T] () 注意需要导包:import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ArrayBuffer
object ArrayDemo {
  def main(args: Array[String]) {

    //初始化一个长度为8的定长数组,其所有元素均为0
    val arr1 = new Array[Int](8)
    //直接打印定长数组,内容为数组的hashcode值
    println(arr1)
    //将数组转换成数组缓冲,就可以看到原数组中的内容了
    //toBuffer会将数组转换长数组缓冲
    println(arr1.toBuffer)

    //注意:如果new,相当于调用了数组的apply方法,直接为数组赋值
    //初始化一个长度为1的定长数组
    val arr2 = Array[Int](10)
    println(arr2.toBuffer)

    //定义一个长度为3的定长数组
    val arr3 = Array("hadoop", "storm", "spark")
    //使用()来访问元素
    println(arr3(2))

    //////////////////////////////////////////////////
    //变长数组(数组缓冲)
    //如果想使用数组缓冲,需要导入import scala.collection.mutable.ArrayBuffer包
    val ab = ArrayBuffer[Int]()
    //向数组缓冲的尾部追加一个元素
    //+=尾部追加元素
    ab += 1
    //追加多个元素
    ab += (2, 3, 4, 5)
    //追加一个数组++=
    ab ++= Array(6, 7)
    //追加一个数组缓冲
    ab ++= ArrayBuffer(8,9)
    //打印数组缓冲ab

    //在数组某个位置插入元素用insert,从某下标插入
    ab.insert(0, -1, 0)
    //删除数组某个位置的元素用remove  按照下标删除
    ab.remove(0)
    println(ab)

  }
}
  • 1.2. 遍历数组

    • (1).增强for循环
    • (2).好用的until会生成脚标,0 until 10 包含0不包含10
object ForArrayDemo {
  def main(args: Array[String]) {
    //初始化一个数组
    val arr = Array(1,2,3,4,5,6,7,8)
    //增强for循环
    for(i <- arr)
      println(i)

    //好用的until会生成一个Range
    //reverse是将前面生成的Range反转
    for(i <- (0 until arr.length).reverse)
      println(arr(i))
  }
}
  • 1.3.数组转换
    yield关键字将原始的数组进行转换会产生一个新的数组,原始的数组不变
    图片4.png
object ArrayYieldDemo {
  def main(args: Array[String]) {
    //定义一个数组
    val arr = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
    //将偶数取出乘以10后再生成一个新的数组
    val res = for (e <- arr if e % 2 == 0) yield e * 10
    println(res.toBuffer)

    //更高级的写法,用着更爽
    //filter是过滤,接收一个返回值为boolean的函数
    //map相当于将数组中的每一个元素取出来,应用传进去的函数
    val r = arr.filter(_ % 2 == 0).map(_ * 10)
    println(r.toBuffer)

  }
}
  • 1.4数组常用算法
    在Scala中,数组上的某些方法对数组进行相应的操作非常方便
    图片5.png

2.映射

在Scala中,把哈希表这种数据结构叫做映射

  • 2.1构建映射

    • (1)val map=Map(键->值,键->值....)
    • (2)利用元组构建 val map=Map((键,值),(键,值),(键,值)....)
      图片6.png
  • 2.2.获取和修改映射中的值

    • 获取映射中的值:
      值=map(键)
      好用的getOrElse (如果映射中有值,返回映射中的值,没有就返回默认值)
      注意:在Scala中,有两种Map,一个是immutable包下的Map,该Map中的内容不可变;另一个是mutable包下的Map,该Map中的内容可变
      通常我们在创建一个集合是会用val这个关键字修饰一个变量(相当于java中的final),那么就意味着该变量的引用不可变,该引用中的内容是不是可变,取决于这个引用指向的集合的类型

3.元组

映射是K/V对偶的集合,对偶是元组的最简单形式,元组可以装着多个不同类型的值。

  • 3.1创建元组

    • (1)元组是不同类型的值的聚集;对偶是最简单的元组。
    • (2)元组表示通过将不同的值用小括号括起来,即表示元组。

    创建元组格式:
    val tuple=(元素,元素...)

  • 3.2获取元组中的值

    • 获取元组中的值格式:
      使用下划线加脚表 ,例如 t._1 t._2 t._3
      注意:元组中的元素脚标是从1开始的
  • 3.3将对偶的集合转换成映射

    • 将对偶的集合装换成映射:
      调用其toMap 方法
      图片7.png
  • 3.4.拉链操作

    • (1).使用zip命令可以将多个值绑定在一起
      图片8.png
      注意:如果两个数组的元素个数不一致,拉链操作后生成的数组的长度为较小的那个数组的元素个数

    • (2).如果其中一个元素的个数比较少,可以使用zipAll用默认的元素填充
      图片9.png

4.集合

Scala的集合有三大类:序列Seq、Set、映射Map,所有的集合都扩展自Iterable特质
在Scala中集合有可变(mutable)和不可变(immutable)两种类型,immutable类型的集合初始化后就不能改变了(注意与val修饰的变量进行区别)
  • 4.1 List

(1)不可变的序列 import scala.collection.immutable._

在Scala中列表要么为空(Nil表示空列表)要么是一个head元素加上一个tail列表。
9 :: List(5, 2)  :: 操作符是将给定的头和尾创建一个新的列表 
注意::: 操作符是右结合的,如9 :: 5 :: 2 :: Nil相当于 9 :: (5 :: (2 :: Nil))


list常用的操作符:
++[B](that: GenTraversableOnce[B]): List[B] 从列表的尾部添加另外一个列表
++: [B >: A, That](that: collection.Traversable[B])(implicit bf: CanBuildFrom[List[A], B, That]): That 在列表的头部添加一个列表
+: (elem: A): List[A] 在列表的头部添加一个元素
:+ (elem: A): List[A] 在列表的尾部添加一个元素
:: (x: A): List[A]     在列表的头部添加一个元素
::: (prefix: List[A]): List[A] 在列表的头部添加另外一个列表
val left = List(1,2,3)
val right = List(4,5,6)
//以下操作等价
left ++ right      // List(1,2,3,4,5,6)
left ++: right     // List(1,2,3,4,5,6)
right.++:(left)    // List(1,2,3,4,5,6)
right.:::(left)     // List(1,2,3,4,5,6)
//以下操作等价
0 +: left    //List(0,1,2,3)
left.+:(0)   //List(0,1,2,3)
//以下操作等价
left :+ 4    //List(1,2,3,4)
left.:+(4)   //List(1,2,3,4)
//以下操作等价
0 :: left      //List(0,1,2,3)
left.::(0)     //List(0,1,2,3)

(2)可变的序列 import scala.collection.mutable._

import scala.collection.mutable.ListBuffer

object MutListDemo extends App{
  //构建一个可变列表,初始有3个元素1,2,3
  val lst0 = ListBuffer[Int](1,2,3)
  //创建一个空的可变列表
  val lst1 = new ListBuffer[Int]
  //向lst1中追加元素,注意:没有生成新的集合
  lst1 += 4
  lst1.append(5)

  //将lst1中的元素最近到lst0中, 注意:没有生成新的集合
  lst0 ++= lst1

  //将lst0和lst1合并成一个新的ListBuffer 注意:生成了一个集合
  val lst2= lst0 ++ lst1

  //将元素追加到lst0的后面生成一个新的集合
  val lst3 = lst0 :+ 5

  //删除元素,注意:没有生成新的集合
  val lst4 = ListBuffer[Int](1,2,3,4,5)
  lst4 -= 5

  //删除一个集合列表,生成了一个新的集合
  val lst5=lst4--List(1,2)

  //把可变list 转换成不可变的list 直接加上toList
  val lst6=lst5.toList

  //把可变list 转变数组用toArray
  val lst7=lst5.toArray

  println(lst0)
  println(lst1)
  println(lst2)
  println(lst3)
  println(lst4)
  println(lst5)
  println(lst6)
  println(lst7)

}
  • 4.2.Set
    (1)不可变的Set import scala.collection.immutable._ Set代表一个没有重复元素的集合;将重复元素加入Set是没有用的,而且 Set 是不保证插入顺序的,即 Set 中的元素是乱序的。
    定义:val set=Set(元素,元素,.....)
//定义一个不可变的Set集合
scala> val set =Set(1,2,3,4,5,6,7)
set: scala.collection.immutable.Set[Int] = Set(5, 1, 6, 2, 7, 3, 4)

//元素个数
scala> set.size
res0: Int = 7

//取集合最小值
scala> set.min
res1: Int = 1

//取集合最大值
scala> set.max
res2: Int = 7

//将元素和set1合并生成一个新的set,原有set不变
scala> set + 8
res3: scala.collection.immutable.Set[Int] = Set(5, 1, 6, 2, 7, 3, 8, 4)

scala> val set1=Set(7,8,9)
set1: scala.collection.immutable.Set[Int] = Set(7, 8, 9)

//两个集合的交集
scala> set & set1
res4: scala.collection.immutable.Set[Int] = Set(7)

//两个集合的并集
scala> set ++ set1
res5: scala.collection.immutable.Set[Int] = Set(5, 1, 6, 9, 2, 7, 3, 8, 4)

//在第一个set基础上去掉第二个set中存在的元素
scala> set -- set1
res6: scala.collection.immutable.Set[Int] = Set(5, 1, 6, 2, 3, 4)

//返回第一个不同于第二个set的元素集合
scala> set &~ set1
res7: scala.collection.immutable.Set[Int] = Set(5, 1, 6, 2, 3, 4)


//计算符合条件的元素个数
scala> set.count(_ >5)
res8: Int = 2

/返回第一个不同于第二个的元素集合
scala> set.diff(set1)
res9: scala.collection.immutable.Set[Int] = Set(5, 1, 6, 2, 3, 4)

/返回第一个不同于第二个的元素集合
scala> set1.diff(set)
res10: scala.collection.immutable.Set[Int] = Set(8, 9)

//取子set(2,5为元素位置, 从0开始,包含头不包含尾)
scala> set.slice(2,5)
res11: scala.collection.immutable.Set[Int] = Set(6, 2, 7)

//迭代所有的子set,取指定的个数组合
scala> set1.subsets(2).foreach(x=>println(x))
Set(7, 8)
Set(7, 9)
Set(8, 9)

(2)可变的Set import scala.collection.mutable._

//导入包
scala> import scala.collection.mutable
import scala.collection.mutable
//定义一个可变的Set
scala> val set1=new HashSet[Int]()
set1: scala.collection.mutable.HashSet[Int] = Set()

//添加元素
scala> set1 += 1
res1: set1.type = Set(1)

//添加元素  add等价于+=
scala> set1.add(2)
res2: Boolean = true
scala> set1
res3: scala.collection.mutable.HashSet[Int] = Set(1, 2)

//向集合中添加元素集合
scala> set1 ++=Set(1,4,5)
res5: set1.type = Set(1, 5, 2, 4)

//删除一个元素
scala> set1 -=5
res6: set1.type = Set(1, 2, 4)

//删除一个元素
scala> set1.remove(1)
res7: Boolean = true
scala> set1
res8: scala.collection.mutable.HashSet[Int] = Set(2, 4)

  • 4.3.Map
    (1)不可变的Map import scala.collection.immutable._
定义Map集合
1.val map=Map(键->值,键->值...)
2.利用元组构建  val map=Map((键,值),(键,值),(键,值)....)
展现形式:
val  map = Map(“zhangsan”->30,”lisi”->40)
val  map = Map((“zhangsan”,30),(“lisi”,40))

3.操作map集合
获取值: 值=map(键)
原则:通过先获取键,在获取键对应值。

4.遍历map集合


scala> val imap=Map("zhangsan" -> 20,"lisi" ->30)
imap: scala.collection.immutable.Map[String,Int] = Map(zhangsan -> 20, lisi -> 30)
//方法一:显示所有的key
scala> imap.keys
res0: Iterable[String] = Set(zhangsan, lisi)

//方法二:显示所有的key
scala> imap.keySet
res1: scala.collection.immutable.Set[String] = Set(zhangsan, lisi)

//通过key获取value
scala> imap("lisi")
res2: Int = 30

//通过key获取value 有key对应的值则返回,没有就返回默认值0,
scala> imap.getOrElse("zhangsan",0)
res4: Int = 20

//没有对应的key,返回默认0
scala> imap.getOrElse("zhangsan1",0)
res5: Int = 0
//由于是不可变map,故不能向其添加、删除、修改键值对
* (2)可变的Map  import scala.collection.mutable._
//导包
import scala.collection.mutable
//声明一个可变集合
scala> val user =mutable.HashMap("zhangsan"->50,"lisi" -> 100)
user: scala.collection.mutable.HashMap[String,Int] = Map(lisi -> 100, zhangsan -> 50)

//添加键值对
scala> user +=("wangwu" -> 30)
res0: user.type = Map(lisi -> 100, zhangsan -> 50, wangwu -> 30)

//添加多个键值对
scala> user += ("zhangsan0" -> 30,"lisi0" -> 20)
res1: user.type = Map(zhangsan0 -> 30, lisi -> 100, zhangsan -> 50, lisi0 -> 20,wangwu -> 30)

//方法一:显示所有的key
scala> user.keys
res2: Iterable[String] = Set(zhangsan0, lisi, zhangsan, lisi0, wangwu)

//方法二:显示所有的key
scala> user.keySet
res3: scala.collection.Set[String] = Set(zhangsan0, lisi, zhangsan, lisi0, wangwu)

//通过key获取value
scala> user("zhangsan")
res4: Int = 50

//通过key获取value 有key对应的值则返回,没有就返回默认值0,
scala> user.getOrElse("zhangsan",0)
res5: Int = 50

//没有对应的key,返回默认0
scala> user.getOrElse("zhangsan1",0)
res6: Int = 0

//更新键值对
scala> user("zhangsan") = 55
scala> user("zhangsan")
res8: Int = 55

//更新多个键值对
scala> user += ("zhangsan" -> 60, "lisi" -> 50)
res9: user.type = Map(zhangsan0 -> 30, lisi -> 50, zhangsan -> 60, lisi0 -> 20,wangwu -> 30)

//删除key
scala> user -=("zhangsan")
res14: user.type = Map(zhangsan0 -> 30, lisi -> 50, lisi0 -> 20, wangwu -> 30)

//删除key
scala>user.remove("zhangsan0")

//遍历map 方法一:模式匹配
scala> for((x,y) <- user) println(x+" -> "+y)
lisi -> 50
lisi0 -> 20
wangwu -> 30

//遍历map 方法二:通过key值
scala> for(x<- user.keys) println(x+" -> "+user(x))
lisi -> 50
lisi0 -> 20
wangwu -> 30

//遍历map 方法三:通过foreach
scala>  user.foreach{case (x,y) => println(x+" -> "+y)}
lisi -> 50
lisi0 -> 20
wangwu -> 30

1.Scala概述

  • 什么是Scala
    • Scala是一种多范式的编程语言,其设计的初衷是要集成面向对象编程和函数式编程的各种特性。Scala运行于Java平台(Java虚拟机),并兼容现有的Java程序。

2.Scala编译器安装

  • 2.1、安装JDK(因为Scala是运行在JVM平台上的,所以安装Scala之前要安装JDK)
  • 2.2、安装Scala
    • Windows安装Scala编译器
      直接解压之后,配置环境变量即可

3.Scala基础

  • 1.声明变量
object VariableDemo {
  def main(args: Array[String]) {
    //使用val定义的变量值是不可变的,相当于java里用final修饰的变量
    val i = 1
    //使用var定义的变量是可变得,在Scala中鼓励使用val
    var s = "hello"
    //Scala编译器会自动推断变量的类型,必要的时候可以指定类型
    //变量名在前,类型在后
    val str: String = "itcast"
  }
}
  • 2.常用类型
    Scala和Java一样,有7种数值类型Byte、Char、Short、Int、Long、Float和Double(无包装类型)和一个Boolean类型
  • 3.条件表达式
object ConditionDemo {
  def main(args: Array[String]) {
    val x = 1
    //判断x的值,将结果赋给y
    val y = if (x > 0) 1 else -1
    //打印y的值
    println(y)

    //支持混合类型表达式
    val z = if (x > 1) 1 else "error"
    //打印z的值
    println(z)

    //如果缺失else,相当于if (x > 2) 1 else ()
    val m = if (x > 2) 1
    println(m)

    //在scala中每个表达式都有值,scala中有个Unit类,写做(),相当于Java中的void
    val n = if (x > 2) 1 else ()
    println(n)

    //if和else if
    val k = if (x < 0) 0
    else if (x >= 1) 1 else -1
    println(k)
  }
}
  • 4.块表达式
object BlockExpressionDemo {
  def main(args: Array[String]) {
    val x = 0
    //在scala中{}中包含一系列表达式,块中最后一个表达式的值就是块的值
    //下面就是一个块表达式
    val result = {
      if (x < 0){
        -1  
      } else if(x >= 1) {
        1
      } else {
        "error"
      }
    }
    //result的值就是块表达式的结果
    println(result)
  }
}
  • 5.循环在scala中有for循环和while循环,用for循环比较多, for循环语法结构:for (i <- 表达式/数组/集合)
object ForDemo {
  def main(args: Array[String]) {
    //for(i <- 表达式),表达式1 to 10返回一个Range(区间)
    //每次循环将区间中的一个值赋给i
    for (i <- 1 to 10)
      println(i)

    //for(i <- 数组)
    val arr = Array("a", "b", "c")
    for (i <- arr)
      println(i)

    //高级for循环
    //每个生成器都可以带一个条件,注意:if前面没有分号
    for(i <- 1 to 3; j <- 1 to 3 if i != j)
      print((10 * i + j) + " ")
    println()

    //for推导式:如果for循环的循环体以yield开始,则该循环会构建出一个集合
    //每次迭代生成集合中的一个值
    val v = for (i <- 1 to 10) yield i * 10
    println(v)
  }
}
  • 6.调用方法和函数
Scala中的+ - * / %等操作符的作用与Java一样,位操作符 & | ^ >> <<也一样。只是有
一点特别的:这些操作符实际上是方法。例如:
a + b
是如下方法调用的简写:
a.+(b)
a 方法 b可以写成 a.方法(b)
  • 7.定义方法
    图片1.png

方法的返回值类型可以不写,编译器可以自动推断出来,但是对于递归函数,必须指定返回类型

  • 8.定义函数
    图片2.png

方法和函数的区别
在函数式编程语言中,函数是“头等公民”,它可以像 任何其他数据类型一样被传递和操 作,函数是一个对象,继承自FuctionN, 函数对象有apply,curried,toString,tupled这些方法。 而方法不具有这些特性。如果想把方法转换成一个函数,可以用方法名跟上下划线的方式.

1.基础环境准备

前提是安装好jdk,zookeeper这里不做描述

2.安装storm

  1. 下载安装包

    wget    http://124.202.164.6/files/1139000006794ECA/apache.fayea.com/storm/apache-storm-0.9.5/apache-storm-0.9.5.tar.gz

  2. 解压安装包

    tar -zxvf apache-storm-0.9.5.tar.gz -C /export/servers/
    cd /export/servers/

  3. 修改配置文件

    mv /export/servers/storm/conf/storm.yaml /export/servers/storm/conf/storm.yaml.bak
    vi /export/servers/storm/conf/storm.yaml

  4. 输入以下内容:

    #指定storm使用的zk集群
    storm.zookeeper.servers:
    - "zk01"
    - "zk02"
    - "zk03"
    #指定storm集群中的nimbus节点所在的服务器
    nimbus.host: "storm01"
    #指定nimbus启动JVM最大可用内存大小
    nimbus.childopts: "-Xmx1024m"
    #指定supervisor启动JVM最大可用内存大小
    supervisor.childopts: "-Xmx1024m"
    #指定supervisor节点上,每个worker启动JVM最大可用内存大小
    worker.childopts: "-Xmx768m"
    #指定ui启动JVM最大可用内存大小,ui服务一般与nimbus同在一个节点上。
    ui.childopts: "-Xmx768m"
    #指定supervisor节点上,启动worker时对应的端口号,每个端口对应槽,每个槽位对应一个worker
    supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

  5. 分发安装包

    scp -r /export/servers/apache-storm-0.9.5 storm02:/export/servers
    scp -r /export/servers/apache-storm-0.9.5 storm03:/export/servers

  6. 启动集群

    在nimbus.host所属的机器上启动 nimbus服务
     cd /export/servers/storm/bin/
     nohup ./storm nimbus &

    在nimbus.host所属的机器上启动ui服务
    cd /export/servers/storm/bin/
    nohup ./storm ui &

    在其它个点击上启动supervisor服务
    cd /export/servers/storm/bin/
    nohup ./storm supervisor &
  1. 查看集群

    访问nimbus.host:/8080,即可看到storm的ui界面

  2. Storm常用操作命令

  • 有许多简单且有用的命令可以用来管理拓扑,它们可以提交、杀死、禁用、再平衡拓扑。
  • 提交任务命令格式:storm jar 【jar路径】 【拓扑包名.拓扑类名】 【拓扑名称】
bin/storm jar examples/storm-starter/storm-starter-topologies-0.10.0.jar storm.starter.WordCountTopology wordcount
  • 杀死任务命令格式:storm kill 【拓扑名称】 -w 10(执行kill命令时可以通过-w [等待秒数]指定拓扑停用以后的等待时间)
storm kill topology-name -w 10
  • 停用任务命令格式:storm deactivte 【拓扑名称】
storm deactivte topology-name

我们能够挂起或停用运行中的拓扑。当停用拓扑时,所有已分发的元组都会得到处理,但是spouts的nextTuple方法不会被调用。销毁一个拓扑,可以使用kill命令。它会以一种安全的方式销毁一个拓扑,首先停用拓扑,在等待拓扑消息的时间段内允许拓扑完成当前的数据流。

  • 启用任务命令格式:storm activate【拓扑名称】
storm activate topology-name
  • 重新部署任务命令格式:storm rebalance 【拓扑名称】
storm rebalance topology-name

再平衡使你重分配集群任务。这是个很强大的命令。比如,你向一个运行中的集群增加了节点。再平衡命令将会停用拓扑,然后在相应超时时间之后重分配worker,并重启拓扑。