跳到主要内容

HTTP节点

1.概述

该节点用于执行 HTTP 类型的任务,例如常见的 POST、GET 等请求类型,基于相应状态码判断节点是否成功

2.节点配置

为当前的任务节点填写基本信息(非必填),在输入框中填写节点名称、描述,这些信息可以帮助您更好地理解和管理节点,使整个离线开发任务的流程更加清晰和易于维护。

  • 请求地址:支持 GET/POST,默认为 GET,指定HTTP请求的URL地址
  • 请求参数配置:
配置项说明
请求头由 key/value 组成,告诉服务端需要的资源类型。
认证当前只支持 API 密钥、Token 令牌 2 种认证方式):API 密钥:API 密钥是赋予某种形式的秘密令牌的名称,与 Web 服务(或类似的)请求一起提交以识别请求的来源Token 令牌:Token 是指在指定有效时间内可以代表用户角色,具有请求接口的权限。
请求参数需要获取的字段和条件,且支持全局参数详见全局参数、时间宏、工作流参数详见工作流参数

请求体只限 JSON 格式:封装 POST 请求消息的请求参数,支持全局参数、时间宏、工作流参数。

  • 校验条件:满足所设条件后该节点将运行成功,响应状态码非2开头,比如200 201等,则报错结束。
  • 回调轮询配置:当接口请求时间较长时可配置回调接口轮询访问请求状态。
配置项说明
轮询间隔时长(秒)每间隔X秒,轮询一次
回调地址支持 GET/POST,默认为 GET,指定回调的URL地址
请求头由 key/value 组成,告诉服务端需要的资源类型。
认证当前只支持 API 密钥、Token 令牌 2 种认证方式:API 密钥:API 密钥是赋予某种形式的秘密令牌的名称,与 Web 服务(或类似的)请求一起提交以识别请求的来源Token 令牌:Token 是指在指定有效时间内可以代表用户角色,具有请求接口的权限
请求参数需要获取的字段和条件,且支持全局参数详见全局参数、时间宏、工作流参数详见工作流参数
请求体只限 JSON 格式:封装 POST 请求消息的请求参数,支持全局参数、时间宏、工作流参数。

回调注意事项:

  1. 主节点请求返回的参数必须为
{
"taskId": "xxxxxx",
...
}
  1. bi会自动提取taskId,并拼接到回调地址上

    例如该url,自动拼接后为:http://127.0.0.1:8004/status/xxxxxx

  2. 回调返回结果校验:

    1. 返回结果格式为: { "status": "xxxxxx" .......}
    2. 如果响应状态码非2开头,比如200 201等,则直接报错结束
    3. 如果status值为PROCESSING,则继续轮询,否则结束。status为SUCCESS表示成功,否则为失败
    4. 注意服务死循环问题

3.运行选项

  • 运行标志

    • 禁止执行:工作流运行至该节点后将直接跳过执行,常用于临时数据问题排查、部分任务运行控制等场景。
    • 正常:按照既有调度策略运行该节点,节点默认运行标志。
  • 失败重试:与数据集自身配置的失败重试策略保持同步。

4.通过HTTP调用python服务实践

Python API服务所需依赖

flask
request
jsonify
cachetools

Python 服务脚本

# -*- coding: utf-8 -*-

from flask import Flask, request, jsonify
from concurrent.futures import ThreadPoolExecutor
from cachetools import TTLCache
import time
import uuid
import io
import contextlib
import sys


app = Flask(__name__)

# 线程池(最大 10 个并发任务)
executor = ThreadPoolExecutor(max_workers=10)

# 2天转换为秒
two_days_in_seconds = 2 * 24 * 60 * 60

# 创建一个缓存对象,每个条目的TTL为2天
task_cache = TTLCache(maxsize=100, ttl=two_days_in_seconds)
def run_python_code(code, taskId):
""" 执行 Python 代码,并存储结果 """
try:
result = execute_code(code)
task_cache[taskId] = {"taskId": taskId, "status": "FINISHED", "log": result}
except Exception as e:
task_cache[taskId] = {"taskId": taskId, "status": "FAILED", "error": str(e)}

@app.route("/run_code", methods=["POST"])
def run_code():
""" 提交代码任务,异步执行 """
data = request.get_json()
code = data.get("code", "").strip()

if not code:
return jsonify({"error": "No code provided"}), 400

# 生成唯一 taskId
taskId = str(uuid.uuid4())

# 记录任务开始
task_cache[taskId] = {"taskId": taskId, "status": "PROCESSING"}

# 提交任务到线程池
executor.submit(run_python_code, code, taskId)

return jsonify({"taskId": taskId, "status": "PROCESSING"})

@app.route("/status/<taskId>", methods=["GET"])
def check_status(taskId):
""" 查询任务状态 """
if taskId not in task_cache:
return jsonify({"taskId": taskId, "status": "FAILED", "error": "Task not found"}), 404

task_info = task_cache[taskId]

if task_info["status"] == "FAILED":
return jsonify({"taskId": taskId, "status": "FAILED", "error": task_info["error"]}), 500
return jsonify(task_info)

# 执行代码的函数
def execute_code(code):
# 使用 StringIO 捕获 print 输出
output_buffer = io.StringIO()
try:
# 重定向标准输出到 output_buffer
with contextlib.redirect_stdout(output_buffer):
exec(code)
# 获取捕获的输出
output = output_buffer.getvalue()
return output.strip() # 返回捕获的输出
except Exception as e:
# 如果代码执行失败,返回错误信息
return f"Code execution failed: {str(e)}"
finally:
# 关闭 StringIO 缓冲区
output_buffer.close()


if __name__ == "__main__":
app.run(host="0.0.0.0", port=8004, debug=True) # 监听所有地址,端口 8004

配置HTTP节点

在请求体中配置参数

配置回调接口

文档AI助手
观远AI助手关闭