Loading Fixed-Width, Multi-Format Files with Python Snowpark
I know these are less common than they used to be, but trust me: they’re out there.
In my previous article on using Python Snowpark and dbt to load the TPC-DI dataset into Snowflake, I mentioned that I would dive deeper into a few of the fun and challenging file types that the data generator produced.
In this article, I’ll describe how I processed the FINWIRE files, which are “financial newswire” files generated by the DIGen.jar
utility. There is a FINWIRE file for every quarter going back to 1967, and they commingle Security records, Financial records, and Company records in the same file. The first two fixed-width fields are the same for every record: PTS
or “posting timestamp”, and RecType
, or “record type”. After those initial two fields, the remainder of the field names and values depend on whether the value of RecType
is CMP
, FIN
or SEC
. A sample of the FINWIRE file FINWIRE2015Q4
is shown below, but may not render well on mobile devices:
20151230-152248FIN201542015100120151230 4880880089.63 2473473307.30 4.82 4.42 0.51 200321223.47 139284472514.02 9402305760.19 512872010 5597926720000001595
20151230-152511SECAAAAAAAAAAAAKVDPREF_AACTVDJBJXyQHLBvn EEOGAOvUNgL XwrOxQUBMrgPv AMEX 982113436 1903022619730704 1.200000000254
20151230-162115SECAAAAAAAAAAAAKWACOMMONACTVWuCIIWEoJqDOHTOu XdIzDZVrIOntWKzIBCydhxHa RyrhgxKejtdKfLBTv PCX 115499859 2008081219900601 1.510000003778
20151230-163207CMPWWfcsOHprIDIUsPfRLrcLPlxaQ 0000004432ACTVMCA 1873092521088 Vessey Crescent M5D 1Z1 Winnipeg AL United States of AmericaMoreno rlRIDCNz dVGrEzomCXIvZVZzFzxCzbGYIEbAXJMJlsYUQEV
If new to data engineering, this file format might sound completely bananas. At the time the TPC-DI specification was written — a time when mainframes roamed the Earth — files like this were normal, and I had to solve for them on most of the projects I worked on during this epoch. Although not extinct, mainframes are certainly endangered, and I suppose IBM didn’t understand the irony in 2003 of code-naming their latest mainframe after an animal from the Cretaceous period.
Uploading the FINWIRE Files to a Stage
As mentioned in the previous article, I used a credentials.json
file to store my credentials, and it looks like this:
{
"account": "myaccount",
"user": "myuser",
"password": "mypassword",
"role": "myrole",
"warehouse": "stewart_dev",
"database": "tpc_di",
"schema": "digen"
}
The code samples below are simplified snippets from the tpcdi.py
loader in the GitHub repository with abstractions removed so they are easier to follow. We first make our connection, then glob all the FINWIRE files, and then put
then in the stage:
import sys, json
from snowflake.snowpark import Session, DataFrame
from snowflake.snowpark.types import *
from snowflake.snowpark.functions import *
from pathlib import Path
# Read the credentials.json file
with open("credentials.json") as jsonfile:
credentials_dict = json.load(jsonfile)
# build the session
session = (
Session
.builder
.configs(credentials_dict)
.create()
)
# glob the files
pathlist = (
Path('/Users/stewartbryson/dev/tpcdi-output/Batch1')
.glob("FINWIRE??????")
)
stage_path = "@tpcdi/Batch1/FINWIRE"
for file in pathlist:
# put the file(s) in the stage
put_result = (
session
.file
.put(
str(file),
stage_path,
parallel=4,
auto_compress=True
)
)
for result in put_result:
print(f"File {result.source}: {result.status}")
File FINWIRE2001Q3: UPLOADED
File FINWIRE2001Q4: UPLOADED
File FINWIRE1997Q4: UPLOADED
File FINWIRE1970Q3: UPLOADED
File FINWIRE1997Q3: UPLOADED
File FINWIRE1970Q4: UPLOADED
File FINWIRE1999Q2: UPLOADED
File FINWIRE1991Q1: UPLOADED
File FINWIRE2005Q2: UPLOADED
File FINWIRE1976Q1: UPLOADED
File FINWIRE1993Q2: UPLOADED
{ Output truncated }
Create FINWIRE Tables
The CMP
, SEC
, and FIN
records all have two fields in common, so we want to create a generic DataFrame that contains the shared logic and we’ll save that DataFrame as a Snowflake temporary table called FINWIRE
:
# These are fixed-width fields, so read the entire line in as "line"
schema = StructType([
StructField("line", StringType(), False),
])
# generic dataframe for all record types
# create a temporary table
# The delimiter '|' seems safer
df = (
session
.read
.schema(schema)
.option('field_delimiter', '|')
.csv(stage_path)
.with_column(
'pts',
to_timestamp(
substring(col("line"), lit(0), lit(15)),
lit("yyyymmdd-hhmiss")
)
)
.with_column(
'rec_type',
substring(col("line"), lit(16), lit(3))
)
.write
.mode("overwrite")
.save_as_table("finwire", table_type="temporary")
)
# let's see the table
df = (
session
.table('finwire')
.show()
)
-----------------------------------------------------------------------------------------
|"LINE" |"PTS" |"REC_TYPE" |
-----------------------------------------------------------------------------------------
|19680401-015042FIN196821968040119680401 6833... |1968-04-01 01:50:42 |FIN |
|19680401-035527FIN196821968040119680401 603... |1968-04-01 03:55:27 |FIN |
|19680401-143845FIN196821968040119680401 7474... |1968-04-01 14:38:45 |FIN |
|19680402-044244FIN196821968040119680402 9310... |1968-04-02 04:42:44 |FIN |
|19680402-071107FIN196821968040119680402 1594... |1968-04-02 07:11:07 |FIN |
|19680402-115506FIN196821968040119680402 1362... |1968-04-02 11:55:06 |FIN |
|19680402-230322FIN196821968040119680402 4179... |1968-04-02 23:03:22 |FIN |
|19680403-091309FIN196821968040119680403 1959... |1968-04-03 09:13:09 |FIN |
|19680403-192649FIN196821968040119680403 3421... |1968-04-03 19:26:49 |FIN |
|19680403-233552FIN196821968040119680403 4432... |1968-04-03 23:35:52 |FIN |
-----------------------------------------------------------------------------------------
We’ll use the temporary table as a source for three new DataFrames that each create a new table for the specific record type, using a WHERE
method to filter the records. We then make extensive use of the WITH_COLUMN
method followed by a SUBSTRING
to construct the schema for the new table. I’m only showing an example for the SEC
table, but the others are done similarly. Notice we drop the LINE
and REC_TYPE
columns from the final table, as neither one is meaningful in the final table:
# SEC record types
table_name = 'sec'
df = (
session
.table('finwire')
.where(col('rec_type') == 'SEC')
.withColumn(
'symbol',
substring(col("line"), lit(19), lit(15))
)
.withColumn(
'issue_type',
substring(col("line"), lit(34), lit(6))
)
.withColumn(
'status',
substring(col("line"), lit(40), lit(4))
)
.withColumn(
'name',
substring(col("line"), lit(44), lit(70))
)
.withColumn(
'ex_id',
substring(col("line"), lit(114), lit(6))
)
.withColumn(
'sh_out',
substring(col("line"), lit(120), lit(13))
)
.withColumn(
'first_trade_date',
substring(col("line"), lit(133), lit(8))
)
.withColumn(
'first_exchange_date',
substring(col("line"), lit(141), lit(8))
)
.withColumn(
'dividend',
substring(col("line"), lit(149), lit(12))
)
.withColumn(
'co_name_or_cik',
substring(col("line"), lit(161), lit(60))
)
.drop(col("line"), col("rec_type"))
.write
.mode("overwrite")
.save_as_table(table_name)
)
print(f"{table_name.upper()} table created.")
In the next article, we’ll process the CustomerMgmt.xml
. It demonstrates how to process elements, attributes, and values, and I don’t recommend it for anyone with a weak stomach or heart condition.