Serverless Log Data Ingestion Pipelines

Austin Bennett
6 min readSep 15, 2020

--

Manage your data, not infrastructure & I like protobuf for real-time analytics!

Components:

  • Global HTTP(S) Load Balancer with Serverless NEGs
  • gRPC in Cloud Run Container (as API/Endpoint)
  • Pub/Sub Message Queue
  • GCP Cloud Functions
  • BigQuery

Serverless NEGs:

NEGs stand for Network Endpoint Groups. I have had great experience with Serverless NEGs (even as they were first rolled out in Alpha). These allow us to use a Global HTTP(S) Load Balancer and route to various serverless services as our point of ingress. Currently, have only used to route based on path. There are a bunch of enhancements on the roadmap here, so something to follow as updates are to come.

Open Source Alternative: There are a ton of ways to do routing, which for many might be unneeded. This is desired to have a truly global endpoint that can reroute traffic to alternate regions if there are any issues.

gRPC endpoints:

Much has been written about the benefits of gRPC, I’m most interested in gRPC due to the tight coupling with Protocol Buffers, which are fantastic to leverage as a way to enforce Data Contracts.

Why gRPC for us: we have experienced applications sending us data with misspelled fields and/or incorrect data types, by having the API enforce protocol buffers, that simply is not possible to do in a way that can break anything (then minimizes the places that require defensive programming).

Improvements/Future Directions: Cloud RUN (fully managed/serverless) currently only supports unary gRPC calls. For some use cases, having streaming calls could be more ideal. Also, the current solution was custom coded. I’d like to extend this to be a more repeatable and automated module to deploy code based on a few parameters and the .proto definition. For clients that are unable to work with protobuf, there is also gRPC-Gateway, a way to transform HTTP POST of JSON into a protobuf, this is another pattern we had initial success with last year. Alternately, I have been meaning to explore KrakenD specifically for its gRPC-Gateway support as well as interoperability with Pub/Sub.

Much documentation exists on writing gRPC services. The code here, is an example of how to parse a HTTP POST JSON object to a protobuf. This can be a great first step, so your pipelines have typed, clean, structured data within, and so that you can evolve your frontend (APIs) to accept gRPC services — once client teams are ready, without having to change your message queues, or database/warehouse insert logic. The following code takes the JSON parses just the pieces desired in the protobuf, and puts that proto into the specified Pub/Sub topic.

package example
// a genrated .pb.go would also have same package name
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"github.com/golang/protobuf/proto""cloud.google.com/go/pubsub"
)
// GCP_PROJECT is a user-set environment variable.
var projectID = os.Getenv("GCP_PROJECT")
var TopicID = os.Getenv("TOPIC")
// client is a global Pub/Sub client, initialized once per instance.
var client *pubsub.Client
func init() {
// err is pre-declared to avoid shadowing client.
var err error
// client is initialized with context.Background() because it should
// persist between function invocations.
// I was sneaky, this actually is an example that works in Cloud
// Functions ;-) so no needn't worry about containers, if you are
// not ready for them.
client, err = pubsub.NewClient(context.Background(), projectID)
if err != nil {
log.Fatalf("pubsub.NewClient: %v", err)
}
}
func Message2PubSub(w http.ResponseWriter, r *http.Request) {
// ProtobufMessage is from a generated pb.go with same package name
p := ProtobufMessage{}
PublishMessage(w, r, &p)
}
// PublishMessage publishes a message to Pub/Sub. PublishMessage
// only works with topics that already exist.
func PublishMessage(w http.ResponseWriter, r *http.Request, p proto.Message) {
// Read the request body.
data, err := ioutil.ReadAll(r.Body)
fmt.Println(string(data))
if err != nil {
http.Error(w, "Error reading request", http.StatusBadRequest)
return
}
// Parse the request body to JSON
if err := json.Unmarshal(data, &p); err != nil {
log.Printf("json.Unmarshal: %v", err)
http.Error(w, "Error parsing request", http.StatusBadRequest)
return
}
//write proto to pb-out (pbo)
pbo, err := proto.Marshal(p)
m := &pubsub.Message{
Data: pbo,
}
// r.Context() used only because they are only needed for this invocation.
id, err := client.Topic(TopicID).Publish(r.Context(), m).Get(r.Context())
if err != nil {
log.Printf("topic(%s).Publish.Get: %v", TopicID, err)
http.Error(w, "Error publishing message", http.StatusInternalServerError)
return
}
//probably don't need to print the message id
fmt.Fprintf(w, "Published msg: %v", id)
}

Open Source Alternative: As mentioned above, needing to explore KrakenD. Also this could easily be hosted on with Knative on Kubernetes (RUN is built on Knative), at which point it would be possible to do streaming gRPC (GCP says that streaming gRPC will eventually be available for fully managed RUN, but is not available yet).

Pub/Sub:

Any data ingestion pipeline these days is going to rely upon a message queue. While Kafka is nice and has been around, Pulsar is looking promising, and Pub/Sub is well integrated in the GCP Ecosystem and works great for our current needs. A notable feature lacking from historical message queues is push subscriptions, which means the message queue can push out (and properly check/acknowledge) messages to the next desired place. In this current architecture, we rely on Cloud Functions. Pull based subscriptions are the common way things have been done (I like push where can be appropriate). Generally, not writing much here, as I assume

Open Source Alternatives: Leading candidates would be (as already mentioned) Pulsar and Kafka.

Cloud Functions:

This falls squarely in the FaaS offering. Pick your runtime (I use Go for this use case), and write the little function/code you need. Depending on levels of concurrency (and familiarity with containers), it might be sensible for cost optimization to leverage something more like Cloud Run (or other — with more knobs to turn).

The code here is dead simple, read the message from the queue (that was pushed here and invoked the function), and push the data to the table. I use Python to demonstrate protobuf interoperability.

import base64
from pb import generated_pb2
from google.protobuf.json_format import MessageToDict
from google.cloud import bigquery
from google.api_core import retry
import json
import os

BQ_DATASET = os.getenv('BQ_DATASET')
BQ_TABLE = os.getenv('BQ_TABLE')

BQ = bigquery.Client()

table = BQ.dataset(BQ_DATASET).table(BQ_TABLE)
# often will have all the different messages relevant to
# the protobuf service, not just the single example as below
PROTO = generated_pb2.ProtobufMessage()

def ProtobufMessageInsert(event, context):
proto = PROTO
InsertData(event, context, proto)
# Could have many

def InsertData(event, context, proto):
pubsub_message = base64.b64decode(event['data'])
proto.ParseFromString(pubsub_message)
record = MessageToDict(proto, preserving_proto_field_name=True)
errors = BQ.insert_rows_json(table,
json_rows=[record],
retry=retry.Retry(deadline=30))
if errors != []:
print(errors)

Open Source Alternative: I would personally be looking at the already mentioned Kantive on Kubernetes. For pure functions, OpenWhisk is also a solid offering. As something to take data off a message queue and write data smoewhere else, Apache Beam is incredible. I think this should be the fundamental data tool of any organization, it is not overkill for a startup and seems the sensible data tool for data movement and analytics for a hybrid/multi-cloud organization. Beam is also billed as being serverless when using DataFlow, but the point is portable data pipelines. Keep your business logic largely the same as you swap out underlying compute (Kinesis/Flink, Spark, or GCP Dataflow) in different environments and a common framework that a company could rally behind that then will work for data processing on-premise and in any public cloud.

BigQuery:

As someone historically consuming data that got into managing it, I can’t say enough great things about BigQuery. For a large enterprise, the ease of sharing across the business would be the largest standoout. Also, that it is truly serverless (both the query as well as storage layer) and that we can stream data into it so immidiately available, as well as the ability to update/delete records, are quite notable. There are a ton of additional benefits. This tool is the initial attraction, but these tools together are a knockout combination.

Summary:

My inclination is to have the simplest backend possible so that the data is straightforward and easy to reason about (naturally, that puts a heavier burden on the client application sending data). Following the patterns above, you get a quick feedback loop for any testing — so no need for both batch ingest to a data warehouse and to something like Elastic for the lower latency data needs. And, for real-time analytics these are foundational components that can’t be beat.

--

--