Python 文本列上的 Pyspark DataFrame UDF
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/34803855/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Pyspark DataFrame UDF on Text Column
提问by dreyco676
I'm trying to do some NLP text clean up of some Unicode columns in a PySpark DataFrame. I've tried in Spark 1.3, 1.5 and 1.6 and can't seem to get things to work for the life of me. I've also tried using Python 2.7 and Python 3.4.
我正在尝试对 PySpark DataFrame 中的一些 Unicode 列进行一些 NLP 文本清理。我已经在 Spark 1.3、1.5 和 1.6 中尝试过,但似乎无法让我一生都在工作。我也尝试过使用 Python 2.7 和 Python 3.4。
I've created an extremely simple udf as seen below that should just return a string back for each record in a new column. Other functions will manipulate the text and then return the changed text back in a new column.
我创建了一个非常简单的 udf,如下所示,它应该只为新列中的每个记录返回一个字符串。其他函数将操作文本,然后在新列中返回更改的文本。
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
def dummy_function(data_str):
cleaned_str = 'dummyData'
return cleaned_str
dummy_function_udf = udf(dummy_function, StringType())
Some sample data can be unzipped from here.
Here is the code I use to import the data and then apply the udf on.
这是我用来导入数据然后应用 udf 的代码。
# Load a text file and convert each line to a Row.
lines = sc.textFile("classified_tweets.txt")
parts = lines.map(lambda l: l.split("\t"))
training = parts.map(lambda p: (p[0], p[1]))
# Create dataframe
training_df = sqlContext.createDataFrame(training, ["tweet", "classification"])
training_df.show(5)
+--------------------+--------------+
| tweet|classification|
+--------------------+--------------+
|rt @jiffyclub: wi...| python|
|rt @arnicas: ipyt...| python|
|rt @treycausey: i...| python|
|what's my best op...| python|
|rt @raymondh: #py...| python|
+--------------------+--------------+
# Apply UDF function
df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet']))
df.show(5)
When I run the df.show(5) I get the following error. I understand that the problem most likely doesn't stem from the show() but the trace doesn't give me much help.
当我运行 df.show(5) 时,出现以下错误。我知道问题很可能不是源于 show() 但跟踪并没有给我太多帮助。
---------------------------------------------------------------------------Py4JJavaError Traceback (most recent call last)<ipython-input-19-0b21c233c724> in <module>()
1 df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet']))
----> 2 df.show(5)
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/dataframe.py in show(self, n, truncate)
255 +---+-----+
256 """
--> 257 print(self._jdf.showString(n, truncate))
258
259 def __repr__(self):
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id, self.name)
814
815 for temp_arg in temp_args:
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/utils.py in deco(*a, **kw)
43 def deco(*a, **kw):
44 try:
---> 45 return f(*a, **kw)
46 except py4j.protocol.Py4JJavaError as e:
47 s = e.java_exception.toString()
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(
Py4JJavaError: An error occurred while calling o474.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 1 times, most recent failure: Lost task 0.0 in stage 10.0 (TID 10, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "<ipython-input-12-4bc30395aac5>", line 4, in <lambda>
IndexError: list index out of range
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon.next(PythonRDD.scala:129)
at org.apache.spark.api.python.PythonRunner$$anon.next(PythonRDD.scala:125)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run.apply(PythonRDD.scala:280)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute.apply(DataFrame.scala:1538)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute.apply(DataFrame.scala:1538)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute(DataFrame.scala:1537)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544)
at org.apache.spark.sql.DataFrame$$anonfun$head.apply(DataFrame.scala:1414)
at org.apache.spark.sql.DataFrame$$anonfun$head.apply(DataFrame.scala:1413)
at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "<ipython-input-12-4bc30395aac5>", line 4, in <lambda>
IndexError: list index out of range
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon.next(PythonRDD.scala:129)
at org.apache.spark.api.python.PythonRunner$$anon.next(PythonRDD.scala:125)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run.apply(PythonRDD.scala:280)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
Actual function I'm trying:
我正在尝试的实际功能:
def tag_and_remove(data_str):
cleaned_str = ' '
# noun tags
nn_tags = ['NN', 'NNP', 'NNP', 'NNPS', 'NNS']
# adjectives
jj_tags = ['JJ', 'JJR', 'JJS']
# verbs
vb_tags = ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ']
nltk_tags = nn_tags + jj_tags + vb_tags
# break string into 'words'
text = data_str.split()
# tag the text and keep only those with the right tags
tagged_text = pos_tag(text)
for tagged_word in tagged_text:
if tagged_word[1] in nltk_tags:
cleaned_str += tagged_word[0] + ' '
return cleaned_str
tag_and_remove_udf = udf(tag_and_remove, StringType())
采纳答案by AChampion
Your dataset isn't clean. 985 lines split('\t')
to only one value:
你的数据集不干净。985 行split('\t')
到只有一个值:
>>> from operator import add
>>> lines = sc.textFile("classified_tweets.txt")
>>> parts = lines.map(lambda l: l.split("\t"))
>>> parts.map(lambda l: (len(l), 1)).reduceByKey(add).collect()
[(2, 149195), (1, 985)]
>>> parts.filter(lambda l: len(l) == 1).take(5)
[['"show me the money!” at what point do you start trying to monetize your #startup? tweet us with #startuplife.'],
['a good pitch can mean money in the bank for your #startup. see how body language plays a key role: (via: ajalumnify)'],
['100+ apps in five years? @2359media did it using microsoft #azure: #azureapps'],
['does buying better coffee make you a better leader? little things can make a big difference: (via: @jmbrandonbb)'],
['.@msftventures graduates pitched\xa0#homeautomation #startups to #vcs! check out how they celebrated: ']]
So changing your code to:
因此,将您的代码更改为:
>>> training = parts.filter(lambda l: len(l) == 2).map(lambda p: (p[0], p[1].strip()))
>>> training_df = sqlContext.createDataFrame(training, ["tweet", "classification"])
>>> df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet']))
>>> df.show(5)
+--------------------+--------------+---------+
| tweet|classification| dummy|
+--------------------+--------------+---------+
|rt @jiffyclub: wi...| python|dummyData|
|rt @arnicas: ipyt...| python|dummyData|
|rt @treycausey: i...| python|dummyData|
|what's my best op...| python|dummyData|
|rt @raymondh: #py...| python|dummyData|
+--------------------+--------------+---------+
only showing top 5 rows
回答by Kirk Broadhurst
I think you're misdefining the problem, and maybe simplifying your lambda for the purposes of this question but hiding the real problem.
我认为您错误地定义了问题,并且可能为了这个问题的目的而简化了您的 lambda,但隐藏了真正的问题。
Your stack trace reads
您的堆栈跟踪读取
File "<ipython-input-12-4bc30395aac5>", line 4, in <lambda>
IndexError: list index out of range
When I run this code it works fine:
当我运行此代码时,它工作正常:
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
training_df = sqlContext.sql("select 'foo' as tweet, 'bar' as classification")
def dummy_function(data_str):
cleaned_str = 'dummyData'
return cleaned_str
dummy_function_udf = udf(dummy_function, StringType())
df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet']))
df.show()
+-----+--------------+---------+
|tweet|classification| dummy|
+-----+--------------+---------+
| foo| bar|dummyData|
+-----+--------------+---------+
Are you sure there isn't some other bug in your dummy_function_udf
? What is the 'real' udf you are using - apart from this sample version?
你确定你的文件中没有其他错误dummy_function_udf
吗?除了这个示例版本之外,您正在使用的“真实”udf 是什么?
回答by Vicky
Below one works with the spark2,
下面一个与 spark2 一起使用,
import hashlib
import uuid
import datetime
from pyspark.sql.types import StringType
def customencoding(s):
m = hashlib.md5()
m.update(s.encode('utf-8'))
d = m.hexdigest()
return d
spark.udf.register("udf_customhashing32adadf", customencoding, StringType())
spark.sql("SELECT udf_customhashing32adadf('test') as rowid").show(10, False)
You can implement it in the same way.
你可以用同样的方式实现它。