Java,Python,Scala三种语言开发并部署Spark的WordCount程序_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > Java,Python,Scala三种语言开发并部署Spark的WordCount程序

Java,Python,Scala三种语言开发并部署Spark的WordCount程序

 2018/3/20 15:47:25  溪头卧剥莲蓬  程序员俱乐部  我要评论(0)
  • 摘要:本文转载自:http://www.javaxxz.com/thread-359537-1-1.html#Java,Python,Scala三种语言开发并部署Spark的WordCount程序一、Java开发并部署Spark的wordcountJava实现WordCount程序:```javapackagecom.spark.wordcount;importjava.util.Arrays;importjava.util.Iterator;importjava.util.Map
  • 标签:程序 Java Python 开发

本文转载自:http://www.javaxxz.com/thread-359537-1-1.html

?

# Java,Python,Scala三种语言开发并部署Spark的WordCount程序

一、Java开发并部署Spark的wordcount

Java实现WordCount程序:

```java
package com.spark.wordcount;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;


public class WordCountApp {

? ? public static void main(String[] args) {
? ?? ???/**
? ?? ?? ?* 1、创建SparkConf对象,设置Spark应用程序的配置信息
? ?? ?? ?*/
? ?? ???SparkConf conf =??new SparkConf();
? ?? ???//设置spark应用程序的名称
? ?? ???conf.setAppName(WordCountApp.class.getSimpleName());
? ?? ???conf.setMaster("local");
? ?? ???if(args.length>2) {
? ?? ???? ? ? ? conf.setMaster(args[0]);
? ?? ???}
? ?? ???/**
? ?? ?? ?* 2、创建sparkContext对象--Java开发使用JavaSparkContext,scala开发使用SparkContext
? ?? ?? ?*? ? 在saprk中SparkContext负责连接spark集群,创建RDD、累计量、广播量等
? ?? ?? ?*? ? Master参数是为了创建TaskSchedule(较低级的调度器,高层次的调度器为DAGSchedule),如下:
? ?? ?? ?*? ? 如果setMaster("local")则创建LocalSchedule;
? ?? ?? ?*? ? 如果setMaster("spark")则创建SparkDeploySchedulerBackend。在SparkDeploySchedulerBackend的start函数,会启动一个Client对象,连接到Spark集群。
? ?? ?? ?*/
? ?? ???JavaSparkContext sc = new JavaSparkContext(conf);

? ?? ???/**
? ?? ?? ?* 3、sc中提供了textFile方法是SparkContext中定义的,如下:
? ?? ?? ?*? ?? ?def textFile(path: String): JavaRDD[String] = sc.textFile(path)
? ?? ?? ?*? ? 用来读取HDFS上的文本文件、集群中节点的本地文本文件或任何支持Hadoop的文件系统上的文本文件,它的返回值是JavaRDD[String],是文本文件每一行
? ?? ?? ?*/
? ?? ???
? ?? ???String filePath = "\\new_workspace\\SparkTest\\src\\com\\spark\\wordcount\\wordCount.txt";
? ?? ???if(args.length>1) {
? ?? ???? ? ? ? filePath = args[1];
? ?? ???}
? ?? ???
? ?? ???JavaRDD lines = sc.textFile(filePath);
? ?? ???System.out.println(conf);
? ?? ???
? ?? ???/**
? ?? ?? ?* 4、将行文本内容拆分为多个单词
? ?? ?? ?* lines调用flatMap这个transformation算子(参数类型是FlatMapFunction接口实现类)返回每一行的每个单词
? ?? ?? ?*/
? ?? ???JavaRDD words = lines.flatMap(new FlatMapFunction(){
? ?? ?? ?? ?private static final long serialVersionUID = -3243665984299496473L;
? ?? ?? ?? ?@Override
? ?? ?? ?? ?public Iterator call(String line) throws Exception {
? ?? ?? ?? ?? ? return Arrays.asList(line.split(" ")).iterator();
? ?? ?? ?? ?}
? ?? ?? ?? ?
? ?? ???});
? ?? ???
? ?? ???/**
? ?? ?? ?* 5、将每个单词的初始数量都标记为1个
? ?? ?? ?* words调用mapToPair这个transformation算子(参数类型是PairFunction接口实现类,
? ?? ?? ?* PairFunction的三个参数是),返回一个新的RDD,即JavaPairRDD
? ?? ?? ?*/
? ?? ???JavaPairRDD pairs = words.mapToPair(new PairFunction() {
? ?? ?? ?? ?private static final long serialVersionUID = -7879847028195817507L;
? ?? ?? ?? ?@Override
? ?? ?? ?? ?public Tuple2 call(String word) throws Exception {
? ?? ?? ?? ?? ? return new Tuple2(word, 1);
? ?? ?? ?? ?}
? ?? ???});

? ?? ???/**
? ?? ?? ?* 6、计算每个相同单词出现的次数
? ?? ?? ?* pairs调用reduceByKey这个transformation算子(参数是Function2接口实现类)
? ?? ?? ?* 对每个key的value进行reduce操作,返回一个JavaPairRDD,这个JavaPairRDD中的每一个Tuple的key是单词、value则是相同单词次数的和
? ?? ?? ?*/
? ?? ???JavaPairRDD wordCount = pairs.reduceByKey(new Function2() {
? ?? ?? ?? ?private static final long serialVersionUID = -4171349401750495688L;
? ?? ?? ?? ?@Override
? ?? ?? ?? ?public Integer call(Integer v1, Integer v2) throws Exception {
? ?? ?? ?? ?? ? return v1+v2;
? ?? ?? ?? ?}
? ?? ???});

? ?? ???/**
? ?? ?? ?* 7、使用foreach这个action算子提交Spark应用程序
? ?? ?? ?* 在Spark中,每个应用程序都需要transformation算子计算,最终由action算子触发作业提交
? ?? ?? ?*/
? ?? ???wordCount.foreach(new VoidFunction>() {
? ?? ?? ?? ?private static final long serialVersionUID = -5926812153234798612L;
? ?? ?? ?? ?@Override
? ?? ?? ?? ?public void call(Tuple2 wordCount) throws Exception {
? ?? ?? ?? ?? ? System.out.println(wordCount._1+":"+wordCount._2);
? ?? ?? ?? ?}
? ?? ???});

? ?? ???/**
? ?? ?? ?* 8、将计算结果文件输出到文件系统
? ?? ?? ?*??HDFS:使用新版API(org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;)
? ?? ?? ?*??wordCount.saveAsNewAPIHadoopFile("hdfs://ns1/spark/wordcount", Text.class, IntWritable.class, TextOutputFormat.class, new Configuration());
? ?? ?? ?*? ?? ?? ?? ? 使用旧版API(org.apache.hadoop.mapred.JobConf;org.apache.hadoop.mapred.OutputFormat;)
? ?? ?? ?*? ?? ?? ?? ?? ???wordCount.saveAsHadoopFile("hdfs://ns1/spark/wordcount", Text.class, IntWritable.class, OutputFormat.class, new JobConf(new Configuration()));
? ?? ?? ?*? ?? ?? ?? ? 使用默认TextOutputFile写入到HDFS(注意写入HDFS权限,如无权限则执行:hdfs dfs -chmod -R 777 /spark)
? ?? ?? ?*? ?? ?? ?? ?? ???wordCount.saveAsTextFile("hdfs://soy1:9000/spark/wordCount");
? ?? ?? ?*/
? ?? ???Map map = wordCount.collectAsMap();
? ?? ???for(String key : map.keySet()) {
? ?? ???? ? ? ? System.out.println(key+":"+map.get(key));
? ?? ???}

? ?? ???/**
? ?? ?? ?* 9、关闭SparkContext容器,结束本次作业
? ?? ?? ?*/
? ?? ???sc.close();

? ? }
}
```

运算结果:

```
JSDLF:1
HELLOWORLD:22
SJF:1
LDSDJEWUROWJ:1
FDSLKFJS:1
FDSK:1
COUNT:2
:1
```

部署:

```shell
spark-submit wordcount.jar local file:/data0/wordcount/wordcount.txt
spark-submit wordcount.jar spark hdfs:/data/logs/wordcount/wordcount.txt
```

二、使用Python开发Spark的wordcount

Python实现wordcount程序

```python
from operator import add
from pyspark import SparkContext

INPUT_FILE = "hdfs://dmp/data/logs/wordcount/wordcount.txt"
MASTER = "spark://n1:7077"

sc = SparkContext(MASTER,"WordCountApp")

text_file = sc.textFile(INPUT_FILE)

counts = text_file.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1)).reduceByKey(lambda a,b:a+b);

results = counts.collectAsMap();

for key in results.iterkeys():
? ? print key + ":" + str(results[key])
```

运行结果:

```
COUNT:2
:1
FDSLKFJS:1
FDSK:1
JSDLF:1
HELLOWORLD:22
LDSDJEWUROWJ:1
SJF:1
```

部署:

```shell
spark-submit wordcount.py
```

三、使用scala开始Spark的wordcount

scala实现wordcount程序

```scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object WordCountApp {
??
??def main(args: Array[String]) {
? ? val conf = new SparkConf();
? ? conf.setMaster("spark://n1:7077");
? ? conf.setAppName("WordCountApp");? ??
? ? val sc = new SparkContext(conf);
? ? val lines = sc.textFile("hdfs://dmp/data/logs/wordcount/wordcount.txt");
? ? lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect.foreach(println);
? ? sc.stop();
??}
??
}
```

运行结果:

```scala
(FDSLKFJS,1)
(SJF,1)
(,1)
(JSDLF,1)
(COUNT,2)
(FDSK,1)
(HELLOWORLD,22)
(LDSDJEWUROWJ,1)
```

部署:

```shell
spark-submit --class com.spark.wordcount.WordCountApp wordcount.jar
```

发表评论
用户名: 匿名