ETL DataFlow Batch— BigQuery to MS SQL

Baskar Gopal
Walmart Global Tech Blog
3 min readMay 16, 2024

Google Dataflow offers pre-built templates for moving the data between multiple data sources without writing code. It helps the data engineer to quickly setup the ETL jobs.

The pre-built templates can be found in the below link.

https://cloud.google.com/dataflow/docs/guides/templates/provided-templates

While there isn’t a specific template for moving data directly from BigQuery to MS SQL server using JDBC driver, though it’s still a common use case.

Data engineers need to transfer data to SQL for reasons like API access, Batch processing and reporting needs when providing feeds and data to downstream systems.

Here is a quick proof of concept we tried and had it working.

Architecture

Technologies:

  • Apache beam
  • JDK 17
  • MS Azure SQL
  • Auth — ActiveDirectoryPrincipal

How to run the Dataflow in your local using eclipse

Pom.xml

Profile:
<profile>
<id>direct</id>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>


Dependec required for Azure AD auth
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>11.2.3.jre17</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>msal4j</artifactId>
<version>1.14.3</version>
</dependency>

- You can refer the dataflow sample project https://cloud.google.com/dataflow/docs/quickstarts/create-pipeline-java
  • Set the profile as direct
  • set exec.mainClass as your class file
  • exec.args — pass temp location which is required for BQ download/processing

Business logic:

ETL:

Extraction — BQ

Transformation — Beam

Load — SQL

  1. Get the BQ table rows with schema
  2. Create SQL Datasource connection by passing below parameters
    - JDBCURL, Username, password
  3. Check the SQL table exists or not.
  4. If not, create a sql table using BQ table schema
  5. Create insert queries using BQ TableRow data

GitHub — https://github.com/baskarrepo/dataflow/blob/main/BigQueryToSQL.java

/*Sample code to read BQ*/
PCollection<TableRow> tableRowPCollection = pipeline.apply("Read from BigQuery",
BigQueryIO.readTableRowsWithSchema()
.fromQuery(
"SELECT * FROM projectid.datatable.demotable limit 100000")
.withoutValidation().usingStandardSql().withMethod(TypedRead.Method.DIRECT_READ));

/* Sample code to write to MS SQL*/
tableRowPCollection

.apply(MapElements.into(TypeDescriptor.of(TableRow.class))
// Use TableRow to access individual fields in the row.
.via((TableRow row) -> {
return row;
}))
.apply(JdbcIO.<TableRow>write().withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(ds))
.withStatement(insertquery)
.withAutoSharding()
.withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<TableRow>() {
public void setParameters(TableRow element, PreparedStatement query) throws SQLException {
for (int i = 0; i < schema.getFieldCount(); i++) {

query.setString(i + 1, (String) element.get(schema.nameOf(i)));


}
}
}));

Notes:

  • Use Google secrets manager for storing the secrets.
  • Use log4j to log the statements.
  • Can view the logs in Job logs in Dataflow.

Performance:

Total Records - 100k
Total Columns - 100
Total size - 250MB

Processing time - 8 mins

Dataflow Template creation

  mvn compile exec:java \
-Dexec.mainClass=com.example.myclass \
-Dexec.args="--runner=DataflowRunner \
--project=PROJECT_ID \
--stagingLocation=gs://BUCKET_NAME/staging \
--templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \
--region=REGION" \
-P dataflow-runner

Once the template is created in Dataflow, go to GCP Console UI to use this template to setup a new job for moving the data from BQ to SQL and schedule.

--

--

Baskar Gopal
Walmart Global Tech Blog

Security Tools Development - UI, API,Pipeline & Analytics