Spark上手示例1:RDD操作
In [1]:
Copied!
# 引入pyspark,并创建spark上下文
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()
# 引入pyspark,并创建spark上下文
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()
1. 创建RDD的第一种方式,读外部数据,比如本地磁盘文件¶
In [2]:
Copied!
rdd = sc.textFile('./dataset/Goodbye_Object_Oriented_Programming.txt')
rdd = sc.textFile('./dataset/Goodbye_Object_Oriented_Programming.txt')
In [3]:
Copied!
# 查看rdd类型
type(rdd)
# 查看rdd类型
type(rdd)
Out[3]:
pyspark.rdd.RDD
1.1 RDD之转换(Transformation)¶
In [4]:
Copied!
%%time
## map是转换操作的一种,这时候只是形成DAG
rdd = rdd.map(lambda x: len(x))
%%time
## map是转换操作的一种,这时候只是形成DAG
rdd = rdd.map(lambda x: len(x))
CPU times: user 0 ns, sys: 0 ns, total: 0 ns Wall time: 31 µs
1.2 RDD之行动(Action)¶
In [5]:
Copied!
%%time
## reduce是行动操作的一种, 这个时候才真正的计算
charCount = rdd.reduce(lambda x, y: x+y)
print(charCount)
%%time
## reduce是行动操作的一种, 这个时候才真正的计算
charCount = rdd.reduce(lambda x, y: x+y)
print(charCount)
13187 CPU times: user 12 ms, sys: 0 ns, total: 12 ms Wall time: 1.58 s
In [6]:
Copied!
! wc ./dataset/Goodbye_Object_Oriented_Programming.txt
! wc ./dataset/Goodbye_Object_Oriented_Programming.txt
328 2260 13687 ./dataset/Goodbye_Object_Oriented_Programming.txt
1.3 示例:统计单词出现的次数¶
In [7]:
Copied!
wordRdd = sc.textFile('./dataset/Goodbye_Object_Oriented_Programming.txt')
# take操作就是一种Action, 返回前n数据
wordRdd.take(2)
wordRdd = sc.textFile('./dataset/Goodbye_Object_Oriented_Programming.txt')
# take操作就是一种Action, 返回前n数据
wordRdd.take(2)
Out[7]:
['I’ve been programming in Object Oriented languages for decades. The first OO language I used was C++ and then Smalltalk and finally .NET and Java.', '']
In [8]:
Copied!
# 将每一行文本打散
wordRdd = wordRdd.map(lambda line: line.split(' '))
# 将每一行文本打散
wordRdd = wordRdd.map(lambda line: line.split(' '))
In [9]:
Copied!
wordRdd.take(2)
wordRdd.take(2)
Out[9]:
[['I’ve', 'been', 'programming', 'in', 'Object', 'Oriented', 'languages', 'for', 'decades.', 'The', 'first', 'OO', 'language', 'I', 'used', 'was', 'C++', 'and', 'then', 'Smalltalk', 'and', 'finally', '.NET', 'and', 'Java.'], ['']]
In [10]:
Copied!
# 扁平化处理
wordRdd = wordRdd.flatMap(lambda x: x)
# 查看有多少个单词
wordRdd.count()
# 扁平化处理
wordRdd = wordRdd.flatMap(lambda x: x)
# 查看有多少个单词
wordRdd.count()
Out[10]:
2493
In [11]:
Copied!
# 查看前两条数据
wordRdd.take(2)
# 查看前两条数据
wordRdd.take(2)
Out[11]:
['I’ve', 'been']
In [12]:
Copied!
# 过滤掉空格数据
wordRdd = wordRdd.filter(lambda x: x != '')
# 查看有多少个单词
wordRdd.count()
# 过滤掉空格数据
wordRdd = wordRdd.filter(lambda x: x != '')
# 查看有多少个单词
wordRdd.count()
Out[12]:
2260
In [13]:
Copied!
# 转换成key-value形式rdd 即 (key, value)
wordRdd = wordRdd.map(lambda word: (word, 1))
wordRdd.take(2)
# 转换成key-value形式rdd 即 (key, value)
wordRdd = wordRdd.map(lambda word: (word, 1))
wordRdd.take(2)
Out[13]:
[('I’ve', 1), ('been', 1)]
In [14]:
Copied!
wordRdd = wordRdd.reduceByKey(lambda x, y: x+y)
# 查看一下
wordRdd.take(10)
# 查看全部
# wordRdd.collect()
wordRdd = wordRdd.reduceByKey(lambda x, y: x+y)
# 查看一下
wordRdd.take(10)
# 查看全部
# wordRdd.collect()
Out[14]:
[('face', 1), ('was', 18), ('Monkey', 2), ('how', 4), ('Just', 1), ('for', 11), ('Directories', 1), ('could', 4), ('gained', 1), ('AGAIN', 1)]
In [15]:
Copied!
# 使用pandas继续计算
import pandas as pd
# 使用pandas继续计算
import pandas as pd
In [16]:
Copied!
df = pd.DataFrame(wordRdd.collect())
# 设置栏位名字
df.columns = ['word', 'count']
# 查看前10条数据
df.head(10)
df = pd.DataFrame(wordRdd.collect())
# 设置栏位名字
df.columns = ['word', 'count']
# 查看前10条数据
df.head(10)
Out[16]:
word | count | |
---|---|---|
0 | face | 1 |
1 | was | 18 |
2 | Monkey | 2 |
3 | how | 4 |
4 | Just | 1 |
5 | for | 11 |
6 | Directories | 1 |
7 | could | 4 |
8 | gained | 1 |
9 | AGAIN | 1 |
In [17]:
Copied!
# 查看出现次数最多的十个单词
df =df.sort_values(by='count', ascending=False)
df.head(10)
# 查看出现次数最多的十个单词
df =df.sort_values(by='count', ascending=False)
df.head(10)
Out[17]:
word | count | |
---|---|---|
263 | the | 121 |
271 | to | 57 |
576 | of | 47 |
358 | and | 45 |
589 | a | 41 |
797 | is | 38 |
136 | in | 35 |
593 | I | 32 |
685 | that | 29 |
645 | The | 26 |
In [18]:
Copied!
# 停止spark上下文
sc.stop()
# 停止spark上下文
sc.stop()