- 虚拟机数量:1
- 系统版本:Centos 7.5
- Spark版本:Apache Spark 2.1.1
- Eclipse版本:Neno.3 (4.6.3)
- 收集一组自选日志,构造mydata.log数据集,上传到实验环境中,重新完成对自选日志分析。
- 获取自选数据集
- 自选数据集计算的需求分析
- 编程设计实现
- 运行程序
字段 |
数据类型 |
说明信息 |
dispatching_base_number |
String |
区域编号 |
date |
String |
日期 |
active_vehicles |
int |
使用的机动车数量 |
trips |
int |
旅游次数 |
按照 Serrializable 接口实现自定义排序的 Key;
将要进行二次排序的文件加载进来生成 key-value 类型的 RDD
使用 sortByKey 基于自定义的 Key 进行二次排序
去除掉排序的 key,只保留排序结果
4.3.1创建工程打开Eclipse IDE点击 File -> New ->Other搜索 maven -> 点击 Maven Project -> Next->Next选中maven-archetype-quickstart ->Next输入Group Id 为 org.zkpk;Artifact Id 为 spark ;Package为org.zkpk.spark;->Finish升级maven工程
升级完成:更改JRE,右键JRE System Library-> Properties选中jdk1.8.0_131-> OK点击spark项目->修改pom.xml-> 保存会自动下载jar包下载Jar包:最终 pom.xml 如下:
| <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>org.zkpk</groupId> <artifactId>spark</artifactId> <version>0.0.1</version> <packaging>jar</packaging>
<name>spark</name> <url>http://maven.apache.org</url>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties>
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.5.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.5.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.10</artifactId> <version>1.5.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.5.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.5.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.10</artifactId> <version>1.5.0</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.3.5</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpcore</artifactId> <version>4.3.1</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/java</sourceDirectory> <testSourceDirectory>src/main/test</testSourceDirectory> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.2.1</version> <executions> <execution> <goals> <goal>exec</goal> </goals> </execution> </executions> <configuration> <executable>java</executable> <includeProjectDependencies>true</includeProjectDependencies> <includePluginDependencies>false</includePluginDependencies> <classpathScope>compile</classpathScope> <mainClass>cn.com.syl.spark.App</mainClass> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> </plugins> </build> </project>
利用实验一的实验环境编写java程序,同样是SecondarySortKey.java和SecondarySortDemo.java两个java文件。其中SecondarySortKey.java中有两个类,一个是serDemo类,用于存储未排序的数据,一个是SecondarySortKey类,里面重写了一系列排序函数,用于按照自己的规则排序;SecondarySortDemo.java有一个类SecondarySortDemo,里面有主函数和创建Spark RDD,求和,sortByKey等函数,是主要的逻辑运行的地方。
| public class SecondarySortKey implements Ordered<SecondarySortKey>, Serializable { private static final long serialVersionUID = 3702442700882342403L; private String date; private int activeVehicles; private int trips;
public SecondarySortKey() { super(); }
public SecondarySortKey(String date, int activeVehicles, int trips) { super(); this.date = date; this.activeVehicles = activeVehicles; this.trips = trips; }
@Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } SecondarySortKey that = (SecondarySortKey) o; return activeVehicles == that.activeVehicles && trips == that.trips && Objects.equals(date, that.date); }
@Override public int hashCode() { return Objects.hash(date, activeVehicles, trips); }
@Override public String toString() { return "SortResult{" + "截止日期:" + date + ", 使用的机动车总数量:" + activeVehicles + ", 旅游总次数:" + trips + '}'; } }
| ...... public boolean $greater(SecondarySortKey other) { if (activeVehicles > other.activeVehicles) { return true; } else if (activeVehicles == other.activeVehicles && trips > other.trips) { return true; } else if (activeVehicles == other.activeVehicles && trips == other.trips && date.compareTo(other.date) > 0) { return true; } return false; }
public int compare(SecondarySortKey other) { if (activeVehicles - other.activeVehicles != 0) { return activeVehicles - other.activeVehicles; } else if (trips - other.trips != 0) { return trips - other.trips; } return 0; }
public int compareTo(SecondarySortKey other) { if (activeVehicles - other.activeVehicles != 0) { return activeVehicles - other.activeVehicles; } else if (trips - other.trips != 0) { return trips - other.trips; } return 0; }
| public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf().setAppName("SecondarySortDemo").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(conf); jsc.setLogLevel("WARN"); JavaRDD<String> textfile = jsc.textFile("/home/zkpk/experiment/mydata.log"); JavaPairRDD<String, serDemo> pairRdd = mapTfRDD2Pair(textfile); JavaPairRDD<String, serDemo> getlines = aggregateByDeviceID(pairRdd); JavaPairRDD<SecondarySortKey, String> telval = mapRDDKey2SortKey(getlines); JavaPairRDD<SecondarySortKey, String> sortedRdd = telval.sortByKey(false); List<Tuple2<SecondarySortKey, String>> getTop = sortedRdd.take(10); System.out.println("============ result ============"); for (Tuple2<SecondarySortKey, String> dt : getTop) { System.out.println("区域编号:" + dt._2 + ", " + dt._1); } jsc.close(); }
| private static JavaPairRDD<String, serDemo> mapTfRDD2Pair(JavaRDD<String> accessLogRDD) { return accessLogRDD.mapToPair(new PairFunction<String, String, serDemo>() { private static final long serialVersionUID = 1L;
public Tuple2<String, serDemo> call(String lines) throws Exception { String[] split = lines.split(","); String dispatchingBaseNumber = split[0]; String date = split[1]; int activeVehicles = Integer.valueOf(split[2]); int trips = Integer.valueOf(split[3]); serDemo dataInfo = new serDemo(date, activeVehicles, trips); return new Tuple2<String, serDemo>(dispatchingBaseNumber, dataInfo); } }); }
| private static JavaPairRDD<String, serDemo> aggregateByDeviceID(JavaPairRDD<String, serDemo> accessLogPairRDD) { return accessLogPairRDD.reduceByKey(new Function2<serDemo, serDemo, serDemo>() { private static final long serialVersionUID = 1L; public serDemo call(serDemo d1, serDemo d2) throws Exception { String date = d1.getDate().compareTo(d2.getDate()) > 0 ? d1.getDate() : d2.getDate(); int activeVehicles = d1.getActiveVehicles() + d2.getActiveVehicles(); int trips = d1.getTrips() + d2.getTrips(); serDemo accessLogInfo = new serDemo(); accessLogInfo.setDate(date); accessLogInfo.setActiveVehicles(activeVehicles); accessLogInfo.setTrips(trips); return accessLogInfo; } }); }
| private static JavaPairRDD<SecondarySortKey, String> mapRDDKey2SortKey( JavaPairRDD<String, serDemo> aggrAccessLogPairRDD) { return aggrAccessLogPairRDD.mapToPair( new PairFunction<Tuple2<String, serDemo>, SecondarySortKey, String>() { private static final long serialVersionUID = 1L; public Tuple2<SecondarySortKey, String> call(Tuple2<String, serDemo> tuple) throws Exception { String dispatchingBaseNumber = tuple._1; serDemo accessLogInfo = tuple._2; SecondarySortKey accessLogSortKey = new SecondarySortKey(accessLogInfo.getDate(), accessLogInfo.getActiveVehicles(), accessLogInfo.getTrips()); return new Tuple2<SecondarySortKey, String>(accessLogSortKey, dispatchingBaseNumber); } }); }
| package org.zkpk.spark;
import scala.math.Ordered;
import java.io.Serializable; import java.util.Objects;
class serDemo implements Serializable { private static final long serialVersionUID = 5749943279909593929L; private String date; private int activeVehicles; private int trips;
public serDemo() { super(); }
public serDemo(String date, int activeVehicles, int trips) { super(); this.date = date; this.activeVehicles = activeVehicles; this.trips = trips; }
public static long getSerialVersionUID() { return serialVersionUID; }
public String getDate() { return date; }
public void setDate(String date) { this.date = date; }
public int getActiveVehicles() { return activeVehicles; }
public void setActiveVehicles(int activeVehicles) { this.activeVehicles = activeVehicles; }
public int getTrips() { return trips; }
public void setTrips(int trips) { this.trips = trips; } }
public class SecondarySortKey implements Ordered<SecondarySortKey>, Serializable { private static final long serialVersionUID = 3702442700882342403L; private String date; private int activeVehicles; private int trips;
public SecondarySortKey() { super(); }
public SecondarySortKey(String date, int activeVehicles, int trips) { super(); this.date = date; this.activeVehicles = activeVehicles; this.trips = trips; }
public static long getSerialVersionUID() { return serialVersionUID; }
public String getDate() { return date; }
public void setDate(String date) { this.date = date; }
public int getActiveVehicles() { return activeVehicles; }
public void setActiveVehicles(int activeVehicles) { this.activeVehicles = activeVehicles; }
public int getTrips() { return trips; }
public void setTrips(int trips) { this.trips = trips; }
public boolean $greater(SecondarySortKey other) { if (activeVehicles > other.activeVehicles) { return true; } else if (activeVehicles == other.activeVehicles && trips > other.trips) { return true; } else if (activeVehicles == other.activeVehicles && trips == other.trips && date.compareTo(other.date) > 0) { return true; } return false; }
public boolean $greater$eq(SecondarySortKey other) { if ($greater(other)) { return true; } else if (activeVehicles == other.activeVehicles && trips == other.trips && date.compareTo(other.date) == 0) { return true; } return false; }
public boolean $less(SecondarySortKey other) { if (activeVehicles < other.activeVehicles) { return true; } else if (activeVehicles == other.activeVehicles && trips < other.trips) { return true; } else if (activeVehicles == other.activeVehicles && trips == other.trips && date.compareTo(other.date) < 0) { return true; } return false; }
public boolean $less$eq(SecondarySortKey other) { if ($less(other)) { return true; } else if (activeVehicles == other.activeVehicles && trips == other.trips && date.compareTo(other.date) == 0) { return true; } return false; }
public int compare(SecondarySortKey other) { if (activeVehicles - other.activeVehicles != 0) { return activeVehicles - other.activeVehicles; } else if (trips - other.trips != 0) { return trips - other.trips; } return 0; }
public int compareTo(SecondarySortKey other) { if (activeVehicles - other.activeVehicles != 0) { return activeVehicles - other.activeVehicles; } else if (trips - other.trips != 0) { return trips - other.trips; } return 0; }
@Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } SecondarySortKey that = (SecondarySortKey) o; return activeVehicles == that.activeVehicles && trips == that.trips && Objects.equals(date, that.date); }
@Override public int hashCode() { return Objects.hash(date, activeVehicles, trips); }
@Override public String toString() { return "SortResult{" + "截止日期:" + date + ", 使用的机动车总数量:" + activeVehicles + ", 旅游总次数:" + trips + '}'; } }
| package org.zkpk.spark;
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2;
import java.util.List;
public class SecondarySortDemo { public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf().setAppName("SecondarySortDemo").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(conf); jsc.setLogLevel("WARN"); JavaRDD<String> textfile = jsc.textFile("/home/zkpk/experiment/mydata.log"); JavaPairRDD<String, serDemo> pairRdd = mapTfRDD2Pair(textfile); JavaPairRDD<String, serDemo> getlines = aggregateByRegionID(pairRdd); JavaPairRDD<SecondarySortKey, String> telval = mapRDDKey2SortKey(getlines); JavaPairRDD<SecondarySortKey, String> sortedRdd = telval.sortByKey(false); List<Tuple2<SecondarySortKey, String>> getTop = sortedRdd.take(10); System.out.println("============ result ============"); for (Tuple2<SecondarySortKey, String> dt : getTop) { System.out.println("区域编号:" + dt._2 + ", " + dt._1); } jsc.close(); } private static JavaPairRDD<String, serDemo> mapTfRDD2Pair(JavaRDD<String> accessLogRDD) { return accessLogRDD.mapToPair(new PairFunction<String, String, serDemo>() { private static final long serialVersionUID = 1L;
public Tuple2<String, serDemo> call(String lines) throws Exception { String[] split = lines.split(","); String dispatchingBaseNumber = split[0]; String date = split[1]; int activeVehicles = Integer.valueOf(split[2]); int trips = Integer.valueOf(split[3]); serDemo dataInfo = new serDemo(date, activeVehicles, trips); return new Tuple2<String, serDemo>(dispatchingBaseNumber, dataInfo); } }); } private static JavaPairRDD<String, serDemo> aggregateByRegionID(JavaPairRDD<String, serDemo> accessLogPairRDD) { return accessLogPairRDD.reduceByKey(new Function2<serDemo, serDemo, serDemo>() { private static final long serialVersionUID = 1L;
public serDemo call(serDemo d1, serDemo d2) throws Exception { String date = d1.getDate().compareTo(d2.getDate()) > 0 ? d1.getDate() : d2.getDate(); int activeVehicles = d1.getActiveVehicles() + d2.getActiveVehicles(); int trips = d1.getTrips() + d2.getTrips(); serDemo accessLogInfo = new serDemo(); accessLogInfo.setDate(date); accessLogInfo.setActiveVehicles(activeVehicles); accessLogInfo.setTrips(trips); return accessLogInfo; } }); } private static JavaPairRDD<SecondarySortKey, String> mapRDDKey2SortKey( JavaPairRDD<String, serDemo> aggrAccessLogPairRDD) { return aggrAccessLogPairRDD.mapToPair( new PairFunction<Tuple2<String, serDemo>, SecondarySortKey, String>() { private static final long serialVersionUID = 1L;
public Tuple2<SecondarySortKey, String> call(Tuple2<String, serDemo> tuple) throws Exception { String dispatchingBaseNumber = tuple._1; serDemo accessLogInfo = tuple._2; SecondarySortKey accessLogSortKey = new SecondarySortKey(accessLogInfo.getDate(), accessLogInfo.getActiveVehicles(), accessLogInfo.getTrips()); return new Tuple2<SecondarySortKey, String>(accessLogSortKey, dispatchingBaseNumber); } }); } }
| import random import string import time
file = open("mydata.log", mode='a')
ch = string.ascii_uppercase dispatching_base_number_list = [] for j in range(20): dispatching_base_number_list.append(random.choice(ch) + str(random.randint(1, 9)))
for i in range(100): for dispatching_base_number in dispatching_base_number_list: file.write(dispatching_base_number + "," + time.strftime("%Y-%m-%d", time.gmtime( time.mktime( time.strptime("2020-01-02", "%Y-%m-%d")) + 86400 * i)) + "," + str(random.randint(100, 10000)) + "," + str(random.randint(1000, 100000)) + "\n") if i == 99 and dispatching_base_number == dispatching_base_number_list[len(dispatching_base_number_list) - 1]: file.write(dispatching_base_number + "," + time.strftime("%Y-%m-%d", time.gmtime( time.mktime( time.strptime("2020-01-02", "%Y-%m-%d")) + 86400 * i)) + "," + str(random.randint(100, 10000)) + "," + str(random.randint(1000, 100000))) file.close()