spark quick start

注意, Spark2.0 之前的版本,Spark的主要编程接口是RDD(Resilient Distributed Dataset)。2.0以后的版本,主要编程接口替换为Dataset。当然了RDD接口依然支持,可以从RDD programming guide。但是,我们强烈建议你切换到Dataset,比RDD有更好的性能。 关于Dataset可以从SQL programming guide获得更多细节。

使用SparkShell进行交互分析

基础内容

启动 ./bin/spark-shell

基本执行:

val textFile = spark.read.textFile("README.md")
textFile.count() // Number of items in this Dataset
textFile.first() // first item in this Dataset

将一个Dataset转换为新的一个。 调用filter返回新的Dataset,是原始文件的一个子集

val linesWithSpark = textFile.filter(t => t.contains("Spark"))

这儿有个疑问,如何打印出dataset中内容??

更多的Dataset的操作

Dataset的actions和transformation 可以用于更多的复杂运算。

textFile.map(line => line.split(" ").size).reduce((a,b) => if (a > b) a else b)
res4: Long = 15

map和reduce的参数都是scala的匿名函数,还可以使用scala/java 库。例如 可以使用Math.max()函数。

import java.lang.Math
textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a,b))

一个通用的数据处理流程是MapReduce。Spark可以很容易的实现MapReduce流。

val wordCounts = textFile.map(line => line.split(" ")).groupByKey(identity).count()

Caching

spark支持将dataset写入cluster-wide in-memory cache。
当数据重复获取时,这还是很有用的。

Self-Contained Application

使用SparkAPI创建一个self-contained应用。

import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "README.md" // Should be some file on your system
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}

其中SparkSession.builder构造一个[[SparkSession]],使用设置application name,最后调用getOrCreate获取[[SparkSession]]实例。

也可以使用maven来进行包管理。

目录结构:

./pom.xml
./src
./src/main
./src/main/scala
./src/main/scala/didi
./src/main/scala/didi/map
./src/main/scala/didi/map/pointsys
./src/main/scala/didi/map/pointsys/App.scala
./src/test
./src/test/scala
./src/test/scala/didi
./src/test/scala/didi/map
./src/test/scala/didi/map/pointsys

App.scala内容:

package didi.map.pointsys

import org.apache.spark.sql.SparkSession
object MyFunctions {
  def func1(s: String): String = {
    s.concat("yankai")
  }
}
// spark-submit --class="didi.map.pointsys.SimpleApp" parking-user-profile-1.0-SNAPSHOT.jar
object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "README.md" // Should be some file on your system
    val ss = SparkSession.builder().appName("Simple Application").enableHiveSupport().getOrCreate()
    val logData = ss.read.textFile(logFile)
    //val pairs = logData.map(s => (s, 1))

    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    ss.stop()
  }
}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>didi.map.pointsys</groupId>
  <artifactId>parking-user-profile</artifactId>
  <version>1.0-SNAPSHOT</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
    <encoding>UTF-8</encoding>
    <scala.binary.version>2.11</scala.binary.version>
    <scala.major.version>2.11</scala.major.version>
    <deploy.scala.version>2.11</deploy.scala.version>
    <scala.version>2.11.8</scala.version>
    <scala.compat.version>2.11</scala.compat.version>
    <spark.version>2.2.0</spark.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.3.0</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.0</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.7.2</version>
      <scope>provided</scope>
    </dependency>

  </dependencies>
  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <!-- bind the maven-assembly-plugin to the package phase this will create
          a jar file without the storm dependencies suitable for deployment to a cluster. -->

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.2.0</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
        </configuration>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.2-beta-5</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.5.1</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
      <!-- disable surefire -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.7</version>
        <configuration>
          <skipTests>true</skipTests>
        </configuration>
      </plugin>
      <!-- enable scalatest -->
      <!-- <plugin>
        <groupId>org.scalatest</groupId>
        <artifactId>scalatest-maven-plugin</artifactId>
        <version>1.0</version>
        <configuration>
          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
          <junitxml>.</junitxml>
          <filereports>WDF TestSuite.txt</filereports>
        </configuration>
        <executions>
          <execution>
            <id>test</id>
            <goals>
              <goal>test</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      -->
    </plugins>

    <resources>
      <resource>
        <directory>src/main/resources</directory>
      </resource>
    </resources>

  </build>
</project>

在target目录下会生成jar文件,使用spark-submit进行提交。

spark-submit --class="didi.map.pointsys.SimpleApp" parking-user-profile-1.0-SNAPSHOT.jar