Python 使用 foreachRDD 和 foreach 遍历 pyspark 中的 rdd
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/37492402/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
using foreachRDD and foreach to iterate over an rdd in pyspark
提问by tchoedak
Questions for Spark 1.6.1, pyspark
关于 Spark 1.6.1、pyspark 的问题
I have streaming data coming in as like
我有流数据进来
{"event":4,"Userid":12345,"time":123456789,"device_model":"iPhone OS", "some_other_property": "value", "row_key": 555}
I have a function that writes to HBase called writeToHBase(rdd), expecting an rdd that has tuple in the following structure:
我有一个写入 HBase 的函数,名为 writeToHBase(rdd),期望 rdd 具有以下结构中的元组:
(rowkey, [rowkey, column-family, key, value])
As you can see from the input format, I have to take my original dataset and iterate over all keys, sending each key/value pair with a send function call.
从输入格式中可以看出,我必须采用原始数据集并遍历所有键,使用发送函数调用发送每个键/值对。
From reading the spark streaming programming guide, section "Design Patterns for using foreachRDD" http://spark.apache.org/docs/latest/streaming-programming-guide.html#tab_python_13
从阅读火花流编程指南,部分“使用 foreachRDD 的设计模式” http://spark.apache.org/docs/latest/streaming-programming-guide.html#tab_python_13
It seems that its recommended to use foreachRDD when doing something external to the dataset. In my case, I want to write data to HBase over the network, so I use foreachRDD on my streaming data and call the function that will handle sending the data:
似乎建议在执行数据集外部的操作时使用 foreachRDD。就我而言,我想通过网络将数据写入 HBase,因此我在流数据上使用 foreachRDD 并调用将处理发送数据的函数:
stream.foreachRDD(lambda k: process(k))
My understanding of spark functions is pretty limited right now, so I'm unable to figure out a way to iterate on my original dataset to use my write function. if it was a python iterable, i'd be able to do this:
我现在对 spark 函数的理解非常有限,所以我无法找到一种方法来迭代我的原始数据集以使用我的 write 函数。如果它是一个 python 可迭代的,我可以这样做:
def process(rdd):
for key, value in my_rdd.iteritems():
writeToHBase(sc.parallelize(rowkey, [rowkey, 'column-family', key, value]))
where rowkey would have be obtained by finding it in the rdd itself
在哪里可以通过在 rdd 本身中找到它来获得 rowkey
rdd.map(lambda x: x['rowkey'])
How do I accomplish what process() is meant to do in pyspark? I see some examples use foreach, but I'm not quite able to get it to do what I want.
如何在 pyspark 中完成 process() 的作用?我看到一些使用 foreach 的例子,但我不太能让它做我想做的事。
回答by Amit Kumar
why do you want to iterate over rdd while your writeToHBase function expects a rdd as arguement. Simply call writeToHBase(rdd)
in your process function, that's it.
为什么你要遍历 rdd 而你的 writeToHBase 函数需要一个 rdd 作为争论。只需调用writeToHBase(rdd)
您的流程函数,就可以了。
If you need to fetch every record from the rdd you can call
如果您需要从 rdd 中获取每条记录,您可以调用
def processRecord(record):
print(record)
rdd.foreach(processRecord)
In processRecord function you will get single record to process.
在 processRecord 函数中,您将获得要处理的单个记录。