大家好,又见面了,我是你们的朋友全栈君。
转: https://blog.csdn.net/weixin_38750084/article/details/82769600
这篇文章非常棒, 用代码实际演示了如何创建RDD; 本文主要转载了 java创建RDD的两种方式,
【方式1】
下面开始初始化spark
spark程序需要做的第一件事情,就是创建一个SparkContext对象,它将告诉spark如何访问一个集群,而要创建一个SparkContext对象,你首先要创建一个SparkConf对象,该对象访问了你的应用程序的信息
比如下面的代码是运行在spark模式下
public class sparkTestCon {
public static void main(String[] args) {
SparkConf conf=new SparkConf();
conf.set("spark.testing.memory", "2147480000"); //因为jvm无法获得足够的资源
JavaSparkContext sc = new JavaSparkContext("spark://192.168.52.140:7077", "First Spark App",conf);
System.out.println(sc);
}
}
下面是运行在本机,把上面的第6行代码改为如下
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
RDD的创建有两种方式
1.引用外部文件系统的数据集(HDFS)
2.并行化一个已经存在于驱动程序中的集合(并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD)
第一种方式创建
下面通过代码来理解RDD和怎么操作RDD
package com.tg.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/**
* 引用外部文件系统的数据集(HDFS)创建RDD
* 匿名内部类定义函数传给spark
* @author 汤高
*
*/
public class RDDOps {
//完成对所有行的长度求和
public static void main(String[] args) {
SparkConf conf=new SparkConf();
conf.set("spark.testing.memory", "2147480000"); //因为jvm无法获得足够的资源
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
System.out.println(sc);
//通过hdfs上的文件定义一个RDD 这个数据暂时还没有加载到内存,也没有在上面执行动作,lines仅仅指向这个文件
JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
//定义lineLengths作为Map转换的结果 由于惰性,不会立即计算lineLengths
//第一个参数为传入的内容,第二个参数为函数操作完后返回的结果类型
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
public Integer call(String s) {
System.out.println("每行长度"+s.length());
return s.length(); }
});
//运行reduce 这是一个动作action 这时候,spark才将计算拆分成不同的task,
//并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
System.out.println(totalLength);
//为了以后复用 持久化到内存...
lineLengths.persist(StorageLevel.MEMORY_ONLY());
}
}
如果觉得刚刚那种写法难以理解,可以看看第二种写法
package com.tg.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/**
* 引用外部文件系统的数据集(HDFS)创建RDD
* 外部类定义函数传给spark
* @author 汤高
*
*/
public class RDDOps2 {
// 完成对所有行的长度求和
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
System.out.println(sc);
//通过hdfs上的文件定义一个RDD 这个数据暂时还没有加载到内存,也没有在上面执行动作,lines仅仅指向这个文件
JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
//定义lineLengths作为Map转换的结果 由于惰性,不会立即计算lineLengths
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
//运行reduce 这是一个动作action 这时候,spark才将计算拆分成不同的task,
//并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序
int totalLength = lineLengths.reduce(new Sum());
System.out.println("总长度"+totalLength);
// 为了以后复用 持久化到内存...
lineLengths.persist(StorageLevel.MEMORY_ONLY());
}
//定义map函数
//第一个参数为传入的内容,第二个参数为函数操作完后返回的结果类型
static class GetLength implements Function<String, Integer> {
public Integer call(String s) {
return s.length();
}
}
//定义reduce函数
//第一个参数为内容,第三个参数为函数操作完后返回的结果类型
static class Sum implements Function2<Integer, Integer, Integer> {
public Integer call(Integer a, Integer b) {
return a + b;
}
}
}
【方式2】 (java编程推荐)
并行化一个已经存在于驱动程序中的集合(并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD)
package com.tg.spark;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
import com.tg.spark.RDDOps2.GetLength;
import com.tg.spark.RDDOps2.Sum;
/**
* 并行化一个已经存在于驱动程序中的集合创建RDD
* @author 汤高
*
*/
public class RDDOps3 {
// 完成对所有数求和
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
System.out.println(sc);
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
//并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD
JavaRDD<Integer> distData = sc.parallelize(data);
JavaRDD<Integer> lineLengths = distData.map(new GetLength());
// 运行reduce 这是一个动作action 这时候,spark才将计算拆分成不同的task,
// 并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序
int totalLength = lineLengths.reduce(new Sum());
System.out.println("总和" + totalLength);
// 为了以后复用 持久化到内存...
lineLengths.persist(StorageLevel.MEMORY_ONLY());
}
// 定义map函数
static class GetLength implements Function<Integer, Integer> {
@Override
public Integer call(Integer a) throws Exception {
return a;
}
}
// 定义reduce函数
static class Sum implements Function2<Integer, Integer, Integer> {
public Integer call(Integer a, Integer b) {
return a + b;
}
}
}
注意:上面的写法是基于jdk1.7或者更低版本
基于jdk1.8有更简单的写法
下面是官方文档的说明
所以如果要完成上面第一种创建方式,在jdk1.8中可以简单的这么写
JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
主要不同就是在jdk1.7中我们要自己写一个函数传到map或者reduce方法中,而在jdk1.8中可以直接在map或者reduce方法中写lambda表达式
参考原文:https://blog.csdn.net/tanggao1314/article/details/51570452/
扩展:
SparkContext的parallelize的参数
通过调用SparkContext的parallelize方法,在一个已经存在的Scala集合上创建的(一个Seq对象)。集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集。
var data = [1, 2, 3, 4, 5]
var distData = sc.parallelize(data)
在一个Spark程序的开始部分,有好多是用sparkContext的parallelize制作RDD的,是ParallelCollectionRDD,创建一个并行集合。
例如sc.parallelize(0 until numMappers, numMappers)
创建并行集合的一个重要参数,是slices的数目(例子中是numMappers),它指定了将数据集切分为几份。
在集群模式中,Spark将会在一份slice上起一个Task。典型的,你可以在集群中的每个cpu上,起2-4个Slice (也就是每个cpu分配2-4个Task)。
一般来说,Spark会尝试根据集群的状况,来自动设定slices的数目。当让,也可以手动的设置它,通过parallelize方法的第二个参数。(例如:sc.parallelize(data, 10)).
参考:https://blog.csdn.net/caoli98033/article/details/41777065
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/136308.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...