spark 操作的几个步骤
1 数据关联 textFile 和 parallelize
2 转换操作(JavaRDD和JavaPairRDD他们可以通过mapToPair and flatMapToPair转换)
3 action操作,获取数据结果
一、wordcount的例子
//单词统计
public static void wordCount(JavaSparkContext ctx ){
String filePath = "e://log1.log";
JavaRDD<String> lines = ctx.textFile(filePath, 1);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) {
return Arrays.asList(SPACE.split(s));
}
});
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?, ?> tuple : output) {
System.out.println(tuple._1() + " : " + tuple._2());
}
}
二、 各种Transformations 和action测试准备数据
public static void testMap(JavaSparkContext ctx) {
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5,1,2,3);
JavaRDD<Integer> distData = ctx.parallelize(data);
List<Integer> data2 = Arrays.asList(1, 2, 3, 4, 5,6);
JavaRDD<Integer> distData2 = ctx.parallelize(data2);
List<String> data3 = Arrays.asList("wang zhan,xiao ming,li xin,wang qiang,e,f".split(","));
JavaRDD<String> data3RDD = ctx.parallelize(data3);
JavaRDD<Integer> lineLengths ;
JavaRDD<String> returnStr;
JavaPairRDD<String, Integer> returnStr2;
JavaPairRDD<String, Integer> returnStr3;
// lineLengths = map( distData);
// lineLengths = filter(distData );
// lineLengths = sample(distData );
// lineLengths = union(distData, distData2) ;
// lineLengths = intersection(distData, distData2) ;
// lineLengths = distinct(distData ) ;
// returnStr = flatMap(data3RDD); //数据扁平打散
// returnStr3 = mapToPair(returnStr);// 数据变成键值对的形式
// reduceByKey(returnStr3); //对key进行分组计算
// List<Integer> list = lineLengths.collect();
// WordCount.print(list);
// List<String> list2 = returnStr.collect() ;
// WordCount.print(list2);
// testPersist( data3RDD );
// List<Tuple2<String, Integer>> listTuple = returnStr2.collect();
// printTuple( listTuple );
// returnStr3 = reduceByKey(returnStr2 );
// List<Tuple2<String, Integer>> listTuple2 = returnStr3.collect();
// printTuple( listTuple2 );
//reduceByKey(returnStr2 );
//action
//reduce( returnStr2);
// count(returnStr2 );
}
三、groupByKey
//对key进行分组处理,但如果需要统计求和则最好不要这样处理
private static JavaPairRDD<String, Integer> groupByKey(JavaPairRDD<String, Integer> returnStr3) {
JavaPairRDD<String, Iterable<Integer>> rdd = returnStr3.groupByKey();
return null;
}
四、数据去重
//数据去重
private static JavaRDD<Integer> distinct(JavaRDD<Integer> distData) {
JavaRDD<Integer> rdd3 = distData.distinct( );
print( rdd3);
return null;
}
五、交集数据
//获取rdd数据的交集 数据
private static JavaRDD<Integer> intersection(JavaRDD<Integer> distData,
JavaRDD<Integer> distData2) {
JavaRDD<Integer> rdd3 = distData.intersection(distData2 );
print(rdd3 );
return null;
}
六数据持久化
/**
* 持久化数据
* @param data3RDD
*/
private static void testPersist(JavaRDD<String> data3RDD) {
System.out.println( "持久化数据到目录。。。");
data3RDD.persist(StorageLevel.MEMORY_ONLY());
// data3RDD.checkpoint();
// data3RDD.isCheckpointed() ;
}
七、count统计
//计算统计 元素两两传入到reduce中然后计算统计
private static void count(JavaPairRDD<String, Integer> returnStr2) {
System.out.println("元素总数:"+returnStr2.count() ); //获取元素总数
System.out.println("元素总数:"+returnStr2.first() );//获取第一个元素
System.out.println("元素总数:"+returnStr2.take(2) );//获取RDD的前2个元素
System.out.println("元素countByKey总数:"+returnStr2.countByKey( ) ); //根据key进行统计数量
//returnStr2.saveAsTextFile("E://test.txt") ;//数据保存到文件中
returnStr2.foreach(new VoidFunction<Tuple2<String,Integer>>() {
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println( t );
}
});
}
八、reduce操作
//计算统计 元素两两传入到reduce中然后计算统计
private static void reduce(JavaPairRDD<String, Integer> returnStr2) {
Tuple2<String, Integer> t = returnStr2.reduce(
new Function2<Tuple2<String,Integer>, Tuple2<String,Integer>, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> call( Tuple2<String, Integer> v1, Tuple2<String, Integer> v2) throws Exception {
System.out.println( v1 +" "+v2 );
return new Tuple2<String, Integer>(v1._1+v2._1,v1._2+v2._2() );
}
} );
System.out.println("reduce结果 :"+t._1 +" "+t._2);
}
九、reduceBykey使用
//将原来的RDD每一个行数据 变成一个数组,然后所有的数组数据存到一个总得RDD数组中
public static JavaPairRDD<String, Integer> reduceByKey(JavaPairRDD<String, Integer> rdd ) {
JavaPairRDD<String, Integer> counts = rdd.reduceByKey(
new Function2<Integer, Integer, Integer>() {//泛型分别是 :两个计算参数 ,最后是返回值
@Override
public Integer call(Integer i1, Integer i2) {//每次把key相同的数据,与上一次执行的结果,依次传进来计算
System.out.println(i1+" == "+i2);
return i1 + i2;
}
});
printPair(counts );
return counts;
}
十 flatMap使用
//将原来的RDD每一个行数据 变成一个数组,然后所有的数组数据存到一个总得RDD数组中
public static JavaRDD<String> flatMap(JavaRDD<String> rdd ) {
JavaRDD<String> lineLengths = rdd.flatMap( //返回值是输出的类型
new FlatMapFunction<String, String>() { //第一个参数是输入,第二个参数是输出
@Override
public Iterable<String> call(String str) throws Exception {
return Arrays.asList(SPACE.split(str));
}
}) ;
printStr(lineLengths );
return lineLengths;
}
十一、mapToPair使用
// 将普通的RDD转换为 map数据的RDD方便计算处理 , a a->1
public static JavaPairRDD<String, Integer> mapToPair(JavaRDD<String> rdd ){
JavaPairRDD<String, Integer> ones = rdd.mapToPair( new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
return new Tuple2<String, Integer>( t, 1);
}
}) ;
return ones ;
}
十二、 合并Rdd
// 合并rdd
public static JavaRDD<Integer> union(JavaRDD<Integer> rdd , JavaRDD<Integer> rdd2) {
JavaRDD<Integer> lineLengths = rdd.union(rdd2) ;
print(lineLengths);
return lineLengths;
}
十三、抽样
// Return a sampled subset of this RDD. 返回一个RDD子集抽样
public static JavaRDD<Integer> sample(JavaRDD<Integer> rdd) {
JavaRDD<Integer> lineLengths = rdd.sample(false , 0.4 ) ;
print(lineLengths);
return lineLengths;
}
十四、map使用
// map 源中的每一个元素都进行一个函数操作,生成一个新的RDD ,即每个元素进行一次转换
public static JavaRDD<Integer> map(JavaRDD<Integer> rdd) {
JavaRDD<Integer> lineLengths = rdd.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer v1) throws Exception {
return v1+1;
}
});
print( lineLengths);
return lineLengths;
}
十五、打印数据
/**
* 如果rdd中数据过多,则调用take获取一部分数据打印
* 1 直接rdd.foreach 打印rdd的数据,数据打印在各个executor中
* 2 调用rdd.collect.foreach 打印数据,数据打印在driver上
* @param lineLengths
*/
public static void print(JavaRDD<Integer> lineLengths ){
// lineLengths.foreach( new VoidFunction<Integer>() {
// @Override
// public void call(Integer t) throws Exception {
// }
// });
System.out.println("开始打印");
lineLengths.collect().forEach( new Consumer<Integer>(){
@Override
public void accept(Integer t) {
System.out.println( t);
}
});
System.out.println("结束打印");
}
public static void printStr(JavaRDD<String> lineLengths ){
System.out.println("开始打印");
lineLengths.collect().forEach( new Consumer<String>(){
@Override
public void accept(String t) {
System.out.println( t);
}
});
System.out.println("结束打印");
}
/**
* 打印tuble
* @param lineLengths
*/
public static void printPair( JavaPairRDD<String, Integer> rdd ){
System.out.println("开始打印");
rdd.collect().forEach( new Consumer<Tuple2<String,Integer>>(){
@Override
public void accept(Tuple2<String, Integer> t) {
System.out.println( t._1() +" "+t._2() );
}
});
System.out.println("结束打印");
}
public static void print(List list) {
if (list == null || list.size() == 0) {
return;
}
for (int i = 0; i < list.size(); i++) {
System.out.println(list.get(i));
}
}
/**
* 打印map的RDD
* @param listTuple
*/
public static void printTuple(List<Tuple2<String, Integer>> listTuple) {
if (listTuple == null || listTuple.size() == 0) {
return;
}
for (int i = 0; i < listTuple.size(); i++) {
System.out.println(listTuple.get(i));
}
}
十六、过滤
//对每一个元素进行过滤,然后返回 ,false 的数据会被过滤掉
public static JavaRDD<Integer> filter(JavaRDD<Integer> rdd) {
JavaRDD<Integer> lineLengths = rdd.filter(new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer v1) throws Exception {
if (v1 != 1) {
return true;
}
return false;
}
});
print( lineLengths);
return lineLengths;
}