¡Hola, desarrolladores!
En esta serie de artículos hemos hablado del framework iris-datapipe, de cómo nos ayuda a crear "pipes" de datos para la ingesta y procesamiento, y de cómo instalarlo. Vamos a profundizar en cómo implementar uno de esos "pipes" paso a paso.
Si llegaste directamente a este artículo, te recomiendo revisar los anteriores y recordar que iris-datapipe incluye un QuickStart para que puedas explorar sus funcionalidades rápidamente 👌.
El ejemplo que abordaremos está incluido en el QuickStart, por lo que puedes utilizarlo como referencia.
Definir un nuevo Pipe
Comienza por definir un nuevo Pipe en la interfaz gráfica. Básicamente, solo se trata de asignar un código y una descripción.
Opcionalmente, puedes especificar un recurso de seguridad de IRIS que se requerirá para poder operar con ese Pipe (esto es útil si necesitas crear pipes que solo sean accesibles a determinados usuarios).
En el ejemplo, definimos un pipe llamado REST-API
, que se encargará de procesar datos recibidos desde una API REST, donde llegarán datos sobre personas.
Procesamiento de los datos
Para procesar los datos utilizando un "pipe", necesitamos seguir los siguientes pasos:
1) Definir un modelo DataPipe
Debemos definir un modelo para los datos que queremos procesar.
Un modelo no es más que una clase que hereda o extiende de DataPipe.Model.cls donde tendrás que implementar algunos métodos.
Tu modelo debe implementar:
- Cómo serializar/deserializar tus datos (por ejemplo, usando XML o JSON).
- Cómo normalizar y validar tus datos.
- Y finalmente, qué operación quieres ejecutar sobre tus datos una vez están normalizados y validados.
En mi ejemplo, el modelo que utilizaré será DataPipe.Test.REST.Models.Person.cls.
Dado que el modelo es una clase convencional en InterSystems IRIS, puedes añadir herencia u otro comportamiento que necesites. En mi caso, heredo de DataPipe.Test.REST.Msg.PersonData, donde tengo definidas las propiedades que me interesa tratar.
El modelo DataPipe.Test.REST.Models.Person.cls tiene diferentes métodos:
Serialize
,Deserialize
: Se utilizan para indicar cómo serializar/deserializar el modelo. En este caso, utilizo JSON.Normalize
: Especifica cómo quiero normalizar el modelo. En mi caso, solo quiero llamar a una transformación de datos.
/// Normalize model
Method Normalize(Output obj As DataPipe.Model) As %Status
{
set ret = $$$OK
try {
// call normalizaton data transform
set sc = $classmethod("DataPipe.Test.REST.DT.PersonNormalize", "Transform", $this, .obj)
$$$ThrowOnError(sc)
} catch ex {
set ret = ex.AsStatus()
}
quit ret
}
Validate
: Indica cómo quiero validar si mi modelo es correcto o no. Puedo añadir "warnings" también. Puedes implementar lo que necesites:
/// Validate model
Method Validate(Output errorList As %List) As %Status
{
#define AddError(%list, %code, %desc) set error = ##class(DataPipe.Data.ErrorInfo).%New() set error.Code=%code set error.Desc=%desc do %list.Insert(error)
set ret = $$$OK
try {
set errorList = ##class(%ListOfObjects).%New()
// date of birth
if ..DOB="" {
$$$AddError(errorList, "V001", "DOB required")
} else {
set yearDOB = $extract($zdate(..DOB,8),1,4)
if (yearDOB < 1930) $$$AddError(errorList, "V002", "DOB must be greater than 1930")
if (yearDOB < 1983) $$$AddError(errorList, "W083", "Warning! Older than 1983")
}
// model is invalid if errors (not warnings) found
for i=1:1:errorList.Count() {
set error = errorList.GetAt(i)
set errorCode = error.Code
// in this sample model, all warnings start with "W"
if errorCode'["W" {
$$$ThrowStatus($$$ERROR($$$GeneralError, "Invalid"))
}
}
} catch ex {
set ret = ex.AsStatus()
}
quit ret
}
GetOperation
: Indica qué Business Operation quiero que ejecute la operación sobre los datos (lo entenderás mejor cuando comentemos los componentes de interoperabilidad).
/// Return the Business Operation name that will run the operation with the model
/// Each Business Operation can be used to hold different queues
Method GetOperation() As %Status
{
quit "Person Operation"
}
RunOperation
: Esta es la operación que quiero ejecutar sobre mis datos. Puedo guardarlos en la base de datos o llamar a otro componente de interoperabilidad para continuar el procesamiento más adelante. En mi ejemplo, los guardo en una global y también los envío a otro Business Process.
/// Run final operation with the model
/// This method can be used to persit data from the model to an operational data store
Method RunOperation(Output errorList As %List, Output log As %Stream.Object, bOperation As Ens.BusinessOperation = "", Output delayedProcessing As %Boolean = 0) As %Status
{
#define AddError(%list, %code, %desc) set error = ##class(DataPipe.Data.ErrorInfo).%New() set error.Code=%code set error.Desc=%desc do %list.Insert(error)
#define AddLog(%log, %msg) do %log.WriteLine("["_$zdt($h,3)_"] "_%msg)
set errorList = ##class(%ListOfObjects).%New()
set log = ##class(%Stream.GlobalCharacter).%New()
set ret = $$$OK
try {
TSTART
$$$AddLog(log, "Transaction Started")
// simulate an operation error
if ##class(Ens.Util.FunctionSet).In(..Name, ##class(DataPipe.Test.HL7.Helper).OperationErrorNames()) {
$$$ThrowStatus($$$ERROR($$$GeneralError, "Simulated Operation Error"))
}
// store serialized model
$$$ThrowOnError(..Serialize(.stream))
set ^zDataPipe($i(^zDataPipe)) = stream.Read()
$$$AddLog(log, "Model Stored in ^zDataPipe("_$get(^zDataPipe)_")")
TCOMMIT
$$$AddLog(log, "Transaction Commited")
// you can send messages to other production components (while you are not on an open transaction)
// you can use this feature to continue processing the record in other component (delayed processing)
set delayedProcessing = 1
if $isobject(bOperation) {
set req = bOperation.OperRequest
$$$ThrowOnError(bOperation.SendRequestAsync("REST Delayed Oper Update", req))
}
} catch ex {
TROLLBACK
$$$AddLog(log, "Rollback!")
set ret = ex.AsStatus()
$$$AddLog(log, "Error catched: "_$system.Status.GetOneStatusText(ret))
// include exception errors into errorList
do $system.Status.DecomposeStatus(ret, .errors)
for i=1:1:errors {
$$$AddError(errorList, "Exception", errors(i))
}
}
quit ret
}
2) Añadir componentes de interoperabilidad
Después de definir tu modelo, necesitas configurar una producción de interoperabilidad usando componentes de DataPipe. iris-datapipe incluye componentes preconstruidos que pueden operar con un modelo DataPipe como el que hemos definido anteriormente.
El único proceso que debes implementar es el proceso de ingestión.
2.1) Crear un Proceso de Ingestión
Necesitas crear un nuevo Business Process que utilice como contexto DataPipe.Ingestion.BP.IngestionManagerContext.
Este proceso recibirá la entrada de datos que decidas (en este ejemplo, el mensaje que envía una API REST que actúa como Business Service) y debe implementar:
Identificación de los datos que procesas (InboxAttributes):
- Debes identificar el registro que estás procesando y proporcionar los InboxAttributes.
- El "pipe" al que pertenece este registro debe indicarse en este momento.
- Estos atributos describirán el registro que estás tratando y luego se utilizarán para la búsqueda desde la interfaz gráfica.
- Para realizar lo anterior, puedes utilizar transformaciones de datos, código, ¡lo que necesites!
Convertir los datos de entrada en el modelo de datos que definiste previamente:
- Debes utilizar transformaciones, código o lo que prefieras para transformar la información de entrada al modelo de datos que has desarrollado previamente.
2.2) Añadir el resto de componentes
El resto de los componentes de interoperabilidad son proporcionados por iris-datapipe y ya están preconstruidos. Estos componentes llamarán a los distintos métodos implementados en tu modelo.
En general, necesitarás añadir a la producción, por cada "pipe" diferente que quieras implementar:
- El proceso de ingestión que recibe los datos, proporciona los InboxAttributes y transforma esos datos al modelo correspondiente.
- Un proceso DataPipe.Staging.BP.StagingManager.
- Un proceso DataPipe.Oper.BP.OperManager.
- Y una operación DataPipe.Oper.BO.OperationHandler.
Aquí tienes la producción de ejemplo que se utiliza en el QuickStart.
Con todo esto, cuando comiencen a llegar datos y se procesen, podrás verlos directamente desde la interfaz gráfica.
¡Espero que os sea útil!