Skip to content

Commit d226bf2

Browse files
author
James Lee
committed
add StackOverFlowSurvey
1 parent 96011cb commit d226bf2

File tree

3 files changed

+123
-0
lines changed

3 files changed

+123
-0
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@ repositories {
2020

2121
dependencies {
2222
compile group: 'org.apache.spark', name: 'spark-core_2.10', version: '2.0.0'
23+
compile group: 'org.apache.spark', name: 'spark-sql_2.10', version: '2.1.0'
2324
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.sparkTutorial.advanced.broadcast;
2+
3+
import org.apache.spark.SparkConf;
4+
import org.apache.spark.api.java.JavaRDD;
5+
import org.apache.spark.api.java.JavaSparkContext;
6+
7+
import java.io.File;
8+
import java.io.FileNotFoundException;
9+
import java.util.*;
10+
11+
public class UkMarketSpacesWithoutBroadcaset {
12+
13+
public static void main(String[] args) throws Exception {
14+
15+
SparkConf conf = new SparkConf().setAppName("UkMarketSpaces").setMaster("local[1]");
16+
17+
JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
18+
19+
final Map<String, String> postCodeMap = loadPostCodeMap();
20+
21+
JavaRDD<String> marketsRdd = javaSparkContext.textFile("in/uk-market-spaces-identifiable-data.csv");
22+
23+
JavaRDD<String> regions = marketsRdd
24+
.filter(line -> !line.split(",", -1)[0].equals("Timestamp"))
25+
.map(line -> {
26+
List<String> postCodePrefixes = getPostPrefixes(line);
27+
for (String postCodePrefix: postCodePrefixes) {
28+
if (postCodeMap.containsKey(postCodePrefix)) {
29+
return postCodeMap.get(postCodePrefix);
30+
}
31+
}
32+
return "Unknown";
33+
});
34+
for (Map.Entry<String, Long> regionCounts : regions.countByValue().entrySet()) {
35+
System.out.println(regionCounts.getKey() + " : " + regionCounts.getValue());
36+
}
37+
}
38+
39+
private static List<String> getPostPrefixes(String line) {
40+
String[] splits = line.split(",", -1);
41+
String postcode = splits[4];
42+
String cleanedPostCode = postcode.replaceAll("\\s+", "");
43+
ArrayList<String> prefixes = new ArrayList<>();
44+
for (int i = 1; i <= cleanedPostCode.length(); i ++) {
45+
prefixes.add(cleanedPostCode.substring(0, i));
46+
}
47+
return prefixes;
48+
}
49+
50+
private static Map<String, String> loadPostCodeMap() throws FileNotFoundException {
51+
Scanner postCode = new Scanner(new File("in/uk-postcode.csv"));
52+
Map<String, String> postCodeMap = new HashMap<>();
53+
while (postCode.hasNextLine()) {
54+
String line = postCode.nextLine();
55+
String[] splits = line.split(",", -1);
56+
postCodeMap.put(splits[0], splits[7]);
57+
}
58+
return postCodeMap;
59+
}
60+
61+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.sparkTutorial.sparkSql;
2+
3+
import org.apache.log4j.Level;
4+
import org.apache.log4j.Logger;
5+
import org.apache.spark.sql.Column;
6+
import org.apache.spark.sql.Dataset;
7+
import org.apache.spark.sql.Row;
8+
import org.apache.spark.sql.SparkSession;
9+
10+
import static org.apache.spark.sql.functions.avg;
11+
import static org.apache.spark.sql.functions.max;
12+
13+
public class StackOverFlowSurvey {
14+
15+
private static final String AGE_MIDPOINT = "age_midpoint";
16+
private static final String SALARY_MIDPOINT = "salary_midpoint";
17+
18+
public static void main(String[] args) throws Exception {
19+
20+
Logger.getLogger("org").setLevel(Level.ERROR);
21+
SparkSession session = SparkSession.builder().appName("StackOverFlowSurvey").master("local[1]").getOrCreate();
22+
23+
Dataset<Row> responses = session.read().option("header","true").csv("in/2016-stack-overflow-survey-responses.csv");
24+
25+
System.out.println("=== Print out schema ===");
26+
responses.printSchema();
27+
28+
System.out.println("=== Creates a temporary view called response ===");
29+
responses.createOrReplaceTempView("response");
30+
31+
System.out.println("=== Print 20 records of responses table ===");
32+
responses.show(20);
33+
34+
System.out.println("=== Print the so_region and self_identification columns of gender table ===");
35+
responses.select(new Column("so_region"), new Column("self_identification")).show();
36+
37+
System.out.println("=== Print records where the response is from Afghanistan ===");
38+
responses.filter(new Column("country").equalTo("Afghanistan")).show();
39+
40+
System.out.println("=== Print the count of occupations ===");
41+
responses.groupBy(new Column("occupation")).count().show();
42+
43+
44+
System.out.println("=== Cast the salary mid point and age mid point to integer ===");
45+
Dataset<Row> castedResponse = responses.withColumn(SALARY_MIDPOINT, new Column(SALARY_MIDPOINT).cast("integer"))
46+
.withColumn(AGE_MIDPOINT, new Column(AGE_MIDPOINT).cast("integer"));
47+
48+
System.out.println("=== Print out casted schema ===");
49+
castedResponse.printSchema();
50+
51+
System.out.println("=== Print records with average mid age less than 20 ===");
52+
castedResponse.filter(new Column(AGE_MIDPOINT).$less(20)).show();
53+
54+
System.out.println("=== Print the result with salary middle point in descending order ===");
55+
castedResponse.orderBy(new Column(SALARY_MIDPOINT ).desc()).show();
56+
57+
System.out.println("=== Group by country and aggregate by average salary middle point and max age middle point ===");
58+
castedResponse.groupBy("country").agg(avg(SALARY_MIDPOINT), max(AGE_MIDPOINT)).show();
59+
60+
}
61+
}

0 commit comments

Comments
 (0)