在 Spark-SQL 中创建用户定义的函数
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/25031129/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Creating User Defined Function in Spark-SQL
提问by user2728024
I am new to spark and spark sql and i was trying to query some data using spark SQL.
我是 spark 和 spark sql 的新手,我试图使用 spark SQL 查询一些数据。
I need to fetch the month from a date which is given as a string.
我需要从以字符串形式给出的日期中获取月份。
I think it is not possible to query month directly from sparkqsl so i was thinking of writing a user defined function in scala.
我认为不可能直接从 sparkqsl 查询月份,所以我想在 Scala 中编写一个用户定义的函数。
Is it possible to write udf in sparkSQL and if possible can anybody suggest the best method of writing an udf.
是否可以在 sparkSQL 中编写 udf,如果可能,有人可以建议编写 udf 的最佳方法。
Please help
请帮忙
回答by Spiro Michaylov
You can do this, at least for filtering, if you're willing to use a language-integrated query.
如果您愿意使用语言集成查询,您可以这样做,至少用于过滤。
For a data file dates.txt containing:
对于包含以下内容的数据文件dates.txt:
one,2014-06-01
two,2014-07-01
three,2014-08-01
four,2014-08-15
five,2014-09-15
You can pack as much Scala date magic in your UDF as you want but I'll keep it simple:
您可以根据需要在 UDF 中打包尽可能多的 Scala 日期魔法,但我会保持简单:
def myDateFilter(date: String) = date contains "-08-"
Set it all up as follows -- a lot of this is from the Programming guide.
将其全部设置如下 - 其中很多来自编程指南。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
// case class for your records
case class Entry(name: String, when: String)
// read and parse the data
val entries = sc.textFile("dates.txt").map(_.split(",")).map(e => Entry(e(0),e(1)))
You can use the UDF as part of your WHERE clause:
您可以使用 UDF 作为 WHERE 子句的一部分:
val augustEntries = entries.where('when)(myDateFilter).select('name, 'when)
and see the results:
并查看结果:
augustEntries.map(r => r(0)).collect().foreach(println)
Notice the version of the where
method I've used, declared as follows in the doc:
请注意where
我使用的方法的版本,在文档中声明如下:
def where[T1](arg1: Symbol)(udf: (T1) ? Boolean): SchemaRDD
So, the UDF can only take one argument, but you can compose several .where()
calls to filter on multiple columns.
因此,UDF 只能接受一个参数,但您可以组合多个.where()
调用来过滤多列。
Edit for Spark 1.2.0 (and really 1.1.0 too)
针对 Spark 1.2.0 进行编辑(实际上也是 1.1.0)
While it's not really documented, Spark now supports registering a UDF so it can be queried from SQL.
虽然它没有真正记录在案,但 Spark 现在支持注册 UDF,以便可以从 SQL 查询。
The above UDF could be registered using:
可以使用以下方法注册上述 UDF:
sqlContext.registerFunction("myDateFilter", myDateFilter)
and if the table was registered
如果该表已注册
sqlContext.registerRDDAsTable(entries, "entries")
it could be queried using
可以使用查询
sqlContext.sql("SELECT * FROM entries WHERE myDateFilter(when)")
For more details see this example.
有关更多详细信息,请参阅此示例。
回答by Apurva Singh
In Spark 2.0, you can do this:
在 Spark 2.0 中,你可以这样做:
// define the UDF
def convert2Years(date: String) = date.substring(7, 11)
// register to session
sparkSession.udf.register("convert2Years", convert2Years(_: String))
val moviesDf = getMoviesDf // create dataframe usual way
moviesDf.createOrReplaceTempView("movies") // 'movies' is used in sql below
val years = sparkSession.sql("select convert2Years(releaseDate) from movies")
回答by Sohel Khan
In PySpark 1.5and above, we can easily achieve this with builtin function.
在PySpark 1.5及更高版本中,我们可以使用内置函数轻松实现这一点。
Following is an example:
下面是一个例子:
raw_data =
[
("2016-02-27 23:59:59", "Gold", 97450.56),
("2016-02-28 23:00:00", "Silver", 7894.23),
("2016-02-29 22:59:58", "Titanium", 234589.66)]
Time_Material_revenue_df =
sqlContext.createDataFrame(raw_data, ["Sold_time", "Material", "Revenue"])
from pyspark.sql.functions import *
Day_Material_reveneu_df = Time_Material_revenue_df.select(to_date("Sold_time").alias("Sold_day"), "Material", "Revenue")