Switflow pipeline / Part 1 – Dataflow to BigQuery (nested format)

Switflow pipeline / Part 1 – Dataflow to BigQuery (nested format)

  • By website_admin
  • Comments Off on Switflow pipeline / Part 1 – Dataflow to BigQuery (nested format)

dsds

Table of Contents
< All Topics

Goal

Create and execute a real-time pipeline tailored for exploratory analytics on SWIFT MT using Pub/Sub, Dataflow, Swiftflow and BigQuery, and delivering data in a nested format, persisted as JSON strings.

At the end of this tutorial, a streaming data pipeline will be continuously writing data into BigQuery that is ready for downstream explorative analytics, for example by using the JSON functions available in Standard SQL.

Content

This tutorial shows how to ingest a real-time feed of SWIFT MT messages parsed, enriched and structured via Swiftflow into BigQuery. It follows the steps:

  • Read a real-time feed of SWIFT MT raw messages (string) from Pub/Sub
  • Transform those messages to enriched JSON objects via a Dataflow job that sends requests to the Swiftflow API and that uses the Python SDK of Apache Beam
  • Ingest the JSON objects as JSON strings into BigQuery
  • Show some examples of queries using JSON functions and JSONPath user defined function (UDF) within BigQuery

Motivation

Data

Transactional data like SWIFT MT is a promising candidate to deliver new insights about business processes, the entities involved and the content of their transactions, hence enabling cost savings potential and new insights for banks. Being a string-based format with a quite complex syntax and structure, SWIFT MT represents challenges for the persistence and processing systems that deal with analytics use cases. In addition, SWIFT messages enter and exit the bank as a real-time flow which asks for dedicated platforms.

Toolkit

Pub/Sub is a flexible, reliable, real-time messaging service for independent applications to publish and subscribe to asynchronous events.

Dataflow is a fully managed streaming analytics service that minimizes latency, processing time, and cost through autoscaling and batch processing.

Swiftflow is our API that transforms, enriches, and structures SWIFT MT messages. It runs as a scalable application on the managed service Google Kubernetes Engine and can be deployed directly from the GCP marketplace or on-premises.

BigQuery is the serverless data warehouse that enables scalable analysis over petabytes of data, including built-in machine learning capabilities 1.

Apache Beam is an open source unified programming model to define and execute data processing pipelines, including extract-transform-load (ETL), batch and stream (continuous) processing. Beam Pipelines are defined using one of the provided SDKs (Java, Python, Go) and are executed in one of the Beam’s supported runners (distributed processing back-ends) including Apache FlinkApache SamzaApache Spark, and Dataflow 2.

Pre-requisites

Pipeline creation

Let us go through the different components.

GCP credentials and project

The following global variables are the only ones you need to edit in any case, since you are required to have a billable project and a key to access this project. Please refer to the above links for more details on how to create those.

PROJECT="my-swiftflow-pipeline"
GOOGLE_APPLICATION_CREDENTIALS="path_to_key/swiftflow-pipeline-poc.json"

Swiftflow configs

The users who do not want to use Swiftflow can proceed further by setting SWIFTFLOW_EXECUTION="disabled". Once Swiftflow is provisioned, it is required to expose an internal (or even an external) endpoint, or to forward the port to your localhost, as per those steps. Once done, you must report the URL to the SWIFTFLOW_ENDPOINT variable below. You can activate the functional endpoint nested (returning effectively a transformed value) or proceed first with the test endpoint (returning always the same value) in case you want to make sure the service is up and running: those endpoints are documented here.

SWIFTFLOW_EXECUTION = "disabled" # Or "nested" or "test"
SWIFTFLOW_ENDPOINT = "http://<IP exposed by Swiftflow>:9000"

Runner

The next variable defines the runner that will execute your Beam pipeline. On one hand, the DirectRunner is used for development and goes with local execution, interactive logging and without code parallelization. On the other hand, the DataflowRunner is used for production and goes with remote execution on GCP workers, remote logging and code parallelization (with autoscaling enabled by default). In this latter case, some additional time is required to create the cluster of virtual machines (VMs) before the defined job can effectively start executing.

RUNNER="DataflowRunner" # Or "DirectRunner"

GCP configs

All the following variables can be used as-is or be changed by the user. They define the different artifacts used in the pipeline. While not actively used in this tutorial, we keep the REQUIREMENTS_FILE to show additional PyPI packages can be deployed to the worker nodes of Dataflow.

REGION="europe-west6" # Zurich data center, but chose yours
BUCKET=f"tutorial_swiftflow_{random.getrandbits(128)}"
DATASET="tutorial"
TABLE="mt_nested"
COLUMN_NAME="msg"

TOPIC="tutorial_swiftflow_topic"
SUBSCRIPTION="tutorial_swiftflow_subscription"
OUTPUT_TOPIC=f"projects/{PROJECT}/topics/{TOPIC}"

TEMP_LOCATION="gs://"+BUCKET+"/temp"
STAGING_LOCATION="gs://"+BUCKET+"/staging"

REQUIREMENTS_FILE="./requirements.txt"

Sample SWIFT MT messages

Two samples are given in the below. A MT910 message used in the Payments context (Confirmation of Payment) and a MT541 (Receive against Payment) used in Securities post trade3.

MT_910 = "{1:F01MASKCHZZX80A2596106739}{2:O9101900160703MASKCHZHXXXX25961067391607031900N}{3:{108:110110149709}}{4:\r\n:20:ALPINAREF-1\r\n:21:SOMEREF\r\n:25P:1234567890\r\nCRESCHZZ80A\r\n:13D:1712010800-0500\r\n:32A:180919CHF1000000000,00\r\n:50K:/CH3604835011405382000 \r\nSchweizerische-  und \r\nFernsehgesellschaft \r\nGiacomettistrasse 1 \r\nCH/3006 Zurich\r\n:52A:COPRATWW\r\n:56D:BANK OF JAMES \r\nTOKYO STREET  \r\n Manilla \r\nPhilippines\r\n:72:CVR OF DIR PYMT \r\nSSN:123456 \r\n-}"

MT_541 = "{1:F01ABNACHZ8XXXX2596106739}{2:O5411345160418ICBKCNBJXBJM00897254971604181345N}{3:{108:110110149709}}{4:\r\n:16R:GENL\r\n:20C::SEME//1234567890123456\r\n:23G:NEWM\r\n:98C::PREP//20181123165256\r\n:16S:GENL\r\n:16R:TRADDET\r\n:98A::TRAD//20181123\r\n:98A::SETT//20181127\r\n:35B:ISIN CH0012138530\r\nCREDIT SUISSE GROUP\r\n:16S:TRADDET\r\n:16R:FIAC\r\n:36B::SETT//UNIT/10,\r\n:97A::SAFE//0123-1234567-05-001\r\n:94F::SAFE//NCSD/INSECHZZXXX\r\n:16S:FIAC\r\n:16R:SETDET\r\n:22F::SETR//TRAD\r\n:16R:SETPRTY\r\n:95R::DEAG/SCOM/CH123456\r\n:16S:SETPRTY\r\n:16R:SETPRTY\r\n:95P::SELL//ABCDABABXXX\r\n:97A::SAFE//123456789\r\n:16S:SETPRTY\r\n:16R:SETPRTY\r\n:95P::PSET//INSECHZZ\r\n:16S:SETPRTY\r\n:16R:AMT\r\n:19A::SETT//CHF218,4\r\n:16S:AMT\r\n:16S:SETDET\r\n-}"

Schema

Since we are ingesting JSON strings, the schema for BigQuery is simple.

SCHEMA = {'fields': [{'name': COLUMN_NAME, 'type': 'STRING', 'mode': 'NULLABLE'}]}

Authentication

One simply passes the credentials defined earlier to the Python environment.

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = GOOGLE_APPLICATION_CREDENTIALS

GCP utils

The following methods will be used only once to create all required artifacts and allow to execute the pipeline later on.

# GCS utils

def create_bucket():
    
    bucket_name = BUCKET
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    new_bucket = storage_client.create_bucket(bucket, location=REGION)

    print("Created bucket {} in {}".format(new_bucket.name, new_bucket.location))

# PubSub utils

def create_topic():
    
    publisher = pubsub_v1.PublisherClient()
    topic_name = f"projects/{PROJECT}/topics/{TOPIC}"
    publisher.create_topic(topic_name)

def create_subscriber():
    
    publisher = pubsub_v1.PublisherClient()
    subscriber = pubsub_v1.SubscriberClient()
    topic_path = publisher.topic_path(PROJECT, TOPIC)
    sub_path = subscriber.subscription_path(PROJECT, SUBSCRIPTION)
    subscriber.create_subscription(name=sub_path,topic=topic_path)

def publish(msg=MT_541.encode()):

    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(PROJECT, TOPIC)
    publisher.publish(topic_path,msg,spam='eggs')
    
# BigQuery utils

def create_dataset():
    
    dataset_id = f"{PROJECT}.{DATASET}"
    client = bigquery.Client()
    dataset = bigquery.Dataset(dataset_id)
    dataset.location = REGION
    dataset = client.create_dataset(dataset, timeout=30)  # Make an API request.
    print("Created dataset {}.{}".format(client.project, dataset.dataset_id))

Swiftflow utils

There is only one processing step. It is defined as a custom class MtToJson that inherits from a DoFn class, which represents a Beam SDK class that defines a distributed processing function4. It contains the logic necessary to the request to the Swiftflow endpoint and formatting. The other class XyzOptions in the below is a wrapper used to pass custom arguments to our pipeline.

class MtToJson(beam.DoFn):
    
    def __init__(self, pipeline_options):
        
        self.execution = pipeline_options.swiftflow_execution
        self.endpoint = pipeline_options.swiftflow_endpoint
        self.column_name = pipeline_options.column_name

    def process(self, mtMessage):
        
        import requests as rq # Required to avoid variable conflicts on Dataflow VM
          
        # Real request
        if self.execution == "nested":
            
            url = self.endpoint+"/nested"
            response_str = rq.post(url,json.dumps({"data":mtMessage}))
            response_dict = json.loads(response_str.content)
            payload_str = response_dict['data']
        
        # Test request
        elif self.execution == "test":
            
            url = self.endpoint+"/example"
            response_str =  rq.post(url,json.dumps({"data":"testing"}))
            response_dict = json.loads(response_str.content)
            payload_str = response_dict['data']
              
        # Swiftflow not available, falls back to dummy JSON
        else:
            
            payload_str = '{"a":["a1","a2"],"b":["b1","b2","b3"]}' 
            
        # Return
        
        yield {self.column_name:payload_str}

class XyzOptions(PipelineOptions):
    
        @classmethod
        def _add_argparse_args(cls, parser):
            parser.add_argument('--swiftflow_execution', default='nested')
            parser.add_argument('--swiftflow_endpoint', default='none'),
            parser.add_argument('--column_name', default='msg')

Pipeline

Finally, the run() method contains the options passed to the pipeline, and the pipeline workflow definition itself.

def run():
     
  # Pipeline options
    
    pipeline_options = None 
    
    if RUNNER == "DirectRunner" :
    
        pipeline_options = XyzOptions(
                            save_main_session=True, streaming=True,
                            swiftflow_execution=SWIFTFLOW_EXECUTION,
                            swiftflow_endpoint=SWIFTFLOW_ENDPOINT,
                            column_name=COLUMN_NAME)
    
    elif RUNNER == "DataflowRunner" :
        
        pipeline_options = XyzOptions(
                            save_main_session=True, streaming=True,
                            runner='DataflowRunner',
                            project=PROJECT,
                            region=REGION,
                            temp_location=TEMP_LOCATION,
                            staging_location=STAGING_LOCATION,
                            requirements_file=REQUIREMENTS_FILE,
                            swiftflow_execution=SWIFTFLOW_EXECUTION,
                            swiftflow_endpoint=SWIFTFLOW_ENDPOINT,
                            column_name=COLUMN_NAME)
                                          
    # Pipeline workflow
    
    with beam.Pipeline(options=pipeline_options) as p:
        
      input_subscription=f"projects/{PROJECT}/subscriptions/{SUBSCRIPTION}"
      output_table=f"{PROJECT}:{DATASET}.{TABLE}"
      
      _ = (
          
          p
          | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(
                    subscription=input_subscription).with_output_types(bytes)
          | 'Conversion UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
          | "Transformation MT to JSON" >> beam.ParDo(MtToJson(pipeline_options))
          | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                    output_table,
                    schema = SCHEMA,
                    custom_gcs_temp_location=BUCKET, 
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
          
          )

Putting all together

The following snippet contains the entire code we need for the execution.

# Packages

from __future__ import absolute_import

import os
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
import json
from google.cloud import bigquery
from google.cloud import storage
import random

import logging
logging.getLogger().setLevel(logging.INFO)

# GCP credentials and project

PROJECT="my-swiftflow-pipeline"
GOOGLE_APPLICATION_CREDENTIALS="path_to_key/swiftflow-pipeline-poc.json"

# Swiftflow configs

SWIFTFLOW_EXECUTION = "disabled" # Or "nested" or "test"
SWIFTFLOW_ENDPOINT = "http://<IP exposed by Swiftflow>:9000"

# Runner

RUNNER="DataflowRunner" # Or "DirectRunner"

# GCP configs

REGION="europe-west6" # Zurich data center, but chose yours
BUCKET=f"tutorial_swiftflow_{random.getrandbits(128)}"
DATASET="tutorial"
TABLE="mt_nested"
COLUMN_NAME="msg"

TOPIC="tutorial_swiftflow_topic"
SUBSCRIPTION="tutorial_swiftflow_subscription"
OUTPUT_TOPIC=f"projects/{PROJECT}/topics/{TOPIC}"

TEMP_LOCATION="gs://"+BUCKET+"/temp"
STAGING_LOCATION="gs://"+BUCKET+"/staging"

REQUIREMENTS_FILE="./requirements.txt"

# Sample messages

MT_910 = "{1:F01MASKCHZZX80A2596106739}{2:O9101900160703MASKCHZHXXXX25961067391607031900N}{3:{108:110110149709}}{4:\r\n:20:ALPINAREF-1\r\n:21:SOMEREF\r\n:25P:1234567890\r\nCRESCHZZ80A\r\n:13D:1712010800-0500\r\n:32A:180919CHF1000000000,00\r\n:50K:/CH3604835011405382000 \r\nSchweizerische-  und \r\nFernsehgesellschaft \r\nGiacomettistrasse 1 \r\nCH/3006 Zurich\r\n:52A:COPRATWW\r\n:56D:BANK OF JAMES \r\nTOKYO STREET  \r\n Manilla \r\nPhilippines\r\n:72:CVR OF DIR PYMT \r\nSSN:123456 \r\n-}"

MT_541 = "{1:F01ABNACHZ8XXXX2596106739}{2:O5411345160418ICBKCNBJXBJM00897254971604181345N}{3:{108:110110149709}}{4:\r\n:16R:GENL\r\n:20C::SEME//1234567890123456\r\n:23G:NEWM\r\n:98C::PREP//20181123165256\r\n:16S:GENL\r\n:16R:TRADDET\r\n:98A::TRAD//20181123\r\n:98A::SETT//20181127\r\n:35B:ISIN CH0012138530\r\nCREDIT SUISSE GROUP\r\n:16S:TRADDET\r\n:16R:FIAC\r\n:36B::SETT//UNIT/10,\r\n:97A::SAFE//0123-1234567-05-001\r\n:94F::SAFE//NCSD/INSECHZZXXX\r\n:16S:FIAC\r\n:16R:SETDET\r\n:22F::SETR//TRAD\r\n:16R:SETPRTY\r\n:95R::DEAG/SCOM/CH123456\r\n:16S:SETPRTY\r\n:16R:SETPRTY\r\n:95P::SELL//ABCDABABXXX\r\n:97A::SAFE//123456789\r\n:16S:SETPRTY\r\n:16R:SETPRTY\r\n:95P::PSET//INSECHZZ\r\n:16S:SETPRTY\r\n:16R:AMT\r\n:19A::SETT//CHF218,4\r\n:16S:AMT\r\n:16S:SETDET\r\n-}" 
  
# Schema: single string column

SCHEMA = {'fields': [{'name': COLUMN_NAME, 'type': 'STRING', 'mode': 'NULLABLE'}]}

# Authentication

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = GOOGLE_APPLICATION_CREDENTIALS

# GCS utils

def create_bucket():
    
    bucket_name = BUCKET
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    new_bucket = storage_client.create_bucket(bucket, location=REGION)

    print("Created bucket {} in {}".format(new_bucket.name, new_bucket.location))

# PubSub utils

def create_topic():
    
    publisher = pubsub_v1.PublisherClient()
    topic_name = f"projects/{PROJECT}/topics/{TOPIC}"
    publisher.create_topic(topic_name)

def create_subscriber():
    
    publisher = pubsub_v1.PublisherClient()
    subscriber = pubsub_v1.SubscriberClient()
    topic_path = publisher.topic_path(PROJECT, TOPIC)
    sub_path = subscriber.subscription_path(PROJECT, SUBSCRIPTION)
    subscriber.create_subscription(name=sub_path,topic=topic_path)

def publish(msg=MT_541.encode()):

    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(PROJECT, TOPIC)
    publisher.publish(topic_path,msg,spam='eggs')
    
# BigQuery utils

def create_dataset():
    
    dataset_id = f"{PROJECT}.{DATASET}"
    client = bigquery.Client()
    dataset = bigquery.Dataset(dataset_id)
    dataset.location = REGION
    dataset = client.create_dataset(dataset, timeout=30)  # Make an API request.
    print("Created dataset {}.{}".format(client.project, dataset.dataset_id))

# Swiftflow utils

class MtToJson(beam.DoFn):
    
    def __init__(self, pipeline_options):
        
        self.execution = pipeline_options.swiftflow_execution
        self.endpoint = pipeline_options.swiftflow_endpoint
        self.column_name = pipeline_options.column_name

    def process(self, mtMessage):
        
        import requests as rq # Required to avoid variable conflicts on Dataflow VM
          
        # Real request
        if self.execution == "nested":
            
            url = self.endpoint+"/nested"
            response_str = rq.post(url,json.dumps({"data":mtMessage}))
            response_dict = json.loads(response_str.content)
            payload_str = response_dict['data']
        
        # Test request
        elif self.execution == "test":
            
            url = self.endpoint+"/example"
            response_str =  rq.post(url,json.dumps({"data":"testing"}))
            response_dict = json.loads(response_str.content)
            payload_str = response_dict['data']
              
        # Swiftflow not available, falls back to dummy JSON
        else:
            
            payload_str = '{"a":["a1","a2"],"b":["b1","b2","b3"]}' 
            
        # Return
        
        yield {self.column_name:payload_str}

class XyzOptions(PipelineOptions):
    
        @classmethod
        def _add_argparse_args(cls, parser):
            parser.add_argument('--swiftflow_execution', default='nested')
            parser.add_argument('--swiftflow_endpoint', default='none'),
            parser.add_argument('--column_name', default='msg')

# Run

def run():
     
  # Pipeline options
    
    pipeline_options = None 
    
    if RUNNER == "DirectRunner" :
    
        pipeline_options = XyzOptions(
                            save_main_session=True, streaming=True,
                            swiftflow_execution=SWIFTFLOW_EXECUTION,
                            swiftflow_endpoint=SWIFTFLOW_ENDPOINT,
                            column_name=COLUMN_NAME)
    
    elif RUNNER == "DataflowRunner" :
        
        pipeline_options = XyzOptions(
                            save_main_session=True, streaming=True,
                            runner='DataflowRunner',
                            project=PROJECT,
                            region=REGION,
                            temp_location=TEMP_LOCATION,
                            staging_location=STAGING_LOCATION,
                            requirements_file=REQUIREMENTS_FILE,
                            swiftflow_execution=SWIFTFLOW_EXECUTION,
                            swiftflow_endpoint=SWIFTFLOW_ENDPOINT,
                            column_name=COLUMN_NAME)
                                          
    # Pipeline workflow
    
    with beam.Pipeline(options=pipeline_options) as p:
        
      input_subscription=f"projects/{PROJECT}/subscriptions/{SUBSCRIPTION}"
      output_table=f"{PROJECT}:{DATASET}.{TABLE}"
      
      _ = (
          
          p
          | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(
                    subscription=input_subscription).with_output_types(bytes)
          | 'Conversion UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
          | "Transformation MT to JSON" >> beam.ParDo(MtToJson(pipeline_options))
          | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                    output_table,
                    schema = SCHEMA,
                    custom_gcs_temp_location=BUCKET, 
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
          
          )

Pipeline execution

1. Create artifacts

Simply execute the following commands in a Python console.

create_bucket()
create_topic()
create_subscriber()

2. Build and run the pipeline

Once the above commands are completed, type the following.

run()

If you are executing your code with the DataflowRunner you can monitor the process on the GCP using the Dataflow console. Note that is might take several minutes until the pipeline is ready to stream data.

3. Publish a message

Execute the following code in a new Python console. You can use the default value or pass another value. Note that a UTF-8 byte string is expected by PubSub, so use the conversions b"message" or msg.encode().

publish() # In a new Python console

# Or publish(msg=MT_910.encode())

At this point, the execution of Dataflow will automatically be triggered: the raw message be we read from Pub/Sub, a table will be created in BigQuery if not yet existing, and the message received from Swiftflow will be ingested in this table. Let us briefly explore the results.

Analytics

You can use the BigQuery console to explore data, its schema, and run queries, from a similar view as the one below. Note that the stream job defined previously will keep streaming data into BigQuery even when you are querying.

Payments – MT910

Raw

This is the raw message as it is received from the SWIFT network.

MT_910 = "{1:F01MASKCHZZX80A2596106739}{2:O9101900160703MASKCHZHXXXX25961067391607031900N}{3:{108:110110149709}}{4:\r\n:20:ALPINAREF-1\r\n:21:SOMEREF\r\n:25P:1234567890\r\nCRESCHZZ80A\r\n:13D:1712010800-0500\r\n:32A:180919CHF1000000000,00\r\n:50K:/CH3604835011405382000 \r\nSchweizerische-  und \r\nFernsehgesellschaft \r\nGiacomettistrasse 1 \r\nCH/3006 Zurich\r\n:52A:COPRATWW\r\n:56D:BANK OF JAMES \r\nTOKYO STREET  \r\n Manilla \r\nPhilippines\r\n:72:CVR OF DIR PYMT \r\nSSN:123456 \r\n-}"
BigQuery/JSON

The below view shows the message transformed by Swiftflow and stored as a JSON string in one single column.

BigQuery/Extract

BigQuery supports many JSON functions in Standard SQL. In the following query, we extract the name/address fields and country codes of the Ordering Customer and the Intermediary Party involved in the transaction with the geolocalization feature of Swiftflow. Observe that the full paths of the fields are required to access the field values of interest.

SELECT
JSON_EXTRACT(msg,'$.block4.Other.50K[0].Name_and_Address') AS customer_name_address,
JSON_EXTRACT(msg,'$.block4.Other.50K[0].country_code[0]') AS customer_country_code,
JSON_EXTRACT(msg,'$.block4.Other.56D[0].Name_and_Address') AS intemediary_name_address,
JSON_EXTRACT(msg,'$.block4.Other.56D[0].country_code[0]') AS intemediary_country_code
FROM `swiftflow-pipeline-poc.tutorial.mt`

BigQuery offers the possibility to define and run user defined functions (UDF), as a way to extend the built-in capabilities. In the following example, the JSONPath is added as a third-party JavaScript UDF. A key feature of JSONPath is that is allows for using the recursive operator .., hence enabling to query fields using only a partial path, for example using only tags or business names and without knowing the full context of inside the message. Even better, it is also possible to define some patterns for the search. In the below example, one retrieves all attributes related to the Intermediary by using only the tag 50 and without knowing anything else about the field location.

CREATE TEMPORARY FUNCTION CUSTOM_JSON_EXTRACT(json_path STRING,json STRING)
RETURNS STRING 
LANGUAGE js AS """
  return JSONPath(json_path,JSON.parse(json));
"""
OPTIONS (
  library="gs://tutorial_swiftflow/jsonpath_plus_eval.js"
);

SELECT CUSTOM_JSON_EXTRACT('$..[?(@path.includes("[\'50"))]',msg) AS intemediary_attributes
FROM `swiftflow-pipeline-poc.tutorial.mt_nested`

This leads to following output.

Securities – MT541

Raw

Let us document again the raw message as it is received from the SWIFT network.

MT_541 = "{1:F01ABNACHZ8XXXX2596106739}{2:O5411345160418ICBKCNBJXBJM00897254971604181345N}{3:{108:110110149709}}{4:\r\n:16R:GENL\r\n:20C::SEME//1234567890123456\r\n:23G:NEWM\r\n:98C::PREP//20181123165256\r\n:16S:GENL\r\n:16R:TRADDET\r\n:98A::TRAD//20181123\r\n:98A::SETT//20181127\r\n:35B:ISIN CH0012138530\r\nCREDIT SUISSE GROUP\r\n:16S:TRADDET\r\n:16R:FIAC\r\n:36B::SETT//UNIT/10,\r\n:97A::SAFE//0123-1234567-05-001\r\n:94F::SAFE//NCSD/INSECHZZXXX\r\n:16S:FIAC\r\n:16R:SETDET\r\n:22F::SETR//TRAD\r\n:16R:SETPRTY\r\n:95R::DEAG/SCOM/CH123456\r\n:16S:SETPRTY\r\n:16R:SETPRTY\r\n:95P::SELL//ABCDABABXXX\r\n:97A::SAFE//123456789\r\n:16S:SETPRTY\r\n:16R:SETPRTY\r\n:95P::PSET//INSECHZZ\r\n:16S:SETPRTY\r\n:16R:AMT\r\n:19A::SETT//CHF218,4\r\n:16S:AMT\r\n:16S:SETDET\r\n-}"

BigQuery/JSON

The below view shows the message ingested in the table.

BigQuery/Extract

Securities messages (e.g MT5XX) are generally much longer and deeper (more nesting levels) than payment messages (MT1XX, MT2XX, MT910, …). It is therefore critical for fast and granular exploration to be able to query all specific fields without knowing the exact path of the field within the message (e.g. under which tag or which section it is located), as already evidenced. In the following we extract the Place of Settlement of the trade and with only some partial knowledge of the paths.

CREATE TEMPORARY FUNCTION CUSTOM_JSON_EXTRACT(json STRING,json_path STRING)
RETURNS STRING 
LANGUAGE js AS """
  return jsonPath(JSON.parse(json),json_path);
"""
OPTIONS (
  library="gs://tutorial_swiftflow/jsonpath-0.8.0.js"
);

SELECT
CUSTOM_JSON_EXTRACT(msg,'$..PSET..Identifier_Code_Bank_Code') AS pset_bank_code,
CUSTOM_JSON_EXTRACT(msg,'$..PSET..Identifier_Code_Country_Code') AS pset_country_code,
CUSTOM_JSON_EXTRACT(msg,'$..PSET..Identifier_Code_Location_Code') AS pset_location_code
FROM `swiftflow-pipeline-poc.tutorial.mt`

Next steps

Once dedicated views are created on the data and using the proposed syntax, following steps are possible:

  • Direct analysis within BigQuery
  • Downstream programmatic analytics (e.g. conversion to pandas package using the Python client)
  • File export as (button “Save results”)
  • Business intelligence (BI) and dashboarding via Data Studio (button “Explore data”). Note that Data Studio is a free tool.


Another possible next step is to use BigQuery Machine Learning (ML) which lets you create and execute machine learning models using standard SQL queries. Have a look on the various tutorials available.

Limitations

While the proposed approach allows you to create your own views with some partial knowledge of the exact field addresses, it comes with a few limitations:

  • It requires the user to have a quite clear idea on the attributes to be extracted
  • When the requirements go beyond a few tenths of attributes, the queries get very long and difficult to design, execute, resonate about, and maintain
  • Unless you persist views, every new query execution will have to start by extracting the required field from the JSON string object, which might induce some performance losses and unnecessary costs
  • Finally and most importantly for machine learning applications, you do no get a holistic view of the raw data. Since every field has to be known in advance, the user can never be user that he managed to extract all possible fields, which is by the way not possible to do by hand.

An alternative is presented in the following tutorial: Switflow pipeline / Part 2 – Dataflow To BigQuery (tabular format).

Sources

  1. https://en.wikipedia.org/wiki/BigQuery
  2. https://en.wikipedia.org/wiki/Apache_Beam
  3. https://www.credit-suisse.com/media/assets/private-banking/docs/ch/unternehmen/institutional-clients/mt54x-swift-guide.pdf
  4. https://beam.apache.org/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms