Skip to content

Kafka

往Kafka队列发送消息

业务属性

属性含义说明输入限定示例值
enable是否启用该控件,未启用时不编译是否编译选择值启用
name控件返回值命名控件输出值变量名常量kafka
text与控件关联的文本设计页控件显示常量sendKafkaMessage
kafka消息队列可选已在资源配置中配置的Kafka队列资源选择值myKafka
topictopic名称指定发送消息的topic常量/流程变量topic1
message消息内容发送的消息内容常量,Json数据节点可用流程变量/流程变量{"key":"#{value}"}
key消息分区分配的密钥决定消息发送的分区常量/流程变量#{key}
throwable发生异常时中断请求启用:控件执行异常时立即中断请求,并返回错误
禁用:控件执行异常时将被忽略,并继续执行其后的流程
选择值启用

输出值

  • 消息发送成功时输出值

控件返回以属性name为名称的实体,该实体包含属性result,并且resultboolean类型true

  • throwable为“禁用”,消息发送失败时输出值

控件返回以属性name为名称的实体,该实体包含表示异常信息的属性err(此时属性resultnil)。

使用示例

假设已创建Kafka资源,资源名称为:myKafka,需要向主题名称为exampleTopic发送消息。

发送常量消息

  • Kafka控件设置

添加Kafka控件并命名为sendResult,属性设置为:

kafkamykafkatopicexampleTopic

messagethis is a example message

img_kafka_1.png
  • 输出发送结果

添加输出控件Returndata设置为:

json
{
  "sendResult": "#{sendResult}"
}
{
  "sendResult": "#{sendResult}"
}
img_kafka_2.png
  • 接口调用

使用curl请求接口:

$ curl http://localhost:6636/api/kafka
$ curl http://localhost:6636/api/kafka

接口返回:

json
{
    "sendResult": {
        "result": true
    }
}
{
    "sendResult": {
        "result": true
    }
}

使用流程变量发送消息

  • Kafka控件设置

在前面示例的基础上,修改message属性设置为:

json
{
  "message": "#{body.message}"
}
{
  "message": "#{body.message}"
}
img_kafka_3.png
  • 接口调用

使用curl请求接口:

$ curl http://localhost:6636/api/kafka -d "{\"message\":\"This is an example message sent to kafka\"}"
$ curl http://localhost:6636/api/kafka -d "{\"message\":\"This is an example message sent to kafka\"}"

接口返回:

json
{
    "sendResult": {
        "result": true
    }
}
{
    "sendResult": {
        "result": true
    }
}