Understanding State Management in Hashmap Data Migrator
by Jhimli Bora and John Aven
When migrating data, what is state management and why is it important? It is the management of the state of data movement — as simple as that. It is useful for handling failures, storing history (audit trails), and much more. The state of all data transport is stored in a database — which database is up to you — there is an extensible API with many out-of-the-box implementations. This state management data is used to manage the control flow from end-to-end across a distributed deployments.
Note: This blog post is a part of the Hashmap Data Migrator series. Refer to previous blog posts for more details on the terms: hdm (Hashmap Data Migrator) and Pipeline YAML (a configuration file).
The state management table will track the following values:
- state_id: unique identifier
- run_id: unique identifier for a run
- job_id: unique identifier for a stage (source and sink pair).
- correlation_id_in: Correlation id linking to a preceding pipeline
- correlation_id_out: Correlation id on persistence
- action: Action performed — sourcing pre-pull | sourcing post-pull| sinking pre-pull | sinking post-pull
- status: Status of the transformation — success | failure | in_progress
- source_name: Name of source
- source_type: Type of source
- sink_name: Name of the sink
- sink_type: Type of sink
- source_entity: Asset being transported
- source_filter: Any filtering applied
- sink_entity: Asset being dumped
- sink_filter: Any filtering applied
- first_record_pulled: The first record pulled in the run. Relevant to a database only.
- last_record_pulled: The last record pulled in the run. Relevant to a database only.
- git_sha: Correlates code execution to the late
- sourcing_start_time: When sourcing started
- sourcing_end_time: When sourcing ended
- sinking_start_time: When sinking started
- sinking_end_time: When sourcing ended
- updated_on: When this entry was last updated
- row_count: Number of distinct rows extracted
- created_on: When this entry was created
- manifest_name: Name of the pipeline YAML file
Architecture
Let's look at the hdm state management architecture:
A stage consists of a source and a sink.
The source sets a new state to record that source processing has started. It then updates the state again to record that processing has completed.
The sink gets the current state set by source and updates it to record that sink processing has started. It then updates the state again to record that processing has completed.
Configurations
The hdm state management can be easily extended to other types of database systems with the extensible API. Let's look at the configuration details for MySQL, SQLite, SQL Server, and Azure SQL Server.
Azure SQL Server State Manager
Azure SQL Server as state management database
class AzureSQLServerStateManager(StateManager)
configuration in pipeline YAML
state_manager:
name: state_manager
type: AzureSQLServerStateManager
conf:
connection: state_manager
consume API input
connection: state_manager | required
dao: 'azuresqlserver' | preset value in code | required
format_date: False | preset value in code | required
SQL Server State Manager
SQL Server State as state management database
class SQLServerStateManager(StateManager)
configuration in pipeline YAML
state_manager:
name: state_manager
type: SQLServerStateManager
conf:
connection: state_manager
consume API input
connection: state_manager | required
dao: 'azuresqlserver' | preset value in code | required
format_date: False | preset value in code | required
MySQL State Manager
MYSQL as state management database
class MySQLStateManager(StateManager)
configuration in pipeline YAML
state_manager:
name: state_manager
type: MySQLStateManager
conf:
connection: state_manager
consume API input
connection: state_manager | required
dao: 'mysql' | preset value in code | required
format_date: True | preset value in code | required
SQLite State Manager
SQLite as state management database
class SqLiteStateManager(StateManager)
configuration in pipeline YAML
state_manager:
name: state_manager
type: SqLiteStateManager
conf:
connection: state_manager
consume API input
connection: state_manager | required
dao: 'sqlite' | preset value in code | required
format_date: True | preset value in code | required
State Manager
This is the state management base class
Methods:
method name: insert_state(This insert new state)
params: source_entity, source_filter, action, state_id,
status, correlation_id_in, correlation_id_out,
sink_entity, sink_filter, sourcing_start_time,
sourcing_end_time, sinking_start_time, sinking_end_time,
record_count, first_record_pulled, last_record_pulled
return value: dictionary of state_id,job_id,correlation_id_in,
correlation_id_out,source_entity,source_filter,
sourcing_start_time,sourcing_end_time,sinking_start_time,
sinking_end_time,first_record_pulled,last_record_pulled,
record_count,run_id,manifest_namemethod name: update_state (This updates a state)
params: source_entity, source_filter,action,state_id,
status, correlation_id_in, correlation_id_out,
sink_entity, sink_filter, sourcing_start_time,
sourcing_end_time, sinking_start_time, sinking_end_time,
record_count, first_record_pulled, last_record_pulled
return value :dictionary of state_id,job_id,correlation_id_in,
correlation_id_out,source_entity,source_filter,
sourcing_start_time,sourcing_end_time,sinking_start_time,
sinking_end_time,first_record_pulled,last_record_pulled,
record_count,run_id,manifest_namemethod name: get_current_state(gets current state)
params: job_id | required, entity, entity_filter
return value : dictionary of state_id,job_id, correlation_id_in,
correlation_id_out,source_entity,source_filter,
sourcing_start_time,sourcing_end_time,sinking_start_time,
sinking_end_time,first_record_pulled,last_record_pulled,
record_count,run_id,manifest_namemethod name: get_last_record(gets last_record_pulled value)
params: entity
return value : last_record_pulledmethod name: get_processing_history(get processing history)
params: none
return value : list of sink_entity for a source_name
Final Thoughts
State Management plays an important role in any application. A good state management architecture tracks information flow, data that helps with handling failures quickly and avoids processing already processed data.
The hdm state management is easy to extend to different databases using its extensible API.
Please follow the rest of the series and watch for our podcasts and vlogs on using hdm!
Watch a demo of hdm in this Hashmap Megabyte video:
Ready to Accelerate Your Digital Transformation?
If you are considering moving data and analytics products and applications to the cloud or if you would like help and guidance and a few best practices in delivering higher value outcomes in your existing cloud program, then please contact us.
Hashmap, an NTT DATA Company, offers a range of enablement workshops and assessment services, cloud modernization and migration services, and consulting service packages as part of our Cloud service offerings. We would be glad to work through your specific requirements.
Hashmap’s Data & Cloud Migration and Modernization Workshop is an interactive, two-hour experience for you and your team to help understand how to accelerate desired outcomes, reduce risk, and enable modern data readiness. We’ll talk through options and make sure that everyone has a good understanding of what should be prioritized, typical project phases, and how to mitigate risk. Sign up today for our complimentary workshop.
Other Tools and Content You Might Like
Feel free to share on other channels and be sure and keep up with all new content from Hashmap here. To listen in on a casual conversation about all things data engineering and the cloud, check out Hashmap’s podcast Hashmap on Tap as well on Spotify, Apple, Google, and other popular streaming apps.
Jhimli Bora is a Cloud and Data Engineer with Hashmap, an NTT DATA Company, providing Data, Cloud, IoT, and AI/ML solutions and consulting expertise across industries with a group of innovative technologists and domain experts accelerating high-value business outcomes for our customers. Connect with her on LinkedIn.
John Aven, Ph.D., is the Director of Engineering at Hashmap, an NTT DATA Company, providing Data, Cloud, IoT, and AI/ML solutions and consulting expertise across industries with a group of innovative technologists and domain experts accelerating high-value business outcomes for our customers. Be sure and connect with John on LinkedIn and reach out for more perspectives and insight into accelerating your data-driven business outcomes.