DATAFLOW for Google Cloud Professional Data Exam

raigon jolly
54 min readAug 22, 2020

--

https://cloud.google.com/blog/products/data-analytics/cloud-batch-and-stream-processing-for-analytics

Programming model for Apache Beam

Apache Beam is an open source, unified model for defining both batch- and streaming-data parallel-processing pipelines. The Apache Beam programming model simplifies the mechanics of large-scale data processing. Using one of the Apache Beam SDKs, you build a program that defines the pipeline. Then, one of Apache Beam’s supported distributed processing backends, such as Dataflow, executes the pipeline. This model lets you concentrate on the logical composition of your data processing job, rather than the physical orchestration of parallel processing. You can focus on what you need your job to do instead of exactly how that job gets executed.

The Apache Beam model provides useful abstractions that insulate you from low-level details of distributed processing, such as coordinating individual workers, sharding datasets, and other such tasks. Dataflow fully manages these low-level details.

Concepts

This section contains summaries of fundamental concepts. On the Apache Beam website, the Apache Beam Programming Guide walks you through the basic concepts of building pipelines using the Apache Beam SDKs.

Basic concepts

PipelinesA pipeline encapsulates the entire series of computations involved in reading input data, transforming that data, and writing output data. The input source and output sink can be the same or of different types, allowing you to convert data from one format to another. Apache Beam programs start by constructing a

Pipeline object, and then using that object as the basis for creating the pipeline's datasets. Each pipeline represents a single, repeatable job.PCollectionA

PCollection represents a potentially distributed, multi-element dataset that acts as the pipeline's data. Apache Beam transforms use PCollection objects as inputs and outputs for each step in your pipeline. A PCollection can hold a dataset of a fixed size or an unbounded dataset from a continuously updating data source.

Transforms

A transform represents a processing operation that transforms data. A transform takes one or more PCollections as input, performs an operation that you specify on each element in that collection, and produces one or more PCollections as output. A transform can perform nearly any kind of processing operation, including performing mathematical computations on data, converting data from one format to another, grouping data together, reading and writing data, filtering data to output only the elements you want, or combining data elements into single values.

ParDoParDo is the core parallel processing operation in the Apache Beam SDKs, invoking a user-specified function on each of the elements of the input PCollection. ParDo collects the zero or more output elements into an output PCollection. The ParDo transform processes elements independently and possibly in parallel.

Pipeline I/OApache Beam I/O connectors let you read data into your pipeline and write output data from your pipeline. An I/O connector consists of a source and a sink. All Apache Beam sources and sinks are transforms that let your pipeline work with data from several different data storage formats. You can also write a custom I/O connector.AggregationAggregation is the process of computing some value from multiple input elements. The primary computational pattern for aggregation in Apache Beam is to group all elements with a common key and window. Then, it combines each group of elements using an associative and commutative operation.User-defined functions (UDFs)Some operations within Apache Beam allow executing user-defined code as a way of configuring the transform. For ParDo, user-defined code specifies the operation to apply to every element, and for Combine, it specifies how values should be combined. A pipeline might contain UDFs written in a different language than the language of your runner. A pipeline might also contain UDFs written in multiple languages.RunnerRunners are the software that accepts a pipeline and executes it. Most runners are translators or adapters to massively parallel big-data processing systems. Other runners exist for local testing and debugging.

Advanced concepts

Event time

The time a data event occurs, determined by the timestamp on the data element itself. This contrasts with the time the actual data element gets processed at any stage in the pipeline.

Windowing

Windowing enables grouping operations over unbounded collections by dividing the collection into windows of finite collections according to the timestamps of the individual elements. A windowing function tells the runner how to assign elements to an initial window, and how to merge windows of grouped elements. Apache Beam lets you define different kinds of windows or use the predefined windowing functions.

Watermarks

Apache Beam tracks a watermark, which is the system’s notion of when all data in a certain window can be expected to have arrived in the pipeline. Apache Beam tracks a watermark because data is not guaranteed to arrive in a pipeline in time order or at predictable intervals. In addition, there are no guarantees that data events will appear in the pipeline in the same order that they were generated.

Trigger

Triggers determine when to emit aggregated results as data arrives. For bounded data, results are emitted after all of the input has been processed. For unbounded data, results are emitted when the watermark passes the end of the window, indicating that the system believes all input data for that window has been processed. Apache Beam provides several predefined triggers and lets you combine them.

Pipeline lifecycle: from pipeline code to Dataflow job

When you run your Dataflow pipeline, Dataflow creates an execution graph from the code that constructs your Pipeline object, including all of the transforms and their associated processing functions (such as DoFns). This phase is called Graph Construction Time and runs locally on the computer where the pipeline is run.

During graph construction, Apache Beam locally executes the code from the main entry point of the pipeline code, stopping at the calls to a source, sink or transform step, and turning these calls into nodes of the graph. As a consecuence, a piece of code in a pipeline’s entry point (Java’s main() method or the top-level of a Python script) locally executes on the machine that runs the pipeline, while the same code declared in a method of a DoFn object executes in the Dataflow workers.

Also during graph construction, Apache Beam validates that any resources referenced by the pipeline (like Cloud Storage buckets, BigQuery tables, and Pub/Sub Topics or Subscriptions) actually exist and are accessible. The validation is done through standard API calls to the respective services, so it’s vital that the user account used to run a pipeline has proper connectivity to the necessary services and is authorized to call their APIs. Before submitting the pipeline to the Dataflow service, Apache Beam also checks for other errors, and ensures that the pipeline graph doesn’t contain any illegal operations.

The execution graph is then translated into JSON format, and the JSON execution graph is transmitted to the Dataflow service endpoint.

Note: Graph construction also happens when you execute your pipeline locally, but the graph is not translated to JSON or transmitted to the service. Instead, the graph is run locally on the same machine where you launched your Dataflow program. See the documentation on configuring for local execution for more details.

The Dataflow service then validates the JSON execution graph. When the graph is validated, it becomes a job on the Dataflow service. You’ll be able to see your job, its execution graph, status, and log information by using the Dataflow Monitoring Interface.

Execution graph

Dataflow builds a graph of steps that represents your pipeline, based on the transforms and data you used when you constructed your Pipeline object. This is the pipeline execution graph.

The WordCount example, included with the Apache Beam SDKs, contains a series of transforms to read, extract, count, format, and write the individual words in a collection of text, along with an occurrence count for each word. The following diagram shows how the transforms in the WordCount pipeline are expanded into an execution graph:

The execution graph often differs from the order in which you specified your transforms when you constructed the pipeline. This is because the Dataflow service performs various optimizations and fusions on the execution graph before it runs on managed cloud resources. The Dataflow service respects data dependencies when executing your pipeline; however, steps without data dependencies between them may be executed in any order.

You can see the unoptimized execution graph that Dataflow has generated for your pipeline when you select your job in the Dataflow Monitoring Interface.

Parallelization and distribution

The Dataflow service automatically parallelizes and distributes the processing logic in your pipeline to the workers you’ve allotted to perform your job. Dataflow uses the abstractions in the programming model to represent parallel processing functions; for example, your ParDo transforms cause Dataflow to automatically distribute your processing code (represented by DoFns) to multiple workers to be run in parallel.

Structuring your user code

You can think of your DoFn code as small, independent entities: there can potentially be many instances running on different machines, each with no knowledge of the others. As such, pure functions (functions that do not depend on hidden or external state, that have no observable side effects, and are deterministic) are ideal code for the parallel and distributed nature of DoFns.

The pure function model is not strictly rigid, however; state information or external initialization data can be valid for DoFn and other function objects, so long as your code does not depend on things that the Dataflow service does not guarantee. When structuring your ParDo transforms and creating your DoFns, keep the following guidelines in mind:

  • The Dataflow service guarantees that every element in your input PCollection is processed by a DoFn instance exactly once.
  • The Dataflow service does not guarantee how many times a DoFn will be invoked.
  • The Dataflow service does not guarantee exactly how the distributed elements are grouped — that is, it does not guarantee which (if any) elements are processed together.
  • The Dataflow service does not guarantee the exact number of DoFn instances that will be created over the course of a pipeline.
  • The Dataflow service is fault-tolerant, and may retry your code multiple times in the case of worker issues. The Dataflow service may create backup copies of your code, and can have issues with manual side effects (such as if your code relies upon or creates temporary files with non-unique names).
  • The Dataflow service serializes element processing per DoFn instance. Your code does not need to be strictly thread-safe; however, any state shared between multiple DoFn instances must be thread-safe.

See Requirements for User-Provided Functions in the programming model documentation for more information about building your user code.

Error and exception handling

Your pipeline may throw exceptions while processing data. Some of these errors are transient (e.g., temporary difficulty accessing an external service), but some are permanent, such as errors caused by corrupt or unparseable input data, or null pointers during computation.

Dataflow processes elements in arbitrary bundles, and retries the complete bundle when an error is thrown for any element in that bundle. When running in batch mode, bundles including a failing item are retried 4 times. The pipeline will fail completely when a single bundle has failed 4 times. When running in streaming mode, a bundle including a failing item will be retried indefinitely, which may cause your pipeline to permanently stall.

Note: When processing in batch mode, you might see a large number of individual failures before a pipeline job fails completely (which happens when any given bundle fails after four retry attempts). For example, if your pipeline attempts to process 100 bundles, Dataflow could theoretically generate several hundred individual failures until a single bundle reaches the 4-failure condition for exit.

Fusion optimization

Once the JSON form of your pipeline’s execution graph has been validated, the Dataflow service may modify the graph to perform optimizations. Such optimizations can include fusing multiple steps or transforms in your pipeline’s execution graph into single steps. Fusing steps prevents the Dataflow service from needing to materialize every intermediate PCollection in your pipeline, which can be costly in terms of memory and processing overhead.

While all the transforms you’ve specified in your pipeline construction are executed on the service, they may be executed in a different order, or as part of a larger fused transform to ensure the most efficient execution of your pipeline. The Dataflow service respects data dependencies between the steps in the execution graph, but otherwise steps may be executed in any order.

Fusion example

The following diagram shows how the execution graph from the WordCount example included with the Apache Beam SDK for Java might be optimized and fused by the Dataflow service for efficient execution:

Preventing fusion

There are a few cases in your pipeline where you may want to prevent the Dataflow service from performing fusion optimizations. These are cases in which the Dataflow service might incorrectly guess the optimal way to fuse operations in the pipeline, which could limit the Dataflow service’s ability to make use of all available workers.

For example, one case in which fusion can limit Dataflow’s ability to optimize worker usage is a “high fan-out” ParDo. In such an operation, you might have an input collection with relatively few elements, but the ParDo produces an output with hundreds or thousands of times as many elements, followed by another ParDo. If the Dataflow service fuses these ParDo operations together, parallelism in this step is limited to at most the number of items in the input collection, even though the intermediate PCollection contains many more elements.

You can prevent such a fusion by adding an operation to your pipeline that forces the Dataflow service to materialize your intermediate PCollection. Consider using one of the following operations:

  • You can insert a GroupByKey and ungroup after your first ParDo. The Dataflow service never fuses ParDo operations across an aggregation.
  • You can pass your intermediate PCollection as a side input to another ParDo. The Dataflow service always materializes side inputs.
  • You can insert a Reshuffle step. Reshuffle prevents fusion, checkpoints the data, and performs deduplication of records. Reshuffle is supported by Dataflow even though it is marked deprecated in the Apache Beam documentation.

Combine optimization

Aggregation operations are an important concept in large-scale data processing. Aggregation brings together data that’s conceptually far apart, making it extremely useful for correlating. The Dataflow programming model represents aggregation operations as the GroupByKey, CoGroupByKey, and Combine transforms.

Dataflow’s aggregation operations combine data across the entire data set, including data that may be spread across multiple workers. During such aggregation operations, it’s often most efficient to combine as much data locally as possible before combining data across instances. When you apply a GroupByKey or other aggregating transform, the Dataflow service automatically performs partial combining locally before the main grouping operation.

Note: Because the Dataflow service automatically performs partial local combining, it is strongly recommended that you do not attempt to make this optimization by hand in your pipeline code.

When performing partial or multi-level combining, the Dataflow service makes different decisions based on whether your pipeline is working with batch or streaming data. For bounded data, the service favors efficiency and will perform as much local combining as possible. For unbounded data, the service favors lower latency, and may not perform partial combining (as it may increase latency).

Autotuning features

The Dataflow service contains several autotuning features that can further dynamically optimize your Dataflow job while it is running. These features include Autoscaling and Dynamic Work Rebalancing.

Autoscaling

With autoscaling enabled, the Dataflow service automatically chooses the appropriate number of worker instances required to run your job. The Dataflow service may also dynamically re-allocate more workers or fewer workers during runtime to account for the characteristics of your job. Certain parts of your pipeline may be computationally heavier than others, and the Dataflow service may automatically spin up additional workers during these phases of your job (and shut them down when they’re no longer needed).

Java: SDK 2.xPythonJava: SDK 1.x

Autoscaling is enabled by default on all batch Dataflow jobs created using the Apache Beam SDK for Python version 0.5.1 or higher. You can disable autoscaling by explicitly specifying the flag --autoscaling_algorithm=NONE when you run your pipeline; if so, note that the Dataflow service sets the number of workers based on the --num_workers option, which defaults to 3.

Note: With autoscaling enabled, the Dataflow service does not allow user control of the exact number of worker instances allocated to your job. You may still cap the number of workers by specifying the --max_num_workers option when you run your pipeline.

Dataflow scales based on the parallelism of a pipeline. The parallelism of a pipeline is an estimate of the number of threads needed to most efficiently process data at any given time.

The parallelism is calculated every few minutes unless the bandwidth of an external service is too low. When the parallelism increases, Dataflow scales up and adds workers. When the parallelism decreases, Dataflow scales down and removes workers.

The following table summarizes when autoscaling increases or decreases the number of workers in batch and streaming pipelines:

Batch pipelinesStreaming pipelinesScaling up

If the remaining work takes longer than spinning up new workers and the current workers are utilizing, on average, more than 5% of their CPUs, Dataflow may scale up.

Sources with the following may limit the number of new workers: a small amount of data, un-splittable data (like compressed files), and data processed by I/O modules that don’t split data.

Sinks configured to write to a fixed number of shards, like a Cloud Storage destination writing to existing files, may limit the number of new workers.

If a streaming pipeline is backlogged and workers are utilizing, on average, more than 20% of their CPUs, Dataflow may scale up. Backlogs are cleared within approximately 150 seconds, given the current throughput per worker.

Scaling down

If the remaining work takes less time than spinning up new workers and the current workers are utilizing, on average, more than 5% of their CPUs, Dataflow may scale down.

If a streaming pipeline backlog is lower than 20 seconds and workers are utilizing, on, average less than 80% of the CPUs, Dataflow may scale down. After scaling down, the new number of workers utilize, on average, less than 75% of their CPUs.

No autoscaling

If I/O takes longer than data processing or workers are utilizing, on average, less than 5% of their CPUs, the parallelism isn’t recalculated.

If workers are utilizing, on average, less than 20% of their CPU, the parallelism isn’t recalculated.

Batch autoscaling

For batch pipelines, Dataflow automatically chooses the number of workers based on both the amount of work in each stage of your pipeline and the current throughput at that stage. Dataflow determines how much data is being processed by the current set of workers and extrapolates how much time the rest of the work takes to process.

Note: The number of workers is sub-linear to the amount of work. For instance, a job with twice the work has less than twice the workers.

If your pipeline uses a custom data source that you’ve implemented, there are a few methods you can implement that provide more information to the Dataflow service’s autoscaling algorithm and potentially improve performance:

Java: SDK 2.xPythonJava: SDK 1.x

  • In your BoundedSource subclass, implement the method estimate_size. The Dataflow service uses estimate_size when calculating the initial number of workers to use for your pipeline.
  • In your RangeTracker subclass, implement the method fraction_consumed. The Dataflow service uses fraction_consumed to track read progress and converge on the correct number of workers to use during a read.

Streaming autoscaling

Note: Streaming autoscaling is generally available for pipelines that use Streaming Engine. For pipelines that do not use Streaming Engine, streaming autoscaling is available in beta.

For more information about launch stage definitions, see the Product launch stages page.

Streaming autoscaling allows the Dataflow service to adaptively change the number of workers used to execute your streaming pipeline in response to changes in load and resource utilization. Streaming autoscaling is a free feature and is designed to reduce the costs of the resources used when executing streaming pipelines.

Without autoscaling, you choose a fixed number of workers by specifying numWorkers or num_workers to execute your pipeline. As the input workload varies over time, this number can become either too high or too low. Provisioning too many workers results in unnecessary extra cost, and provisioning too few workers results in higher latency for processed data. By enabling autoscaling, resources are used only as they are needed.

The objective of autoscaling streaming pipelines is to minimize backlog while maximizing worker utilization and throughput, and quickly react to spikes in load. By enabling autoscaling, you don’t have to choose between provisioning for peak load and fresh results. Workers are added as CPU utilization and backlog increase and are removed as these metrics come down. This way, you’re paying only for what you need, and the job is processed as efficiently as possible.

Java: SDK 2.xPythonJava: SDK 1.x

Enable streaming autoscaling

To enable autoscaling, set the following execution parameters when you start your pipeline:

--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=N

For streaming jobs not using Streaming Engine, the minimum number of workers is 1/15th of the --maxNumWorkers value, rounded up.

Streaming pipelines are deployed with a fixed pool of Persistent Disks, equal in number to --maxNumWorkers. Take this into account when you specify --maxNumWorkers, and ensure this value is a sufficient number of disks for your pipeline.

Note: If you’ve reached a scaling limit and want to raise the --maxNumWorkers, you must submit a new job with a higher --maxNumWorkers.

If you want to update a streaming autoscaling job that’s not using Streaming Engine, make sure --maxNumWorkers remains the same (see the section on manually scaling streaming pipelines). Not specifying the --autoscalingAlgorithm pipeline option in the Update command disables autoscaling for the updated job.

Usage and pricing

Compute Engine usage is based on the average number of workers, while Persistent Disk usage is based on the exact value of --max_num_workers. Persistent Disks are redistributed such that each worker gets an equal number of attached disks.

In the example above, where --max_num_workers=15, you pay for between 1 and 15 Compute Engine instances and exactly 15 Persistent Disks.

Manually scaling a streaming pipeline

Until autoscaling is generally available in streaming mode, there is a workaround you can use to manually scale the number of workers running your streaming pipeline by using Dataflow’s Update feature.

Java: SDK 2.xPythonJava: SDK 1.x

To scale your streaming pipeline during execution, ensure that you set the following execution parameters when you start your pipeline:

  • Set --max_num_workers equal to the maximum number of workers you want available to your pipeline.
  • Set --num_workers equal to the initial number of workers you want your pipeline to use when it starts running.

Once your pipeline is running, you can Update your pipeline and specify a new number of workers using the --num_workers parameter. The value you set for the new --num_workers must be between N and --max_num_workers, where N is equal to --max_num_workers / 15.

Updating your pipeline replaces your running job with a new job, using the new number of workers, while preserving all state information associated with the previous job.

Note: Your pipeline’s maximum scaling range depends on the number of Persistent Disks deployed when the pipeline starts. The Dataflow service deploys one persistent disk per worker at the maximum number of workers. Deploying extra persistent disks by setting --max_num_workers to a higher value than --num_workers provides some benefits to your pipeline—specifically, it allows you the flexibility to scale your pipeline to a larger number of workers after startup, and might provide improved performance. However, your pipeline might also incur additional cost for the extra Persistent Disks. Take note of the cost and quota implications of the additional Persistent Disk resources when planning your streaming pipeline and setting the scaling range.

Note: You cannot change the scaling range of a pipeline by using the Update feature. If you need to scale further, you must start a new pipeline and specify a higher value for --max_num_workers as the ceiling of your desired scaling range.

Dynamic Work Rebalancing

The Dataflow service’s Dynamic Work Rebalancing feature allows the service to dynamically re-partition work based on runtime conditions. These conditions might include:

  • Imbalances in work assignments
  • Workers taking longer than expected to finish
  • Workers finishing faster than expected

The Dataflow service automatically detects these conditions and can dynamically reassign work to unused or underused workers to decrease your job’s overall processing time.

Limitations

Dynamic Work Rebalancing only happens when the Dataflow service is processing some input data in parallel: when reading data from an external input source, when working with a materialized intermediate PCollection, or when working with the result of an aggregation like GroupByKey. If a large number of steps in your job are fused, there are fewer intermediate PCollections in your job and Dynamic Work Rebalancing will be limited to the number of elements in the source materialized PCollection. If you want to ensure that Dynamic Work Rebalancing can be applied to a particular PCollection in your pipeline, you can prevent fusion in a few different ways to ensure dynamic parallelism.

Dynamic Work Rebalancing cannot re-parallelize data finer than a single record. If your data contains individual records that cause large delays in processing time, they may still delay your job, since Dataflow cannot subdivide and redistribute an individual “hot” record to multiple workers.

Java: SDK 2.xPythonJava: SDK 1.x

If you’ve set a fixed number of shards for your pipeline’s final output (for example, by writing data using beam.io.WriteToText(..., num_shards=...)), Dataflow will limit parallelization based on the number of shards that you've chosen.

The fixed-shards limitation can be considered temporary, and may be subject to change in future releases of the Dataflow service.

Working with Custom Data Sources

Java: SDK 2.xPythonJava: SDK 1.x

If your pipeline uses a custom data source that you provide, your RangeTracker must implement try_claim, try_split, position_at_fraction, and fraction_consumed to allow your source to work with the Dynamic Work Rebalancing feature.

See the API reference information on RangeTracker for more information.

Resource usage and management

The Dataflow service fully manages resources in Google Cloud on a per-job basis. This includes spinning up and shutting down Compute Engine instances (occasionally referred to as workers or VMs) and accessing your project’s Cloud Storage buckets for both I/O and temporary file staging. However, if your pipeline interacts with Google Cloud data storage technologies like BigQuery and Pub/Sub, you must manage the resources and quota for those services.

Dataflow uses a user provided location in Cloud Storage specifically for staging files. This location is under your control, and you should ensure that the location’s lifetime is maintained as long as any job is reading from it. You can re-use the same staging location for multiple job runs, as the SDK’s built-in caching can speed up the start time for your jobs.

Caution: Manually altering Dataflow-managed Compute Engine resources associated with a Dataflow job is an unsupported operation. You should not attempt to manually stop, delete, or otherwise control the Compute Engine instances that Dataflow has created to run your job. In addition, you should not alter any persistent disk resources associated with your Dataflow job.

Jobs

You may run up to 25 concurrent Dataflow jobs per Google Cloud project; however, this limit can be increased by contacting Google Cloud Support. For more information, see Quotas.

The Dataflow service is currently limited to processing JSON job requests that are 20 MB in size or smaller. The size of the job request is specifically tied to the JSON representation of your pipeline; a larger pipeline means a larger request.

To estimate the size of your pipeline’s JSON request, run your pipeline with the following option:

Java: SDK 2.xPythonJava: SDK 1.x

--dataflow_job_file=< path to output file >

This command writes a JSON representation of your job to a file. The size of the serialized file is a good estimate of the size of the request; the actual size will be slightly larger due to some additional information included in the request.

For more information, see the troubleshooting page for “413 Request Entity Too Large” / “The size of serialized JSON representation of the pipeline exceeds the allowable limit”.

In addition, your job’s graph size must not exceed 10 MB. For more information, see the troubleshooting page for “The job graph is too large. Please try again with a smaller job graph, or split your job into two or more smaller jobs.”.

Workers

The Dataflow service currently allows a maximum of 1000 Compute Engine instances per job. The default machine type is n1-standard-1 for batch jobs, and n1-standard-4 for streaming jobs. Therefore, when using the default machine types, the Dataflow service can therefore allocate up to 4000 cores per job. If you need more cores for your job, you can select a larger machine type.

Note: The Dataflow managed service now deploys Compute Engine virtual machines associated with Dataflow jobs using Managed Instance Groups. A Managed Instance Group creates multiple Compute Engine instances from a common template and allows you to control and manage them as a group. That way, you don’t have to individually control each instance associated with your pipeline.

You should not attempt to manage or otherwise interact directly with your Compute Engine Managed Instance Group; the Dataflow service will take care of that for you. Manually altering any Compute Engine resources associated with your Dataflow job is an unsupported operation.

You can use any of the available Compute Engine machine type families as well as custom machine types. For best results, use n1 machine types. Shared core machine types, such as f1 and g1 series workers, are not supported under the Dataflow Service Level Agreement.

Dataflow bills by the number of vCPUs and GB of memory in workers. Billing is independent of the machine type family. You can specify a machine type for your pipeline by setting the appropriate execution parameter at pipeline creation time.

Caution: Shared core machine types such as f1 and g1 series workers are not supported under Dataflow's Service Level Agreement.

Java: SDK 2.xPythonJava: SDK 1.x

To change the machine type, set the --worker_machine_type option.

Note: The Dataflow service currently does not support jobs with only preemptible virtual machines. Instead, if you would like to save processing costs, consider using the FlexRS batch processing mode that uses a combination of preemptible and non-preemptible resources.

Resource quota

The Dataflow service checks to ensure that your Google Cloud project has the Compute Engine resource quota required to run your job, both to start the job and scale to the maximum number of worker instances. Your job will fail to start if there is not enough resource quota available.

If your Dataflow job deploys Compute Engine virtual machines as a Managed Instance Group, you’ll need to ensure your project satisfies some additional quota requirements. Specifically, your project will need one of the following types of quota for each concurrent Dataflow job that you want to run:

  • One Instance Group per job
  • One Managed Instance Group per job
  • One Instance Template per job

Caution: Manually changing your Dataflow job’s Instance Template or Managed Instance Group is not recommended or supported. Use Dataflow’s pipeline configuration options instead.

Dataflow’s Autoscaling feature is limited by your project’s available Compute Engine quota. If your job has sufficient quota when it starts, but another job uses the remainder of your project’s available quota, the first job will run but not be able to fully scale.

However, the Dataflow service does not manage quota increases for jobs that exceed the resource quotas in your project. You are responsible for making any necessary requests for additional resource quota, for which you can use the Google Cloud Console.

Persistent disk resources

The Dataflow service is currently limited to 15 persistent disks per worker instance when running a streaming job. Each persistent disk is local to an individual Compute Engine virtual machine. Your job may not have more workers than persistent disks; a 1:1 ratio between workers and disks is the minimum resource allotment.

For jobs running on worker VMs, the default size of each persistent disk is 250 GB in batch mode and 400 GB in streaming mode. Jobs using Streaming Engine or Dataflow Shuffle run on the Dataflow service backend and use smaller disks.

Locations

By default, the Dataflow service deploys Compute Engine resources in the us-central1-f zone of the us-central1 region. You can override this setting by specifying the --region parameter. If you need to use a specific zone for your resources, use the --zone parameter when you create your pipeline. However, we recommend that you only specify the region, and leave the zone unspecified. This allows the Dataflow service to automatically select the best zone within the region based on the available zone capacity at the time of the job creation request. For more information, see the regional endpoints documentation.

Streaming Engine

Currently, the Dataflow pipeline runner executes the steps of your streaming pipeline entirely on worker virtual machines, consuming worker CPU, memory, and Persistent Disk storage. Dataflow’s Streaming Engine moves pipeline execution out of the worker VMs and into the Dataflow service backend.

Benefits of Streaming Engine

The Streaming Engine model has the following benefits:

  • A reduction in consumed CPU, memory, and Persistent Disk storage resources on the worker VMs. Streaming Engine works best with smaller worker machine types (n1-standard-2 instead of n1-standard-4) and does not require Persistent Disk beyond a small worker boot disk, leading to less resource and quota consumption.
  • More responsive autoscaling in response to variations in incoming data volume. Streaming Engine offers smoother, more granular scaling of workers.
  • Improved supportability, since you don’t need to redeploy your pipelines to apply service updates.

Most of the reduction in worker resources comes from offloading the work to the Dataflow service. For that reason, there is a charge associated with the use of Streaming Engine. However, the total bill for Dataflow pipelines using Streaming Engine is expected to be approximately the same compared to the total cost of Dataflow pipelines that do not use this option.

Using Streaming Engine

Streaming Engine is currently available for streaming pipelines in the following regions. It will become available in additional regions in the future.

  • us-west1 (Oregon)
  • us-central1 (Iowa)
  • us-east1 (South Carolina)
  • us-east4 (North Virginia)
  • northamerica-northeast1 (Montréal)
  • europe-west2 (London)
  • europe-west1 (Belgium)
  • europe-west4 (Netherlands)
  • europe-west3 (Frankfurt)
  • asia-southeast1 (Singapore)
  • asia-east1 (Taiwan)
  • asia-northeast1 (Tokyo)
  • australia-southeast1 (Sydney)

Note: Updating an already-running pipeline to use Streaming Engine is not currently supported.

If your pipeline is already running in production and you would like to use Streaming Engine, you need to stop your pipeline using the Dataflow Drain option. Then, specify the Streaming Engine parameter and rerun your pipeline.

Java: SDK 2.xPythonJava: SDK 1.x

Note: Streaming Engine requires the Apache Beam SDK for Python, version 2.16.0 or higher.

To use Streaming Engine for your streaming pipelines, specify the following parameter:

--enable_streaming_engine

If you use Dataflow Streaming Engine for your pipeline, do not specify the --zone parameter. Instead, specify the --region parameter and set the value to one of the regions where Streaming Engine is currently available. Dataflow auto-selects the zone in the region you specified. If you do specify the --zone parameter and set it to a zone outside of the available regions, Dataflow reports an error.

Streaming Engine works best with smaller worker machine types, so we recommend that you set --machine_type=n1-standard-2. You can also set --disk_size_gb=30 because Streaming Engine only needs space for the worker boot image and local logs. These values are the default values.

Dataflow Shuffle

Dataflow Shuffle is the base operation behind Dataflow transforms such as GroupByKey, CoGroupByKey, and Combine. The Dataflow Shuffle operation partitions and groups data by key in a scalable, efficient, fault-tolerant manner. Currently, Dataflow uses a shuffle implementation that runs entirely on worker virtual machines and consumes worker CPU, memory, and Persistent Disk storage. The service-based Dataflow Shuffle feature, available for batch pipelines only, moves the shuffle operation out of the worker VMs and into the Dataflow service backend.

Benefits of Dataflow Shuffle

The service-based Dataflow Shuffle has the following benefits:

  • Faster execution time of batch pipelines for the majority of pipeline job types.
  • A reduction in consumed CPU, memory, and Persistent Disk storage resources on the worker VMs.
  • Better autoscaling since VMs no longer hold any shuffle data and can therefore be scaled down earlier.
  • Better fault tolerance; an unhealthy VM holding Dataflow Shuffle data will not cause the entire job to fail, as would happen if not using the feature.

Most of the reduction in worker resources comes from offloading the shuffle work to the Dataflow service. For that reason, there is a charge associated with the use of Dataflow Shuffle. However, the total bill for Dataflow pipelines using the service-based Dataflow implementation is expected to be less than or equal to the cost of Dataflow pipelines that do not use this option.

For the majority of pipeline job types, Dataflow Shuffle is expected to execute faster than the shuffle implementation running on worker VMs. However, the execution times might vary from run to run. If you are running a pipeline that has important deadlines, we recommend allocating sufficient buffer time before the deadline. In addition, consider requesting a bigger quota for Shuffle.

Disk considerations

When using the service-based Dataflow Shuffle feature, you do not need to attach large Persistent Disks to your worker VMs. Dataflow automatically attaches a small 25 GB boot disk. However, due to this small disk size, there are important considerations to be aware of when using Dataflow Shuffle:

  • A worker VM uses part of the 25 GB of disk space for the operating system, binaries, logs, and containers. Jobs that use a significant amount of disk and exceed the remaining disk capacity may fail when you use Dataflow Shuffle.
  • Jobs that use a lot of disk I/O may be slow due to the performance of the small disk. For more information about performance differences between disk sizes, see the Compute Engine Persistent Disk Performance page.

If any of these considerations apply to your job, you can use pipeline options to specify a larger disk size.

Using Dataflow Shuffle

Service-based Dataflow Shuffle is currently available in the following regions:

  • us-west1 (Oregon)
  • us-central1 (Iowa)
  • us-east1 (South Carolina)
  • us-east4 (North Virginia)
  • northamerica-northeast1 (Montréal)
  • europe-west2 (London)
  • europe-west1 (Belgium)
  • europe-west4 (Netherlands)
  • europe-west3 (Frankfurt)
  • asia-southeast1 (Singapore)
  • asia-east1 (Taiwan)
  • asia-northeast1 (Tokyo)
  • australia-southeast1 (Sydney)

Dataflow Shuffle will become available in additional regions in the future.

Note: Performance differences in the asia-northeast1 (Tokyo) region: We recommend using Dataflow Shuffle with large datasets (greater than 1 TB) when you run pipelines in the asia-northeast1 (Tokyo) region. Using Shuffle with smaller datasets in the asia-northeast1 (Tokyo) region does not give you the same performance advantages as Shuffle in other regions.

Java: SDK 2.xPythonJava: SDK 1.x

Note: To use Dataflow Shuffle, you must have Apache Beam SDK for Python version 2.1.0 or higher.

To use the service-based Dataflow Shuffle in your batch pipelines, specify the following parameter:
--experiments=shuffle_mode=service

If you use Dataflow Shuffle for your pipeline, do not specify the --zone parameter. Instead, specify the --region parameter and set the value to one of the regions where Shuffle is currently available. Dataflow autoselects the zone in the region you specified. If you do specify the --zone parameter and set it to a zone outside of the available regions, Dataflow reports an error.

Dataflow Flexible Resource Scheduling

Dataflow FlexRS reduces batch processing costs by using advanced scheduling techniques, the Dataflow Shuffle service, and a combination of preemptible virtual machine (VM) instances and regular VMs. By running preemptible VMs and regular VMs in parallel, Dataflow improves the user experience if Compute Engine stops preemptible VM instances during a system event. FlexRS helps to ensure that the pipeline continues to make progress and that you do not lose previous work when Compute Engine preempts your preemptible VMs. For more information about FlexRS, see Using Flexible Resource Scheduling in Dataflow.

Dataflow Runner v2

The current production Dataflow runner utilizes language-specific workers when running Apache Beam pipelines. To improve scalability, generality, extensibility, and efficiency, Dataflow runner is moving to a more services-based architecture. These changes include a more efficient and portable worker architecture packaged together with the Shuffle Service and Streaming Engine.

The new Dataflow runner, Dataflow Runner v2, is available for Python streaming pipelines. You are encouraged to try out Dataflow Runner v2 with your current workload before it is enabled by default on all new pipelines. You do not have to make any changes to your pipeline code to take advantage of this new architecture.

Note: Dataflow Runner v2 requires the Apache Beam SDK for Python, version 2.21.0 or higher.

Benefits of using Dataflow Runner v2

Starting with Python streaming pipelines, new features will be available on Dataflow Runner v2 only. In addition, the improved efficiency of the Dataflow Runner v2 architecture could lead to performance improvements in your Dataflow jobs.

While using Dataflow Runner v2, you might notice a reduction in your bill. The billing model for Dataflow Runner v2 is not final yet, so your bill might increase back to near current levels as the new runner is enabled across all pipelines.

Using Dataflow Runner v2

Dataflow Runner v2 is available in regions that have Dataflow regional endpoints.

Java: SDK 2.xPython

Note: Dataflow Runner v2, you must have Apache Beam SDK for Python version 2.21.0 or higher.

Dataflow Runner v2 requires Streaming Engine. To enable them both, specify the following parameter:
--experiments=use_runner_v2

Debugging Dataflow Runner v2 jobs

To debug jobs using Dataflow Runner v2, you should follow standard debugging steps; however, be aware of the following when using Dataflow Runner v2:

  • Dataflow Runner v2 jobs run two types of processes on the worker VM — SDK process and the runner harness process. Depending on the pipeline and VM type, there might be one or more SDK processes, but there is only one runner harness process per VM.
  • SDK processes run user code and other language-specific functions, while the runner harness process manages everything else.
  • The runner harness process waits for all SDK processes to connect to it before starting to request work from Dataflow.
  • Jobs might be delayed if the worker VM downloads and installs dependencies during the SDK process startup. If there are issues in an SDK process, such as starting up or installing libraries, the worker reports its status as unhealthy.
  • Worker VM logs — available through the Logs Viewer or the Dataflow monitoring interface — include logs from the runner harness process as well as logs from the SDK processes.
  • To diagnose problems in your user code, examine the worker logs from the SDK processes. If you find any errors in the runner harness logs, please contact Support to file a bug.

Updating an existing pipeline

https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline

The Apache Beam SDKs provide a way to update an ongoing streaming job on the Dataflow managed service with new pipeline code.

There are various reasons why you might want to update your existing Dataflow job:

  • You want to enhance or otherwise improve your pipeline code.
  • You want to fix bugs in your pipeline code.
  • You want to update your pipeline to handle changes in data format, or to account for version or other changes in your data source.

When you update your job, the Dataflow service performs a compatibility check between your currently-running job and your potential replacement job. The compatibility check ensures that things like intermediate state information and buffered data can be transferred from your prior job to your replacement job.

You can use the Update feature to scale a streaming Apache Beam pipeline to use a different number of workers. See Manual Scaling in Streaming Mode for instructions and restrictions.

The update process and its effects

When you update a job on the Dataflow service, you replace the existing job with a new job that runs your updated pipeline code. The Dataflow service retains the job name, but runs the replacement job with an updated Job ID.

The replacement job preserves any intermediate state data from the prior job, as well as any buffered data records or metadata currently “in-flight” from the prior job. For example, some records in your pipeline might be buffered while waiting for a window to resolve.

In-flight data

“In-flight” data will still be processed by the transforms in your new pipeline. However, additional transforms that you add in your replacement pipeline code may or may not take effect, depending on where the records are buffered. For example, let’s say your existing pipeline has the following transforms:

Java: SDK 2.xPythonJava: SDK 1.x

p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
| 'Format' >> FormatStrings()

You can replace your job with new pipeline code, as follows:

Java: SDK 2.xPythonJava: SDK 1.x

p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
| 'Remove', RemoveStringsStartingWithA()
| 'Format' >> FormatStrings()

Even though you’ve added a transform to filter out strings that begin with the letter “A”, the next transform (FormatStrings) may still see buffered or in-flight strings that begin with "A" that were transferred over from the prior job.

Changing windowing

You can change windowing and trigger strategies for the PCollections in your replacement pipeline, but use caution. Changing the windowing or trigger strategies will not affect data that is already buffered or otherwise in-flight.

We recommend that you attempt only smaller changes to your pipeline’s windowing, such as changing the duration of fixed- or sliding-time windows. Making major changes to windowing or triggers, like changing the windowing algorithm, might have unpredictable results on your pipeline output.

Launching your replacement job

To update your job, you’ll need to launch a new job to replace the ongoing job. When you launch your replacement job, you’ll need to set the following pipeline options to perform the update process in addition to the job’s regular options:

JavaPython

  • Pass the --update option.
  • Set the --job_name option in PipelineOptions to the same name as the job you want to update.
  • If any transform names in your pipeline have changed, you must supply a transform mapping and pass it using the --transform_name_mapping option.

Caution: To facilitate the mapping between transforms in your prior pipeline and your replacement pipeline, give explicit names to every transform in your pipelines.

By default, Dataflow generates a warning if you do not explicitly name your transforms. You can increase this warning to an error.

Specifying your replacement job name

JavaPython

When you launch your replacement job, the value you pass for the --job_name option must match exactly the name of the job you want to replace.

To find the correct job name value, select your prior job in the Dataflow Monitoring Interface and find the Job Name field in the Summary tab:

Alternatively, you can query a list of existing jobs by using the Dataflow Command-line Interface. Enter the command gcloud beta dataflow jobs list into your shell or terminal window to obtain a list of Dataflow jobs in your Google Cloud project, and find the NAME field for the job you want to replace:

ID                                        NAME                                 TYPE       CREATION_TIME        STATUS
2015-07-28_17_02_27-7257409117866690674 windowedwordcount-johndoe-0729000214 Streaming 2015-07-28 17:02:28 Running

Creating the transform mapping

JavaPython

If your replacement pipeline has changed any transform names from those in your prior pipeline, the Dataflow service requires a transform mapping. The transform mapping maps the named transforms in your prior pipeline code to names in your replacement pipeline code. You can pass the mapping by using the --transform_name_mapping command-line option, using the following general format:

--transform_name_mapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

You only need to provide mapping entries in --transform_name_mapping for transform names that have changed between your prior pipeline and your replacement pipeline.

Note: When you run with --transform_name_mapping, you may need to escape the quotations as appropriate for your shell. For example, in Bash:

--transform_name_mapping='{"oldTransform1":"newTransform1",...}'

Determining transform names

The transform name in each instance in the map is the name you supplied when you applied the transform in your pipeline. For example:

Java: SDK 2.xPythonJava: SDK 1.x

| 'FormatResults' >> beam.ParDo(MyDoFn())

You can also obtain the transform names for your prior job by examining the job’s execution graph in the Dataflow Monitoring Interface:

Naming in composite transforms

Transform names are hierarchical, based on the transform hierarchy in your pipeline. If your pipeline has a composite transform, the nested transforms are named in terms of their containing transform. For example, suppose your pipeline contains a composite transform named CountWidgets, which contains an inner transform named Parse. The inner transform's full name will be CountWidgets/Parse, and you must specify that full name in your transform mapping.

If your new pipeline maps a composite transform to a different name, all of the nested transforms are also automatically renamed. You’ll need to specify the changed names for the inner transforms in your transform mapping.

Refactoring the transform hierarchy

If your replacement pipeline uses a different transform hierarchy than your prior pipeline (e.g. because you refactored your composite transforms, or your pipeline depends on a composite transform from a library that changed), you’ll need to explicitly declare the mapping.

For example, let’s suppose your prior pipeline applied a composite transform, CountWidgets, that contained an inner transform named Parse. Now, let's say your replacement pipeline refactors CountWidgets, and nests Parse inside another transform named Scan. For your update to succeed, you must explicitly map the prior pipeline's complete transform name (CountWidgets/Parse) to the new pipeline's transform name (CountWidgets/Scan/Parse):

JavaPython

--transform_name_mapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

If you delete a transform entirely in your replacement pipeline, you must provide a null mapping. Suppose our replacement pipeline removes the CountWidgets/Parse transform entirely:

--transform_name_mapping={"CountWidgets/Parse":""}

Job compatibility check

When you launch your replacement job, the Dataflow service performs a compatibility check between your replacement job and your prior job. If the compatibility check passes, your prior job will be stopped. Your replacement job will then launch on the Dataflow service while retaining the same job name. If the compatibility check fails, your prior job will continue running on the Dataflow service and your replacement job will return an error.

JavaPython

Note: The Dataflow service currently has a limitation in that the error returned from a failed update attempt is only visible in your console or terminal if you use blocking execution. The current workaround consists of the following steps:

  1. Use pipeline.run().wait_until_finish() in your pipeline code.
  2. Run your replacement pipeline program with the --update option.
  3. Wait for the replacement job to successfully pass the compatibility check.
  4. Exit the blocking runner process by typing Ctrl+C.

Alternately, you can monitor your replacement job’s state in the Dataflow Monitoring Interface. If your job has started successfully, it also passed the compatibility check.

The compatibility check ensures that the Dataflow service can transfer intermediate state data from the steps in your prior job to your replacement job, as specified by the transform mapping that you provide. The compatibility check also ensures that your pipeline’s PCollections are using the same Coders. Changing a Coder can cause the compatibility check to fail because any in-flight data or buffered records may not be correctly serialized in the replacement pipeline.

Preventing compatibility breaks

Certain differences between your prior pipeline and your replacement pipeline can cause the compatibility to check to fail. These differences include:

  • Changing the pipeline graph without providing a mapping. When you update a job, the Dataflow service attempts to match the transforms in your prior job to the transforms in the replacement job in order to transfer intermediate state data for each step. If you’ve renamed or removed any steps, you’ll need to provide a transform mapping so that Dataflow can match state data accordingly.
  • Changing the side inputs for a step. Adding side inputs to or removing them from a transform in your replacement pipeline will cause the compatibility check to fail.
  • Changing the Coder for a step. When you update a job, the Dataflow service preserves any data records currently buffered (for example, while windowing is resolving) and handles them in the replacement job. If the replacement job uses different or incompatible data encoding, the Dataflow service will not be able to serialize or deserialize these records.

Caution: The Dataflow service currently cannot guarantee that changing a coder in your prior pipeline to an incompatible coder will cause the compatibility check to fail. It is recommended that you do not attempt to make backwards-incompatible changes to Coders when updating your pipeline. If your pipeline update succeeds but you encounter issues or errors in the resulting data, ensure that your replacement pipeline uses data encoding that is at least compatible with your prior job.

  • You’ve removed a “stateful” operation from your pipeline. Your replacement job might fail Dataflow’s compatibility check if you remove certain stateful operations from your pipeline. The Dataflow service can fuse multiple steps together for efficiency. If you’ve removed a state-dependent operation from within a fused step, the check will fail. Stateful operations include:
  • Transforms that produce or consume side inputs.
  • I/O reads.
  • Transforms that use keyed state.
  • Transforms that have window merging.
  • You’re attempting to run your replacement job in a different geographic zone. You must run your replacement job in the same zone in which you ran your prior job.

https://cloud.google.com/dataflow/docs/guides/using-command-line-intf

Using the Cloud Dataflow command-line interface

When you execute your pipeline using the Dataflow managed service, you can obtain information about your Dataflow job by using the Dataflow command-line interface. The Dataflow command-line interface is part of the gcloud command-line tool in the Cloud SDK.

NOTE: If you’d rather view and interact with your Dataflow jobs using the web-based UI, use the Dataflow monitoring interface.

Installing the Cloud Dataflow command-line component

To use the Dataflow Command-line Interface, you’ll first need to install the gcloud tool.

Running the available commands

You interact with the Dataflow command-line interface by running the available commands. To run a command, type the following command into your shell or terminal:

gcloud dataflow

The Dataflow command-line interface has three major subcommands: jobs, logs, and metrics.

Note: You can see the complete list of all currently available Dataflow commands in the Google Cloud SDK documentation. Alternatively, type gcloud dataflow -h in your shell or terminal to print help information on available commands and flags.

To see help information on any specific command, add --help to the end of the command when you enter it into your shell or terminal

Jobs commands

The jobs subcommands group lets you view and interact with the Dataflow jobs in your Google Cloud project. You can use these commands to view a list of your jobs, cancel a job, show a description of a specific job, and others. For example, to view a list of all your Dataflow jobs, type the following command into your shell or terminal:

gcloud dataflow jobs list

The gcloud tool returns a list of your current jobs, as follows:

ID                                        NAME                                    TYPE   CREATION_TIME        STATE   REGION
2015-06-03_16_39_22-4020553808241078833 wordcount-janedoe-0603233849 Batch 2015-06-03 16:39:22 Done us-central1
2015-06-03_16_38_28-4363652261786938862 wordcount-johndoe-0603233820 Batch 2015-06-03 16:38:28 Done us-central1
2015-05-21_16_24_11-17823098268333533078 bigquerytornadoes-johndoe-0521232402 Batch 2015-05-21 16:24:11 Done europe-west1
2015-05-21_13_38_06-16409850040969261121 bigquerytornadoes-johndoe-0521203801 Batch 2015-05-21 13:38:06 Done us-central1
2015-05-21_13_17_18-18349574013243942260 bigquerytornadoes-johndoe-0521201710 Batch 2015-05-21 13:17:18 Done europe-west1
2015-05-21_12_49_37-9791290545307959963 wordcount-johndoe-0521194928 Batch 2015-05-21 12:49:37 Done us-central1
2015-05-20_15_54_51-15905022415025455887 wordcount-johndoe-0520225444 Batch 2015-05-20 15:54:51 Failed us-central1
2015-05-20_15_47_02-14774624590029708464 wordcount-johndoe-0520224637 Batch 2015-05-20 15:47:02 Done us-central1

Using the job ID, you can run the describe command to display more information about a job.

export JOBID=<X>
gcloud dataflow jobs describe $JOBID

For example, if you run the command for job ID 2015-02-09_11_39_40-15635991037808002875, the gcloud tool returns the following information:

createTime: '2015-02-09T19:39:41.140Z'
currentState: JOB_STATE_DONE
currentStateTime: '2015-02-09T19:56:39.510Z'
id: 2015-02-09_11_39_40-15635991037808002875
name: tfidf-bchambers-0209193926
projectId: google.com:clouddfe
type: JOB_TYPE_BATCH

You can run the command with the --format=json option to format the result into JSON.

gcloud --format=json dataflow jobs describe $JOBID

The gcloud tool returns the following formatted information:

{
"createTime": "2015-02-09T19:39:41.140Z",
"currentState": "JOB_STATE_DONE",
"currentStateTime": "2015-02-09T19:56:39.510Z",
"id": "2015-02-09_11_39_40-15635991037808002875",
"name": "tfidf-bchambers-0209193926",
"projectId": "google.com:clouddfe",
"type": "JOB_TYPE_BATCH"
}

For a complete list of jobs commands, see the gcloud dataflow jobs command in the Cloud SDK documentation.

Logs commands

The logs commands display log entries for jobs run on the Dataflow Service.

For example, you can use the list command to print the logs that provide information about what your job is doing.

export JOBID=<X>
gcloud dataflow logs list $JOBID

For job ID 2015-02-09_11_39_40-15635991037808002875, the gcloud tool returns:

Listed 0 items.

In this example, no logs showed up at the default severity (Warning). You can include the BASIC logs by running the list command with the --importance=detailed option.

gcloud dataflow logs list $JOBID --importance=detailed

The gcloud tool prints out the following logs:

d 2016-08-29T09:33:28 2015-02-09_11_39_40-15635991037808002875_00000156d72606f7 (39b2a31f5e883423): Starting worker pool synchronously
d 2016-08-29T09:33:28 2015-02-09_11_39_40-15635991037808002875_00000156d7260871 (39b2a31f5e883ce9): Worker pool is running
d 2016-08-29T09:33:28 2015-02-09_11_39_40-15635991037808002875_00000156d7260874 (39b2a31f5e883b77): Executing operation Count.PerElement/Sum.PerKey/GroupByKey/GroupByKeyOnly…
...

For a complete list of logs commands, see the gcloud dataflow logs command in the Cloud SDK documentation.

Metrics commands

The metrics commands allow you to view the metrics for a given Dataflow job.

Note: The metric command names are subject to change and certain metrics are subject to deletion.

You can use the list command to get information about the steps in your job.

gcloud dataflow metrics list $JOBID

For this command, the gcloud tool returns:

---
name:
name: s09-s14-start-msecs
origin: dataflow/v1b3
scalar: 137
updateTime: '2016-08-29T16:35:50.007Z'
---
name:
context:
output_user_name: WordCount.CountWords/Count.PerElement/Init-out0
name: ElementCount
origin: dataflow/v1b3
scalar: 26181
updateTime: '2016-08-29T16:35:50.007Z'
---
name:
context:
step: s2
name: emptyLines
origin: user
scalar: 1080
updateTime: '2016-08-29T16:35:50.007Z'
...

You can use gcloud dataflow metrics list command to obtain tentative metrics while your job is running or shortly after it finishes. To view tentative metrics, run the command with the --tentative flag. A metric marked tentative is updated frequently as worker instances process your pipeline's data. It may decrease if a worker experiences an error. tentative metrics become committed values as a worker finishes work and commits the results.

For a complete list of metrics commands, see the gcloud dataflow metrics command in the Cloud SDK documentation.

Using commands with regional endpoints

The Dataflow command-line interface supports regional endpoints since gcloud tool version 176. Use the --region option with any command to specify the regional endpoint that manages your job.

For example, gcloud dataflow jobs list will list jobs from all regions, but gcloud dataflow jobs list --region=europe-west1 will only list jobs managed from europe-west1.

Note: The --region option is required to obtain job information from a regional endpoint. If you do not specify a regional endpoint, us-central1 will be used as the default endpoint.

Stopping a running pipeline

If you need to stop a running Dataflow job, you can do so by issuing a command using either the Dataflow Monitoring Interface or the Dataflow Command-line Interface. There are two possible commands you can issue to stop your job: Cancel and Drain.

Cancel

Using the Cancel option to stop your job tells the Dataflow service to cancel your job immediately. The service will halt all data ingestion and processing as soon as possible and immediately begin cleaning up the Google Cloud resources attached to your job. These resources may include shutting down Compute Engine worker instances and closing active connections to I/O sources or sinks.

Because Cancel immediately halts processing, you may lose any “in-flight” data. “In-flight” data refers to data that has been read but is still being processed by your pipeline. Data written from your pipeline to an output sink before you issued the Cancel command may still be accessible on your output sink.

If data loss is not a concern, use the Cancel option to stop your to ensure the Google Cloud resources associated with your job are shut down as soon as possible.

Drain

Draining batch pipelines is not supported.

Using the Drain option to stop your job tells the Dataflow service to finish your job in its current state. Your job stops ingesting new data from input sources soon after receiving the drain request (typically within a few minutes). However, the Dataflow service preserves any existing resources, such as worker instances, to finish processing and writing any buffered data in your pipeline. When all pending processing and write operations are complete, the Dataflow service cleans up the Google Cloud resources associated with your job.

Note: Your pipeline continues to incur the cost of maintaining any associated Google Cloud resources until all processing and writing has completed.

If you want to prevent data loss as you bring down your pipelines, use the Drain option to stop your job.

Effects of draining a job

When you issue the Drain command, Dataflow immediately closes any in-process windows and fires all triggers. The system does not wait for any outstanding time-based windows to finish. For example, if your pipeline is ten minutes into a two-hour window when you issue the Drain command, Dataflow won’t wait for the remainder of the window to finish. It closes the window immediately with partial results.

Dataflow causes open windows to close by advancing the system watermark to infinity. This functionality also works with custom data sources. When Draining a pipeline that uses a custom data source class, Dataflow stops issuing requests for new data, advance the system watermark to infinity, and call your source’s finalize() method on the last checkpoint.

In the detailed view of your pipeline’s transforms, you can see the effects of an in-process Drain command:

Note: You can Update a pipeline that is currently being Drained.

As a Drain can take a significant amount of time to complete, such as when your pipeline has a large amount of buffered data, you may still Cancel a job that is currently Draining.

Custom metrics

Any metric you define in your Apache Beam pipeline is reported by Dataflow to Monitoring as a custom metric. There are three types of Apache Beam pipeline metrics: Counter, Distribution, and Gauge. Dataflow currently only reports Counter and Distribution to Monitoring. Distribution is reported as four sub-metrics suffixed with _MAX, _MIN, _MEAN, and _COUNT. Dataflow does not support creating a histogram from Distribution metrics.

Dataflow reports incremental updates to Monitoring approximately every 30 seconds. All user metrics are exported as a double data type to avoid conflicts. Custom metrics in Dataflow appear in Monitoring as custom.googleapis.com/dataflow/metric-name and are limited to 500 metrics per project.

Custom metrics reported to Monitoring incurs charges based on the Cloud Monitoring pricing.

BEAM:

https://www.udemy.com/course/apache-beam-a-hands-on-course-to-build-big-data-pipelines/learn/lecture/16131701#questions

Core Beam transforms

Beam provides the following core transforms, each of which represents a different processing paradigm:

  • ParDo
  • GroupByKey
  • CoGroupByKey
  • Combine
  • Flatten
  • Partition

State and timer

1. State and Timers

Beam’s windowing and triggering facilities provide a powerful abstraction for grouping and aggregating unbounded input data based on timestamps. However there are aggregation use cases for which developers may require a higher degree of control than provided by windows and triggers. Beam provides an API for manually managing per-key state, allowing for fine-grained control over aggregations.

Beam’s state API models state per key. To use the state API, you start out with a keyed PCollection, which in Java is modeled as a PCollection<KV<K, V>>. A ParDo processing this PCollection can now declare state variables. Inside the ParDo these state variables can be used to write or update state for the current key or to read previous state written for that key. State is always fully scoped only to the current processing key.

Windowing can still be used together with stateful processing. All state for a key is scoped to the current window. This means that the first time a key is seen for a given window any state reads will return empty, and that a runner can garbage collect state when a window is completed. It’s also often useful to use Beam’s windowed aggregations prior to the stateful operator. For example, using a combiner to preaggregate data, and then storing aggregated data inside of state. Merging windows are not currently supported when using state and timers.

Sometimes stateful processing is used to implement state-machine style processing inside a DoFn. When doing this, care must be taken to remember that the elements in input PCollection have no guaranteed order and to ensure that the program logic is resilient to this. Unit tests written using the DirectRunner will shuffle the order of element processing, and are recommended to test for correctness.

In Python DoFn declares states to be accessed by creating StateSpec class member variables representing each state. Each StateSpec is initialized with a name, this name is unique to a ParDo in the graph and has no relation to other nodes in the graph. A DoFn can declare multiple state variables.

11.1 Types of state

Beam provides several types of state:

ValueState

A ValueState is a scalar state value. For each key in the input, a ValueState will store a typed value that can be read and modified inside the DoFn’s @ProcessElement or @OnTimer methods. If the type of the ValueState has a coder registered, then Beam will automatically infer the coder for the state value. Otherwise, a coder can be explicitly specified when creating the ValueState. For example, the following ParDo creates a single state variable that accumulates the number of elements seen.

Beam also allows explicitly specifying a coder for ValueState values. For example:

CombiningState

CombiningState allows you to create a state object that is updated using a Beam combiner. For example, the previous ValueState example could be rewritten to use CombiningState

BagState

A common use case for state is to accumulate multiple elements. BagState allows for accumulating an unordered set of elements. This allows for addition of elements to the collection without requiring the reading of the entire collection first, which is an efficiency gain. In addition, runners that support paged reads can allow individual bags larger than available memory.

11.2 Deferred state reads

When a DoFn contains multiple state specifications, reading each one in order can be slow. Calling the read() function on a state can cause the runner to perform a blocking read. Performing multiple blocking reads in sequence adds latency to element processing. If you know that a state will always be read, you can annotate it as @AlwaysFetched, and then the runner can prefetch all of the states necessary. For example:

If however there are code paths in which the states are not fetched, then annotating with @AlwaysFetched will add unnecessary fetching for those paths. In this case, the readLater method allows the runner to know that the state will be read in the future, allowing multiple state reads to be batched together.

11.3 Timers

Beam provides a per-key timer callback API. This allows for delayed processing of data stored using the state API. Timers can be set to callback at either an event-time or a processing-time timestamp. Every timer is identified with a TimerId. A given timer for a key can only be set for a single timestamp. Calling set on a timer overwrites the previous firing time for that key’s timer.

11.3.1 Event-time timers

Event-time timers fire when the input watermark for the DoFn passes the time at which the timer is set, meaning that the runner believes that there are no more elements to be processed with timestamps before the timer timestamp. This allows for event-time aggregations.

11.3.2 Processing-time timers

Processing-time timers fire when the real wall-clock time passes. This is often used to create larger batches of data before processing. It can also be used to schedule events that should occur at a specific time. Just like with event-time timers, processing-time timers are per key — each key has a separate copy of the timer.

While processing-time timers can be set to an absolute timestamp, it is very common to set them to an offset relative to the current time. In Java, the Timer.offset and Timer.setRelative methods can be used to accomplish this.

11.3.3 Dynamic timer tags

Beam also supports dynamically setting a timer tag using TimerMap. This allows for setting multiple different timers in a DoFn and allowing for the timer tags to be dynamically chosen - e.g. based on data in the input elements. A timer with a specific tag can only be set to a single timestamp, so setting the timer again has the effect of overwriting the previous expiration time for the timer with that tag. Each TimerMap is identified with a timer family id, and timers in different timer families are independent.

11.3.4 Timer output timestamps

By default, event-time timers will hold the output watermark of the ParDo to the timestamp of the timer. This means that if a timer is set to 12pm, any windowed aggregations or event-time timers later in the pipeline graph that finish
after 12pm will not expire. The timestamp of the timer is also the default output timestamp for the timer callback. This means that any elements output from the onTimer method will have a timestamp equal to the timestamp of the timer firing. For processing-time timers, the default output timestamp and watermark hold is the value of the input watermark at the time the timer was set.

In some cases, a DoFn needs to output timestamps earlier than the timer expiration time, and therefore also needs to hold its output watermark to those timestamps. For example, consider the following pipeline that temporarily batches records into state, and sets a timer to drain the state. This code may appear correct, but will not work properly.

The problem with this code is that the ParDo is buffering elements, however nothing is preventing the watermark from advancing past the timestamp of those elements, so all those elements might be dropped as late data. In order to prevent this from happening, an output timestamp needs to be set on the timer to prevent the watermark from advancing past the timestamp of the minimum element. The following code demonstrates this.

11.4 Garbage collecting state

Per-key state needs to be garbage collected, or eventually the increasing size of state may negatively impact performance. There are two common strategies for garbage collecting state.

11.4.1 Using windows for garbage collection

All state and timers for a key is scoped to the window it is in. This means that depending on the timestamp of the input element the ParDo will see different values for the state depending on the window that element falls into. In addition, once the input watermark passes the end of the window, the runner should garbage collect all state for that window. (note: if allowed lateness is set to a positive value for the window, the runner must wait for the watermark to pass the end of the window plus the allowed lateness before garbage collecting state). This can be used as a garbage-collection strategy.

For example, given the following:

This ParDo stores state per day. Once the pipeline is done processing data for a given day, all the state for that day is garbage collected.

11.4.1 Using timers For garbage collection

In some cases, it is difficult to find a windowing strategy that models the desired garbage-collection strategy. For example, a common desire is to garbage collect state for a key once no activity has been seen on the key for some time. This can be done by updating a timer that garbage collects state. For example

11.5 State and timers examples

Following are some example uses of state and timers

11.5.1. Joining clicks and views

In this example, the pipeline is processing data from an e-commerce site’s home page. There are two input streams: a stream of views, representing suggested product links displayed to the user on the home page, and a stream of clicks, representing actual user clicks on these links. The goal of the pipeline is to join click events with view events, outputting a new joined event that contains information from both events. Each link has a unique identifier that is present in both the view event and the join event.

Many view events will never be followed up with clicks. This pipeline will wait one hour for a click, after which it will give up on this join. While every click event should have a view event, some small number of view events may be lost and never make it to the Beam pipeline; the pipeline will similarly wait one hour after seeing a click event, and give up if the view event does not arrive in that time. Input events are not ordered — it is possible to see the click event before the view event. The one hour join timeout should be based on event time, not on processing time.

11.5.2 Batching RPCs

In this example, input elements are being forwarded to an external RPC service. The RPC accepts batch requests — multiple events for the same user can be batched in a single RPC call. Since this RPC service also imposes rate limits, we want to batch ten seconds worth of events together in order to reduce the number of calls.

Coders

https://beam.apache.org/documentation/programming-guide/#data-encoding-and-type-safety

Data encoding and type safety

When Beam runners execute your pipeline, they often need to materialize the intermediate data in your PCollections, which requires converting elements to and from byte strings. The Beam SDKs use objects called Coders to describe how the elements of a given PCollection may be encoded and decoded.

Note that coders are unrelated to parsing or formatting data when interacting with external data sources or sinks. Such parsing or formatting should typically be done explicitly, using transforms such as ParDo or MapElements.

In the Beam SDK for Python, the type Coder provides the methods required for encoding and decoding data. The SDK for Python provides a number of Coder subclasses that work with a variety of standard Python types, such as primitive types, Tuple, Iterable, StringUtf8 and more. You can find all of the available Coder subclasses in the apache_beam.coders package.

Note that coders do not necessarily have a 1:1 relationship with types. For example, the Integer type can have multiple valid coders, and input and output data can use different Integer coders. A transform might have Integer-typed input data that uses BigEndianIntegerCoder, and Integer-typed output data that uses VarIntCoder.

7.1. Specifying coders

The Beam SDKs require a coder for every PCollection in your pipeline. In most cases, the Beam SDK is able to automatically infer a Coder for a PCollection based on its element type or the transform that produces it, however, in some cases the pipeline author will need to specify a Coder explicitly, or develop a Coder for their custom type.

Beam SDKs use a variety of mechanisms when attempting to automatically infer the Coder for a PCollection.

The Beam SDK for Python has a CoderRegistry that represents a mapping of Python types to the default coder that should be used for PCollections of each type.

By default, the Beam SDK for Python automatically infers the Coder for the elements of an output PCollection using the typehints from the transform’s function object, such as DoFn. In the case of ParDo, for example a DoFn with the typehints @beam.typehints.with_input_types(int) and @beam.typehints.with_output_types(str) accepts an input element of type int and produces an output element of type str. In such a case, the Beam SDK for Python will automatically infer the default Coder for the output PCollection (in the default pipeline CoderRegistry, this is BytesCoder).

NOTE: If you create your PCollection from in-memory data by using the Create transform, you cannot rely on coder inference and default coders. Create does not have access to any typing information for its arguments, and may not be able to infer a coder if the argument list contains a value whose exact run-time class doesn’t have a default coder registered.

7.2. Default coders and the CoderRegistry

Each Pipeline object has a CoderRegistry object, which maps language types to the default coder the pipeline should use for those types. You can use the CoderRegistry yourself to look up the default coder for a given type, or to register a new default coder for a given type.

CoderRegistry contains a default mapping of coders to standard Python types for any pipeline you create using the Beam SDK for Python. The following table shows the standard mapping:

Java TypeDefault CoderDoubleDoubleCoderInstantInstantCoderIntegerVarIntCoderIterableIterableCoderKVKvCoderListListCoderMapMapCoderLongVarLongCoderStringStringUtf8CoderTableRowTableRowJsonCoderVoidVoidCoderbyte[ ]ByteArrayCoderTimestampedValueTimestampedValueCoder

Python TypeDefault CoderintVarIntCoderfloatFloatCoderstrBytesCoderbytesStrUtf8CoderTupleTupleCoder

7.2.1. Looking up a default coder

You can use the method CoderRegistry.get_coder to determine the default Coder for a Python type. You can use coders.registry to access the CoderRegistry. This allows you to determine (or set) the default Coder for a Python type.

7.2.2. Setting the default coder for a type

To set the default Coder for a Python type for a particular pipeline, you obtain and modify the pipeline’s CoderRegistry. You use the method coders.registry to get the CoderRegistry object, and then use the method CoderRegistry.register_coder to register a new Coder for the target type.

The following example code demonstrates how to set a default Coder, in this case BigEndianIntegerCoder, for int values for a pipeline.

  • Java
  • Python
apache_beam.coders.registry.register_coder(int, BigEndianIntegerCoder)

7.2.3. Annotating a custom data type with a default coder

The Beam SDK for Python does not support annotating data types with a default coder. If you would like to set a default coder, use the method described in the previous section, Setting the default coder for a type.

PUBSUB:

https://cloud.google.com/dataflow/docs/concepts/streaming-with-cloud-pubsub

Dataflow runner uses a different, private implementation of PubsubIO. This implementation takes advantage of Google Cloud-internal APIs and services to offer three main advantages: low latency watermarks, high watermark accuracy (and therefore data completeness), and efficient deduplication.

The Dataflow runner’s implementation of PubsubIO automatically acknowledges messages once they have been written to persistent storage, either Shuffle or a sink. Therefore, messages are only acknowledged when Dataflow can guarantee that there will be no data loss if some component crashed, or a connection were lost.

.4. Side inputs

In addition to the main input PCollection, you can provide additional inputs to a ParDo transform in the form of side inputs. A side input is an additional input that your DoFn can access each time it processes an element in the input PCollection. When you specify a side input, you create a view of some other data that can be read from within the ParDo transform’s DoFn while processing each element.

Side inputs are useful if your ParDo needs to inject additional data when processing each element in the input PCollection, but the additional data needs to be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipeline.

4.4.1. Passing side inputs to ParDo

  • Java
  • Python
# Side inputs are available as extra arguments in the DoFn's process method or Map / FlatMap's callable.
# Optional, positional, and keyword arguments are all supported. Deferred arguments are unwrapped into their
# actual values. For example, using pvalue.AsIteor(pcoll) at pipeline construction time results in an iterable
# of the actual elements of pcoll being passed into each process invocation. In this example, side inputs are
# passed to a FlatMap transform as extra arguments and consumed by filter_using_length.
words = ...
# Callable takes additional arguments.
def filter_using_length(word, lower_bound, upper_bound=float('inf')):
if lower_bound <= len(word) <= upper_bound:
yield word
# Construct a deferred side input.
avg_word_len = (
words
| beam.Map(len)
| beam.CombineGlobally(beam.combiners.MeanCombineFn()))
# Call with explicit side inputs.
small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3)
# A single deferred side input.
larger_than_average = (
words | 'large' >> beam.FlatMap(
filter_using_length, lower_bound=pvalue.AsSingleton(avg_word_len))
)
# Mix and match.
small_but_nontrivial = words | beam.FlatMap(
filter_using_length,
lower_bound=2,
upper_bound=pvalue.AsSingleton(avg_word_len))
# We can also pass side inputs to a ParDo transform, which will get passed to its process method.
# The first two arguments for the process method would be self and element.
class FilterUsingLength(beam.DoFn):
def process(self, element, lower_bound, upper_bound=float('inf')):
if lower_bound <= len(element) <= upper_bound:
yield element
small_words = words | beam.ParDo(FilterUsingLength(), 0, 3)...

4.4.2. Side inputs and windowing

A windowed PCollection may be infinite and thus cannot be compressed into a single value (or single collection class). When you create a PCollectionView of a windowed PCollection, the PCollectionView represents a single entity per window (one singleton per window, one list per window, etc.).

Beam uses the window(s) for the main input element to look up the appropriate window for the side input element. Beam projects the main input element’s window into the side input’s window set, and then uses the side input from the resulting window. If the main input and side inputs have identical windows, the projection provides the exact corresponding window. However, if the inputs have different windows, Beam uses the projection to choose the most appropriate side input window.

For example, if the main input is windowed using fixed-time windows of one minute, and the side input is windowed using fixed-time windows of one hour, Beam projects the main input window against the side input window set and selects the side input value from the appropriate hour-long side input window.

If the main input element exists in more than one window, then processElement gets called multiple times, once for each window. Each call to processElement projects the “current” window for the main input element, and thus might provide a different view of the side input each time.

If the side input has multiple trigger firings, Beam uses the value from the latest trigger firing. This is particularly useful if you use a side input with a single global window and specify a trigger.

Future reference

https://cloud.google.com/blog/products/gcp/after-lambda-exactly-once-processing-in-cloud-dataflow-part-2-ensuring-low-latency

--

--