HTTP节点
1.概述
该节点用于执行 HTTP 类型的任务,例如常见的 POST、GET 等请求类型,基于相应状态码判断节点是否成功
2.节点配置
为当前的任务节点填写基本信息(非必填),在输入框中填写节点名称、描述,这些信息可以帮助您更好地理解和管理节点,使整个离线开发任务的流程更加清晰和易于维护。
- 请求地址:支持 GET/POST,默认为 GET,指定HTTP请求的URL地址
- 请求参数配置:
配置项 | 说明 |
请求头 | 由 key/value 组成,告诉服务端需要的资源类型。 |
认证 | 当前只支持 API 密钥、Token 令牌 2 种认证方式):API 密钥:API 密钥是赋予某种形式的秘密令牌的名称,与 Web 服务(或类似的)请求一起提交以识别请求的来源Token 令牌:Token 是指在指定有效时间内可以代表用户角色,具有请求接口的权限。 |
请求参数 | 需要获取的字段和条件,且支持全局参数详见全局参数、时间宏、工作流参数详见工作流参数。![]() |
请求体 | 只限 JSON 格式:封装 POST 请求消息的请求参数,支持全局参数、时间宏、工作流参数。![]() |
- 校验条件:满足所设条件后该节点将运行成功,响应状态码非2开头,比如200 201等,则报错结束。
- 回调轮询配置:当接口请求时间较长时可配置回调接口轮询访问请求状态。
配置项 | 说明 |
轮询间隔时长(秒) | 每间隔X秒,轮询一次 |
回调地址 | 支持 GET/POST,默认为 GET,指定回调的URL地址 |
请求头 | 由 key/value 组成,告诉服务端需要的资源类型。 |
认证 | 当前只支持 API 密钥、Token 令牌 2 种认证方式:API 密钥:API 密钥是赋予某种形式的秘密令牌的名称,与 Web 服务(或类似的)请求一起提交以识别请求的来源Token 令牌:Token 是指在指定有效时间内可以代表用户角色,具有请求接口的权限 |
请求参数 | 需要获取的字段和条件,且支持全局参数详见全局参数、时间宏、工作流参数详见工作流参数。 |
请求体 | 只限 JSON 格式:封装 POST 请求消息的请求参数,支持全局参数、时间宏、工作流参数。 |
回调注意事项:
- 主节点请求返回的参数必须为
{
"taskId": "xxxxxx",
...
}
-
bi会自动提取taskId,并拼接到回调地址上
例如该url,自动拼接后为:http://127.0.0.1:8004/status/xxxxxx
-
回调返回结果校验:
- 返回结果格式为: { "status": "xxxxxx" .......}
- 如果响应状态码非2开头,比如200 201等,则直接报错结束
- 如果status值为PROCESSING,则继续轮询,否则结束。status为SUCCESS表示成功,否则为失败
- 注意服务死循环问题
3.运行选项
-
运行标志
- 禁止执行:工作流运行至该节点后将直接跳过执行,常用于临时数据问题排查、部分任务运行控制等场景。
- 正常:按照既有调度策略运行该节点,节点默认运行标志。
-
失败重试:与数据集自身配置的失败重试策略保持同步。
4.通过HTTP调用python服务实践
Python API服务所需依赖
flask
request
jsonify
cachetools
Python 服务脚本
# -*- coding: utf-8 -*-
from flask import Flask, request, jsonify
from concurrent.futures import ThreadPoolExecutor
from cachetools import TTLCache
import time
import uuid
import io
import contextlib
import sys
app = Flask(__name__)
# 线程池(最大 10 个并发任务)
executor = ThreadPoolExecutor(max_workers=10)
# 2天转换为秒
two_days_in_seconds = 2 * 24 * 60 * 60
# 创建一个缓存对象,每个条目的TTL为2天
task_cache = TTLCache(maxsize=100, ttl=two_days_in_seconds)
def run_python_code(code, taskId):
""" 执行 Python 代码,并存储结果 """
try:
result = execute_code(code)
task_cache[taskId] = {"taskId": taskId, "status": "FINISHED", "log": result}
except Exception as e:
task_cache[taskId] = {"taskId": taskId, "status": "FAILED", "error": str(e)}
@app.route("/run_code", methods=["POST"])
def run_code():
""" 提交代码任务,异步执行 """
data = request.get_json()
code = data.get("code", "").strip()
if not code:
return jsonify({"error": "No code provided"}), 400
# 生成唯一 taskId
taskId = str(uuid.uuid4())
# 记录任务开始
task_cache[taskId] = {"taskId": taskId, "status": "PROCESSING"}
# 提交任务到线程池
executor.submit(run_python_code, code, taskId)
return jsonify({"taskId": taskId, "status": "PROCESSING"})
@app.route("/status/<taskId>", methods=["GET"])
def check_status(taskId):
""" 查询任务状态 """
if taskId not in task_cache:
return jsonify({"taskId": taskId, "status": "FAILED", "error": "Task not found"}), 404
task_info = task_cache[taskId]
if task_info["status"] == "FAILED":
return jsonify({"taskId": taskId, "status": "FAILED", "error": task_info["error"]}), 500
return jsonify(task_info)
# 执行代码的函数
def execute_code(code):
# 使用 StringIO 捕获 print 输出
output_buffer = io.StringIO()
try:
# 重定向标准输出到 output_buffer
with contextlib.redirect_stdout(output_buffer):
exec(code)
# 获取捕获的输出
output = output_buffer.getvalue()
return output.strip() # 返回捕获的输出
except Exception as e:
# 如果代码执行失败,返回错误信息
return f"Code execution failed: {str(e)}"
finally:
# 关闭 StringIO 缓冲区
output_buffer.close()
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8004, debug=True) # 监听所有地址,端口 8004
配置HTTP节点
在请求体中配置参数
配置回调接口