How to publish data to AWS Marketplace using Teradata Vantage

Data Sharing with AWS Data Exchange and Teradata VantageCloud

Sebastian
Teradata
22 min readJan 6, 2023

--

Photo by Lisheng Chang on Unsplash

Hello data folks! In a previous story, we reviewed how to consume data from AWS Data Exchange (ADX) using Vantage. You may be interested also in sharing data from vantage to ADX. This story will help you achieve so.

Note: The procedure offered in this guide has been implemented and tested by Teradata. However, it is offered on an as-is basis. Neither AWS nor Teradata provides validation of Teradata Vantage with AWS Data Exchange. This guide includes content from both AWS and Teradata product documentation.

Overview

This article describes the process to write a data set to Amazon S3 using Vantage NOS write feature, create a revision of the data set in Amazon Data Exchange, then publish the data set to the subscriber via a private offering. On the subscriber side, an amazon CloudWatch event rule is also set up to alert the subscriber of the new data set, and AWS Data Exchange Subscriber Coordinator is launched to download the newly published data set to the subscriber’s S3 bucket.

About AWS Data Exchange

AWS Data Exchange makes it easy to find, subscribe to, and use third-party data in the cloud. Today, AWS Data Exchange contains data products from a broad range of domains, including financial services, healthcare, life sciences, geospatial, weather, and mapping. Once subscribed to a data product, you can use the AWS Data Exchange API to publish data from Teradata Vantage, and then be consumed by the subscriber of the data product.

About Teradata Vantage

Teradata Vantage is the cloud data analytics platform that unifies everything — data lakes, data warehouses, analytics, and new data sources and types. Leading the way with multi-cloud environments and priced for flexibility, Vantage leverages 100 percent of a business’s data, regardless of scale, volume, or complexity.

Vantage combines descriptive, predictive, prescriptive analytics, autonomous decision-making, ML functions, and visualization tools into a unified, integrated platform that uncovers real-time business intelligence at scale, no matter where the data resides.

Vantage enables companies to start small and elastically scale compute or storage, paying only for what they use, harnessing low-cost object stores, and integrating their analytic workloads.

Vantage supports R, Python, Teradata Studio, and any other SQL-based tools. You can deploy Vantage across public clouds, on-premises, on optimized or commodity infrastructure, or as-a-service.

Teradata Vantage Native Object Store (NOS) can be used to explore data in external object stores, like Amazon S3, using standard SQL. No special object storage-side compute infrastructure is required to use NOS. You can explore data located in an S3 bucket by simply creating a NOS table definition that points to your bucket. With NOS, you can quickly import data from S3 or even join it with other tables in the database.

Alternatively, the Teradata Parallel Transporter (TPT) utility can be used to import data from S3 to Teradata Vantage in a bulk fashion. Once loaded, data can be efficiently queried within Vantage.

Prerequisites

You are expected to be familiar with the Amazon Data Exchange service, Amazon CloudWatch, Teradata Vantage, and Teradata Vantage NOS Write feature.

You will need the following accounts and systems:

For The Data Provider

For The Data Subscriber

Setting Up a Data Provider

Once you have met the prerequisites, follow these steps:

1. Prepare

2. Publish the initial data set

3. Publishing data set revisions to your product

Prepare

This step will load sample data into the s3 bucket (e.g. tdadxdemo) using WROTE_NOS. If you already have sample data in the s3 bucket, skip to the next section — Publish the Initial Data Set.

Setting Up Test User and Privileges for WRITE_NOS

1. Log on to the Vantage database as an administrative user (e.g. dbc) with the required privileges to create users and grant privileges.

2. Create a user, if not already done. For example:

CREATE USER nos_usr FROM dbc AS PERMANENT=30e8 PASSWORD=user_password;

3. Grant the necessary privileges to the user. The user may need some or all of the following, depending on which queries the user runs. To run the examples the user needs the following privileges:

GRANT CREATE TABLE on nos_usr to nos_usr;
GRANT EXECUTE FUNCTION on TD_SYSFNLIB.READ_NOS to nos_usr;
GRANT EXECUTE FUNCTION on TD_SYSFNLIB.WRITE_NOS to nos_usr;
GRANT CREATE AUTHORIZATION on nos_usr to nos_usr;
GRANT CREATE FUNCTION ON nos_usr to nos_usr;

Write Product Asset to the S3 Bucket

Teradata Vantage WRITE_NOS allows you to extract selected or all columns from a database’s tables or derived results and write to external object storage, in this case, Amazon S3.

WRITE_NOS stores data in Parquet format.

To use WRITE_NOS, you need the following privileges in Vantage Advanced SQL Engine:

  • Log on to the database as the new user (e.g. nos_usr), and write to S3 bucket using WRITE_NOS
  • WRITE_NOS syntax:
WRITE_NOS (
ON { [ database_name.]table_name | (subquery) }
[ PARTITION BY column_name [ ,...] ORDER BY column_name [ ,...] |
HASH BY column_name [ ,...] LOCAL ORDER BY column_name [ ,...] |
LOCAL ORDER BY column_name [ ,...]]
USING
LOCATION ('external_storage_path')
[ AUTHORIZATION ( { [DatabaseName.]AuthorizationObjectName |
'{"Access_ID":"access_id", "Access_Key":"secret_key", "Session_Token":"session_token"}' } ) ]
STOREDAS ('PARQUET')
[ NAMING ({ 'DISCRETE' | 'RANGE' }) ]
[ MANIFESTFILE ('manifest_name') ]
[ MANIFESTONLY ('TRUE') ]
[ OVERWRITE ({ 'TRUE' | 'FALSE' }) ]
[ INCLUDE_ORDERING ({ 'TRUE' | 'FALSE' }) ]
[ INCLUDE_HASHBY ({ 'TRUE' | 'FALSE' }) ]
[ MAXOBJECTSIZE ('max_object_size') ]
[ COMPRESSION ( { 'GZIP' | 'SNAPPY' } ) ]
)

See document for more detail.

For example, table network_events is defined as follows:

CREATE SET TABLE network_events ,FALLBACK ,
NO BEFORE JOURNAL,
NO AFTER JOURNAL,
CHECKSUM = DEFAULT,
DEFAULT MERGEBLOCKRATIO,
MAP = TD_MAP1
(
Destination_Country_ID VARCHAR(4) CHARACTER SET LATIN CASESPECIFIC,
Roaming_Indicator VARCHAR(1) CHARACTER SET LATIN CASESPECIFIC,
Roaming_Operator_ID INTEGER,
Customer_Type_ID VARCHAR(4) CHARACTER SET LATIN CASESPECIFIC,
Event_Count INTEGER,
Charged_Product_Id VARCHAR(10) CHARACTER SET LATIN CASESPECIFIC,
Charge_Amt DECIMAL(5,0),
Used_Duration_in_Sec INTEGER,
Used_Volume_in_Bytes BIGINT,
Charged_Duration_in_secs INTEGER,
Charged_Volume_in_Bytes BIGINT,
Free_Duration_in_secs INTEGER,
Free_Volume_in_Bytes BIGINT,
Zero_Rated_Indicator VARCHAR(1) CHARACTER SET LATIN CASESPECIFIC,
Served_Device_Type_ID INTEGER,
Network_Event_Type_ID INTEGER,
Network_Event_Sub_Type_ID INTEGER,
Subscription_ID VARCHAR(10) CHARACTER SET LATIN CASESPECIFIC NOT NULL,
Customer_ID VARCHAR(10) CHARACTER SET LATIN CASESPECIFIC NOT NULL,
Account_ID VARCHAR(20) CHARACTER SET LATIN CASESPECIFIC NOT NULL,
Network_Event_Date DATE FORMAT 'YY/MM/DD',
Subscription_Rate_Plan_ID VARCHAR(10) CHARACTER SET LATIN CASESPECIFIC,
SPECIAL_NUM_NM INTEGER,
Event_Load_Date DATE FORMAT 'YY/MM/DD',
LOAD_TS VARCHAR(10) CHARACTER SET LATIN CASESPECIFIC,
INSERT_BATCH_NUM INTEGER,
Tariff_Id INTEGER,
Charge_catagory VARCHAR(20) CHARACTER SET LATIN CASESPECIFIC,
SERVICE_ID VARCHAR(10) CHARACTER SET LATIN CASESPECIFIC,
CUSTOMER_NODE_ID VARCHAR(10) CHARACTER SET LATIN CASESPECIFIC,
SERVICE_TYPE_CD_ID VARCHAR(5) CHARACTER SET LATIN CASESPECIFIC,
PROMO_AMT DECIMAL(5,0),
RADIO_TYPE_ACCESS_ID VARCHAR(10) CHARACTER SET LATIN CASESPECIFIC,
SERVICE_KEY VARCHAR(20) CHARACTER SET LATIN CASESPECIFIC,
SERIAL_NUMBER VARCHAR(30) CHARACTER SET LATIN CASESPECIFIC,
CURRENCY_ID INTEGER,
CHARGED_PRDCT_INSTC_ID INTEGER,
THROTTLE_FLG CHAR(1) CHARACTER SET LATIN CASESPECIFIC)
PRIMARY INDEX (Customer_ID, Account_ID);

To retrieve data from network_events and write to bucket tdadxdemo, use the following:

SELECT * FROM WRITE_NOS (
ON (
SELECT
Destination_Country_ID,
Roaming_Indicator,
Roaming_Operator_ID,
Customer_Type_ID,
Event_Count,
Charged_Product_Id,
Charge_Amt,
Used_Duration_in_Sec,
Used_Volume_in_Bytes,
Charged_Duration_in_secs,
Charged_Volume_in_Bytes,
Free_Duration_in_secs,
Free_Volume_in_Bytes,
Zero_Rated_Indicator,
Served_Device_Type_ID,
Network_Event_Type_ID,
Network_Event_Sub_Type_ID,
Subscription_ID,
Customer_ID,
Account_ID,
Network_Event_Date,
Subscription_Rate_Plan_ID,
SPECIAL_NUM_NM,
Event_Load_Date,
LOAD_TS,
INSERT_BATCH_NUM,
Tariff_Id,
Charge_category,
SERVICE_ID,
CUSTOMER_NODE_ID,
SERVICE_TYPE_CD_ID,
PROMO_AMT,
RADIO_TYPE_ACCESS_ID,
SERVICE_KEY,
SERIAL_NUMBER,
CURRENCY_ID,
CHARGED_PRDCT_INSTC_ID,
THROTTLE_FLG,
Year(Event_Load_Date) TheYear,
Month(Event_Load_Date) TheMonth
FROM network_events WHERE TheYear = ‘2015’
)
PARTITION BY TheYear, TheMonth ORDER BY TheYear, TheMonth
USING
LOCATION (‘/s3/tdadxdemo.s3.amazonaws.com/’)
AUTHORIZATION(‘{“Access_ID”:”AKIAR…………”,
“Access_Key”:”fBuG…………”}’)
COMPRESSION(‘SNAPPY’)
NAMING(‘DISCRETE’)
INCLUDE_ORDERING(‘FALSE’)
STOREDAS(‘PARQUET’)
)AS d;

Publish the Initial Data Set

This step creates and publishes the new product. A private product is used in this article.

Create Data Set

To publish your data, you need to create the data set first.

  • Choose Create data set
  • Make sure Amazon S3 object is selected for Select data set type. Fill in data set name (e.g. tdadxdemodata) and Description, choose Create data set
  • At step 2. Create revision within data set page, choose Create revision
  • At Create revision page, enter comment (optional), choose Create revision
  • At step 3. Add assets to revision page, choose Import from Amazon S3

At the Import from Amazon S3 page, choose the bucket from Source drop-down list, check the folder where the files are (or your file if there’s no folder), click Add selected. The files from the folder will show up under Files added. Click Import assets

The import job will show up in Jobs tile. Once it’s finished, the state of the job changes to Completed, and the files that were chosen from the previous step will be listed in Imported assets tile.

  • Choose Step 4 Finalize revision.

Create Product

Back at the Data Exchange Owned data sets page, click on the checkbox next to the new data set you created, and click Create product from data set(s).

­At the Product visibility page, make Product visibility Private. Make sure not to publish sensitive data, and check the first checkbox under Sensitive informationNo personal data that is not otherwise publicly available, and no sensitive categories of information (for example, stock data, weather patterns, public company filings). Click Next.

At the Define product page, do the following:

  • Fill in product name (e.g., Daily Network Events Data)
  • Add a company logo image file
  • Give either an email address or web address for support information
  • Choose a Product category (optional)
  • Give product a short description and a long description
  • Choose Next

On the Add data page, the data set you created should be already highlighted and selected. If not, click the check box next to your data set and click Add selected. Choose the Select revision access rules to indicate if you want to include the historical revisions and/or future revisions. Click Next.

At the Add custom offer page, do the following:

  • Offer type is Private offer for this example
  • Enter the subscriber’s AWS ID and description
  • Enter the duration and price for the offer
  • Enter the Offer expiration date
  • Leave Tax settings as default unless you have different needs
  • Leave Data subscription agreement (DSA) as default unless you have your own DSA file
  • No Refund policy if it’s a free product; otherwise enter Refund policy
  • Check whether or not you’d like the subscribers to auto-renew the offer
  • Choose Next

Review the information entered. Make corrections if needed. Click Publish.

The product will be in “Awaiting approval” status for a day. Once it’s approved by AWS and it’s ready to use.

Publishing Data Set Revisions to your Product

A Lambda function will be created to automatically read the new data files from the bucket, create a revision for the data files, and publish the revision to the product. This function is triggered by the S3 bucket PUT event.

Create an IAM Role

An IAM role needs to be created for the Lambda function to give the Lambda function permission to access S3 bucket and Data Exchange service. The steps for role creation can be found here. Make sure the role policy includes

  • Get, Put and List objects from your S3 bucket
  • Be able to CreateRevision, UpdateRevision, CreateJob, StartJob and GetJob from Data Exchange
  • Be able to DescribeEntity, StartChangeSet, DescribeChangeSet from MarketplaceAPI
  • Be able to Create Cloud Watch Log Group
  • Be able to Create Log Stream in the Log Group

Creating a Lambda Function

On the Lambda console, click Create a function.

On the Create function section, choose Author from scratch, and do the following:

  • For Function name, enter a name of your choice
  • For Runtime, choose Python 3.7
  • For Permissions, select Use an existing role, and select the Lambda role you created earlier from the Existing role dropdown
  • Choose Create function

Configuring Your Lambda Function

You can now configure your Lambda function. You first need to configure the function to be triggered when new files upload to the S3 bucket. Complete the following steps:

  • On the Lambda console, choose Functions
  • Select the newly created function
  • On the function configuration page, in the Function overview section, choose Add trigger
  • On the Add trigger page, Under Trigger Configuration, choose S3
  • From the drop-down, select the bucket you created as a part of the prerequisites
  • Under Event type, choose PUT
  • Leave Prefix and Suffix blank
  • Check the acknowledgment, and choose Add

You now can add code to the Lambda function.

  • Select Code tab and double click lambda_function.py under your function name in the Code source section
  • Replace lambda_function.py with the following code. Modify when necessary.
import os
import boto3
import time
import datetime
import json

#Include the Lambda layer extracted location
os.environ['AWS_DATA_PATH'] = '/opt/'
region = os.environ['AWS_REGION']

# Read in environment variables
try:
data_set_id = os.environ['DATA_SET_ID']
except KeyError:
raise Exception("DATA_SET_ID environment variable must be defined!")

try:
product_id = os.environ['PRODUCT_ID']
except KeyError:
raise Exception("PRODUCT_ID environment variable must be defined!")

try:
bucket_name = os.environ['BUCKET_NAME']
except KeyError:
raise Exception("BUCKET_NAME environment variable must be defined!")

try:
file_name = os.environ['FILE_NAME']
except KeyError:
raise Exception("FILE_NAME environment variable must be defined!")

def lambda_handler(event, context):
# Setup the boto3 clients needed
dataexchange = boto3.client(
service_name='dataexchange',
region_name=region

)
marketplace_catalog = boto3.client(
service_name='marketplace-catalog',
region_name=region
)

s3_resource = boto3.resource('s3')

# Global Variables
# Temp Local File
local_file = "/tmp/" + file_name
local_file_content = []
# List of files to be imported to S3
AssetSource = []

# Open local temp file, create if does not exist
try:
f = open(local_file,"r")
except:
f = open(local_file, "w")

# Read file content into a local_file_content
if os.path.getsize(local_file) != 0:
for line in f:
local_file_content.append(line.rstrip())

f.close()

# parse the s3 details from the triggered event
bucket_name = event['Records'][0]['s3']['bucket']['name']
object_key = event['Records'][0]['s3']['object']['key']

s3_bucket = s3_resource.Bucket(bucket_name)

# Retrieve 1st level partition name (partitioned by year) from the new file
s3_file_path = object_key[:4]

# If the new file has not been processed
if(s3_file_path not in local_file_content):
#Wait for all the file loads to finish
time.sleep(8)

# Walk through the entire bucket to retrieve all the files by year
for file in s3_bucket.objects.all():
year = file.key[:4]

# Matching the new file path
if year == s3_file_path:
thisAssetSource = {
'Bucket': bucket_name,
'Key': file.key
}
AssetSource.append(thisAssetSource)

# Create revision under the data set provided from the environment variable
revision_creation_time = datetime.datetime.utcnow().strftime("%d %B %Y %I:%M%p UTC")
create_revision_response = dataexchange.create_revision(Data setId=data_set_id, Comment='Revision created on ' + revision_creation_time)
revision_id = create_revision_response['Id']

# Create import job under the revision to import files
s3_import = dataexchange.create_job(
Type='IMPORT_ASSETS_FROM_S3',
Details={
'ImportAssetsFromS3': {
'DataSetId': data_set_id,
'RevisionId': revision_id,
'AssetSources': AssetSource
}
}
)

# Get job_id to start the job
job_id = s3_import['Id']
start_created_job = dataexchange.start_job(JobId=job_id)

# Get job status and wait until it reaches COMPLETED state
job_status = ''

while job_status != 'COMPLETED':
get_job_status = dataexchange.get_job(JobId=job_id)
job_status = get_job_status['State']
print('Job Status ' + job_status)

if job_status=='ERROR' :
job_errors = get_job_status['Errors']
raise Exception('JobId: {} failed with error:{}'.format(job_id, job_errors))

time.sleep(.5)

# Finalize revision
revision_finalize_time = datetime.datetime.utcnow().strftime("%d %B %Y %I:%M%p UTC")
finalize_revision = dataexchange.update_revision(Data setId=data_set_id, RevisionId=revision_id, Finalized=True, Comment='Revision finalized on ' + revision_finalize_time)

# Get the metadata about the product
describe_entity = marketplace_catalog.describe_entity(Catalog='AWSMarketplace', EntityId=product_id)

# Use the output to get entity type, id, data set arn and revision arn
entity_type = describe_entity['EntityType']
entity_id = describe_entity['EntityIdentifier']
data_set_arn = ((json.loads(describe_entity['Details']))['DataSets'][0]['DataSetArn'])
revision_arn = create_revision_response['Arn']

# Add the newly finalized revision to the existing product
start_change_set = marketplace_catalog.start_change_set(
Catalog='AWSMarketplace',
ChangeSetName="Adding revision to my Product",
ChangeSet=[
{
"ChangeType": "AddRevisions",
"Entity": {
"Identifier": entity_id,
"Type": entity_type
},
"Details": json.dumps({
"DataSetArn": data_set_arn,
"RevisionArns": [revision_arn]
})
}
]
)

# Get job status until it reaches SUCCEEDED state
changeset_id = start_change_set['ChangeSetId']

change_set_status = ''

while change_set_status != 'SUCCEEDED':
describe_change_set = marketplace_catalog.describe_change_set(
Catalog='AWSMarketplace',
ChangeSetId=changeset_id
)
change_set_status = describe_change_set['Status']
print('Change Set Status ' + change_set_status)

if change_set_status=='FAILED' :
print(describe_change_set)
failurereason = describe_change_set['FailureDescription']
raise Exception('ChangeSetID: {} failed with error:\n{}'.format(changeset_id, failurereason))
time.sleep(1)

# Add the processed partition to the temp local file
f = open(local_file,"a")
print('add new entry to file: ' + s3_file_path)
f.write(s3_file_path + '\n')
f.close()

return ('Your data has been published successfully')

For this example, testing data is written to the S3 bucket by year and month partition. Following is a list of sample data files in the bucket:

2018/01/object_35_59_1.parquet
2018/02/object_35_69_1.parquet
2018/03/object_36_77_1.parquet
2018/04/object_35_69_1.parquet
2018/05/object_36_85_1.parquet
2018/06/object_35_50_1.parquet
2018/07/object_36_89_1.parquet
2018/08/object_34_31_1.parquet
2018/09/object_35_68_1.parquet
2018/10/object_34_42_1.parquet
2018/11/object_33_9_1.parquet
2018/12/object_34_46_1.parquet

The sample code provided does the following:

  • Open a temp file to store the 1st level partition information (year in this case)
  • If the file is new, go through the bucket and gather all the files in the same partition and write them onto a list
  • Create a revision for the list from the previous step
  • Create an import job to import all the files from S3 to Data Exchange
  • Finalize the revision
  • Create a change set to add the newly finalized revision to an existing product
  • Add the partition info to the temp file so the files won’t get processed again

Note: The sample code provided is designed specifically to use year partition to identify data files to be processed. If you are using a different partition, change the code accordingly.

Next, set up the configuration for the Lambda function. Choose Configuration tab under Function overview, and click General configuration. Edit the Timeout time and change it to 1 minute.

Then click Environment variables. Add environment variables BUCKET_NAME, DATA_SET_ID, FILE_NAME (local temporary file name) and PRODUCT_ID.

You can retrieve DATA_SET_ID and PRODUCT_ID from the AWS Data Exchange console. The product ID is available on the product page, and the data set ID is available on the data set’s page.

Product Page
Dataset Page

Last, click Concurrency in Configuration tab, then click Edit

At the Edit concurrency page, select Reserve concurrency and change the number to 1. Click Save

The Lambda function is now ready and the setup for the data Provider is finished.

Write a New Asset to the S3 Bucket

New assets can then be created to generate data set revisions for consumers. To generate new data from network_events and write to bucket tdadxdemo, let’s use the same SQL statement used to generate the initial data set and we’ll change the year WHERE clause to a different year:

SELECT * FROM WRITE_NOS (
ON (
SELECT
Destination_Country_ID,
Roaming_Indicator,
Roaming_Operator_ID,
Customer_Type_ID,
Event_Count,
Charged_Product_Id,
Charge_Amt,
Used_Duration_in_Sec,
Used_Volume_in_Bytes,
Charged_Duration_in_secs,
Charged_Volume_in_Bytes,
Free_Duration_in_secs,
Free_Volume_in_Bytes,
Zero_Rated_Indicator,
Served_Device_Type_ID,
Network_Event_Type_ID,
Network_Event_Sub_Type_ID,
Subscription_ID,
Customer_ID,
Account_ID,
Network_Event_Date,
Subscription_Rate_Plan_ID,
SPECIAL_NUM_NM,
Event_Load_Date,
LOAD_TS,
INSERT_BATCH_NUM,
Tariff_Id,
Charge_catagory,
SERVICE_ID,
CUSTOMER_NODE_ID,
SERVICE_TYPE_CD_ID,
PROMO_AMT,
RADIO_TYPE_ACCESS_ID,
SERVICE_KEY,
SERIAL_NUMBER,
CURRENCY_ID,
CHARGED_PRDCT_INSTC_ID,
THROTTLE_FLG,
Year(Event_Load_Date) TheYear,
Month(Event_Load_Date) TheMonth
FROM network_events WHERE TheYear = ‘2016’
)
PARTITION BY TheYear, TheMonth ORDER BY TheYear, TheMonth
USING
LOCATION (‘/s3/tdadxdemo.s3.amazonaws.com/’)
AUTHORIZATION(‘{“Access_ID”:”AKIAR…………”,
“Access_Key”:”fBuG…………”}’)
COMPRESSION(‘SNAPPY’)
NAMING(‘DISCRETE’)
INCLUDE_ORDERING(‘FALSE’)
STOREDAS(‘PARQUET’)
)AS d;

Once the new data lands onto the S3 bucket, the Lambda function will be triggered and automatically create an asset revision against the Product’s Data set.

Setting Up a Data Subscriber

Let’s go an review the steps to subscribe to the data product.

Accepting A Private Product Offer

This step subscribes to a private product offer.

My Product Offers

When invited to a private data exchange, you will receive a notification in your AWS Data Exchange console.

  • To subscribe to the private product offer, choose Continue to subscribe in the product’s upper right-hand side of the product tile.
  • Review the subscription terms, Data Subscription Agreement (DSA), and refund policy. When ready to proceed, choose Subscribe at the bottom of the screen

Automating Data Set Revisions For Consumption

You can set up auto export of the data set to S3 bucket while the subscription’s been processed.

At the Set up exports — optional window

  • Use Simple for destination option and choose the destination bucket from the drop-down list.
  • Choose the bucket and folder destination for the data set from the Select Amazon S3 bucket foler destination drop down list
  • Choose the auto-export revision option. In this example, we’ll be using the On option, which requires setting up bucket policy for the destination bucket. Expand Bucket resource policy statement and add it to the bucket policy of your destination bucket.
  • Encryption is optional.
  • Click Export.

Note: auto-export can also be set up later. Go to Entitled data under My subscriptions from the left manual, expand your data subscription, and click on the data set name. Choose Add auto-export job destination and follow the steps described above.

Setting Up Email Notification

For the subscriber to be notified of the new data sets, set up a Cloud Watch rule to send the notification to the subscriber’s email.

Set Up an SNS Topic and Subscription

We will be using Amazon SNS service for emails.

  • Go to Amazon SNS service console, click Topics from the left manual and Create topic
  • Choose Standard as Type. Enter the name (e.g. newEventData) for the Topic, leave everything as default. Choose Create topic
  • At the Topics window, click Create subscription
  • Pick the topic you just created (e.g. newEventData) from the dropdown list for Topic ARN, and email as Protocol. Endpoint will be subscriber’s email address. Choose Create subscription

You should receive an email from AWS Notifications<no-reply@sns.amazonaws.com> once the setup is completed notifying you of the subscription. Click the Confirm subscription link from the email to confirm.

Sample email

Set Up Cloud Watch Rule

  • Browse to the AWS EventBridge console
  • Click Create rule from the Get started tile with the EventBridge Rule selected
  • At Define rule detail window, name your rule (e.g. newEventDataAlert). Click Next.
  • At Build event pattern window, choose AWS events or EventBridge partner events as Event source
  • Scroll down to Event Pattern, use the following configuration:
  • o Event source: AWS services
  • o AWS service: Data Exchange
  • o Event type: Revision Published to Data Set from Any resource
  • o Event pattern: use default
  • Click Next
  • At Select target(s) window, choose AWS service SNS topic as the target, and pick the topic name from the drop-down list (e.g. newEventData)
  • Expand Additional settings, use Input Transformer to Configure target input
  • Click Configure input transformer
  • ­Scroll down to Target input transformer, copy and paste the following code to Input path.
{“event-type”:”$.detail-type”,”RevisionIds”:”$.detail.RevisionIds”,”resources”:”$.resources”,”source”:”$.source”,”time”:”$.time”}
  • Copy and paste the following to Template:
“<event-type> by <source> at <time> with <RevisionIds> from source <resources> .”
  • Click Confirm
  • Click Next
  • At Configure tags window, add tag(s) that are applicable for you, click Next
  • Review the information. If everything is correct, click Create rule.

Configure NOS access to S3

Native Object Store (NOS) can directly read data in Amazon S3, which allows you to explore and analyze data in S3 without explicitly loading the data.

NOS is only available with Vantage 2.0 (version 17.0) or later.

Create a foreign table definition

A foreign table definition allows data in S3 to be easily referenced within the Advanced SQL Engine and makes the data available in a structured, relational format.

NOS supports data in CSV, JSON, and Parquet formats.

To create a foreign table, log in to your Vantage system with Teradata Studio.

Create an AUTHORIZATION object to access your S3 bucket with the following SQL command.

CREATE AUTHORIZATION DefAuth_S3
AS DEFINER TRUSTED
USER 'A*****' /* AccessKeyId */
PASSWORD '*****' /* SecretAccessKey */
;

Replace the string for USER with your AWS access key.

Replace the string for PASSWORD with your AWS secret access key.

Create a foreign table definition for the Parquet file on S3 with the following SQL command.

CREATE FOREIGN TABLE <foreign table name>
, EXTERNAL SECURITY DEFINER TRUSTED DefAuth_S3
(
Location VARCHAR(2048) CHARACTER SET UNICODE CASESPECIFIC
, col1 DOUBLE PRECISION FORMAT '-ZZZ9.99'
, col2 BIGINT
, col3 VARCHAR(16) CHARACTER SET UNICODE CASESPECIFIC
, …
, coln FLOAT
)
USING (
LOCATION ('<YOUR-STORAGE-ACCOUNT>')
STOREDAS ('PARQUET')
)
NO PRIMARY INDEX
PARTITION BY COLUMN ;

For example, using the network events data the subscriber received, the create foreign table statement as follows:

CREATE FOREIGN TABLE network_events_parquet
,EXTERNAL SECURITY DEFINER TRUSTED DefAuth_S3
(
Location VARCHAR(2048) CHARACTER SET UNICODE CASESPECIFIC,
Destination_Country_ID BIGINT,
Roaming_Indicator VARCHAR(1) CHARACTER SET UNICODE NOT CASESPECIFIC,
Roaming_Operator_ID VARCHAR(5) CHARACTER SET UNICODE NOT CASESPECIFIC,
Customer_Type_ID BIGINT,
Event_Count BIGINT,
Charged_Product_Id BIGINT,
Charge_Amt FLOAT,
Used_Duration_in_Sec BIGINT,
Used_Volume_in_Bytes BIGINT,
Charged_Duration_in_secs FLOAT,
Charged_Volume_in_Bytes FLOAT,
Free_Duration_in_secs FLOAT,
Free_Volume_in_Bytes FLOAT,
Zero_Rated_Indicator VARCHAR(1) CHARACTER SET UNICODE NOT CASESPECIFIC,
Served_Device_Type_ID FLOAT,
Network_Event_Type_ID BIGINT,
Network_Event_Sub_Type_ID BIGINT,
Subscription_ID BIGINT,
Customer_ID BIGINT,
Account_ID BIGINT,
Network_Event_Date VARCHAR(10) CHARACTER SET UNICODE NOT CASESPECIFIC,
Subscription_Rate_Plan_ID BIGINT,
SPECIAL_NUM_NM VARCHAR(28) CHARACTER SET UNICODE NOT CASESPECIFIC,
Event_Load_Date VARCHAR(10) CHARACTER SET UNICODE NOT CASESPECIFIC,
LOAD_TS VARCHAR(26) CHARACTER SET UNICODE NOT CASESPECIFIC,
INSERT_BATCH_NUM BIGINT,
Tariff_Id BIGINT,
Charge_category VARCHAR(13) CHARACTER SET UNICODE NOT CASESPECIFIC,
SERVICE_ID BIGINT,
CUSTOMER_NODE_ID BIGINT,
SERVICE_TYPE_CD_ID BIGINT,
PROMO_AMT FLOAT,
RADIO_TYPE_ACCESS_ID BIGINT,
SERVICE_KEY BIGINT,
SERIAL_NUMBER BIGINT,
CURRENCY_ID BIGINT,
CHARGED_PRDCT_INSTC_ID BIGINT,
THROTTLE_FLG VARCHAR(1) CHARACTER SET UNICODE NOT CASESPECIFIC)
USING
(
LOCATION ('/s3/s3.amazonaws.com/qgeast')
STOREDAS ('PARQUET')
)
NO PRIMARY INDEX
PARTITION BY COLUMN;

The LOCATION requires a top-level name, which is the bucket name. This is highlighted above in yellow. You will need to replace this with your own bucket name.

Foreign tables are always defined as No Primary Index (NoPI) tables.

Query the dataset in S3

Run the following SQL command to query the dataset.

SELECT * FROM network_events_parquet SAMPLE 10;

The foreign table contains Location and columns from the original data. Location is the address in the object store system.

Sample Results

Previewing the Parquet Schema using READ_NOS

READ_NOS can be used to view the schema of your Parquet data.

Run the following command to explore the schema of network events data.

SELECT * FROM READ_NOS (
USING
LOCATION ('S3/s3.amazonaws.com/qgeast')
ACCESS_ID ('AK****')
ACCESS_KEY (******)
RETURNTYPE ('NOSREAD_PARQUET_SCHEMA')
)
AS d;

The LOCATION requires a top-level name, which is the bucket name. This is highlighted above in yellow. You will need to replace this with your own bucket name.

Replace the string for ACCESS_ID with your AWS access key.

Replace the string for ACCESS_KEY with your AWS secret access key.

Sample Results

Load data from S3 into Vantage (optional)

Having a persistent copy of the S3 data can be useful when repetitive access to the same data is expected. NOS does not automatically make a persistent copy of the S3 data. Each time you reference a foreign table, Vantage will fetch the data from S3. (Some data may be cached, but this depends on the size of the data in S3 and other active workloads in Vantage.)

In addition, you may be charged network fees for data transferred from S3. If you will be referencing the data in S3 multiple times, you may reduce your cost by loading it into Vantage, even temporarily.

You can select among the approaches below to load the data into Vantage.

Create the table and load the data in a single statement

You can use a single statement to both create the table and load the data. You can choose the desired attributes from the foreign table payload and what they will be called in the relational table.

A CREATE TABLE AS … WITH DATA statement can be used with the foreign table definition as the source table.

Run the following SQL command to create the relational table and load the data.

CREATE MULTISET TABLE network_events AS (
SELECT CAST(Destination_Country_ID AS BIGINT) Destination_Country_ID,
CAST(Roaming_Indicator AS VARCHAR(1)) Roaming_Indicator,
CAST(Roaming_Operator_ID AS VARCHAR(5)) Roaming_Operator_ID,
CAST(Customer_Type_ID AS BIGINT) Customer_Type_ID,
CAST(Event_Count AS BIGINT) Event_Count,
CAST(Charged_Product_Id AS BIGINT) Charged_Product_Id,
CAST(Charge_Amt AS FLOAT) Charge_Amt,
CAST(Used_Duration_in_Sec AS BIGINT) Used_Duration_in_Sec,
CAST(Used_Volume_in_Bytes AS BIGINT) Used_Volume_in_Bytes,
CAST(Charged_Duration_in_secs AS FLOAT) Charged_Duration_in_secs,
CAST(Charged_Volume_in_Bytes AS FLOAT) Charged_Volume_in_Bytes,
CAST(Free_Duration_in_secs AS FLOAT) Free_Duration_in_secs,
CAST(Free_Volume_in_Bytes AS FLOAT) Free_Volume_in_Bytes,
CAST(Zero_Rated_Indicator AS VARCHAR(1)) Zero_Rated_Indicator,
CAST(Served_Device_Type_ID AS FLOAT) Served_Device_Type_ID,
CAST(Network_Event_Type_ID AS BIGINT) Network_Event_Type_ID,
CAST(Network_Event_Sub_Type_ID AS BIGINT) Network_Event_Sub_Type_ID,
CAST(Subscription_ID AS BIGINT) Subscription_ID,
CAST(Customer_ID AS BIGINT) Customer_ID,
CAST(Account_ID AS BIGINT) Account_ID,
CAST(Network_Event_Date AS VARCHAR(10)) Network_Event_Date,
CAST(Subscription_Rate_Plan_ID AS BIGINT) Subscription_Rate_Plan_ID,
CAST(SPECIAL_NUM_NM AS VARCHAR(28)) SPECIAL_NUM_NM,
CAST(Event_Load_Date AS VARCHAR(10)) Event_Load_Date,
CAST(LOAD_TS AS VARCHAR(26)) LOAD_TS,
CAST(INSERT_BATCH_NUM AS BIGINT) INSERT_BATCH_NUM,
CAST(Tariff_Id AS BIGINT) Tariff_Id,
CAST(Charge_category AS VARCHAR(13)) Charge_category,
CAST(SERVICE_ID AS BIGINT) SERVICE_ID,
CAST(CUSTOMER_NODE_ID AS BIGINT) CUSTOMER_NODE_ID,
CAST(SERVICE_TYPE_CD_ID AS BIGINT) SERVICE_TYPE_CD_ID,
CAST(PROMO_AMT AS FLOAT) PROMO_AMT,
CAST(RADIO_TYPE_ACCESS_ID AS BIGINT) RADIO_TYPE_ACCESS_ID,
CAST(SERVICE_KEY AS BIGINT) SERVICE_KEY,
CAST(SERIAL_NUMBER AS BIGINT) SERIAL_NUMBER,
CAST(CURRENCY_ID AS BIGINT) CURRENCY_ID,
CAST(CHARGED_PRDCT_INSTC_ID AS BIGINT) CHARGED_PRDCT_INSTC_ID,
CAST(THROTTLE_FLG AS VARCHAR(1)) THROTTLE_FLG
FROM network_events_parquet
) WITH DATA
NO PRIMARY INDEX
;

Run the following SQL command to validate the contents of the table.

SELECT * FROM network_events SAMPLE 10;
Sample Results

Create the table and load the data in multiple statements

You can also use multiple statements to first create the relational table and then load the data. An advantage of this choice is that you can perform multiple loads, possibly selecting different data or loading in smaller increments if the object is very large.

Run the following SQL command to create the relational table.

CREATE TABLE network_events
(
Destination_Country_ID BIGINT,
Roaming_Indicator VARCHAR(1) CHARACTER SET UNICODE NOT CASESPECIFIC,
Roaming_Operator_ID VARCHAR(5) CHARACTER SET UNICODE NOT CASESPECIFIC,
Customer_Type_ID BIGINT,
Event_Count BIGINT,
Charged_Product_Id BIGINT,
Charge_Amt FLOAT,
Used_Duration_in_Sec BIGINT,
Used_Volume_in_Bytes BIGINT,
Charged_Duration_in_secs FLOAT,
Charged_Volume_in_Bytes FLOAT,
Free_Duration_in_secs FLOAT,
Free_Volume_in_Bytes FLOAT,
Zero_Rated_Indicator VARCHAR(1) CHARACTER SET UNICODE NOT CASESPECIFIC,
Served_Device_Type_ID FLOAT,
Network_Event_Type_ID BIGINT,
Network_Event_Sub_Type_ID BIGINT,
Subscription_ID BIGINT,
Customer_ID BIGINT,
Account_ID BIGINT,
Network_Event_Date VARCHAR(10) CHARACTER SET UNICODE NOT CASESPECIFIC,
Subscription_Rate_Plan_ID BIGINT,
SPECIAL_NUM_NM VARCHAR(28) CHARACTER SET UNICODE NOT CASESPECIFIC,
Event_Load_Date VARCHAR(10) CHARACTER SET UNICODE NOT CASESPECIFIC,
LOAD_TS VARCHAR(26) CHARACTER SET UNICODE NOT CASESPECIFIC,
INSERT_BATCH_NUM BIGINT,
Tariff_Id BIGINT,
Charge_category VARCHAR(13) CHARACTER SET UNICODE NOT CASESPECIFIC,
SERVICE_ID BIGINT,
CUSTOMER_NODE_ID BIGINT,
SERVICE_TYPE_CD_ID BIGINT,
PROMO_AMT FLOAT,
RADIO_TYPE_ACCESS_ID BIGINT,
SERVICE_KEY BIGINT,
SERIAL_NUMBER BIGINT,
CURRENCY_ID BIGINT,
CHARGED_PRDCT_INSTC_ID BIGINT,
THROTTLE_FLG VARCHAR(1) CHARACTER SET UNICODE NOT CASESPECIFIC
)
PRIMARY INDEX (Customer_ID, Account_ID);

Run the following SQL to load the data into the table.

INSERT INTO network_events
SELECT CAST(Destination_Country_ID AS BIGINT) Destination_Country_ID,
CAST(Roaming_Indicator AS VARCHAR(1)) Roaming_Indicator,
CAST(Roaming_Operator_ID AS VARCHAR(5)) Roaming_Operator_ID,
CAST(Customer_Type_ID AS BIGINT) Customer_Type_ID,
CAST(Event_Count AS BIGINT) Event_Count,
CAST(Charged_Product_Id AS BIGINT) Charged_Product_Id,
CAST(Charge_Amt AS FLOAT) Charge_Amt,
CAST(Used_Duration_in_Sec AS BIGINT) Used_Duration_in_Sec,
CAST(Used_Volume_in_Bytes AS BIGINT) Used_Volume_in_Bytes,
CAST(Charged_Duration_in_secs AS FLOAT) Charged_Duration_in_secs,
CAST(Charged_Volume_in_Bytes AS FLOAT) Charged_Volume_in_Bytes,
CAST(Free_Duration_in_secs AS FLOAT) Free_Duration_in_secs,
CAST(Free_Volume_in_Bytes AS FLOAT) Free_Volume_in_Bytes,
CAST(Zero_Rated_Indicator AS VARCHAR(1)) Zero_Rated_Indicator,
CAST(Served_Device_Type_ID AS FLOAT) Served_Device_Type_ID,
CAST(Network_Event_Type_ID AS BIGINT) Network_Event_Type_ID,
CAST(Network_Event_Sub_Type_ID AS BIGINT) Network_Event_Sub_Type_ID,
CAST(Subscription_ID AS BIGINT) Subscription_ID,
CAST(Customer_ID AS BIGINT) Customer_ID,
CAST(Account_ID AS BIGINT) Account_ID,
CAST(Network_Event_Date AS VARCHAR(10)) Network_Event_Date,
CAST(Subscription_Rate_Plan_ID AS BIGINT) Subscription_Rate_Plan_ID,
CAST(SPECIAL_NUM_NM AS VARCHAR(28)) SPECIAL_NUM_NM,
CAST(Event_Load_Date AS VARCHAR(10)) Event_Load_Date,
CAST(LOAD_TS AS VARCHAR(26)) LOAD_TS,
CAST(INSERT_BATCH_NUM AS BIGINT) INSERT_BATCH_NUM,
CAST(Tariff_Id AS BIGINT) Tariff_Id,
CAST(Charge_category AS VARCHAR(13)) Charge_category,
CAST(SERVICE_ID AS BIGINT) SERVICE_ID,
CAST(CUSTOMER_NODE_ID AS BIGINT) CUSTOMER_NODE_ID,
CAST(SERVICE_TYPE_CD_ID AS BIGINT) SERVICE_TYPE_CD_ID,
CAST(PROMO_AMT AS FLOAT) PROMO_AMT,
CAST(RADIO_TYPE_ACCESS_ID AS BIGINT) RADIO_TYPE_ACCESS_ID,
CAST(SERVICE_KEY AS BIGINT) SERVICE_KEY,
CAST(SERIAL_NUMBER AS BIGINT) SERIAL_NUMBER,
CAST(CURRENCY_ID AS BIGINT) CURRENCY_ID,
CAST(CHARGED_PRDCT_INSTC_ID AS BIGINT) CHARGED_PRDCT_INSTC_ID,
CAST(THROTTLE_FLG AS VARCHAR(1)) THROTTLE_FLG
FROM network_events_parquet;

Run the following SQL command to validate the contents of the table.

SELECT * FROM network_events SAMPLE 10;
Sample Results

Further Information

Don’t forget to go to teradata.com to find more information about Teradata products and follow us on Medium to get notifications on news and important developer content. Stay in touch, as improvements to this workflow are coming soon!

And remember…. Always have fun, and happy coding!!

--

--

Sebastian
Teradata

Data & Analytics Nerd. Product Manager. Former Enterprise Architect. Professor. Speaker. Amateur Musician. Traveller