Saltearse al contenido

Motor del Workflow - Funcionamiento General

Un workflow es una secuencia de nodos conectados que procesan datos de forma ordenada. Cada nodo realiza una accion especifica (llamada HTTP, decision logica, transformacion de datos, consulta SQL, envio de correo, etc.) y pasa el resultado al siguiente nodo en la cadena.

El flujo de ejecucion es dirigido: los datos entran por un nodo inicial (trigger), se transforman y enriquecen a medida que pasan por cada nodo, y finalmente alcanzan un nodo terminal o se detienen por una condicion especifica.

Los workflows permiten automatizar procesos de negocio sin necesidad de escribir codigo, conectando modulos predefinidos mediante una interfaz visual de editor.


El motor utiliza un modelo de ejecucion basado en cola (queue-based execution). El proceso general es el siguiente:

La funcion principal es:

processWorkflowV1(workflowName, workflowId, queryParams, webHook)

Parametros:

  • workflowName: Nombre identificador del workflow a ejecutar.
  • workflowId: Identificador unico de la ejecucion. Si no se proporciona, se genera automaticamente.
  • queryParams: Parametros iniciales que se inyectan como datos de entrada.
  • webHook: Objeto de respuesta HTTP (cuando el workflow se inicia por webhook).
  1. Carga de la definicion: Se consulta la tabla workflows y se obtiene la columna workflow_json, que contiene toda la estructura del workflow (nodos, conexiones, configuraciones).
  2. Generacion de workflowId: Se genera un identificador unico (UUID) para esta ejecucion particular, permitiendo rastrear cada ejecucion de forma independiente.
  3. Carga de variables de memoria: Se consulta la tabla memorystorage para obtener las variables persistentes asociadas al workflow.
  4. Inicializacion del contexto de ejecucion: Se construye un objeto de contexto que incluye el mapeo de nodeLabels (etiquetas legibles de cada nodo), el historial de pasos (steps), las variables de memoria y el workflowId.

Cada modulo (nodo) recibe tres argumentos y devuelve un objeto con el resultado:

modulo(data, config, context)
  • data: La salida del nodo anterior. Es el dato principal que el nodo actual debe procesar. En el caso del primer nodo, corresponde a los datos iniciales del trigger.
  • config: La configuracion del nodo. Contiene los parametros definidos por el usuario en el editor visual (por ejemplo, la URL de una peticion HTTP, la consulta SQL, las condiciones de una decision, etc.).
  • context: El contexto de ejecucion. Incluye:
    • workflowId: Identificador unico de esta ejecucion.
    • steps: Array con el historial acumulado de ejecucion (resultados de nodos anteriores).
    • nodeLabels: Mapeo de identificadores de nodos a sus etiquetas legibles.
    • memoryVariables: Variables persistentes cargadas desde memorystorage.
{
nextModule: "id_del_siguiente_nodo", // o array, o null
data: { ... }, // datos de salida
error: null, // objeto de error si fallo
_meta_: { ... } // metadatos opcionales
}
  • nextModule: Determina que nodo se ejecuta a continuacion.
  • data: Los datos de salida que se convierten en la entrada del siguiente nodo.
  • error: Objeto de error en caso de fallo (null si fue exitoso).
  • meta: Metadatos opcionales del modulo (tiempo de ejecucion, flags, etc.).

El motor utiliza una cola FIFO (First In, First Out) llamada moduleQueue para gestionar el orden de ejecucion de los nodos.

1. Inicializar moduleQueue con el nodo de inicio (start)
2. Mientras moduleQueue no este vacia:
a. Tomar el siguiente elemento de la cola
b. Resolver variables dinamicas en la configuracion del nodo
c. Cargar el codigo del modulo correspondiente al tipo de nodo
d. Ejecutar el modulo con (data, config, context)
e. Almacenar el resultado en la tabla workflowstates
f. Agregar el resultado al array de steps del contexto
g. Determinar el/los siguiente(s) modulo(s)
h. Insertar el/los siguiente(s) modulo(s) en la cola
3. Fin de la ejecucion

Antes de ejecutar cada nodo, el motor resuelve las variables dinamicas presentes en la configuracion. Estas variables permiten referenciar datos de nodos anteriores o variables de memoria. El formato tipico es {{nombreVariable}} o referencias a pasos previos como {{steps.nodo_anterior.campo}}.

Tras cada ejecucion de nodo, el resultado se persiste en la tabla workflowstates con informacion como:

  • Identificador del workflow y del nodo
  • Estado de ejecucion (success, error)
  • Datos de entrada y salida
  • Tiempo de respuesta en milisegundos

El valor de nextModule retornado por cada nodo determina como continua el flujo:

El nodo devuelve un unico identificador. La ejecucion continua de forma secuencial al nodo indicado.

nextModule: "http_2"

El nodo devuelve multiples identificadores. Cada uno se agrega a la cola y se ejecutan en orden.

nextModule: ["rama_a_1", "rama_b_1"]

Los nodos de tipo decision evaluan una condicion y retornan la ruta correspondiente:

  • Si la condicion es verdadera: sigue por truePath
  • Si la condicion es falsa: sigue por falsePath
nextModule: condicion ? config.truePath : config.falsePath

Cuando nextModule es null, la rama actual termina. Si hay otros elementos en la cola, la ejecucion continua con ellos.

nextModule: null

Cuando un nodo retorna el flag _stopflowprocess, se detiene completamente la ejecucion del workflow y se retorna la respuesta HTTP al cliente. Este flag se usa para workflows iniciados por webhook que necesitan responder inmediatamente.


La definicion de un workflow se almacena como JSON en la columna workflow_json de la tabla workflows. La estructura general es la siguiente:

{
"id": "123",
"name": "mi_workflow",
"start": "start_1",
"active_schedule": true,
"schedule": "0 9 * * *",
"modules": {
"start_1": {
"type": "start",
"config": {
"label": "Inicio",
"nextModule": "http_2"
}
},
"http_2": {
"type": "http",
"config": {
"label": "Consultar API",
"method": "GET",
"url": "https://api.ejemplo.com/datos",
"headers": {},
"nextModule": "decision_3"
}
},
"decision_3": {
"type": "decision",
"config": {
"label": "Verificar resultado",
"conditions": [],
"truePath": "mail_4",
"falsePath": "end_5"
}
},
"mail_4": {
"type": "sendmail",
"config": {
"label": "Enviar notificacion",
"to": "admin@ejemplo.com",
"subject": "Resultado",
"nextModule": "end_5"
}
},
"end_5": {
"type": "end",
"config": {
"label": "Fin"
}
}
}
}
CampoDescripcion
idIdentificador unico del workflow
nameNombre del workflow (usado como referencia en la URL o programacion)
startIdentificador del nodo inicial
active_scheduleSi el workflow tiene programacion activa (true/false)
scheduleExpresion cron para ejecucion programada
modulesObjeto con todos los nodos del workflow, indexados por su identificador
CampoDescripcion
typeTipo de modulo (start, http, decision, sendmail, sql, etc.)
configConfiguracion especifica del modulo
config.labelEtiqueta visible del nodo en el editor
config.nextModuleSiguiente nodo a ejecutar

El motor implementa varios mecanismos para gestionar errores durante la ejecucion.

Por defecto, cada nodo tiene hasta 3 reintentos en caso de fallo. El motor reintenta la ejecucion del nodo antes de marcarlo como fallido definitivamente.

Cuando un nodo tiene activado el flag continueOnError, la ejecucion del workflow continua al siguiente nodo aunque el nodo actual haya fallado. Esto es util para nodos no criticos donde un fallo no debe detener todo el proceso.

Tipo de errorDescripcion
EMPTY_INPUT_DATAEl nodo recibio datos de entrada vacios o nulos
EMPTY_RETURN_DATAEl nodo no devolvio datos de salida
UNKNOWN_ERRORError no clasificado o inesperado

Todos los errores se registran en la tabla workflowstates junto con:

  • El tipo de error
  • El mensaje descriptivo
  • Los datos de entrada que causaron el fallo
  • El timestamp de la ocurrencia

Esto permite diagnosticar problemas revisando el historial de ejecucion de cada nodo.


El motor proporciona dos mecanismos de seguimiento para monitorear y depurar las ejecuciones.

Registra la ejecucion de cada nodo individual. Cada fila contiene:

CampoDescripcion
workflow_idIdentificador unico de la ejecucion
node_idIdentificador del nodo ejecutado
statusEstado de la ejecucion (success, error)
dataDatos de salida del nodo
response_time_msTiempo de ejecucion del nodo en milisegundos
created_atFecha y hora de la ejecucion

Almacena trazas detalladas con diferentes niveles de severidad:

NivelUso
TRACEInformacion de bajo nivel, flujo detallado de ejecucion
INFOEventos informativos generales
WARNAdvertencias que no detienen la ejecucion
ERRORErrores que afectan la ejecucion del nodo o workflow
DEBUGInformacion de depuracion para desarrollo

Durante la ejecucion, el contexto mantiene un array steps que acumula el resultado de cada nodo ejecutado. Este array esta disponible para todos los nodos posteriores, permitiendo acceder a datos de cualquier nodo anterior en el flujo.

context.steps = [
{ nodeId: "start_1", data: { ... }, status: "success" },
{ nodeId: "http_2", data: { ... }, status: "success" },
// ... mas pasos
]

El motor reconoce varios flags especiales que alteran el comportamiento normal de la ejecucion.

Detiene completamente la ejecucion del workflow y retorna una respuesta HTTP al cliente. Se usa principalmente en workflows iniciados por webhook donde se necesita enviar una respuesta inmediata.

{
nextModule: null,
data: { resultado: "ok" },
_stopflowprocess: true
}

Cuando este flag esta activo, la cola de ejecucion se vacia y los datos del nodo se envian como respuesta HTTP.

Permite enviar una respuesta al cliente HTTP sin detener la ejecucion del workflow. El workflow continua procesando en segundo plano mientras el cliente ya recibio su respuesta.

{
nextModule: "siguiente_nodo",
data: { ... },
_respondata: { mensaje: "Recibido, procesando..." }
}

Indica que el nodo actual esta en modo de iteracion por lotes. El motor ejecuta el mismo nodo (o subconjunto de nodos) multiples veces, una vez por cada elemento de un array de datos.

Contiene los resultados acumulados de cada iteracion cuando se usa el modo de iteracion por lotes. Al finalizar todas las iteraciones, este array se pasa al nodo merge para consolidar los resultados.

{
_batchSteps: [
{ iteration: 0, data: { ... } },
{ iteration: 1, data: { ... } },
// ... una entrada por cada elemento iterado
]
}

El siguiente diagrama ASCII muestra un ejemplo tipico de ejecucion de un workflow con decision:

+----------+
| START |
+----+-----+
|
v
+----------+
| Node 1 |
| (HTTP) |
+----+-----+
|
v
+-----------+
/ Decision \
/ (condicion) \
+--------+--------+
| |
true | | false
v v
+----------+ +----------+
| Node 2 | | Node 3 |
| (SendMail)| | (Logger) |
+----+-----+ +----+-----+
| |
v v
+----------------------+
| MERGE |
+----------+-----------+
|
v
+----------+
| END |
+----------+
  1. START: El trigger inicia la ejecucion y proporciona los datos iniciales.
  2. Node 1 (HTTP): Realiza una peticion HTTP a una API externa y pasa la respuesta al siguiente nodo.
  3. Decision: Evalua una condicion sobre los datos recibidos. Si es verdadera, continua por la rama izquierda (true); si es falsa, por la derecha (false).
  4. Node 2 (SendMail): Envia un correo electronico con los datos obtenidos.
  5. Node 3 (Logger): Registra los datos para auditoria.
  6. MERGE: Consolida las ramas paralelas en un unico flujo.
  7. END: Finaliza la ejecucion del workflow.