Section 13 - Real time Datablock processors
Go back to Getting started guide
In this section we will:
- show how to create a Data Block processor to distribute data in real time.
-
please ensure we completed all previous sections before proceeding
Description Config Reference Artifacts Generated files Required prerequisites
Processing Pipeline
Ready for real time processing?
- If you completed all previous sections you would have, implemented data cleansers, data processors and data validators validators as illustrated below.
- We also implemented
Datablock collections
,Data Sinks
and scheduledDistributions
. - This means that all the cleansed, validated and enriched data will be distributed in a
batch
manner. -
Note: Our current implementation distributed collections of data not single records.
-
In this section, we will show how to
publish
each record as it becomes ready, in real time. -
In order to implement real time processing we need to update the
CUSTOMER
schema. - All we have to do is to add the following
Data Block
processor as showin below1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
<schemaAppliedProcessors> <dataBlockProcessors> <dataBlockProcessor name="PUBLISH_CUSTOMER_DATA" entity="ADHOC_JSON_SINK"> <config> <![CDATA[ { "jsonPayload": { "bac": "#GRV{ CTX['BAC'] }", "first_name": "#GRV{ CTX['FIRST_NAME'] }", "last_name": "#GRV{ CTX['LAST_NAME'] }", "address": "#GRV{ CTX['ADDRESS'] }", "phone_number": "#GRV{ CTX['PHONE_NUMBER'] }", "age": "#GRV{ CTX['AGE'] }", "yearly_income": "#GRV{ CTX['YEARLY_INCOME'] }", "tfn": "#GRV{ CTX['TFN'] }", "portfolio_value": "#GRV{ CTX['PORTFOLIO_VALUE'] }" }, "dataSink": { "name": "adhoc", "entity": "HAZELCAST_CLUSTER_QUEUE", "config" : { "queueName": "CUSTOMER_DATA" } } } ]]> </config> </dataBlockProcessor> </dataBlockProcessors> </schemaAppliedProcessors>
- This resulting file is shown below in the next section below
- Congradulations! You have just implemented real time processing by publishing into a Hazelcast queue every time a record becomes available.
- Note: Please note that the Hazelcast queue needs to be setup manually before you can start sending data to it. This is out of the scope of this section.
Configuration files
Completed configuration files
- This is the updated SCEHMA_CUSTOMER.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <apiroConf version="1" xmlns="http://apiro.com/apiro/v1/root"> <loadOrder>15</loadOrder> <schemas> <schema defBacked="false" historical="false" name="CUSTOMER"> <identityKeys> <identityKey>BAC</identityKey> </identityKeys> <dataPoints> <dataPoint name="BAC" displayName="Bank Account Code" dataType="STRING" canEditValid="false" canEditViolated="false"> <nullable>false</nullable> <rawDPValidators> <rawDPValidator name="IN_BAC_SET_CHECK " entity="IN_SET"> <config> <![CDATA[ { ignoreCase : "true", options : [ "AAAA1111", "BBBB2222", "CCCC2222" ] } ]]> </config> </rawDPValidator> </rawDPValidators> <rawDPProcessors/> <consolidationAlgorithm/> <consDPValidators/> <consDPProcessors/> </dataPoint> <dataPoint displayName="First Name" name="FIRST_NAME" dataType="STRING" > <rawDPProcessors> <rawDPProcessor name="CAPITALISE_LAST_NAME_RAW_PROC" entity="GEN_EXPRESS"> <config> <![CDATA[ #GRV{ CTX['.'] = CTX['.'].toUpperCase() } ]]> </config> </rawDPProcessor> </rawDPProcessors> </dataPoint> <dataPoint displayName="Last Name" name="LAST_NAME" dataType="STRING" > </dataPoint> <dataPoint displayName="Address" name="ADDRESS" dataType="STRING" > </dataPoint> <dataPoint displayName="Phone Number" name="PHONE_NUMBER" dataType="STRING" > </dataPoint> <dataPoint displayName="Age" name="AGE" dataType="INTEGER" > <rawDPValidators> <rawDPValidator name="INVALID_IF_NULL" entity="NOT_NULL"/> <rawDPValidator name="INVALID_IF_NEGATIVE" entity="POSITIVE"> <lateBound>false</lateBound> </rawDPValidator> </rawDPValidators> </dataPoint> <dataPoint displayName="Yearly Income" name="YEARLY_INCOME" dataType="DECIMAL" > </dataPoint> <dataPoint displayName="Tax File Number" name="TFN" dataType="STRING" > </dataPoint> <dataPoint displayName="Tax File Number Masked" name="TFN_MASKED" dataType="STRING" > <consDPProcessors> <consDPProcessor name="TFN_HASH_MASKING" entity="HASH_MASK"> <config> <![CDATA[ { "inputValue":"CTX['TFN']", "maskingSalt":"aqQwSxXcfgdejhbJhdygjyfdghjHGYYIdh!66gydshasGY!" } ]]> </config> </consDPProcessor> </consDPProcessors> </dataPoint> <dataPoint displayName="Investment Portfolio Value" name="PORTFOLIO_VALUE" dataType="DECIMAL" > <rawDPValidators> <rawDPValidator name="CHECK_HIGH_PORTFOLIO_VALUE" entity="GEN_EXPRESS"> <config> <![CDATA[ #GRV{// CTX["."] refers to the current DP, eg.PORTFOLIO_VALUE. eg. CTX[AGE] would refer to the AGE data point if((CTX["."]>100000000)){ // If this condition is met return true; // A violation with the name `CHECK_HIGH_PORTFOLIO_VALUE` is raised } return false; } ]]> </config> </rawDPValidator> </rawDPValidators> <consolidationAlgorithm name="PORTF_VALUE_WEIGHTED_MEAN_01" entity="GEN_EXPRESS"> <config> <![CDATA[ #GRV{ return ( (items.get(CUSTOMERS_A_XLSX) * 0.8) + (items.get(CUSTOMERS_A_XLSX) * 0.2))/2 ]]> </config> </consolidationAlgorithm> <consDPValidators> <consDPValidator name="PORTF_VALUE_HISTORICAL_VALIDATOR" entity="HISTORICAL_SHIFT"> <config> <![CDATA[ { "priorValues" : 5, "percent" : 5.2, "comparisonMore" : true } ]]> </config> </consDPValidator> </consDPValidators> </dataPoint> <dataPoint displayName="Company Name" name="COMPANY_NAME" dataType="STRING" > </dataPoint> <dataPoint displayName="Company Address" name="COMPANY_ADDRESS" dataType="STRING" > </dataPoint> <dataPoint displayName="XML Root Doc" name="xmlRootDoc" dataType="XML" > </dataPoint> <dataPoint displayName="JSON Root Doc" name="jsonRootDoc" dataType="JSON" > </dataPoint> <dataPoint displayName="Profile Image" name="PROFILE_IMAGE" dataType="BLOB" > </dataPoint> </dataPoints> <schemaAppliedProcessors> <dataBlockProcessors>> <dataBlockProcessor name="PUBLISH_CUSTOMER_DATA" entity="ADHOC_JSON_SINK"> <config> <![CDATA[ { "jsonPayload": { "bac": "#GRV{ CTX['BAC'] }", "first_name": "#GRV{ CTX['FIRST_NAME'] }", "last_name": "#GRV{ CTX['LAST_NAME'] }", "address": "#GRV{ CTX['ADDRESS'] }", "phone_number": "#GRV{ CTX['PHONE_NUMBER'] }", "age": "#GRV{ CTX['AGE'] }", "yearly_income": "#GRV{ CTX['YEARLY_INCOME'] }", "tfn": "#GRV{ CTX['TFN'] }", "portfolio_value": "#GRV{ CTX['PORTFOLIO_VALUE'] }" }, "dataSink": { "name": "adhoc", "entity": "HAZELCAST_CLUSTER_QUEUE", "config" : { "queueName": "CUSTOMER_DATA" } } } ]]> </config> </dataBlockProcessor> </dataBlockProcessors> </schemaAppliedProcessors> </schema> </schemas> </apiroConf>
Deploy config files
Follow these steps Config Deployment to deploy and start using your configuration files.