Within the everyday programming tasks that take place on a long-term or large-scale project, there will always be data migration and transformation tasks that require “one-off scripts”.
As an experienced/older (or greyer) engineer, I’m used to “one-off” becoming regularly scheduled. As such I guide the other team members to write anything designated as a “one-off” task in the same fashion as a normal piece of production code — which means:
- Using the best tool (language/libraries) for the task at hand
- Must have unit tests (but not necessarily ‘integrated tests’)
- Must be well documented — the next time this “one off” script is executed could be well be a year (or more) later
Processing CSV files
The unfortunate reality of working in technology is that it is extremely likely you will be required to export data for some product manager/project manager or someone fulfilling a similar role. Since the client will always want to open the exported data in Excel, s-exprs are not considered an acceptable output format 😒.
At the warehouse we needed to be able to provide some stock related data to another part of THG and we needed to validate the data before handing it over (as the ubiquitous CSV file).
The team designated this task, own the stock and inventory components of our micro-service based WMS and decided that their tool of choice should match the language/libraries and code style of their components — so in this case the “one-off” was built using Scala and Akka.
Fit for processing?
The more obvious choices for dealing with slurping and processing files traditionally come from the “dynamic” and/or “scripting” families (Perl, Python, Ruby &c).
However the team lead’s experience and confidence that Scala and Akka would be a good fit for the problem at hand, led them to choose a less obvious solution.
Personally I would have reached for Clojure or Rust to perform the same task, however the team owned problem and we empower our teams to solve problems in the manner they deem to be the most appropriate.
Still using a compiled, strongly-typed language like Scala to do something almost automatically associated with Perl (ie. the “Practical extraction and reporting language”) did raise some eyebrows! The choice of Scala for this was however justified:
- The stock system is built in Scala and Akka
- Our build pipeline and processes are already configured to use Scala
- Having a much simpler system built using the same language and framework as the much more complex stock system, allows newcomers to the team an “on-ramp” where they can understand the language without the complexity of the problem domain
- This project can act as a canary for upgrades to Akka libraries and Scala versions without impact to the core stock system
- The project can act as a test vehicle for specific Akka related techniques (one of which is discussed in detail below) before applying the same to the core stock system
As our input files are stored in AWS S3 (and the output would also be expected to be stored in S3), there were a couple of approaches we could take here:
- Use the AWS CLI to download the files from S3 at the start, kick off the processing of the files and then use AWS CLI to upload the output at the end
- Use Alpakka to connect S3 directly with the core processing logic.
If a traditional scripting approach had been taken, it is probable that the team would have chose option 1, just call
aws s3 cp at the start and then process the files when they have been downloaded. However given the ability to create a stream of data directly from the S3 bucket, it made sense to explore the second option.
Streaming from S3
downloadFile method interacts with the core Alpakka library, indeed this code is very similar to the example provided in the documentation, with the addition of logging and error handling. The stream support in Alpakka allows us to define the output from
S3.download as a Stream
Source. From the documentation:
* Downloads a S3 Object
* @param bucket the s3 bucket name
* @param key the s3 object key
* @param range [optional] the [[akka.http.scaladsl.model.headers.ByteRange ByteRange]] you want to download
* @param sse [optional] the server side encryption used on upload
* @return The source will emit an empty [[scala.Option Option]] if an object can not be found.
* Otherwise [[scala.Option Option]] will contain a tuple of object's data and metadata.
As we now have a Stream
source we can pass that directly to a Stream
sink in this case
Sink.head to return a
Future of the first value received
This code is useless until it’s called by a client of some kind, and in our case, as the team is using Akka, the client code is an Actor
ReconciliationSupervisorActor is created and called from our main code where we pass in the references to both the
FileWriterActor and the
ReconciliationSupervisorActor handles the co-ordination between the
S3ReadWriteActor and the
Although it is possible to connect the output of the
S3ReadWriteActor directly to the processing and validation actor,
ReconcileActor, the team thought it would be wiser to store a local copy of the source CSV files to allow them to debug any issues with either the validation process or the download process independently.
With a local copy of the files cached, the validation actor is asked to start processing,
reconcileActor ! Begin. The core logic of validation is some simple arithmetic and of little interest to us here. However how the values that are being manipulated is kept track of is quite nice.
In Akka systems, there is a core concept of supervision of actors and supervision trees. Monitor programs or supervisor programs date back to the start of computing on time-share systems and eventually evolved into ‘operating systems’ and it was only a matter of time before some programming languages codified the concepts of supervisors and formalised how to construct reliable software systems using these concepts.
The key to supervision trees is that creating or instantiating an actor within an actor forms a parent & child relationship, where the parent is responsible for handling any errors of the child actor. In the case shown above, the
ReconcileActor is the parent of two child actors.
However we don’t want the standard or default supervision strategy here. Instead the team have specified a specific strategy,
resumingDecider that will drop an erroneous element of the stream and continue processing.
The snippet above also showcases Akka streaming; the
csvFile acts as a
Source and the
StockPositionActor as the
Sink and the data (as lines from the file) are consumed by the
Getting down to Cases
One of the small advantages of Scala over Java is the ability to leverage “case classes” for models instead of having to create a more traditional Java Bean. In Akka, case classes are used to represent the commands/messages that are passed to the actors.
These case classes define what we expect to map the raw files to for processing by the Actors. One thing of interest is
CsvSource#Repr[ReportLine] this is using a facility of Scala called Type Projection to define a type in terms of an inner type from a different class. This allows the code to be type safe and keep the type definitions in the most logical and consistent place.
The solution requires case classes for the
CsvFiles and for the definition of the rows within each CSV. Finally there is a case class to handle the definition of the command line arguments, which allows us to parse and validate the command line arguments using the Scopt library.
Scala, being a multi-paradigm language allows developers to write code in either a functional or object-oriented style or a mix of both:
Our team have opted to use Scala as a ‘functional-first’ language and have tried, within the Scala code they are responsible for, to eliminate the more procedural and object-oriented practices such as state updates or mutation.
In Akka, actors have an internal mailbox of messages that the framework handles, however you could also encode state and transitions within an Actor. To avoid manipulating state you can use the elegant
become feature of Akka, this is one of those ‘best-practice’ idioms you don’t need until you really need!:
In this example, the standard Akka
receive is over-ridden to call
active passing in (initially) an empty
Map. Then on receiving a message that would normally mutate state (in a conventional object-oriented fashion), the actor replaces it’s current implementation with a new implementation that is identical except the empty
Map has been updated. In the example code, this occurs for just two of the cases;
case line @ ReportLine(productId, total, good, damaged, held, transfer)
case line @ MovementLine(productId, qty, reason, messageType)
In essence, as each line is processed by this actor, it swaps out the
Map of stock positions or quantities for a new
Map calculated by taking the current
Map and applying the line of data. Interestingly this doesn’t seem to have an impact on performance — one of the criticisms of functional programming is the amount of copying of data structures required to maintain purity and immutability.
As all the processing occurs within actors, using
context.become allows the code to essentially be purely functional — with all the benefits that entails. This leaves the only state as the input and output files (which as IO really must be stateful).
The task was initially to create a “one-off script” to process and validate some tabular data. Taking a longer-term approach to software engineering, the team decided to build the validation logic in the same tech stack as the core technology they were responsible for.
As the problem is easy to define and understand, it is the perfect vehicle to get new hires and graduate developers started with understanding Scala & Akka and how they can be used to solve (this simple) problem before they dive into the core (more complex) systems.
The team produced a solution that used the features of the technology in the recommended fashion whilst still delivering to the target spec.
Obviously this “one-off” task became something that was to be repeated, or all this additional effort would have been over-engineering of the navel-gazing sort.
Find out about the exciting opportunities at THG here: