Skip to content

Section 13 - Real time Datablock processors

Go back to Getting started guide

In this section we will:

Processing Pipeline

Ready for real time processing?
  1. If you completed all previous sections you would have, implemented data cleansers, data processors and data validators validators as illustrated below.
  2. We also implemented Datablock collections, Data Sinks and scheduled Distributions.
  3. This means that all the cleansed, validated and enriched data will be distributed in a batch manner.
  4. Note: Our current implementation distributed collections of data not single records.

    provide_diagram

  5. In this section, we will show how to publish each record as it becomes ready, in real time.

    provide_diagram

  6. In order to implement real time processing we need to update the CUSTOMER schema.

  7. All we have to do is to add the following Data Block processor as showin below
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
            <schemaAppliedProcessors>
                <dataBlockProcessors>
                    <dataBlockProcessor name="PUBLISH_CUSTOMER_DATA" entity="ADHOC_JSON_SINK">
                        <config>
                            <![CDATA[
                                {
                                    "jsonPayload": {
                                        "bac": "#GRV{ CTX['BAC'] }",
                                        "first_name": "#GRV{ CTX['FIRST_NAME'] }",
                                        "last_name": "#GRV{ CTX['LAST_NAME'] }",
                                        "address": "#GRV{ CTX['ADDRESS'] }",
                                        "phone_number": "#GRV{ CTX['PHONE_NUMBER'] }",
                                        "age": "#GRV{ CTX['AGE'] }",
                                        "yearly_income": "#GRV{ CTX['YEARLY_INCOME'] }",
                                        "tfn": "#GRV{ CTX['TFN'] }",
                                        "portfolio_value": "#GRV{ CTX['PORTFOLIO_VALUE'] }"
                                    },
                                    "dataSink": {
                                        "name": "adhoc",
                                        "entity": "HAZELCAST_CLUSTER_QUEUE",
                                        "config" : {
                                            "queueName": "CUSTOMER_DATA"
                                        }
                                    }
                                }
                        ]]>
                        </config>
                    </dataBlockProcessor>
                </dataBlockProcessors>
            </schemaAppliedProcessors>
    
  8. This resulting file is shown below in the next section below
  9. Congradulations! You have just implemented real time processing by publishing into a Hazelcast queue every time a record becomes available.
  10. Note: Please note that the Hazelcast queue needs to be setup manually before you can start sending data to it. This is out of the scope of this section.

Configuration files

Completed configuration files
  • This is the updated SCEHMA_CUSTOMER.xml
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
        <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
        <apiroConf version="1" xmlns="http://apiro.com/apiro/v1/root">
            <loadOrder>15</loadOrder>
            <schemas>
                <schema defBacked="false" historical="false" name="CUSTOMER">
                    <identityKeys>
                        <identityKey>BAC</identityKey>
                    </identityKeys>
                    <dataPoints>
                        <dataPoint name="BAC" displayName="Bank Account Code" dataType="STRING" canEditValid="false" canEditViolated="false">
                            <nullable>false</nullable>
                            <rawDPValidators>
                                <rawDPValidator name="IN_BAC_SET_CHECK " entity="IN_SET">
                                    <config>
                                        <![CDATA[
                                            {
                                                ignoreCase : "true",
                                                options : [ "AAAA1111", "BBBB2222", "CCCC2222" ]
                                            }
                                        ]]>
                                    </config>
                                </rawDPValidator>
                            </rawDPValidators>
                            <rawDPProcessors/>
                            <consolidationAlgorithm/>
                            <consDPValidators/>
                            <consDPProcessors/>
                        </dataPoint>
                        <dataPoint displayName="First Name" name="FIRST_NAME" dataType="STRING" >
                            <rawDPProcessors>
                                <rawDPProcessor name="CAPITALISE_LAST_NAME_RAW_PROC" entity="GEN_EXPRESS">
                                    <config>
                                        <![CDATA[
                                            #GRV{
                                                    CTX['.'] = CTX['.'].toUpperCase()
                                                }
                                        ]]>
                                    </config>
    
                                </rawDPProcessor>
                            </rawDPProcessors>
                        </dataPoint>
                        <dataPoint displayName="Last Name" name="LAST_NAME" dataType="STRING" >
                        </dataPoint>
                        <dataPoint displayName="Address" name="ADDRESS" dataType="STRING" >
                        </dataPoint>
                        <dataPoint displayName="Phone Number" name="PHONE_NUMBER" dataType="STRING" >
                        </dataPoint>
                        <dataPoint displayName="Age" name="AGE" dataType="INTEGER" >
                            <rawDPValidators>
                                <rawDPValidator name="INVALID_IF_NULL" entity="NOT_NULL"/>
    
                                <rawDPValidator name="INVALID_IF_NEGATIVE" entity="POSITIVE">
                                    <lateBound>false</lateBound>
                                </rawDPValidator>
                            </rawDPValidators>
                        </dataPoint>
                        <dataPoint displayName="Yearly Income" name="YEARLY_INCOME" dataType="DECIMAL" >
                        </dataPoint>
                        <dataPoint displayName="Tax File Number" name="TFN" dataType="STRING" >
                        </dataPoint>
                        <dataPoint displayName="Tax File Number Masked" name="TFN_MASKED" dataType="STRING" >
                            <consDPProcessors>
                                <consDPProcessor name="TFN_HASH_MASKING" entity="HASH_MASK">
                                    <config>
                                        <![CDATA[
                                    {
                                        "inputValue":"CTX['TFN']",
                                        "maskingSalt":"aqQwSxXcfgdejhbJhdygjyfdghjHGYYIdh!66gydshasGY!"
                                    }
                                ]]>
                                    </config>
                                </consDPProcessor>
                            </consDPProcessors>
                        </dataPoint>
                        <dataPoint displayName="Investment Portfolio Value" name="PORTFOLIO_VALUE" dataType="DECIMAL" >
                            <rawDPValidators>
                                <rawDPValidator name="CHECK_HIGH_PORTFOLIO_VALUE" entity="GEN_EXPRESS">
                                    <config>
                                        <![CDATA[
                                            #GRV{// CTX["."] refers to the current DP, eg.PORTFOLIO_VALUE. eg. CTX[AGE] would refer to the AGE data point
                                                if((CTX["."]>100000000)){ // If this condition is met
                                                    return true;          // A violation with the name `CHECK_HIGH_PORTFOLIO_VALUE` is raised
                                                }
                                                return false;
                                            }
                                        ]]>
                                    </config>
                                </rawDPValidator>
                            </rawDPValidators>
                            <consolidationAlgorithm name="PORTF_VALUE_WEIGHTED_MEAN_01" entity="GEN_EXPRESS">
                                <config>
                                    <![CDATA[
                                                #GRV{
                                                    return (
                                                    (items.get(CUSTOMERS_A_XLSX) * 0.8) +
                                                    (items.get(CUSTOMERS_A_XLSX) * 0.2))/2
                                                ]]>
                                </config>
                            </consolidationAlgorithm>
    
                            <consDPValidators>
                                <consDPValidator name="PORTF_VALUE_HISTORICAL_VALIDATOR" entity="HISTORICAL_SHIFT">
                                    <config>
                                        <![CDATA[
                                            {
                                                "priorValues" : 5,
                                                "percent" : 5.2,
                                                "comparisonMore" : true
                                            }
                                        ]]>
                                    </config>
                                </consDPValidator>
                            </consDPValidators>
                        </dataPoint>
                        <dataPoint displayName="Company Name" name="COMPANY_NAME" dataType="STRING" >
                        </dataPoint>
                        <dataPoint displayName="Company Address" name="COMPANY_ADDRESS" dataType="STRING" >
                        </dataPoint>
                        <dataPoint displayName="XML Root Doc" name="xmlRootDoc" dataType="XML" >
                        </dataPoint>
                        <dataPoint displayName="JSON Root Doc" name="jsonRootDoc" dataType="JSON" >
                        </dataPoint>
                        <dataPoint displayName="Profile Image" name="PROFILE_IMAGE" dataType="BLOB" >
                        </dataPoint>
                    </dataPoints>
    
                    <schemaAppliedProcessors>
                        <dataBlockProcessors>>
                            <dataBlockProcessor name="PUBLISH_CUSTOMER_DATA" entity="ADHOC_JSON_SINK">
                                <config>
                                    <![CDATA[
                                        {
                                            "jsonPayload": {
                                                "bac": "#GRV{ CTX['BAC'] }",
                                                "first_name": "#GRV{ CTX['FIRST_NAME'] }",
                                                "last_name": "#GRV{ CTX['LAST_NAME'] }",
                                                "address": "#GRV{ CTX['ADDRESS'] }",
                                                "phone_number": "#GRV{ CTX['PHONE_NUMBER'] }",
                                                "age": "#GRV{ CTX['AGE'] }",
                                                "yearly_income": "#GRV{ CTX['YEARLY_INCOME'] }",
                                                "tfn": "#GRV{ CTX['TFN'] }",
                                                "portfolio_value": "#GRV{ CTX['PORTFOLIO_VALUE'] }"
                                            },
                                            "dataSink": {
                                                "name": "adhoc",
                                                "entity": "HAZELCAST_CLUSTER_QUEUE",
                                                "config" : {
                                                    "queueName": "CUSTOMER_DATA"
                                                }
                                            }
                                        }
                                ]]>
                                </config>
                            </dataBlockProcessor>
                        </dataBlockProcessors>
                    </schemaAppliedProcessors>
    
                </schema>
            </schemas>
        </apiroConf>
    
Deploy config files

Follow these steps Config Deployment to deploy and start using your configuration files.