Spark SQL, DataFrames and Datasets Guide
##概述:
Spark SQL时候spark中处理结构化数据的模块。
Spark SQL可以从已经安装好的hive中读取数据,也可以通过命令行 or JDBC/ODBC 读取数据。
Datasets and DataFrames:
Dataset 是数据的分布式集合(A Dataset is a distributed collection of data)。Dataset可以从JVM对象中构建,也可以使用转化进行操作(transformations 如map,flatMap,filter 等等)。
DataFrame 是将Dataset组织为命名列(named columns),在python/R 中等同于关系型数据库中的表(table)。
DataFrame Operations:
df.printSchema()
df.select(“name”).show()
df.select(df[‘name’], df[‘age’] + 1).show()
df.filter(df[‘age’] > 21).show()
df.groupBy(“age”).count().show()
Running SQL Queries Programmatically
在SparkSession中的sql函数会执行sql查询语句,并返回 DataFrame的结果。
sqlDF = spark.sql(“SELECT * FROM people”)
sqlDF.show()
Programmatically Specifying the Schema
官方引导的方式个人感觉有点繁琐。 http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema
可以使用to.DF()的方式来做。
org.apache.spark.sql.types._1 2 3 4 5 6 7 8 9 10 11 12
| from pyspark.sql import Row
lines = sc.textFile("file:///usr/local/Cellar/spark/2.0.1/examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) peoples = parts.map(lambda p: Row(name=p[0], age=p[1])) peoples.toDF()
// Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people")
// SQL can be run over a temporary view created using DataFrames val results = spark.sql("SELECT name FROM people")
|
Data Sources
Spark SQL 通过DataFrame接口 支持不同类型的数据源。
将DataFrame注册为一个临时视图(temporary view),允许你可以通过SQL查询数据。
这部分描述的是加载和保存数据的通用方法,并会介绍built-in data sources的特殊选项。
Generic Load/Save Functions
1 2
| df = spark.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
|
手工指定特殊参数
1 2
| df = spark.read.load("examples/src/main/resources/people.json", format="json") df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
|
例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| #-*- coding:utf-8 -*-
import sys reload(sys) sys.setdefaultencoding('utf8') import json import subprocess import time import pandas as pd from pyspark.sql import Row
import math from datetime import datetime, timedelta from pyspark import SparkContext, SparkConf from pyspark import HiveContext import collections
def hiveSql(): sql = ''' select param['datatype'] as datatype from xxx between aaa and bbb ''' return sql
def parse_poi(row): # 第二列取值: # 1 参数错误并且经纬度为0, # 2 参数错误并且经纬度不是0 # 3 query为空 # 4 其他异常情况 parse_result = "empty\tempty\tempty" json_str = row.json_str if type(json_str) != str and type(json_str) != unicode: return parse_result try: res = json.loads(json_str) except: return parse_result
if row.searchname != '': parse_result = '%s\t%s\t%s'%(row.nnn, 0, row.oooo) return parse_result
def parse_mmm(sc): hc = HiveContext(sc)
all_sug_poi = sc.parallelize([]) sql = hiveSql() print sql df = hc.sql(sql) mmmm = df.rdd.map(parse_poi).map(lambda x:x.split("\t"))
ooooo = mmmm.map(lambda p: Row(searchname=p[0], stattype=p[1], datatype=p[2]))
dudtf = ooooo.toDF() dudtf.printSchema() dudtf.groupBy('stattype').count().show() dudtf.groupBy('stattype', 'datatype').count().show()
if __name__ == "__main__": conf = SparkConf() conf.setAppName('mmmmxxxx') sc = SparkContext(conf = conf) parse_mmm(sc)
|
在这个例子中,
- 首先查询hive,要使用HiveContext去查询。
- 再次,由于数据存在一定的污染,因此在进行json.loads之前必须对row.json_str进行类型断言;并且当json.loads抛出异常时,进行捕获。
- 在执行mmmm.map时使用map,而不是flatMap。
##参考文章: