java 调试自定义 Kafka 连接器的简单有效方法是什么?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/45717658/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 08:51:59  来源:igfitidea点击:

What is a simple, effective way to debug custom Kafka connectors?

javadebuggingapache-kafkaslf4japache-kafka-connect

提问by C. Ommen

I'm working a couple of Kafka connectors and I don't see any errors in their creation/deployment in the console output, however I am not getting the result that I'm looking for (no results whatsoever for that matter, desired or otherwise). I made these connectors based on Kafka's example FileStream connectors, so my debug technique was based off the use of the SLF4J Logger that is used in the example. I've searched for the log messages that I thought would be produced in the console output, but to no avail. Am I looking in the wrong place for these messages? Or perhaps is there a better way of going about debugging these connectors?

我正在使用几个 Kafka 连接器,在控制台输出中没有看到它们的创建/部署有任何错误,但是我没有得到我正在寻找的结果(没有任何结果,想要的或否则)。我根据 Kafka 的示例 FileStream 连接器制作了这些连接器,因此我的调试技术基于示例中使用的 SLF4J Logger 的使用。我搜索了我认为会在控制台输出中生成的日志消息,但无济于事。我是否在错误的地方寻找这些消息?或者也许有更好的方法来调试这些连接器?

Example uses of the SLF4J Logger that I referenced for my implementation:

我在实现中引用的 SLF4J Logger 的示例使用:

Kafka FileStreamSinkTask

Kafka FileStreamSinkTask

Kafka FileStreamSourceTask

Kafka FileStreamSourceTask

回答by Konstantine Karantasis

I will try to reply to your question in a broad way. A simple way to do Connector development could be as follows:

我会尽量广泛地回答你的问题。一种进行连接器开发的简单方法如下:

  • Structure and build your connector source code by looking at one of the many Kafka Connectors available publicly (you'll find an extensive list available here: https://www.confluent.io/product/connectors/)
  • Download the latest Confluent Open Source edition (>= 3.3.0) from https://www.confluent.io/download/
  • Make your connector package available to Kafka Connect in one of the following ways:

    1. Store all your connector jar files (connector jar plus dependency jars excluding Connect API jars) to a location in your filesystem and enable plugin isolation by adding this location to the plugin.pathproperty in the Connect worker properties. For instance, if your connector jars are stored in /opt/connectors/my-first-connector, you will set plugin.path=/opt/connectorsin your worker's properties (see below).
    2. Store all your connector jar files in a folder under ${CONFLUENT_HOME}/share/java. For example: ${CONFLUENT_HOME}/share/java/kafka-connect-my-first-connector. (Needs to start with kafka-connect-prefix to be picked up by the startup scripts). $CONFLUENT_HOME is where you've installed Confluent Platform.
  • Optionally, increase your logging by changing the log level for Connect in ${CONFLUENT_HOME}/etc/kafka/connect-log4j.propertiesto DEBUGor even TRACE.

  • Use Confluent CLI to start all the services, including Kafka Connect. Details here: http://docs.confluent.io/current/connect/quickstart.html

    Briefly: confluent start

  • 通过查看众多公开可用的 Kafka 连接器之一来构建和构建您的连接器源代码(您可以在此处找到广泛的列表:https: //www.confluent.io/product/connectors/
  • https://www.confluent.io/download/下载最新的 Confluent 开源版本 (>= 3.3.0)
  • 通过以下方式之一使您的连接器包可用于 Kafka Connect:

    1. 将所有连接器 jar 文件(连接器 jar 加上依赖 jar,不包括 Connect API jar)存储到文件系统中的某个位置,并通过将此位置添加到plugin.pathConnect 工作器属性中的属性来启用插件隔离 。例如,如果您的连接器 jar 存储在 中/opt/connectors/my-first-connector,您将plugin.path=/opt/connectors在您的工作人员的属性中进行设置(见下文)。
    2. 将所有连接器 jar 文件存储在${CONFLUENT_HOME}/share/java. 例如:${CONFLUENT_HOME}/share/java/kafka-connect-my-first-connector。(需要以kafka-connect-启动脚本选取的前缀开头)。$CONFLUENT_HOME 是您安装 Confluent Platform 的地方。
  • 或者,通过改变在连接日志级别提高日志${CONFLUENT_HOME}/etc/kafka/connect-log4j.propertiesDEBUG甚至TRACE

  • 使用 Confluent CLI 启动所有服务,包括 Kafka Connect。详情请见:http: //docs.confluent.io/current/connect/quickstart.html

    简要地: confluent start

Note: The Connect worker's properties file currently loaded by the CLI is ${CONFLUENT_HOME}/etc/schema-registry/connect-avro-distributed.properties. That's the file you should edit if you choose to enable classloading isolation but also if you need to change your Connect worker's properties.

注意:CLI 当前加载的 Connect 工作器的属性文件是${CONFLUENT_HOME}/etc/schema-registry/connect-avro-distributed.properties. 如果您选择启用类加载隔离以及需要更改 Connect 工作器的属性,那么您应该编辑该文件。

  • Once you have Connect worker running, start your connector by running:

    confluent load <connector_name> -d <connector_config.properties>

    or

    confluent load <connector_name> -d <connector_config.json>

    The connector configuration can be either in java properties or JSON format.

  • Run confluent log connectto open the Connect worker's log file, or navigate directly to where your logs and data are stored by running

    cd "$( confluent current )"

  • 运行 Connect 工作器后,通过运行以下命令启动连接器:

    confluent load <connector_name> -d <connector_config.properties>

    或者

    confluent load <connector_name> -d <connector_config.json>

    连接器配置可以是 java 属性或 JSON 格式。

  • 运行 confluent log connect以打开 Connect 工作器的日志文件,或通过运行直接导航到存储日志和数据的位置

    cd "$( confluent current )"

Note: change where your logs and data are stored during a session of the Confluent CLI by setting the environment variable CONFLUENT_CURRENTappropriately. E.g. given that /opt/confluentexists and is where you want to store your data, run:

export CONFLUENT_CURRENT=/opt/confluent
confluent current

注意:通过CONFLUENT_CURRENT适当设置环境变量来更改在 Confluent CLI 会话期间存储日志和数据的位置。例如,假设/opt/confluent存在并且是您要存储数据的位置,请运行:

export CONFLUENT_CURRENT=/opt/confluent
confluent current

  • Finally, to interactively debug your connector a possible way is to apply the following before starting Connect with Confluent CLI :

    confluent stop connect
    export CONNECT_DEBUG=y; export DEBUG_SUSPEND_FLAG=y;
    confluent start connect

    and then connect with your debugger (for instance remotely to the Connect worker (default port: 5005). To stop running connect in debug mode, just run: unset CONNECT_DEBUG; unset DEBUG_SUSPEND_FLAG;when you are done.

  • 最后,要交互式调试连接器,一种可能的方法是在使用 Confluent CLI 开始连接之前应用以下内容:

    confluent stop connect
    export CONNECT_DEBUG=y; export DEBUG_SUSPEND_FLAG=y;
    confluent start connect

    然后与您的调试器连接(例如远程连接到 Connect 工作器(默认端口:5005)。要停止在调试模式下运行连接,只需运行:unset CONNECT_DEBUG; unset DEBUG_SUSPEND_FLAG;完成后。

I hope the above will make your connector development easier and ... more fun!

我希望以上内容能让您的连接器开发更轻松……更有趣!

回答by Datum Geek

i love the accepted answer. one thing - the environment variables didn't work for me... i'm using confluent community edition 5.3.1...

我喜欢接受的答案。一件事 - 环境变量对我不起作用......我正在使用融合社区版 5.3.1......

here's what i did that worked...

这就是我所做的工作......

i installed the confluent cli from here: https://docs.confluent.io/current/cli/installing.html#tarball-installation

我从这里安装了融合的 cli:https: //docs.confluent.io/current/cli/installing.html#tarball-installation

i ran confluent using the command confluent local start

我使用命令运行融合 confluent local start

i got the connect app details using the command ps -ef | grep connect

我使用命令获得了连接应用程序的详细信息 ps -ef | grep connect

i copied the resulting command to an editor and added the arg (right after java):

我将生成的命令复制到编辑器并添加了 arg(在 java 之后):

-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005

then i stopped connect using the command confluent local stop connect

然后我停止使用命令连接 confluent local stop connect

then i ran the connect command with the arg

然后我用 arg 运行了连接命令

brief intermission ---

短暂的休息---

vs code development is led by erich gamma - of gang of fourfame, who also wrote eclipse. vs code is becoming a first class java ide see https://en.wikipedia.org/wiki/Erich_Gamma

vs 代码开发由 Erich gamma - of Fame 领导gang of four,他还编写了 eclipse。vs 代码正在成为一流的 java ide,请参阅https://en.wikipedia.org/wiki/Erich_Gamma

intermission over ---

中场休息——

next i launched vs code and opened the debezium oracle connector folder (cloned from here) https://github.com/debezium/debezium-incubator

接下来我启动了 vs code 并打开了 debezium oracle 连接器文件夹(从这里克隆)https://github.com/debezium/debezium-incubator

then i chose Debug - Open Configurations

然后我选择了 Debug - Open Configurations

enter image description here

在此处输入图片说明

and entered the highlighted debugging configuration

并进入突出显示的调试配置

enter image description here

在此处输入图片说明

and then run the debugger - it will hit your breakpoints !!

然后运行调试器 - 它会命中你的断点!!

enter image description here

在此处输入图片说明

the connect command should look something like this:

连接命令应如下所示:

/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/bin/java -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 -Xms256M -Xmx2G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/var/folders/yn/4k6t1qzn5kg3zwgbnf9qq_v40000gn/T/confluent.CYZjfRLm/connect/logs -Dlog4j.configuration=file:/Users/myuserid/confluent-5.3.1/bin/../etc/kafka/connect-log4j.properties -cp /Users/myuserid/confluent-5.3.1/share/java/kafka/*:/Users/myuserid/confluent-5.3.1/share/java/confluent-common/*:/Users/myuserid/confluent-5.3.1/share/java/kafka-serde-tools/*:/Users/myuserid/confluent-5.3.1/bin/../share/java/kafka/*:/Users/myuserid/confluent-5.3.1/bin/../support-metrics-client/build/dependant-libs-2.12.8/*:/Users/myuserid/confluent-5.3.1/bin/../support-metrics-client/build/libs/*:/usr/share/java/support-metrics-client/* org.apache.kafka.connect.cli.ConnectDistributed /var/folders/yn/4k6t1qzn5kg3zwgbnf9qq_v40000gn/T/confluent.CYZjfRLm/connect/connect.properties

回答by Shen liang

Connector module is executed by the kafka connector framework. For debugging, we can use the standalone mode. we can configure IDE to use the ConnectStandalone main function as entry point.

连接器模块由 kafka 连接器框架执行。对于调试,我们可以使用独立模式。我们可以将 IDE 配置为使用 ConnectStandalone 主函数作为入口点。

  1. create debug configure as the following. Need remember to tick "Include dependencies with "Provided" scope if it is maven project enter image description here

  2. connector properties file need specify the connector class name "connector.class" for debugging enter image description here

  3. worker properties file can copied from kafka folder /usr/local/etc/kafka/connect-standalone.properties
  1. 创建调试配置如下。如果是 Maven 项目,需要记住勾选“包含具有“提供”范围的依赖项 在此处输入图片说明

  2. 连接器属性文件需要指定连接器类名“connector.class”进行调试 在此处输入图片说明

  3. worker 属性文件可以从 kafka 文件夹 /usr/local/etc/kafka/connect-standalone.properties 复制