Spark 分组 TOPN 排序

本贴最后更新于 3013 天前,其中的信息可能已经时移世改
/**
 * Created by zhangshuai on 2016/9/22.
 */
//        输入文件
//        Spark 100
//        Hadoop 65
//        Spark 99
//        Hadoop 61
//        Spark 195
//        Hadoop 60
//        Spark 98
//        Hadoop 69
//        Spark 91
//        Hadoop 64
//        Spark 89
//        Hadoop 98
//        Spark 88
//        Hadoop 99
//        Spark 68
//        Hadoop 60
//        Spark 79
//        Hadoop 97
//        Spark 69
//        Hadoop 96



//        结果输出
//        Group key :Spark
//        195
//        100
//        99
//        98
//        91
//        *********************
//        Group key :Hadoop
//        99
//        98
//        97
//        96
//        69
//        **********************

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;


public class TopNGroupJava {
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        SparkConf conf = new SparkConf().setAppName("TopNGroupJava")
                .setMaster("local");

        JavaSparkContext sc = new JavaSparkContext(conf); //其底层实际上就是Scala的SparkContext
        JavaRDD<String> lines = sc.textFile(
                "E://topn.txt");

        JavaPairRDD<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, Integer> call(String line)
                    throws Exception {
                // TODO Auto-generated method stub
                String[] splitedLine = line.split(" ");
                System.out.println(splitedLine[0]);

                return new Tuple2<String, Integer>(splitedLine[0],Integer.valueOf(splitedLine[1]));
            }
        });

        JavaPairRDD<String, Iterable<Integer>> groupedPairs = pairs.groupByKey();

        JavaPairRDD<String, Iterable<Integer>> top5 = groupedPairs.mapToPair(new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>() {
            /**
             *
             */
            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> groupedData) throws Exception {
                Integer[] top5 = new Integer[5]; //保存top5本身
                String groupedKey = groupedData._1(); //获取分组组名
                Iterator<Integer> groupedValue = groupedData._2().iterator(); //获取每组的内容集合

                while (groupedValue.hasNext()) { //查看下一个元素,如果有继续循环
                    Integer value = groupedValue.next(); //获取当前循环的元素本身内容

                    for (int i = 0; i < 5; i++) {
                        if (top5[i] == null) {
                            top5[i] = value;

                            break;
                        } else if (value > top5[i]) {
                            for (int j = 4; j > i; j--) {
                                top5[j] = top5[j - 1];
                            }

                            top5[i] = value;

                            break;
                        }
                    }
                }

                return new Tuple2<String, Iterable<Integer>>(groupedKey,
                        Arrays.asList(top5));
            }
        });

        top5.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
            @Override
            public void call(Tuple2<String, Iterable<Integer>> topped)
                    throws Exception {
                System.out.println("Group key :" + topped._1());

                Iterator<Integer> toppedValue = topped._2().iterator();

                while (toppedValue.hasNext()) {
                    Integer value = toppedValue.next();
                    System.out.println(value);
                }

                System.out.println("**********************");
            }
        });
    }
}
import org.apache.spark.{SparkContext, SparkConf}

/**
 * Created by zhangshuai on 2016/9/22.
 */
//输入文件
//Spark,100
//Hadoop,62
//Flink,77
//Kafka,91
//Hadoop,93
//Spark,78
//Hadoop,69
//Spark,98
//Hadoop,62
//Spark,99
//Hadoop,61
//Spark,70
//Hadoop,75
//Spark,88
//Hadoop,68
//Spark,90
//Hadoop,61



//结果输出
//Flink:
//77
//Hadoop:
//61
//61
//62
//62
//68
//Kafka:
//91
//Spark:
//70
//78
//88
//90
//98
object TopNGroupScala {
  def main(args: Array[String]) {
     val conf=new SparkConf().setAppName("TopNGroupScala").setMaster("local")

     val sc=new SparkContext(conf)
     sc.setLogLevel("WARN")
     val lines=sc.textFile("E://topn.txt",1)

     val pairs=lines.map{(line =>(line.split(",")(0),line.split(",")(1).toInt))}

     val grouped=pairs.groupByKey()

     val groupedTop5=grouped.map(grouped =>
      {
        (grouped._1,grouped._2.toList.sortWith(_<_).take(5))
      }
     )
     val groupedKeySorted=groupedTop5.sortByKey()

     groupedKeySorted.collect().foreach(pair =>
     {
       println(pair._1+":")
       pair._2.foreach{println}
     }
     )
    sc.stop()

  }
}

 

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 560 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Solidity

    Solidity 是一种智能合约高级语言,运行在 [以太坊] 虚拟机(EVM)之上。它的语法接近于 JavaScript,是一种面向对象的语言。

    3 引用 • 18 回帖 • 401 关注
  • MySQL

    MySQL 是一个关系型数据库管理系统,由瑞典 MySQL AB 公司开发,目前属于 Oracle 公司。MySQL 是最流行的关系型数据库管理系统之一。

    692 引用 • 535 回帖
  • 小说

    小说是以刻画人物形象为中心,通过完整的故事情节和环境描写来反映社会生活的文学体裁。

    28 引用 • 108 回帖
  • V2EX

    V2EX 是创意工作者们的社区。这里目前汇聚了超过 400,000 名主要来自互联网行业、游戏行业和媒体行业的创意工作者。V2EX 希望能够成为创意工作者们的生活和事业的一部分。

    17 引用 • 236 回帖 • 316 关注
  • 前端

    前端技术一般分为前端设计和前端开发,前端设计可以理解为网站的视觉设计,前端开发则是网站的前台代码实现,包括 HTML、CSS 以及 JavaScript 等。

    247 引用 • 1348 回帖 • 1 关注
  • SSL

    SSL(Secure Sockets Layer 安全套接层),及其继任者传输层安全(Transport Layer Security,TLS)是为网络通信提供安全及数据完整性的一种安全协议。TLS 与 SSL 在传输层对网络连接进行加密。

    70 引用 • 193 回帖 • 418 关注
  • jsDelivr

    jsDelivr 是一个开源的 CDN 服务,可为 npm 包、GitHub 仓库提供免费、快速并且可靠的全球 CDN 加速服务。

    5 引用 • 31 回帖 • 72 关注
  • WebSocket

    WebSocket 是 HTML5 中定义的一种新协议,它实现了浏览器与服务器之间的全双工通信(full-duplex)。

    48 引用 • 206 回帖 • 318 关注
  • Hexo

    Hexo 是一款快速、简洁且高效的博客框架,使用 Node.js 编写。

    21 引用 • 140 回帖 • 3 关注
  • Ant-Design

    Ant Design 是服务于企业级产品的设计体系,基于确定和自然的设计价值观上的模块化解决方案,让设计者和开发者专注于更好的用户体验。

    17 引用 • 23 回帖 • 4 关注
  • HHKB

    HHKB 是富士通的 Happy Hacking 系列电容键盘。电容键盘即无接点静电电容式键盘(Capacitive Keyboard)。

    5 引用 • 74 回帖 • 478 关注
  • OkHttp

    OkHttp 是一款 HTTP & HTTP/2 客户端库,专为 Android 和 Java 应用打造。

    16 引用 • 6 回帖 • 76 关注
  • SQLServer

    SQL Server 是由 [微软] 开发和推广的关系数据库管理系统(DBMS),它最初是由 微软、Sybase 和 Ashton-Tate 三家公司共同开发的,并于 1988 年推出了第一个 OS/2 版本。

    21 引用 • 31 回帖 • 5 关注
  • Unity

    Unity 是由 Unity Technologies 开发的一个让开发者可以轻松创建诸如 2D、3D 多平台的综合型游戏开发工具,是一个全面整合的专业游戏引擎。

    25 引用 • 7 回帖 • 158 关注
  • 安全

    安全永远都不是一个小问题。

    200 引用 • 816 回帖
  • 架构

    我们平时所说的“架构”主要是指软件架构,这是有关软件整体结构与组件的抽象描述,用于指导软件系统各个方面的设计。另外还有“业务架构”、“网络架构”、“硬件架构”等细分领域。

    142 引用 • 442 回帖 • 1 关注
  • Gzip

    gzip (GNU zip)是 GNU 自由软件的文件压缩程序。我们在 Linux 中经常会用到后缀为 .gz 的文件,它们就是 Gzip 格式的。现今已经成为互联网上使用非常普遍的一种数据压缩格式,或者说一种文件格式。

    9 引用 • 12 回帖 • 147 关注
  • API

    应用程序编程接口(Application Programming Interface)是一些预先定义的函数,目的是提供应用程序与开发人员基于某软件或硬件得以访问一组例程的能力,而又无需访问源码,或理解内部工作机制的细节。

    77 引用 • 430 回帖
  • Lute

    Lute 是一款结构化的 Markdown 引擎,支持 Go 和 JavaScript。

    26 引用 • 196 回帖 • 17 关注
  • Openfire

    Openfire 是开源的、基于可拓展通讯和表示协议 (XMPP)、采用 Java 编程语言开发的实时协作服务器。Openfire 的效率很高,单台服务器可支持上万并发用户。

    6 引用 • 7 回帖 • 101 关注
  • 快应用

    快应用 是基于手机硬件平台的新型应用形态;标准是由主流手机厂商组成的快应用联盟联合制定;快应用标准的诞生将在研发接口、能力接入、开发者服务等层面建设标准平台;以平台化的生态模式对个人开发者和企业开发者全品类开放。

    15 引用 • 127 回帖
  • jsoup

    jsoup 是一款 Java 的 HTML 解析器,可直接解析某个 URL 地址、HTML 文本内容。它提供了一套非常省力的 API,可通过 DOM,CSS 以及类似于 jQuery 的操作方法来取出和操作数据。

    6 引用 • 1 回帖 • 484 关注
  • 酷鸟浏览器

    安全 · 稳定 · 快速
    为跨境从业人员提供专业的跨境浏览器

    3 引用 • 59 回帖 • 26 关注
  • Tomcat

    Tomcat 最早是由 Sun Microsystems 开发的一个 Servlet 容器,在 1999 年被捐献给 ASF(Apache Software Foundation),隶属于 Jakarta 项目,现在已经独立为一个顶级项目。Tomcat 主要实现了 JavaEE 中的 Servlet、JSP 规范,同时也提供 HTTP 服务,是市场上非常流行的 Java Web 容器。

    162 引用 • 529 回帖 • 3 关注
  • 倾城之链
    23 引用 • 66 回帖 • 138 关注
  • 心情

    心是产生任何想法的源泉,心本体会陷入到对自己本体不能理解的状态中,因为心能产生任何想法,不能分出对错,不能分出自己。

    59 引用 • 369 回帖
  • Webswing

    Webswing 是一个能将任何 Swing 应用通过纯 HTML5 运行在浏览器中的 Web 服务器,详细介绍请看 将 Java Swing 应用变成 Web 应用

    1 引用 • 15 回帖 • 637 关注