scala 来自 Spark Streaming 的 RestAPI 服务调用

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/41799578/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 09:02:20  来源:igfitidea点击:

RestAPI service call from Spark Streaming

scalarestapache-sparkspark-streaming

提问by nilesh1212

I have a use case where I need to call RESTAPI from spark streaming after messages are read from Kafka to perform some calculation and save back the result to HDFS and third party application.

我有一个用例,在从 Kafka 读取消息以执行一些计算并将结果保存回 HDFS 和第三方应用程序后,我需要从 Spark 流中调用 RESTAPI。

I have few doubts here:

我在这里有几个疑问:

  • How can we call RESTAPI directly from the spark streaming.
  • How to manage RESTAPI timeout with streaming batch time.
  • 我们如何直接从火花流中调用 RESTAPI。
  • 如何使用流式批处理时间管理 RESTAPI 超时。

回答by mrsrinivas

This code will not compile as it is. But this the approach for the given usecase.

此代码不会按原样编译。但这是给定用例的方法。

val conf = new SparkConf().setAppName("App name").setMaster("yarn")
val ssc = new StreamingContext(conf, Seconds(1))

val dstream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)

dstream.foreachRDD { rdd =>

  //Write the rdd to HDFS directly
  rdd.saveAsTextFile("hdfs/location/to/save")

  //loop through each parttion in rdd
  rdd.foreachPartition { partitionOfRecords =>

    //1. Create HttpClient object here
    //2.a POST data to API

    //Use it if you want record level control in rdd or partion
    partitionOfRecords.foreach { record =>
      //2.b Post the the date to API
      record.toString
    }
  }
  //Use 2.a or 2.b to POST data as per your req
}

ssc.start()
ssc.awaitTermination()


Most of the HttpClients (for REST call) supports request timeout.

大多数 HttpClients(用于 REST 调用)都支持请求超时。

Sample Http POST call with timeout using Apache HttpClient

使用 Apache HttpClient 超时的 Http POST 调用示例

val CONNECTION_TIMEOUT_MS = 20000; // Timeout in millis (20 sec).

val requestConfig = RequestConfig.custom()
  .setConnectionRequestTimeout(CONNECTION_TIMEOUT_MS)
  .setConnectTimeout(CONNECTION_TIMEOUT_MS)
  .setSocketTimeout(CONNECTION_TIMEOUT_MS)
  .build();

HttpClientBuilder.create().build();

val client: CloseableHttpClient = HttpClientBuilder.create().build();

val url = "https://selfsolve.apple.com/wcResults.do"
val post = new HttpPost(url);

//Set config to post
post.setConfig(requestConfig)

post.setEntity(EntityBuilder.create.setText("some text to post to API").build())

val response: HttpResponse = client.execute(post)