dsds
Goal
Similarly to the previous tutorial TxFlow Pipeline / Part 1 – Dataflow To BigQuery (nested format), the goal here is to create and execute a real-time pipeline tailored for exploratory analytics on SWIFT MT using Pub/Sub, Dataflow, TxFlow and BigQuery. The difference is the format of the output: a tabular format (flatenned and exploded) while we produced a nested format (JSON string) in the previous tutorial.
At the end of this tutorial, you will run a streaming data pipeline that continuously writes data into BigQuery in a format that is ready for explorative analytics with Standard SQL.
Content
This tutorial shows how to ingest a real-time feed of SWIFT MT and ISO 20022 messages parsed, enriched and structured via TxFlow into BigQuery. It covers the following steps:
- Read a real-time feed of SWIFT MT or ISO 20022 raw messages (string) from Pub/Sub
- Transform those messages to enriched JSON objects via a Dataflow job, defined using the Apache Beam SDK in Python, and which sends requests to the TxFlow API
- Transform the returned JSON objects as exploded/flattened data and ingest it into BigQuery with dynamic schema evolution
- Show some examples of SQL queries

Motivation
As discussed in the previous tutorial TxFlow Pipeline / Part 1 – Dataflow To BigQuery (nested), the nested format comes with a few limitations (needs precise requirements, leads to query complexity, query performance limitations and limited view of the data).
The proposed approach is holistic. It transforms all the fields found in the message in dedicated columns of a table, while keeping full track of their initial context in the raw message (section, qualifier, tag, ..) plus their position in case they can appear as repeated elements (e.g. list of multiple entities, accounts, …). The entire context information is stored in the column names.
This opens the way to data consumption in Standard SQL and related applications: Business Intelligence tools (usually natively tailored for SQL) or downstream Machine Learning applications, and this on all the message types and all the fields at the same time, which can enable the definition of cross-business transactions.
Pre-requisites
Please run the previous tutorial priori to this one. The following will concentrate on the new features that are built upon it.
Pipeline creation
Schema
The only difference is that there is no schema is required, as it will be derived by BigQuery thanks to the schema auto-detection feature. The table width will dynamically evolve over time as more messages and new fields are ingested in the pipeline, stabilizing at some point once all the variants have been encountered. Therefore, no variable SCHEMA is required.
TxFlow utils
We need two additional classes here. The class MtToDataframe processes the requests to the TxFlow endpoint plus the logic used to explode and flatten the resulting JSON into a pandas dataframe by using the flatsplode package1. The second class WriteDataframeToBQ relies on an existing proposal2 and takes care of the BigQuery client initialization and configuration, using the parameterautodetect=True for the schema-free ingestion.
class MtToDataframe(beam.DoFn):
        
    def __init__(self, pipeline_options):
        self.execution = pipeline_options.swiftflow_execution
        self.endpoint = pipeline_options.swiftflow_endpoint
               
     # Comply to BQ column name requirements
     
    def clean_column_names(self,df):
        
        import re
        
        df =  df.rename(lambda x: '_' + x if re.match('([0-9])\w+',x) else x, axis=1)
        df= df.rename(lambda x: x.replace(" ", "_"), axis =1)
        df = df.rename(lambda x: x.replace(".", "_"), axis =1)
        
        return df
        
    def process(self, mtMessage):
        
        import requests as rq # Required to avoid variable conflicts on Dataflow VM
        import pandas as pd
        from flatsplode import flatsplode
        
        # Real request
        if self.execution == "nested":
            
            url = self.endpoint+"/nested"
            response_str = rq.post(url,json.dumps({"data":mtMessage}))
            response_dict = json.loads(response_str.content)
            payload_str = response_dict['data']
        
        # Test request
        elif self.execution == "test":
            
            url = self.endpoint+"/example"
            response_str =  rq.post(url,json.dumps({"data":"testing"}))
            response_dict = json.loads(response_str.content)
            payload_str = response_dict['data']
              
        # Swiftflow not available, falls back to dummy JSON
        else:
            
            payload_str = '{"foo":{"bar":{"name1":"john","name2":"jane"}}}'
        
        # Flatsploding
        
        payload_dict = json.loads(payload_str)
        df = pd.DataFrame(list(flatsplode(payload_dict)))     
        df = self.clean_column_names(df)
        
        yield df
class WriteDataframeToBQ(beam.DoFn):
    
    def __init__(self, bq_dataset, bq_table, project_id):
        self.bq_dataset = bq_dataset
        self.bq_table = bq_table
        self.project_id = project_id
    def start_bundle(self):
        
        from google.cloud import bigquery
        self.client = bigquery.Client()
    def process(self, df):
        
        from google.cloud import bigquery
        
        table_id = f"{self.bq_dataset}.{self.bq_table}"
        job_config = bigquery.LoadJobConfig(
            autodetect=True,
            source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
            schema_update_options=[
                bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
                bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION
            ],
            write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
            create_disposition=bigquery.CreateDisposition.CREATE_IF_NEEDED
        )
        try:
            load_job = self.client.load_table_from_dataframe(
                df,
                table_id,
                job_config=job_config
            )
            load_job.result()
            if load_job.errors:
                logging.info(f"result={load_job.error_result}")
                logging.info(f"errors = {load_job.errors}")
            else:
                logging.info(f'Loaded {len(df)} rows.')
        except Exception as error:
            logging.info(f'Error: {error} with loading dataframe')
Pipeline
with beam.Pipeline(options=pipeline_options) as p:    
        
        input_subscription=f"projects/{PROJECT}/subscriptions/{SUBSCRIPTION}"
      
        _ = (
          
            p
            | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(
                      subscription=input_subscription).with_output_types(bytes)
            | "Conversion UTF-8 bytes to string" >> beam.Map(lambda msg: msg.decode('utf-8'))
            | "Transformation MT to Dataframe" >> beam.ParDo(MtToDataframe(pipeline_options))
            | "Write Raw Data to Big Query" >> beam.ParDo(
                      WriteDataframeToBQ(project_id=PROJECT, bq_dataset=DATASET, bq_table=TABLE))
         
            )
Pipeline execution
Please refer to the previous tutorial: TxFlow Pipeline / Part 1 – Dataflow To BigQuery (nested)
Analytics
Securities – MT541
Raw
That is the raw message as received from the SWIFT network.
MT_541 = "{1:F01ABNACHZ8XXXX2596106739}{2:O5411345160418ICBKCNBJXBJM00897254971604181345N}{3:{108:110110149709}}{4:\r\n:16R:GENL\r\n:20C::SEME//1234567890123456\r\n:23G:NEWM\r\n:98C::PREP//20181123165256\r\n:16S:GENL\r\n:16R:TRADDET\r\n:98A::TRAD//20181123\r\n:98A::SETT//20181127\r\n:35B:ISIN CH0012138530\r\nCREDIT SUISSE GROUP\r\n:16S:TRADDET\r\n:16R:FIAC\r\n:36B::SETT//UNIT/10,\r\n:97A::SAFE//0123-1234567-05-001\r\n:94F::SAFE//NCSD/INSECHZZXXX\r\n:16S:FIAC\r\n:16R:SETDET\r\n:22F::SETR//TRAD\r\n:16R:SETPRTY\r\n:95R::DEAG/SCOM/CH123456\r\n:16S:SETPRTY\r\n:16R:SETPRTY\r\n:95P::SELL//ABCDABABXXX\r\n:97A::SAFE//123456789\r\n:16S:SETPRTY\r\n:16R:SETPRTY\r\n:95P::PSET//INSECHZZ\r\n:16S:SETPRTY\r\n:16R:AMT\r\n:19A::SETT//CHF218,4\r\n:16S:AMT\r\n:16S:SETDET\r\n-}"
BigQuery/JSON
The below view shows the message transformed by TxFlow and ingested as a tabular structure into the table.

BigQuery/Extract
A powerful feature of the approach is that one simple SELECT (*) statement gives you a full structured list of all fields found in that message.
SELECT * FROM`swiftflow-pipeline-poc.tutorial.mt_flat`
We can observe the effect of:
- The flattening – expresses all fields in their context (section, tag, business name) as flat list of columns
- The explosion – creates as many records as the number of combinations of repetitive elements
The latter feature allows to get all attributes belonging to the specific entities in separated columns. For example and because the context is preserved from the original message, it is possible to deduce that the value Account Number = 123456789 belongs to Entity 2 in the below picture.

Payments – MT910
Similar comments are valid for the payment messages as well: all fields are going to be extracted and no more up-front definition is required before starting the extraction.
Next steps
Analytics
At this stage, data is ready for:
- Direct analysis within BigQuery using standard SQL
- Business intelligence (BI) with any third-party tool as tabular format and SQL compatibility being the standard
- Business intelligence (BI) and dashboarding via Data Studio (button “Explore data”). Note that Data Studio is a free tool.
- File export as (button “Save results”)
Machine learning
Possible next steps are:
- To use BigQuery Machine Learning (ML) which lets you create and execute machine learning models using standard SQL queries.
- Develop Machine Learning application using the Python client for Big Query
Indeed, the tabular holistic format is the preferred approach for machine learning development and potentially its execution as it allows to explore and select from all the features (depending on the feature selection during the training phase) hence avoiding data loss and performance limitations of the models.