A Scalable, Reliable Webhook Dispatcher Powered by Kafka
A webhook is the term used to describe a web request that sends data to a receiver, where no data is expected in response. It is a one-way push of data, typically used to notify a system that one or more events have occurred. A webhook is usually an HTTP POST with a JSON payload that expects a 200 OK response to acknowledge receipt of the payload. Any response payload with the 200 OK is typically ignored.
At Hootsuite we send webhooks to notify partners when a message scheduled for publication to a social network changes state. If a customer has configured an external service to be part of the message review flow, that service is notified when a message is created, scheduled, requires approval, is approved and is subsequently sent. The notification payload contains the message id and its new state. For example, a service offering compliance review of message content, upon receiving a PENDING_APPROVAL notification indicating that the service is next up to approve the content, would fetch the message content using our REST API and then approve or reject it via another API call. Message content is versioned and any approval or rejection indicates the version to which it applies.
A webhook dispatcher’s job is to ensure that HTTPS POSTs and their payloads, such as event notifications in JSON, are correctly and securely received by the webhook receiver URL. As API integrations become more prevalent, webhook-like mechanisms become critical for efficiently and quickly synchronizing data with customers and partners. Alternatives such querying or polling for changes pose problems for optimal request routing and can place unnecessary load on systems. Optimal routing of a polling request may involve finding a server that is caching the requestor’s next events in RAM (e.g. a via shard-aware load balancer).
While the above PENDING_APPROVAL use case above tolerates some notification re-ordering because messages are versioned, we wanted to support event notification producers that require order preservation of their notification payloads. Imagine a light controller service sending a webhook to turn a light on and then another a minute later to turn it off. Re-ordering those events would leave the light on.
From a webhook receiver perspective (e.g. a partner), the reliability of a webhook dispatcher is paramount in many business contexts. For example, a dropped notification that a scheduled message that is pending review would stop the publication of a potentially timing-critical marketing message. Untrustworthy APIs are pretty easy to detect during initial integration testing and will be rejected for business-critical uses. Low latency delivery of notifications is also important in some contexts, for example, when a human is monitoring the progress of a task.
Desirable per-receiver delivery features include:
- At-least-once delivery of each event notification — i.e. no message loss.
- Overall notification ordering that preserves individual event producer ordering
- A single total ordering of event notifications with a sequence number per event notification. The receiver simply needs to durably retain the last received sequence number and ensure notifications are stored or fully processed before acknowledgement.
- Prevent user/partner/customer-provided webhook URLs from probing internal infrastructure.
- Require HTTPS webhook URLs for privacy and data integrity.
- Authenticate the webhook dispatcher to the receiver — HMAC on webhook payloads
- Authenticate the webhook receiver to the dispatcher — HTTPS GET an HMAC over a challenge nonce and receiver-provided nonce before POSTing
- Prevent leakage of internal network details by removing headers like x-forwarded-for from the POST, some of which may also confuse the receiver.
- Prevent resource exhaustion from misbehaved webhook receivers by discarding response headers and bodies that are excessively long — we’re just looking for a 200 OK.
- Event notification payload size — a few kB
- Number of webhook receivers (URLs) — millions
- Number of events per minute per destination — less than 10, some in 1000s
- Unreliable webhook receivers — buffering, retry/backoff timers, order-preservation
- Network and DNS outages — buffering for all destinations
- Coordinating of state for a receiver retained by the dispatcher — next seq number, buffered notifications, last retry time, etc.
- Dispatcher process or node failure must not lose notification events
- Resource limitation aware throttling — RAM, file descriptors, source ports
There are three APIs for the Webhooks Dispatcher:
- The webhook receiver’s API: an HTTP POST, headers and payload format
- An internal API to request sending of a webhook
- An internal API to observe and manage the state for webhook receivers
Webhook Receiver API
The webhook receiver API is a generic means to POST a list of event notifications, each comprised of a JSON object with sequence number, type identifier string and an object containing the value of the identified type. X-Hootsuite-Timestamp and X-Hootsuite-Signature headers are included where the signature header contains an SHA-512 HMAC over the timestamp and body using the partner’s key as the secret. Figure 1 shows an example of this format as used for message review.
Specific event notification types such as “com.hootsuite.messages.event.v1” shown above add additional semantics to the webhook. This type of event is used for message review and is described by the finite state machine in Figure 2 whose states are described in Table 1. In conjunction with the message review related notifications, a partner uses our message publishing API (not shown) to fetch the content of messages under review and subsequently approve or reject them. Message versioning makes it clear what is being approved or rejected which is important if someone chooses to edit a message while a partner system is deciding to approve or reject it.
Internal Webhook Sending API
To request sending of a webhook to a specific receiver, the event notification producer writes to the partition associated with the receiver ID of the webhook request Kafka topic (for now we use the webhook URL as the receiver ID). Producers who care about the ordering of their event notifications can choose to use Kafka’s acknowledgements to ensure that they are on-disk at three Kafka brokers. We may add a REST version of this by adding it to the API described in the next section.
Internal API to Manage Webhook Receiver State
The Webhook Dispatcher offers an internal REST API for:
- getting the last response from a webhook receiver (the 200 OK, headers and body)
- deleting a receiver’s state (handy for testing)
- Testing a webhook receiver by sending a “ping” event notification type
See the implementation section below for how these requests are routed to the dispatcher process responsible for the given receiver.
Developer User Interface
To help our partners quickly debug webhook integrations we offer a web browser user interface that shows the last webhook response from their URL. This also helps reduce any question about the reliability of our webhook dispatcher. There is also a UI for configuring the webhook URL associated with their application (not shown).
Some systems add webhook dispatch in an unplanned way where each event-producing subsystem implements their own, often with limited retry support for unavailable receivers and minimal fault tolerance. In some cases an egress web proxy, such as Squid, is used to prevent probing of internal infrastructure by user-specified webhook URLs.
Ad hoc webhook dispatcher implementations may be justified by the perceived complexity and cost of building systems that support persisting per-destination state in a scalable/sharded way that is also highly available. Maintaining producer ordering of events may also require some form of leader election for the dispatcher managing state for a set of receivers.
The key to Hootsuite’s webhook dispatcher is the observation that Kafka, already deployed in our system, provides both a fault tolerant, sharded persistence mechanism (topic partitions) and leader election (per partition, via consumer groups). Kafka is the leading open source pub/sub messaging system and can persist weeks’ of messages using inexpensive storage. Writes to a topic partition can be replicated to three brokers and the producer can choose to wait for full acknowledgment. Topic “consumer groups” provide an elastic, fault-tolerant way to (re)assign exactly one process in the consumer group as responsible for consuming a given topic partition.
The design uses two topics with the same number of partitions. One topic is used to issue webhook requests, which are published into the partition that is a modulus of the receiver URL hash. The second topic is a Kafka log-compacted topic for storing the state of each webhook receiver in the associated request partition. Upon failure of a webhook dispatcher, the Kafka brokers rebalance the consumer group and any dispatcher process assigned a new request partition loads the state associated with the receivers in that partition from the same partition number of the log compacted topic and restarts any associated timers before processing new requests to receivers in that partition. The dispatcher is written in Go, which offers very lightweight threads (goroutines) and an efficient timer implementation. The number of partitions on these topics is set at topic creation time and must be considered carefully as it limits scale-out; there can be at most N webhook dispatcher processes servicing topics with N partitions.
Concurrency in the dispatcher is somewhat novel. Kafka brokers can store the last processed offset per consumer group-partition pair, as set by the dispatcher process. To allow more than one outstanding dispatch per partition we exploited the at-least-once delivery semantics so multiple goroutines per partition (each for different receivers) can succeed or fail independently. The request partition offset is updated only when all requests prior to the new offset value have been stored in the log compacted topic or sent — a Tetris-like algorithm. This, along with a sequence number per event notification in the body (not per POST), allows batching of notification delivery and state updates of dirty receivers to the log compacted partition. Should a failure occur, any re-read requests from the request topic whose receiver state was not completely updated resend the webhook to the receiver with the identical sequence number, allowing duplicates to be discarded at the receiver.
The first prototype used the state topic like a write ahead log (WAL) so every webhook send required at least two writes: one to record that we were trying to send it with given sequence number(s) and the subsequent one to indicate that it had succeeded. During load testing we found this to be quite IO intensive and opted for the model described above. It is more likely that receiver will get the same message twice but even with the WAL approach we couldn’t guarantee exactly once delivery (e.g. 200 OK is lost due to connection failure).
A fairly simple solution was adopted for webhook receivers who are offline for a couple days: discard any message older than two days. Once a webhook receiver is considered offline, we periodically send it throw-away ping events without sequence numbers. Only when it starts answering pings do we continue sending any buffered webhook requests. If any buffered notifications were discarded before the receiver comes back online, it will become aware of the number of lost notifications from the size of the gap in sequence numbers.This reduces RAM pressure but may not be sufficiently aggressive in network outage situations where messages for all receivers are being buffered. In those cases we may institute a maximum on the number of buffered messages per receiver so resource planning is simplified. Alternatively, we can shutdown the dispatchers until the outage is corrected, relying on the Kafka request topic to buffer requests (on disk at the brokers). Flow control is another option: stop reading from the request topic while resources are exhausted, but this by itself allows flakey, high-volume receivers to ruin the party for others. Adding more servers to the consumer group or replacing the servers with larger ones is another option. Failover by consumer group re-balancing makes this fairly simple. For now, we simply monitor the RAM pressure of the dispatchers and will shut them down or move to larger machines.
To avoid exhausting file descriptors we limit the maximum number of concurrent POSTs with semaphore set sufficiently below the maximum number of file descriptors available. We use Squid as an egress web proxy which means all the webhook posting connections will be to Squid. Squid does not support HTTP2 yet so we still need a large number of concurrent connections to Squid — one for each webhook in flight.
An event producer creates a webhook request by publishing a message into the webhook request topic (see Figure 5). The producer uses the webhook receiver ID (URL) as the key on the message with the structure defined in Table 2. The key ensures that the request is put in the correct partition of the topic and that the consumer of that partition is guaranteed to see all requests for the receivers that hash into that partition. Kafka’s consumer group mechanism ensures that at most one consumer process in that group is responsible for a partition so it can be the ‘master’ of all state related receivers in that partition such as next sequence number, buffered messages etc. Note that producing an event can be tricky, for example, the producer might not receive the acknowledgement from the broker due to connection loss and send it again. To allow detection and discarding of duplicate events, each webhook request may include a UUID set by the producer and the dispatcher keeps a small window of the most recent UUIDs to detect duplicates. The most recent versions of Kafka now offer exactly once delivery of messages to the broker so duplicate detection will eventually be removed from our code. Periodically the receiver state that has changed (sequence numbers, buffered messages, etc) is written to the log compacted state topic. See Table 3 for a description of the information stored per receiver.
The Webhook Dispatcher offers a REST API which allows getting the last received response, deleting a receiver and pinging a receiver. In order to handle these requests, they must be processed by the Webhook Dispatcher node that is the ‘master’ of the receiver’s partition. We used the user_data field that a consumer group member can share with the Kafka brokers to hold the host and port of the node. Any node receiving a REST request will see if it’s the master of that receiver, and if not, it will proxy the request to the host and port of the node that is. The user_data is shared with the entire consumer group during partition assignment and re-balancing. See Figure 6.
This solution provides an elastic, horizontally scalable, fault-tolerant webhook dispatcher that re-uses our existing infrastructure and needs very little management. The re-use of trusted, existing multi-availability-zone infrastructure (Kafka, Squid) reduced risk and allowed the dispatcher to be implemented quickly. Partners using this API soon come to trust it, perhaps after they’ve debugged their code using the web UI showing their response to the last delivered webhook.