original:http://ift.tt/1OfjE5g 文章翻译自Introducing DataFrames in Spark for Large Scale Data Science,作者Reynold Xin(辛湜,@hashjoin),Michael Armbrust,Davies Liu。 以下为译文 今天,我们正式宣布Spark新的API — — DataFrame 。作为2014–2015年Spark最大的API改动,DataFrame能够使得大数据更为简单,从而拥有更广泛的受众群体。 我们最早在设计Spark的时候,其中一个很重要的目标就是给大数据生态圈提供基于通用编程语言的(Java、Scala、Python)简单易用的API。Spark原本的RDD API通过函数式编程的模式把分布式数据处理转换成分布式数据集(distributed collections)。原本需要上千行用Hadoop MapReduce实现的代码,在Spark这个API上减少到了数十行。 然后随着Spark的不断壮大,我们希望拥有更广泛的受众群体利用其进行分布式处理,不局限于“大数据”工程师。这个新的DataFrame API在R和Python data frame的设计灵感之上,专门为了数据科学应用设计,具有以下功能特性: 从KB到PB级的数据量支持; 多种数据格式和多种存储系统支持; 通过Spark SQL的Catalyst优化器进行先进的优化,生成代码; 通过Spark无缝集成所有大数据工具与基础设施; 为Python、Java、Scala和R语言(SparkR)API。 对于之前熟悉其他语言中data frames的新用户来说,这个新的API可以让Spark的初体验变得更加友好。而对于那些已经在使用的用户来说,这个API会让基于Spark的编程更加容易,同时其智能优化和代码生成也能帮助用户获得更好的性能。 初识DataFrames 在Spark中,DataFrame是一个以命名列方式组织的分布式数据集,等同于关系型数据库中的一个表,也相当于R/Python中的data frames(但是进行了更多的优化)。DataFrames可以由结构化数据文件转换而来,也可以从Hive中的表得来,以及可以转换自外部数据库或现有的RDD。 下面代码演示了如何使用Python构造DataFrames,而在Scala和Java中也有类似的API可以调用。 # Constructs a DataFrame from the users table in Hive. users = context.table(“users”) # from JSON files in S3 logs = context.load(“http://s3npath/to/data.json”, “json”) 一经构建,DataFrames就会为分布式数据处理提供一个指定的DSL(domain-specific language )。 # Create a new DataFrame that contains “young users” only young = users.filter(users.age < 21) # Alternatively, using Pandas-like syntax young = users[users.age < 21] # Increment everybody’s age by 1 young.select(young.name, young.age + 1) # Count the number of young users by gender young.groupBy(“gender”).count() # Join young users with another DataFrame called logs young.join(logs, logs.userId == users.userId, “left_outer”) 通过Spark SQL,你还可以用SQL的方式操作DataFrames。下面这个例子统计了“young” DataFrame中的用户数量。 young.registerTempTable(“young”) context.sql(“SELECT count(*) FROM young”) 在Python中,Pandas DataFrame和Spark DataFrame还可以自由转换。 # Convert Spark DataFrame to Pandas pandas_df = young.toPandas() # Create a Spark DataFrame from Pandas spark_df = context.createDataFrame(pandas_df) 类似于RDD,DataFrame同样使用了lazy的方式。也就是说,只有动作真正发生时(如显示结果,保存输出),计算才会进行。从而,通过一些技术,比如predicate push-downs和bytecode generation,执行过程可以进行适当的优化(详情见下文)。同时,所有的DataFrames也会自动的在集群上并行和分布执行。 数据格式和来源 现代的应用程序通常需要收集和分析来自各种不同数据源的数据,而DataFrame与生俱来就支持读取最流行的格式,包括JSON文件、Parquet文件和Hive表格。DataFrame还支持从多种类型的文件系统中读取,比如本地文件系统、分布式文件系统(HDFS)以及云存储(S3)。同时,配合JDBC,它还可以读取外部关系型数据库系统。此外,通过Spark SQL的外部数据源(external data sources) API,DataFrames可以更广泛地支持任何第三方数据格式和数据源。值得一提的是,当下的第三方扩展已经包含Avro、CSV、ElasticSearch和Cassandra。 DataFrames对数据源的支持能力允许应用程序可以轻松地组合来自不同数据源的数据。下面的代码片段则展示了存储在S3上网站的一个文本流量日志(textual traffic log)与一个PostgreSQL数据库的join操作,目的是计算网站用户访问该网站的次数。 users = context.jdbc(“jdbc:postgresql:production”, “users”) logs = context.load(“/path/to/traffic.log”) […]

 via WordPress http://ift.tt/1OfjElw