spark的强大功能之一就是可以让用户自定义函数。
用户自定义函数(UDF)让用户可以使用Scala或Python编写自己的自定义
转换(Transformation)操作
,甚至可以使用外部库。
UDF可以将一个或多个列作为输入,同时也可以返回一个或多个列。Spark的UDF允许使用多种不同的编程语言编写。
这些函数只是描述了(一个接一个地)处理数据记录的方法
。 默认情况下,这些函数被注册为SparkSession或者Context的临时函数。
这里用一个简单的函数来说明:设计一个
power3
函数,该函数接受一个数字并返回它的三次幂。
// 先创建一个DF
val udfExampleDF = spark.range(5).toDF("num")
// 第一步:设计一个实际的函数
// 注意:这个自定义函数有指定接受参数类型,和返回类型
def power3(number:Double): Double = number * number * number
可以对此函数进行测试,可以看到函数按预期的那样执行。
Spark将在驱动进程上序列化该函数,并将它通过网络传递到所有执行进程,无论使用何种语言都是这个过程。
// 第二步,在spark上注册该UDF
val power3UDF = udf(power3(_: Double): Double)
// val power3UDF = udf(power3 _) //简写形式
当使用该函数的时候,基本上有两种不同的情况发生。如果该函数时用Scala或Java编写的,则可以在JVM中使用它。这意味着除了不能使用Spark为内置函数提供的代码生成功能以外(事实就是如此),几乎没有性能损失。
如果函数时用Python编写的,则会出现一些截然不同的情况。Spark在worker上启动一个Python进程,将所有数据序列化为Python可执行的格式(请记住,在此之前数据位于JVM中)。在Python进程中对该数据逐行执行函数,最终将对每行的操作结果返回给JVM和Spark。此过程如下图所示:
注意:
启动此Python进程代价很高
,但主要代价是将数据序列化为Python可理解的格式的这个过程。造成代价高的原因有两个,一是计算昂贵,另一个是数据进入Python后Spark无法管理worker的内存,在这意味着,如果某个worker因资源受限而失败(因为JVM和Python都在同一台计算机上争夺内存),则可能会导致该worker出现故障。所以建议使用Scala或Java编写UDF函数,不仅编写程序的时间少,还能提高性能。当然也可以使用Python编写函数。
编写代码如下:
udfExampleDF.select(power3UDF(col("num"))).show()
此时,就将此UDF作为DataFrame函数来使用了。
如下,我使用的是本地允许的,
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{col, udf}
object Test {
def main(args: Array[String]): Unit = {
val spark = new SparkSession.Builder()
.appName("chemaDemo1")
.config("spark.testing.memory", "471859200")
.master("local[" +
"4]")
.getOrCreate()
val udfExampleDF: DataFrame = spark.range(5).toDF("num")
println(udfExampleDF.schema)
val power3UDF = udf(power3 _)
udfExampleDF.select(power3UDF(col("num"))).show()
}
def power3(number:Long): Long = number * number * number
}
文章来源互联网,如有侵权,请联系管理员删除。邮箱:417803890@qq.com / QQ:417803890
Python Free
邮箱:417803890@qq.com
QQ:417803890
皖ICP备19001818号-4
© 2019 copyright www.pythonf.cn - All rights reserved
微信扫一扫关注公众号:
Python Free