<?xml version="1.0" encoding="UTF-8"?><rss xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:atom="http://www.w3.org/2005/Atom" version="2.0" xmlns:cc="http://cyber.law.harvard.edu/rss/creativeCommonsRssModule.html">
    <channel>
        <title><![CDATA[Stories by Benoit de Patoul on Medium]]></title>
        <description><![CDATA[Stories by Benoit de Patoul on Medium]]></description>
        <link>https://medium.com/@benoitdepatoul?source=rss-2430cef64b90------2</link>
        <image>
            <url>https://cdn-images-1.medium.com/fit/c/150/150/1*bo3QRe6z_Vv3UnOQjvVO-g.jpeg</url>
            <title>Stories by Benoit de Patoul on Medium</title>
            <link>https://medium.com/@benoitdepatoul?source=rss-2430cef64b90------2</link>
        </image>
        <generator>Medium</generator>
        <lastBuildDate>Sat, 16 May 2026 17:23:31 GMT</lastBuildDate>
        <atom:link href="https://medium.com/@benoitdepatoul/feed" rel="self" type="application/rss+xml"/>
        <webMaster><![CDATA[yourfriends@medium.com]]></webMaster>
        <atom:link href="http://medium.superfeedr.com" rel="hub"/>
        <item>
            <title><![CDATA[Trigger concurrent Amazon SageMaker jobs at scale with AWS Step Functions]]></title>
            <link>https://medium.com/@benoitdepatoul/trigger-concurrent-amazon-sagemaker-jobs-at-scale-with-aws-step-functions-1438e8d081c6?source=rss-2430cef64b90------2</link>
            <guid isPermaLink="false">https://medium.com/p/1438e8d081c6</guid>
            <dc:creator><![CDATA[Benoit de Patoul]]></dc:creator>
            <pubDate>Mon, 06 Mar 2023 09:21:22 GMT</pubDate>
            <atom:updated>2023-03-06T09:21:22.014Z</atom:updated>
            <content:encoded><![CDATA[<p>An example about triggering SageMaker jobs or ML pipelines at scale in a controlled manner using Step Functions.</p><p>Typically a Machine Learning (ML) lifecycle within SageMaker (SM) triggers what we call jobs. These jobs allow the user to take advantage over the fully managed SageMaker resources. Sometimes SM users need to trigger different SM jobs or pipelines at scale and in concurrency. It can be from having to trigger multiple simple training jobs to multiple ML pipelines. This can rapidly become challenging and complex due to different limitations/quotas such as the maximum number of jobs, the API throttling limits, the logic to retry failed jobs, etc. As a best practice it is recommended to spread in time the amount of jobs to avoid hitting quotas and peak times. All of this needs to be done in a controlled manner.</p><p>This blog explores and shows an example on how can this be achieved by mainly using Step Functions. This example can be built within your AWS account by deploying the provided Cloud Formation template. The architecture combines all together the <a href="https://aws.amazon.com/blogs/compute/controlling-concurrency-in-distributed-systems-using-aws-step-functions/">blog</a> “Controlling concurrency in distributed systems using AWS Step Functions”, the <a href="https://docs.aws.amazon.com/step-functions/latest/dg/sample-train-model.html">sample project “Train a machine learning model”</a> from Step Functions and the <a href="https://aws.amazon.com/sqs/">Amazon Simple Queue Service</a> (SQS). This solution uses the concurrency control and retry policies of the <a href="https://aws.amazon.com/blogs/compute/controlling-concurrency-in-distributed-systems-using-aws-step-functions/">blog</a> combined to a Machine Learning (ML) lifecycle that includes its own retry policies, and Amazon SQS that contains the number of tasks yet to be run and its parameters. This architecture can be adapted to your own use case and it is discussed how later in the blog.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/640/1*gNmNOBpzPWnXupJPF-5oIQ.jpeg" /><figcaption>Photo by <a href="https://unsplash.com/@gerandeklerk?utm_source=unsplash&amp;utm_medium=referral&amp;utm_content=creditCopyText">Geran de Klerk</a> on <a href="https://unsplash.com/photos/uYkdJEYNwSM?utm_source=unsplash&amp;utm_medium=referral&amp;utm_content=creditCopyText">Unsplash</a></figcaption></figure><h3>How does the architecture work?</h3><p>The architecture uses a distributed <a href="https://en.wikipedia.org/wiki/Semaphore_(programming)">semaphore</a> to spread and limit the number of concurrent SageMaker workloads, and a SQS queue that contains all the remaining tasks to be performed. Step Functions queries via Lambda the task from the SQS queue and runs the workload when there is available space within the chosen concurrency limit. A task could be anything from just generating or preparing a dataset, to run a whole ML lifecycle. In this blog I run a ML lifecycle <a href="https://docs.aws.amazon.com/step-functions/latest/dg/sample-train-model.html">“Train a machine learning model”</a>. This architecture has been tested by running 300 tasks with a concurrency of 20. The result was successful by completing all the tasks, respecting the maximum number of concurrent running tasks and retrying any failed task.</p><p>The architecture is made of three state machines from the <a href="https://aws.amazon.com/blogs/compute/controlling-concurrency-in-distributed-systems-using-aws-step-functions/">blog</a>. To understand in details each of those state machines please have a read at the shared link above. The ones we will focus on this blog is the<em> CC-Test-Run100Executions </em>and<em> CC-ConcurrencyControlledStateMachine </em>state machines.</p><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*dI_OwtxfJS-sMCmozfwR3A.jpeg" /></figure><p>The state machine <em>CC-Test-Run100Executions </em>is the state machine used to demonstrate the concurrency control. It has been adapted to retrieve the number of tasks in the SQS queue. The <em>CC-ConcurrencyControlledStateMachine</em> is where you define the maximum number of tasks that can be run concurrently. In this example, the task is the maximum number of ML lifecycle.</p><p>The ML lifecycle task <strong><em>(</em></strong> <a href="https://docs.aws.amazon.com/step-functions/latest/dg/sample-train-model.html">“Train a machine learning model”</a><strong><em>)</em></strong> is part and defined in the <em>CC-ConcurrencyControlledStateMachine. </em>It generates a dataset, trains a model, saves the model and then applies a batch transform on the data. I have added <a href="https://docs.aws.amazon.com/step-functions/latest/dg/concepts-error-handling.html">retry policies</a> in case of any failure happening separately in the training job, on saving the model, or in the batch transform. The <em>Generate dataset</em> step contains extra code within the Lambda function to pick a task from the SQS queue. I have also added an extra step at the end of the task that deletes the SQS message once the ML lifecycle is successfully completed.</p><h3>How to use the Cloud Formation template?</h3><p>You can download the Cloud Formation template from this <a href="https://gist.github.com/bpatoul/c0a0849b36d3410c22d69ba291eba85a">link</a>. You need to log in into your AWS account, go to the Cloud Formation service, create a new stack and then upload the template.</p><p>You will see three parameters. The<em> ParameterInstancePrefix</em> and <em>ParameterLockName </em>are related to the Dynamo table. The first one is the table name, and the second one is the partition key name. You do not need to change any of those. Lastly, the <em>MaxConcurrency</em> parameter is the number of maximum concurrent tasks running. To be more precise, for this example is the maximum number of ML lifecycles running at the same time. You can increase this parameter but consider your quotas.</p><p>Once all the resources from the template have been created, the first step is to generate dummy messages in the SQS queue called <em>SQSQueueJobs</em>. Go to Lambda and run/test the function named <em>LambdaMessagesGenerator</em>. This function will generate 100 messages in the SQS queue (you can manually update the variable <em>number</em> to reduce the number of messages). Since this is just an example, the messages do not contain any parameter that will be used by the <a href="https://docs.aws.amazon.com/step-functions/latest/dg/sample-train-model.html">“Train a machine learning model”</a> task. The number of messages allow the logic to know how many tasks are planned to be run. In real life scenario, the SQS messages would include parameters that are needed for the task to be successful. These parameters could be about the number of instances, the type of instance, the location of the data, etc.</p><p>Now that we sent messages to the SQS queue, go to Step Functions and start the execution of the CC-Test-Run100Executions state machine (<strong>this will incur costs!</strong>). This will trigger the whole logic:</p><ol><li>Query the SQS queue to know the number of tasks to perform</li><li>Trigger the state machine that contains the task definition to be performed following the maximum number of <em>MaxConcurrency </em>parameter. In the example the task is <a href="https://docs.aws.amazon.com/step-functions/latest/dg/sample-train-model.html">“Train a machine learning model”</a> defined within the <em>CC-ConcurrencyControlledStateMachine </em>state machine.</li><li>Each task queries the SQS queue for the message, run the steps and once successful, deletes the message from the queue.</li></ol><p>Once you started the execution, you can see in the SageMaker console how many jobs are triggered. It will not be more than the number stated in the MaxConcurrency parameter from the cloud formation template. This automated process will continue until all the messages (100) have been deleted. This means that the <a href="https://docs.aws.amazon.com/step-functions/latest/dg/sample-train-model.html">“Train a machine learning model”</a> task will be triggered 100 times and when it is successful, it will delete the message from SQS queue which will drop the number of messages until reach 0.</p><h3>How to adapt this architecture to your use case?</h3><p>The architecture is an example that you can adapt to your own use case. The first thing you want to do is define the task itself which is within the<em> CC-ConcurrencyControlledStateMachine</em> state machine. In the example of this blog<em>, </em>the task<em> </em>is<em> </em><a href="https://docs.aws.amazon.com/step-functions/latest/dg/sample-train-model.html">“Train a machine learning model”</a><em>.</em> You should include reading and deleting the SQS message within the task definition to keep the synchronization with the SQS queue. In the <a href="https://docs.aws.amazon.com/step-functions/latest/dg/sample-train-model.html">“Train a machine learning model”</a>, I modified the Lambda code from the Generate dataset to read the message in the SQS queue. I also added an extra Lambda function (deleteSQS) at the end to delete the message read. The code example of each is shown below.</p><iframe src="" width="0" height="0" frameborder="0" scrolling="no"><a href="https://medium.com/media/e14de16b4a2e3c2274e0429219914507/href">https://medium.com/media/e14de16b4a2e3c2274e0429219914507/href</a></iframe><iframe src="" width="0" height="0" frameborder="0" scrolling="no"><a href="https://medium.com/media/0b0bd99046eb5e6aa16787b21bcf9fd4/href">https://medium.com/media/0b0bd99046eb5e6aa16787b21bcf9fd4/href</a></iframe><p>The second thing you will need to adapt/create is the SQS queue. The SQS queue is there to pass the number of tasks and the task parameters according to your use case. For example, let us say you have a simple SM training job as a task and you want to trigger 25 SM training jobs but each with different training data located in S3. The SQS queue will contain 25 messages which each contains the necessary parameters for the training jobs to be successful. Each message could include the parameters such as the data location, job name, instance type, artifacts location, script location, etc.</p><p>The third thing you might want to add/modify in your state machine logic is the <a href="https://docs.aws.amazon.com/step-functions/latest/dg/concepts-error-handling.html">retry policies</a>. It is not mandatory but recommended. In the case of SM this is very useful since it can manage the throttling limits when you trigger multiple SM jobs at the same time or any other error that could happen.</p><h3>Conclusion</h3><p>We have seen in this blog an example on how to trigger concurrent Amazon SageMaker jobs at scale and in a controlled manner with AWS Step Functions. While it brings the advantage of control over the workflow, the downside to be aware of this architecture is the Step Functions inline map state limited to 40 parallel iterations at a time. This is translated in the architecture by having a maximum of 40 tasks running at the same time. You can go around this by adapting the State Machine to use the new <a href="https://aws.amazon.com/blogs/aws/step-functions-distributed-map-a-serverless-solution-for-large-scale-parallel-data-processing/">Distributed Map</a> which supports up to 10,000 executions in parallel!</p><p>I have explained about adapting this example to your own use case through:</p><ol><li>Defining the task itself within the <em>CC-ConcurrencyControlledStateMachine</em> state machine.</li><li>Adding SQS commands to collect the message with its parameters and making sure that you delete the message once it has been successfully completed.</li><li>Add retry policies within your task definition.</li></ol><p>While we use in this example StepFunctions to run the ML Pipeline, we could instead have StepFunctions calling an existing <a href="https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines.html">SageMaker Pipeline</a> and then see the ongoing pipeline running within SM Studio. You would then have StepFunctions controlling the workload concurrency and SM Pipelines running and controlling the ML workload. This is specially useful if you are mainly developing and working in SM Studio. SM Pipeline has as well the advantage of being more adapted for <a href="https://aws.amazon.com/blogs/machine-learning/mlops-foundation-roadmap-for-enterprises-with-amazon-sagemaker/">MLOps</a>. As a future step, I have planned to update this blog and add the example of SM Pipelines into the architecture.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=1438e8d081c6" width="1" height="1" alt="">]]></content:encoded>
        </item>
        <item>
            <title><![CDATA[Use Amazon Athena in a processing job with Amazon SageMaker]]></title>
            <link>https://medium.com/@benoitdepatoul/use-amazon-athena-in-a-processing-job-with-amazon-sagemaker-d43272d69a78?source=rss-2430cef64b90------2</link>
            <guid isPermaLink="false">https://medium.com/p/d43272d69a78</guid>
            <category><![CDATA[athena]]></category>
            <category><![CDATA[processing]]></category>
            <category><![CDATA[sagemaker]]></category>
            <dc:creator><![CDATA[Benoit de Patoul]]></dc:creator>
            <pubDate>Thu, 03 Mar 2022 11:09:30 GMT</pubDate>
            <atom:updated>2022-03-07T08:31:32.170Z</atom:updated>
            <content:encoded><![CDATA[<p>A guide on how to configure an Amazon SageMaker processing job in conjunction with Amazon Athena.</p><p><a href="https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html">Amazon SageMaker Processing</a> allows to analyze data and evaluate machine learning models on Amazon SageMaker with a fully managed infrastructure. It downloads the data at the file level to process into the processing container by providing its location in <a href="https://aws.amazon.com/s3/">Amazon Simple Storage Service</a> ( S3).</p><p><a href="https://docs.aws.amazon.com/athena/latest/ug/what-is.html">Amazon Athena</a> is an interactive query service that makes it easy to analyze/query data in Amazon S3 using standard SQL. It is an add-on managed capability in SageMaker Processing thanks to the AthenaDatasetDefinition parameter. It can be used to query data in a processing job adding an extra layer between the user querying the data and the S3. It brings the following advantages:</p><ul><li>filtering data before downloading it into the processing job with a SQL query</li><li><a href="https://docs.aws.amazon.com/athena/latest/ug/fine-grained-access-to-glue-resources.html">fine-grained access</a> to databases and tables in the AWS Glue Data Catalog</li><li><a href="https://docs.aws.amazon.com/athena/latest/ug/manage-queries-control-costs-with-workgroups.html">Workgroups</a> to control query access and costs</li></ul><p>There are currently no solutions or examples on the internet about how to use a SageMaker processing job in conjunction with Athena. In this blog we will see how to configure a processing job that uses Athena to query a data source, and uses a data processing script. It covers two ways to do it, by using Boto3 and by using the Amazon SageMaker python SDK. There is a code example available for both ways.</p><p>The workflow is:</p><ol><li>The processing job sends the SQL query to Athena.</li><li>Athena queries the data source registered with the AWS Glue Data Catalog.</li><li>The results of the query are saved in S3.</li><li>The processing job downloads the S3 results into the deployed container to process the data.</li></ol><figure><img alt="" src="https://cdn-images-1.medium.com/max/715/1*BcPxk61ENxU5lMavc5D82w.png" /><figcaption>Architecture and workflow</figcaption></figure><h3>Prerequisites</h3><p>To go through and apply this guide you need to have:</p><ol><li>an existent data source in your Amazon Athena</li><li>access to that data source with the role used by Amazon SageMaker</li><li>the python SDK and the Boto3 installed</li><li>a file containing your processing data code (your script) saved into S3 (if you use Boto3)</li></ol><h3>Boto3</h3><h4>API call configuration</h4><p>Following the SageMaker <a href="https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html">boto3</a> we will use the <a href="https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.create_processing_job">API call</a> to generate a processing job. We do not need all of the parameters from the API call to generate a basic processing job. The following code shows you how to configure a processing job that uses Amazon Athena to query a database with SQL, apply the file containing the data processing code on the data queried, and finally save the results into S3. Some things are worth to mention for this blog.</p><p>The default parameters and API requirements such as AppSpecification, ProcessingOutputConfig, ProcessingJobName, ProcessingResources and RoleArn remain the same than when given an S3 location as data input. The ProcessingOutputConfig contains the location of the processing job results. The ProcessingJobName is the job name. The ProcessingResources designates the hardware resources. The AppSpecification contains the link to the pre-built Amazon Docker Image from Elastic Container Registry (links to containers can be found in the <a href="https://docs.aws.amazon.com/sagemaker/latest/dg/pre-built-docker-containers-scikit-learn-spark.html">SageMaker documentation</a>) and the location of the script.</p><p>The ProcessingInputs parameter is where we modify the data source to use Athena rather than the default S3. It contains two inputs, the Athena dataset definition and the data processing script. The Athena dataset definition is the parameter that defines with a SQL query the data that will be downloaded into the processing container. To help you fill up properly the Athena dataset definition according to your use case, I have added an image below showing where you can get the different parameters to define the dataset.</p><iframe src="" width="0" height="0" frameborder="0" scrolling="no"><a href="https://medium.com/media/9fdf155bd7111e0e596e858183cbfe6d/href">https://medium.com/media/9fdf155bd7111e0e596e858183cbfe6d/href</a></iframe><figure><img alt="" src="https://cdn-images-1.medium.com/max/1024/1*9ZYAM6-0oBNd3dlInWlb0Q.jpeg" /><figcaption>Athena Dataset Definition</figcaption></figure><h4>Example</h4><p>Let us first set up the context. I have Athena that queries a data source with the configuration shown on the image above. I have a python file located in S3 called ‘preprocessing.py’ containing a simple <a href="https://gist.github.com/bpatoul/0eadf40561a5059b3a11f2c057f2b0e8">data processing script</a> that separates the data into training, validation and testing datasets. The region is us-east-1 and will use the scikit-learn framework. I want the results to be saved in S3. The configured API call example can be found at this <a href="https://gist.github.com/bpatoul/254331c91c4ac764b7d3bec4d6ffec5a">link</a>.</p><h3>Amazon SageMaker Python SDK</h3><h4>API call configuration</h4><p>Amazon SageMaker Python SDK is an open source library for training and deploying ML models on Amazon SageMaker. With the SDK, you can train and deploy models using popular deep learning frameworks, algorithms provided by Amazon, or your own algorithms built into SageMaker-compatible Docker images.</p><p>The code below starts importing the necessary libraries and then <a href="https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/sagemaker.sklearn.html#sagemaker.sklearn.processing.SKLearnProcessor">defining the scikit-learn framework</a> (you could use <a href="https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.spark.processing.PySparkProcessor">spark</a> if you prefer). The next step is to <a href="https://sagemaker.readthedocs.io/en/stable/api/utility/inputs.html#sagemaker.dataset_definition.inputs.AthenaDatasetDefinition">define the Athena dataset</a> which is then used by the <a href="https://sagemaker.readthedocs.io/en/stable/api/utility/inputs.html#sagemaker.dataset_definition.inputs.DatasetDefinition">DatasetDefinition</a>. Once you have the dataset configured, you can <a href="https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.processing.ScriptProcessor.run">run the processing job</a> in the scikit-learn container that you previously defined with the processing data script.</p><iframe src="" width="0" height="0" frameborder="0" scrolling="no"><a href="https://medium.com/media/f3fc30a63c2c904fead7f3fd00d8da4a/href">https://medium.com/media/f3fc30a63c2c904fead7f3fd00d8da4a/href</a></iframe><h4>Example</h4><p>We will use the same context and same <a href="https://gist.github.com/bpatoul/0eadf40561a5059b3a11f2c057f2b0e8">script</a> as previously defined in the Boto3 example but now applied to the SageMaker SDK. There is a small add up on this example that could be applied to the Boto3 one. There are three ProcessingOutput (saved in S3) which selects each of the datasets generated by the script: the training, validation and test datasets.</p><p>The Boto3 example saves the results (train.csv, validation.csv and test.csv) including the directories within the ‘/opt/ml/processing/output’ in S3. It will then create 3 directories (train, validation and test) within the S3 location where you asked to save the results. The SageMaker SDK example saves the files (train.csv, validation.csv and test.csv) in S3 without their directories. You can find the code example in this <a href="https://gist.github.com/bpatoul/18d055d89847eeb1d0e27c2eb68c7135">link</a></p><h3>Conclusion</h3><p>We have learned here how to successfully run a processing job with Boto3 and SageMaker SDK by using Amazon Athena as a datasource, a data processing script and a pre-built container from SageMaker.</p><p>When you query data with Athena via a processing job for both cases (SDK and Boto3), the Athena query results are saved in S3 and are after downloaded to the processing job. With time, you might not need this data once you have processed it. You could after each processing job delete the queried data manually, but this can be time consuming. To solve this you might want to implement an object lifecycle rule in your S3 bucket that will delete automatically not used data according to your configuration. Please see the <a href="https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lifecycle-mgmt.html">documentation</a> for more information and examples.</p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=d43272d69a78" width="1" height="1" alt="">]]></content:encoded>
        </item>
    </channel>
</rss>