scala 来自 Spark Streaming 的 RestAPI 服务调用
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/41799578/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
RestAPI service call from Spark Streaming
提问by nilesh1212
I have a use case where I need to call RESTAPI from spark streaming after messages are read from Kafka to perform some calculation and save back the result to HDFS and third party application.
我有一个用例,在从 Kafka 读取消息以执行一些计算并将结果保存回 HDFS 和第三方应用程序后,我需要从 Spark 流中调用 RESTAPI。
I have few doubts here:
我在这里有几个疑问:
- How can we call RESTAPI directly from the spark streaming.
- How to manage RESTAPI timeout with streaming batch time.
- 我们如何直接从火花流中调用 RESTAPI。
- 如何使用流式批处理时间管理 RESTAPI 超时。
回答by mrsrinivas
This code will not compile as it is. But this the approach for the given usecase.
此代码不会按原样编译。但这是给定用例的方法。
val conf = new SparkConf().setAppName("App name").setMaster("yarn")
val ssc = new StreamingContext(conf, Seconds(1))
val dstream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
dstream.foreachRDD { rdd =>
//Write the rdd to HDFS directly
rdd.saveAsTextFile("hdfs/location/to/save")
//loop through each parttion in rdd
rdd.foreachPartition { partitionOfRecords =>
//1. Create HttpClient object here
//2.a POST data to API
//Use it if you want record level control in rdd or partion
partitionOfRecords.foreach { record =>
//2.b Post the the date to API
record.toString
}
}
//Use 2.a or 2.b to POST data as per your req
}
ssc.start()
ssc.awaitTermination()
Most of the HttpClients (for REST call) supports request timeout.
大多数 HttpClients(用于 REST 调用)都支持请求超时。
Sample Http POST call with timeout using Apache HttpClient
使用 Apache HttpClient 超时的 Http POST 调用示例
val CONNECTION_TIMEOUT_MS = 20000; // Timeout in millis (20 sec).
val requestConfig = RequestConfig.custom()
.setConnectionRequestTimeout(CONNECTION_TIMEOUT_MS)
.setConnectTimeout(CONNECTION_TIMEOUT_MS)
.setSocketTimeout(CONNECTION_TIMEOUT_MS)
.build();
HttpClientBuilder.create().build();
val client: CloseableHttpClient = HttpClientBuilder.create().build();
val url = "https://selfsolve.apple.com/wcResults.do"
val post = new HttpPost(url);
//Set config to post
post.setConfig(requestConfig)
post.setEntity(EntityBuilder.create.setText("some text to post to API").build())
val response: HttpResponse = client.execute(post)

