Ingesting Musical Content At Scale Using DDEX: Part Two

Will Button
Aaptiv Engineering
Published in
9 min readApr 4, 2018

Parsing DDEX Electronic Resource Notification (ERN) Messages

Recap

In the last post, I introduced the Data Definition EXchange (DDEX) as the format used in the music industry to distribute products digitally. I also described the goals and objectives we had in consuming DDEX resources to provide audio content to our users.

In this post, I’ll go into the technical details of how we implemented it.

Workflow

New Message Received

Our workflow starts when we receive an Electronic Resource Notification (ERN) message from a music provider. This is done via AWS S3. Each provider has a dedicated bucket, accessible only by them.

We use a trigger to fire the appropriate Lambda function when the message is received.

Shown here is the trigger configuration, demonstrating that files placed in the incoming folder with the .xml extension will trigger the Lambda function.

Accessing the Message

When the Lambda function is triggered, a parameter is passed to the function. This parameter contains the name of the S3 bucket and the key name (a.k.a. file name) of the file that triggered the function. We use this information to download the file to the Lambda function, where it can be read.

def process(event, context):
"""
Lambda handler to process new ERN messages
"""
# get s3 file
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
file_path = '/tmp/{0}'.format(key.split("/")[-1])
s3_client.download_file(bucket, key, file_path)
''

You’ll notice that we download the file to the /tmp/ directory. This is the only writeable location within a Lambda function, the rest of the filesystem is read-only. It’s also limited to 500mb (you’ll see why that’s important to know later).

Parsing the Message

Now that we have a message, it’s time to parse it and see what’s inside.

There are three primary messages we expect and act upon:

  • OriginalMessage: A brand new message containing new music.
  • UpdateMessage: Indicates a change has been made to an audio asset we already have. For example, a record label changes names or the distribution details have changed.
  • TakedownMessage: Used to notify us that we can no longer use this piece of music or resource.

To parse the message, we used ElementTree. It’s simple to use, feature-rich, and part of core Python. Our needs were pretty simple, so I didn’t see any reason to look further.

Here’s the kicker: any given message from the sender contains a ton of information. A typical message contains the following:

  • Message Header: This includes the identifier for the sender, the identifier for the intended recipient, and the unique message id.
  • Resource List: Each resource represented in the message is listed here. For a full album, there is an XML element for the album, for each track on the album, and the cover art or images. Individual elements include the unique identifier for the album or track, the musicians and the role they played (singer, guitar, bass, etc), and regional elements indicating what the title of the track is. This allows the same song to have different names in different parts of the world. Other information included here is the audio format, duration, and MD5 hash for the file.
  • Release List: Each release for each resource is listed here, and it can get quite complicated. For any given song, it may be released as a single, as part of an album, or as a preview. It can also be released in different parts of the world on different days, requiring a separate release element for each instance. The schema specification also introduces relational XML here, meaning that the resource being described in this release is identified by a unique identifier element that points to the Resource List element in the file.
  • Deal List: Finally, we get to the deal list. The deal describes how you are authorized to use the music, and when you are allowed to do so. The music industry is vast, so you can expect to find deal information that describes when a music store can display the CD on their shelf and what the retail price is. Some deals allow you to make the music available for download, while others allow you to stream the audio. Again we see the relational XML here that references an individual release from the same ERN message. This section is arguably the most important: it describes how you are allowed to use the resource, when you are allowed to do so, and you’ll need to provide monthly reports back to the sender describing how you used it. This report is used to calculate the royalties you owe to the music provider in exchange for playing their music.

As an audio fitness company, we use music to create playlists that are used by our trainers as the soundtrack for your guided workout. This means we’ll probably never play an album. We’ll also never sell a physical CD (or cassette, album, 8-track, or download). In other words, some deals don’t apply to us.

With all of that new knowledge, I felt like I had enough information to start building out our workflow.

Implementation

Preserve the integrity of the ingested content

The first step is to transform the XML message into a usable format in its entirety. This ensures that we always have the complete message which may be needed for troubleshooting or validating that we took the correct action based on the message contents.

I used SQLAlchemy to create a database schema. There are a lot of ORM haters, and in some cases they are justified. In this case, the only required action is saving the message to a database. In my experience, the problem most people have with ORMs can be traced to performing complex SQL queries (which many times can be traced to poor schema design). I felt confident we could avoid that here, and using an ORM gives me a nice Python class object that fully represents the ERN message.

In hindsight, I really wished I’d used MongoDB for this instead of Postgres. From a relational perspective, this looks pretty straightforward, and it is. After multiple integrations though, I’ve learned that every sender adds fields unique to them, or uses a different data format.

Guess what?

If I didn’t code for it, it doesn’t get captured.

If I had used MongoDB though, I could have just converted the XML to JSON and saved it to the db. Additionally, Mongo has excellent query capabilities and in most cases does an exceptional job of querying across sparse data sets.

Lesson learned.

Verify required data

Some XML elements are required, for example the MessageId. It’s an ID unique to the sender that allows us to reference an individual ERN message. It’s seems logical to assume this field will always be included, but anyone who has spent significant time working with 3rd party data can attest otherwise.

if message_header.find('MessageId') is not None:
ern.message_id = message_header.find('MessageId').text
else:
raise Exception("Missing MessageId")

Using the .find() method checks to see if the element exists in the XML, and if found retrieve the value using .text.

Similar code is executed for each XML element that is required. Once complete, we can process the XML message, confident that we have the required data for successful ingestion.

Download and store the resources

The next step is to iterate through the ResourceList. This provides all of the resources (audio tracks, album artwork, etc) contained in this message. Each one of these must be downloaded from the Sender.

<TechnicalSoundRecordingDetails><TechnicalResourceDetailsReference>T5</TechnicalResourceDetailsReference>
<AudioCodecType>FLAC</AudioCodecType>
<NumberOfChannels>2</NumberOfChannels>
<SamplingRate UnitOfMeasure="kHz">44.1</SamplingRate>
<BitsPerSample>16</BitsPerSample>
<IsPreview>false</IsPreview>
<File>
<URL>https://s3.amazonaws.com/XXXXXX/XXXX.flac</URL>
<HashSum>
<HashSum>014a089377bb23c80c693d10065f03a9</HashSum>
<HashSumAlgorithmType>MD5</HashSumAlgorithmType>
</HashSum>
</File>
</TechnicalSoundRecordingDetails>

Because we are using AWS Lambda, two unique problems presented themselves. First, each Lambda function has a maximum runtime of 5 minutes. Second, the Lambda function operates in a read-only file system, with the exception of a 500mb /tmp directory.

track = requests.get(url, stream=True)
if track.ok:
file_name = '/tmp/' + url.split('/')[-1].split('?')[0]
with open(file_name, 'wb') as f:
for chunk in track.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
if get_hash(file_name) == hashsum:
return file_name
else:
raise Exception("Hash did not match downloaded file")
else:
if track.status_code >= 400 and track.status_code < 500:
raise exceptions.File400Exception("Error accessing file")
elif track.status_code >= 500:
raise exceptions.File500Exception("HTTP 5XX encountered downloading file")

Using the python requests library, we first verify the file URL returns an HTTP 200 response if track.ok. Next, we download the file and save it to the /tmp folder, then verify the MD5 hash matches the value provided in the XML message. This ensures the file downloaded completely, without corruption.

Once downloaded and verified, we use the boto3 library to upload the file to our own S3 bucket.

res = upload.Bucket(UPLOAD_BUCKET).put_object(Key=file_name.split("/")[-1], Body=data, ServerSideEncryption='aws:kms')

Notice that we specify ServerSideEncryption as well so that the files in our S3 bucket are encrypted.

While this works, we ran into problems at scale. Some XML messages were huge, containing nearly 100 audio tracks. Each one of those had to be downloaded, verified, then uploaded to our S3 bucket. In some cases, this caused the Lambda function time out before completing. Since the function didn’t exit successfully, it re-triggers, starting the process all over again…

at Track 1. 🤦‍♂

To overcome this, I added a quick check to see if the downloaded file already existed in our final S3 bucket:

client = boto3.client('s3')
try:
response = client.head_object(Bucket=bucket, Key=key)
return True
except ClientError as e:
if int(e.response['Error']['Code']) == 404:
return False

This meant that files previously downloaded could be skipped, allowing the function to focus on the remaining files in the ERN message.

Next I hit the second issue: the 500mb limit. With each ERN message triggering its own Lambda function, you wouldn’t think stale data would be a problem (at least I didn’t). Turns out it is. When a Lambda function finishes executing, if another trigger fires within a certain window, that Lambda function is reused.

This means that the /tmp space may already have files in it. Once you reach the 500mb capacity, the functions will fail due to lack of disk space. As a result, I added

os.remove(file_name)

immediately after uploading the file to our S3 bucket to ensure that no lingering files remained.

Save the parsed message to the master database

At this point, we’ve

  • validated our required fields
  • downloaded the resources identified in the message
  • saved the files to our encrypted S3 bucket

It’s only then that I write the message to the database. This ensures that a message is completely and successfully processed when it is written to the database. Because I used SQLAlchemy as the ORM and parsed the ERN message into a Python object, saving is as simple as

session.add(ern)
session.commit()

Tell someone you’re done

Now that we’ve successfully ingested the new content, we need to notify some folks. First, we need to notify the sender that we’ve successfully ingested their message. We do this by posting a message to a web service provided by the sender.

response = requests.post(WS_URL, data=request, auth=(WS_USER, WS_PWD), headers=headers)

The web service URL, username, and password are all environment variables. This means I can set them per function and never have to store credentials in the code. The payload to the web service is an XML file with an element <Status>SuccessfullyIngestedByDistributor</Status>.

Remember, we received a lot of information about each audio track. Some of it we don’t care about, for instance all the contributors to a given audio track. Having to sort through that information across millions of songs would lead to a poor user experience. Instead, I take the parts of the data that are relevant to our audio team and post it as a message on an SQS queue. This triggers another Lambda function to save the SQS message to a different database, searchable by our audio team.

This may seem like an unnecessary step, but actually provides us with a lot of flexibility and scalability. The process of ingesting ERN messages is decoupled from the process of adding new music to the database used by our audio engineering team. This allows for a failure in either process to be isolated to its function only, preventing a scenario where a failure to update the audio engineers database blocks an acknowledgement to the ERN Sender. For scalability, it allows either process to operate as fast as it can and use the SQS queue as a buffer between.

Up Next

In the next post, we’ll take a look at the different types of testing implemented to ensure things work as expected and allow us to confidently deploy changes without fear of breaking things.

--

--