java Spark 流自定义指标
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/32843832/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark streaming custom metrics
提问by Gideon
I'm working on a Spark Streaming program which retrieves a Kafka stream, does very basic transformation on the stream and then inserts the data to a DB (voltdb if it's relevant). I'm trying to measure the rate in which I insert rows to the DB. I think metricscan be useful (using JMX). However I can't find how to add custom metrics to Spark. I've looked at Spark's source code and also found this threadhowever it doesn't work for me. I also enabled the JMX sink in the conf.metrics file. What's not working is I don't see my custom metrics with JConsole.
我正在开发一个 Spark Streaming 程序,它检索 Kafka 流,对流进行非常基本的转换,然后将数据插入到数据库中(如果相关,则为 voltdb)。我正在尝试测量向数据库插入行的速率。我认为指标很有用(使用 JMX)。但是我找不到如何向 Spark 添加自定义指标。我查看了 Spark 的源代码,也找到了这个线程,但是它对我不起作用。我还在 conf.metrics 文件中启用了 JMX 接收器。不起作用的是我在 JConsole 中看不到我的自定义指标。
Could someone explain how to add custom metrics (preferably via JMX) to spark streaming? Or alternatively how to measure my insertion rate to my DB (specifically VoltDB)? I'm using spark with Java 8.
有人可以解释如何添加自定义指标(最好通过 JMX)来激发流式传输?或者如何测量我的数据库(特别是 VoltDB)的插入率?我在 Java 8 中使用 spark。
回答by Gideon
Ok after digging through the source codeI found how to add my own custom metrics. It requires 3 things:
好的,在深入研究源代码后,我发现了如何添加我自己的自定义指标。它需要3件事:
- Create my own custom source. Sort of like this
- Enable the Jmx sink in the spark metrics.properties file. The specific line I used is:
*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
which enable JmxSink for all instances - Register my custom source in the SparkEnv metrics system. An example of how to do can be seen here- I actually viewed this link before but missed the registration part which prevented me from actually seeing my custom metrics in the JVisualVM
- 创建我自己的自定义源。有点像这样
- 在 spark metrics.properties 文件中启用 Jmx 接收器。我使用的具体行是:
*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
为所有实例启用 JmxSink - 在 SparkEnv 指标系统中注册我的自定义源。可以在此处查看如何操作的示例- 我之前实际上查看过此链接,但错过了注册部分,这使我无法在 JVisualVM 中实际看到我的自定义指标
I'm still struggling with how to actually count the number of insertions into VoltDB because the code runs on the executors but that's a subject for a different topic :)
我仍在努力计算如何实际计算 VoltDB 的插入次数,因为代码在执行程序上运行,但这是另一个主题的主题:)
I hope this will help others
我希望这会帮助其他人
回答by Martin McNulty
Groupon have a library called spark-metrics
that lets you use a simple (Codahale-like) API on your executors and have the results collated back in the driver and automatically registered in Spark's existing metrics registry. These then get automatically exported along with Spark's built-in metrics when you configure a metric sink as per the Spark docs.
Groupon 有一个名为的库spark-metrics
,可让您在执行程序上使用简单的(类似 Codahale 的)API,并将结果整理回驱动程序并自动注册到 Spark 现有的指标注册表中。当您按照Spark 文档配置指标接收器时,这些会与 Spark 的内置指标一起自动导出。
回答by robert towne
to insert rows from based on inserts from VoltDB, use accumulators - and then from your driver you can create a listener - maybe something like this to get you started
根据来自 VoltDB 的插入插入行,使用累加器 - 然后从你的驱动程序你可以创建一个监听器 - 也许像这样让你开始
sparkContext.addSparkListener(new SparkListener() {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
stageCompleted.stageInfo.accumulables.foreach { case (_, acc) => {
here you have access to those rows combined accumulators and then you can send to your sink..
在这里您可以访问这些行组合累加器,然后您可以发送到您的接收器..
回答by Erik Schmiegelow
here's an excellent tutorial which covers all the setps you need to setup Spark's MetricsSystem with Graphite. That should do the trick:
这是一个出色的教程,涵盖了使用 Graphite 设置 Spark 的 MetricsSystem 所需的所有设置。这应该够了吧:
http://www.hammerlab.org/2015/02/27/monitoring-spark-with-graphite-and-grafana/
http://www.hammerlab.org/2015/02/27/monitoring-spark-with-graphite-and-grafana/
回答by Leon
Below is a working example in Java.
It's tested with StreaminQuery
(Unfortunately StreaminQuery
does not have ootb metrics like StreamingContext
till Spark 2.3.1).
下面是 Java 中的一个工作示例。
它已经过测试StreaminQuery
(不幸的StreaminQuery
是没有像StreamingContext
Spark 2.3.1这样的 ootb 指标)。
Steps:
脚步:
Define a custom source in the same package of Source
class
在同一个Source
类包中定义自定义源
package org.apache.spark.metrics.source;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.spark.sql.streaming.StreamingQueryProgress;
/**
* Metrics source for structured streaming query.
*/
public class StreamingQuerySource implements Source {
private String appName;
private MetricRegistry metricRegistry = new MetricRegistry();
private final Progress progress = new Progress();
public StreamingQuerySource(String appName) {
this.appName = appName;
registerGuage("batchId", () -> progress.batchId());
registerGuage("numInputRows", () -> progress.numInputRows());
registerGuage("inputRowsPerSecond", () -> progress.inputRowsPerSecond());
registerGuage("processedRowsPerSecond", () -> progress.processedRowsPerSecond());
}
private <T> Gauge<T> registerGuage(String name, Gauge<T> metric) {
return metricRegistry.register(MetricRegistry.name(name), metric);
}
@Override
public String sourceName() {
return String.format("%s.streaming", appName);
}
@Override
public MetricRegistry metricRegistry() {
return metricRegistry;
}
public void updateProgress(StreamingQueryProgress queryProgress) {
progress.batchId(queryProgress.batchId())
.numInputRows(queryProgress.numInputRows())
.inputRowsPerSecond(queryProgress.inputRowsPerSecond())
.processedRowsPerSecond(queryProgress.processedRowsPerSecond());
}
@Data
@Accessors(fluent = true)
private static class Progress {
private long batchId = -1;
private long numInputRows = 0;
private double inputRowsPerSecond = 0;
private double processedRowsPerSecond = 0;
}
}
Register the source right after SparkContext is created
创建 SparkContext 后立即注册源
querySource = new StreamingQuerySource(getSparkSession().sparkContext().appName());
SparkEnv.get().metricsSystem().registerSource(querySource);
Update data in StreamingQueryListener.onProgress(event)
在 StreamingQueryListener.onProgress(event) 中更新数据
querySource.updateProgress(event.progress());
Config metrics.properties
配置metrics.properties
*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=xxx
*.sink.graphite.port=9109
*.sink.graphite.period=10
*.sink.graphite.unit=seconds
# Enable jvm source for instance master, worker, driver and executor
master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
Sample output in graphite exporter (mapped to prometheus format)
石墨导出器中的示例输出(映射到普罗米修斯格式)
streaming_query{application="local-1538032184639",model="model1",qty="batchId"} 38
streaming_query{application="local-1538032184639",model="model1r",qty="inputRowsPerSecond"} 2.5
streaming_query{application="local-1538032184639",model="model1",qty="numInputRows"} 5
streaming_query{application="local-1538032184639",model=model1",qty="processedRowsPerSecond"} 0.81