Understanding State Management in Hashmap Data Migrator

Jhimli Bora
Hashmap, an NTT DATA Company
6 min readJan 12, 2021

by Jhimli Bora and John Aven

When migrating data, what is state management and why is it important? It is the management of the state of data movement — as simple as that. It is useful for handling failures, storing history (audit trails), and much more. The state of all data transport is stored in a database — which database is up to you — there is an extensible API with many out-of-the-box implementations. This state management data is used to manage the control flow from end-to-end across a distributed deployments.

Note: This blog post is a part of the Hashmap Data Migrator series. Refer to previous blog posts for more details on the terms: hdm (Hashmap Data Migrator) and Pipeline YAML (a configuration file).

The state management table will track the following values:

  • state_id: unique identifier
  • run_id: unique identifier for a run
  • job_id: unique identifier for a stage (source and sink pair).
  • correlation_id_in: Correlation id linking to a preceding pipeline
  • correlation_id_out: Correlation id on persistence
  • action: Action performed — sourcing pre-pull | sourcing post-pull| sinking pre-pull | sinking post-pull
  • status: Status of the transformation — success | failure | in_progress
  • source_name: Name of source
  • source_type: Type of source
  • sink_name: Name of the sink
  • sink_type: Type of sink
  • source_entity: Asset being transported
  • source_filter: Any filtering applied
  • sink_entity: Asset being dumped
  • sink_filter: Any filtering applied
  • first_record_pulled: The first record pulled in the run. Relevant to a database only.
  • last_record_pulled: The last record pulled in the run. Relevant to a database only.
  • git_sha: Correlates code execution to the late
  • sourcing_start_time: When sourcing started
  • sourcing_end_time: When sourcing ended
  • sinking_start_time: When sinking started
  • sinking_end_time: When sourcing ended
  • updated_on: When this entry was last updated
  • row_count: Number of distinct rows extracted
  • created_on: When this entry was created
  • manifest_name: Name of the pipeline YAML file

Architecture

Let's look at the hdm state management architecture:

A stage consists of a source and a sink.

The source sets a new state to record that source processing has started. It then updates the state again to record that processing has completed.

The sink gets the current state set by source and updates it to record that sink processing has started. It then updates the state again to record that processing has completed.

Configurations

The hdm state management can be easily extended to other types of database systems with the extensible API. Let's look at the configuration details for MySQL, SQLite, SQL Server, and Azure SQL Server.

Azure SQL Server State Manager

Azure SQL Server as state management database

class AzureSQLServerStateManager(StateManager)

configuration in pipeline YAML

state_manager:
name: state_manager
type: AzureSQLServerStateManager
conf:
connection: state_manager

consume API input

connection: state_manager  | required
dao: 'azuresqlserver' | preset value in code | required
format_date: False | preset value in code | required

SQL Server State Manager

SQL Server State as state management database

class SQLServerStateManager(StateManager)

configuration in pipeline YAML

state_manager:
name: state_manager
type: SQLServerStateManager
conf:
connection: state_manager

consume API input

connection: state_manager  | required
dao: 'azuresqlserver' | preset value in code | required
format_date: False | preset value in code | required

MySQL State Manager

MYSQL as state management database

class MySQLStateManager(StateManager)

configuration in pipeline YAML

state_manager:
name: state_manager
type: MySQLStateManager
conf:
connection: state_manager

consume API input

connection: state_manager  | required
dao: 'mysql' | preset value in code | required
format_date: True | preset value in code | required

SQLite State Manager

SQLite as state management database

class SqLiteStateManager(StateManager)

configuration in pipeline YAML

state_manager:
name: state_manager
type: SqLiteStateManager
conf:
connection: state_manager

consume API input

connection: state_manager  | required
dao: 'sqlite' | preset value in code | required
format_date: True | preset value in code | required

State Manager

This is the state management base class

Methods:

method name: insert_state(This insert new state)
params: source_entity, source_filter, action, state_id,
status, correlation_id_in, correlation_id_out,
sink_entity, sink_filter, sourcing_start_time,
sourcing_end_time, sinking_start_time, sinking_end_time,
record_count, first_record_pulled, last_record_pulled
return value: dictionary of state_id,job_id,correlation_id_in,
correlation_id_out,source_entity,source_filter,
sourcing_start_time,sourcing_end_time,sinking_start_time,
sinking_end_time,first_record_pulled,last_record_pulled,
record_count,run_id,manifest_name
method name: update_state (This updates a state)
params: source_entity, source_filter,action,state_id,
status, correlation_id_in, correlation_id_out,
sink_entity, sink_filter, sourcing_start_time,
sourcing_end_time, sinking_start_time, sinking_end_time,
record_count, first_record_pulled, last_record_pulled
return value :dictionary of state_id,job_id,correlation_id_in,
correlation_id_out,source_entity,source_filter,
sourcing_start_time,sourcing_end_time,sinking_start_time,
sinking_end_time,first_record_pulled,last_record_pulled,
record_count,run_id,manifest_name
method name: get_current_state(gets current state)
params: job_id | required, entity, entity_filter
return value : dictionary of state_id,job_id, correlation_id_in,
correlation_id_out,source_entity,source_filter,
sourcing_start_time,sourcing_end_time,sinking_start_time,
sinking_end_time,first_record_pulled,last_record_pulled,
record_count,run_id,manifest_name
method name: get_last_record(gets last_record_pulled value)
params: entity
return value : last_record_pulled
method name: get_processing_history(get processing history)
params: none
return value : list of sink_entity for a source_name

Final Thoughts

State Management plays an important role in any application. A good state management architecture tracks information flow, data that helps with handling failures quickly and avoids processing already processed data.

The hdm state management is easy to extend to different databases using its extensible API.

Please follow the rest of the series and watch for our podcasts and vlogs on using hdm!

Watch a demo of hdm in this Hashmap Megabyte video:

Hashmap Data Migrator (hdm) — Hashmap Megabytes — Ep 9

Ready to Accelerate Your Digital Transformation?

If you are considering moving data and analytics products and applications to the cloud or if you would like help and guidance and a few best practices in delivering higher value outcomes in your existing cloud program, then please contact us.

Hashmap, an NTT DATA Company, offers a range of enablement workshops and assessment services, cloud modernization and migration services, and consulting service packages as part of our Cloud service offerings. We would be glad to work through your specific requirements.

Hashmap’s Data & Cloud Migration and Modernization Workshop is an interactive, two-hour experience for you and your team to help understand how to accelerate desired outcomes, reduce risk, and enable modern data readiness. We’ll talk through options and make sure that everyone has a good understanding of what should be prioritized, typical project phases, and how to mitigate risk. Sign up today for our complimentary workshop.

Other Tools and Content You Might Like

Feel free to share on other channels and be sure and keep up with all new content from Hashmap here. To listen in on a casual conversation about all things data engineering and the cloud, check out Hashmap’s podcast Hashmap on Tap as well on Spotify, Apple, Google, and other popular streaming apps.

Jhimli Bora is a Cloud and Data Engineer with Hashmap, an NTT DATA Company, providing Data, Cloud, IoT, and AI/ML solutions and consulting expertise across industries with a group of innovative technologists and domain experts accelerating high-value business outcomes for our customers. Connect with her on LinkedIn.

John Aven, Ph.D., is the Director of Engineering at Hashmap, an NTT DATA Company, providing Data, Cloud, IoT, and AI/ML solutions and consulting expertise across industries with a group of innovative technologists and domain experts accelerating high-value business outcomes for our customers. Be sure and connect with John on LinkedIn and reach out for more perspectives and insight into accelerating your data-driven business outcomes.

--

--