What’s New in Apache Pulsar 2.7.2
We are excited to see the Apache Pulsar community has successfully released the 2.7.2 version! More than 38 contributors provided improvements and bug fixes that contributed to 85 commits.
Highlights of this release are as below:
- Consumers are no longer blocked after receiving multiple retry messages in Docker.
- Consumers can consume messages published in the topic stats when using the
Key_Shared
subscription type.
This blog walks through the most noteworthy changes grouped by the key functionality. For the complete list, including all enhancements and bug fixes, check out the Pulsar 2.7.2 Release Notes.
Notable bug fix and enhancement
Pulsar 2.7.2 has included the following changes for broker, bookie, proxy, Pulsar admin, Pulsar SQL, and clients.
Broker
- Fix NPEs and thread safety issues in PersistentReplicator. PR-9763
Previously, in a non-persistent topic with a key-shared subscription, messages were marked as published in the topic stats, but consumers did not consume them. This caused NullPointerExeptions (NPEs).
· Makecursor
fieldvolatile
since the field is updated asynchronously in another thread.
· Remove the unnecessarysynchronization
on theopenCursorAsync
method since it is not needed.
· Add null checks before accessing thecursor
field since statistics might be updated before the cursor is available. - Fix the issue of a message not dispatched for the
Key_Shared
subscription type in a non-persistent topic. PR-9826
Previously, In a non-persistent topic with a key-shared subscription, messages were marked as published in the topic stats, but consumers did not consume them. This PR fixes this issue. - Fix the issue of a consumer being blocked after receiving retry messages. PR-10078
Previously, in the Docker environment, if a consumer enabled the retry feature and set the retry topic in DeadLetterPolicy, the consumer was blocked after receiving multiple retry messages because thehasMessageAvailable
check was set to false. This PR fixes this issue. - Fix the issue of schema not added when subscribing to an empty topic without schema. PR-9853
Previously, when a consumer with a schema subscribed to an empty topic without schema, the previous check usedisActive
, which only checked whether the topic could be deleted. However, it should check if there was any connected producer or consumer of this topic. For the previous implementation, even if a topic had no active producers or consumers, the topic's subscription list was not empty andisActive
returned true. Then the consumer's schema was not attached to the topic and it threw anIncompatibleSchemaException
. This PR changes to check if the topic has active producers or consumers instead of checking whether it can be deleted. - Fix the issue of schema type check when using the
ALWAYS_COMPATIBLE
strategy. PR-10367
This PR provides the following enhancements when using theALWAYS_COMPATIBLE
strategy for schema type check:
· For non-transitive strategy, it checks only schema type for the last schema.
· For transitive strategy, it checks all schema types.
· For getting schema by schema data, it considers different schema types. - Fix the issue of CPU 100% usage when deleting namespace. PR-10337
Previously, When deleting a namespace, the namespacePolicies
setting was marked as deleted, triggering the topic's onPoliciesUpdate and a read of the data of ZooKeeper’s Policies node ascheckReplicationAndRetryOnFailure
. Because the namespace was deleted, the ZooKeeper node no longer existed and the failure to read data triggered infinite retries. This PR fixes this issue by adding a method to check for non-deleted policies.
Bookie
- Fallback to
PULSAR_GC
ifBOOKIE_GC
is not defined. PR-9621
This PR changes fallback fromPULSAR_MEM
toPULSAR_GC
ifBOOKIE_GC
is not defined. - Fallback to
PULSAR_EXTRA_OPTS
ifBOOKIE_EXTRA_OPTS
is not defined. PR-10397
This PR defines that-Dio.netty.* does not pass the system
properties ifPULSAR_EXTRA_OPTS
orBOOKIE_EXTRA_OPTS
is set. This change ensures consistency withPULSAR_EXTRA_OPTS
behavior and prevents duplicate properties. This PR also adds-Dio.netty.leakDetectionLevel=disabled
(unlessBOOKIE_EXTRA_OPTS
is set) sincePULSAR_EXTRA_OPTS
does not include that setting by default.
Proxy
- Fix authorization error while using proxy and
Prefix
subscription authentication mode. PR-10226
Previously, when using Pulsar proxy andPrefix
subscriptionauthentication mode, org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider#canConsumeAsync
threw an exception, which caused the consumer error. This PR updates theorg.apache.pulsar.broker.authorization.PulsarAuthorizationProvider#allowTopicOperationAsync
logic, checksisSuperUser
first, and then returnsisAuthorizedFuture
.
Pulsar admin
- Add
get version
command for Pulsar REST API, pulsar-admin, and pulsar-client. PR-9975
Pulsar SQL
- Fix the issue of
BKNoSuchLedgerExistsException
. PR-9910
Previously, when using Pulsar SQL to query messages,BKNoSuchLedgerExistsException
was thrown if the ZooKeeper ledger root directory was changed. This PR fixes this issue.
Client
Pulsar 2.7.2 includes the following changes for Java, Python, C++, and WebSocket clients.
Java
- Fix the issue that ClientConfigurationData’s objects are not equal. PR-10091
This PR fixes this issue and reusesAuthenticationDisabled.INSTANCE
as default instead of creating a new one. - Fix the issue of AutoConsumeSchema KeyValue encoding. PR-10089
This PR keeps the KeyValueEncodingType when auto-consuming a KeyValue schema. - Fix the error of
OutOfMemoryError
while usingKeyValue<GenericRecord, GenericRecord>
. PR-9981
Previously, a topic with schemaKeyValue<GenericRecord, GenericRecord>
could not be consumed due to a problem inHttpLookupService
. TheHttpLookupService
downloaded the schema in JSON format but theKeyValue
schema was expected to be encoded in binary form. This PR uses the existing utility functions to convert the JSON representation of theKeyValue
schema to the desired format. - Fix the concurrency issue in the client’s producer epoch handling. PR-10436
This PR uses a volatile field for epoch andAtomicLongFieldUpdater
for incrementing the value. - Handle NPE while receiving ack for a closed producer. PR-8979
- Fix the issue of batch size not set when deserializing from a byte array. PR-9855
Previously, batch index message acknowledgment was added to the seek method to support more precise seek using ACK sets. However, when the seek was performed by a message that was serialized and deserialized, thebatchSize
was set to zero, which led to a discrepancy between messageId forms and seek results. This PR fixes this issue. - Fix the issue of a single-topic consumer being unable to close. PR-9849
Python
- Support setting the default value when using Python Avro Schema. PR-10265
Previously, the default value for the Python Avro schema could not be set, causing the Python schema to not be updated. This PR fixes this issue and adds the following changes:
· Add therequired
field to control the type of schema that can setnull
.
· Add therequired_default
field to control the schema whether it has a default attribute or not.
· Add thedefault
field to control the default value of the schema. - Fix the issue of nested Map or Array in schema does not work. PR-9548
Previously, the Python client did not handle nestedMap
orArray
well, and the generated schema string was invalid. When theMap/Array
'sschema()
method set thevalues
field of the schema string, it ignored theRecord
type but notMap
andArray
. This PR fixes the issue and adds 4 tests forMap<Map>
,Map<Array>
,Array<Array>
, andArray<Map>
to cover all nested cases that involveMap
orArray
. - Add TLS SNI support for Python and C++ clients. PR-8957
This PR adds TLS SNI support for CPP and Python clients, so you can connect to brokers through the proxy.
C++
- Fix the issue that the C++ client cannot be built on Windows. PR-10363
This PR putsPULSAR_PUBLIC
before the variable type and keeps theLIB_NAME
as the shared library's name (for example, removing the dll suffix). - Fix the issue of the paused zero queue consumer pre-fetches messages. PR-10036
Previously, zero queue consumers (the consumer’s receiver queue size is 0) pre-fetched messages afterpauseMessageListener
was called. This was becauseConsumerImpl::increaseAvailablePermits
did not check the boolean variablemessageListenerRunning_
, which became false afterpauseMessageListener
was called. Therefore, after the zero queue consumer was paused, it still sent the FLOW command to pre-fetch a message to its internal unbounded queueincomingMessages_
. This PR fixes this issue and make the following changes:
· Add the check formessageListenerRunning_
inincreaseAvailablePermits
method and make the implementation consistent with Java client'sConsumerImpl#increaseAvailablePermits
. Change the type ofavailablePermits_
tostd::atomic_int
.
· Add theincreaseAvailablePermits
invocation inresumeMessageListener
to send FLOW command after consumer resumes sincepauseMessageListener
does not prefetch messages anymore. - Fix the issue of segmentation fault when getting a topic name from the received message ID. PR-10006
Previously, the C++ client supported getting a topic name from both the received message and its message ID. However, for a consumer that subscribed to a non-partitioned topic, getting a topic name from the received message ID caused a segmentation fault. This PR usessetTopicName
for every single message when a consumer receives a batch and adds related tests for all types of consumers (includingConsumerImpl
,MultiTopicsConsumerImpl
, andPartitionedConsumerImpl
). - Fix the issue of the
SinglePartitionMessageRouter
always picking the same partition. PR-9702
Previously, theSinglePartitionMessageRouter
was supposed to pick a random partition for a given producer and stick with that. The problem was that the Crand()
call always used the seed 0 and that ended up having multiple processes to always deterministically pick the same partition. This PR fixes this issue. - Reduce log level for an ack-grouping tracker. PR-10094
Previously, the warning log occurred when the ACK grouping tracker tried to send ACKs while the connection was closed. This PR changes the log level to debug when the connection is not ready forAckGroupingTrackerEnabled::flush
.
WebSocket
- Optimize URL token param value. PR-10187
This PR removes theBearer
prefix requirement for the token param value of the WebSocket URL. - Make the browser client support the token authentication. PR-9886
Previously, the WebSocket client used the HTTP request header to transport the authentication params, but the browser JavaScript WebSocket client could not add new headers. This PR uses the query param token to transport the authentication token for the browser JavaScript WebSocket client.
Function and connector
- Allow customizable function logging. PR-10389
Previously, the function log configuration was in the jar package and could not be dynamically customized. This PR changes the function log configuration file to the configuration directory, which can be customized. - Pass through record properties from Pulsar sources. PR-9943
- Fix the issue of the time unit in Pulsar Go functions. PR-10160
This PR changes the time unit of avg process latency from ns to ms. - Fix the issue that the Kinesis sink did not try to resend messages. PR-10420
Previously, when the Kinesis sink connector failed to send a message, it did not retry. In this case, ifretainOrdering
was enabled, it would lead to subsequent messages not being sent. This PR adds retry logic for the Kinesis sink connector. A message is retried to send if it fails to send. - Fix the issue of null error messages in the onFailure exception in the Kinesis sink. PR-10416
Previously, if the Kinesis producer failed to send a message, the error message in theonFailure
exception was null. This PR extracts theUserRecordFailedException
to show the real error messages.
Tiered storage
- Prevent class loader leak and restore offloader directory override. PR-9878
Previously, there was a class loader leak. This PR updates thePulsarService
and thePulsarConnectorCache
classes to use a map from directory strings to offloaders. - Add logs for cleanup of offloaded data operation. PR-9852
Previously, the cleanup offloaded data operation lacked logs making it hard for users to analyze the reason for the tiered storage data loss. This PR adds some logs for the cleanup of offloaded data operation.
Get involved
To get started, you can download Pulsar directly or you can spin up a Pulsar cluster on StreamNative Cloud with a free 30-day trial of StreamNative Cloud in which Pulsar 2.7.2 changes are shipped! Moreover, we offer technical consulting and expert training to help get your organization started. As always, we are highly responsive to your feedback. Feel free to contact us if you have any questions at any time. Look forward to hearing from you and stay tuned for the next Pulsar release!
About the Author
Yong Zhang is an Apache Pulsar committer. He works as a software engineer at StreamNative.
Yu Liu is an Apache Pulsar committer and a technical writer from StreamNative. You can follow her on twitter.
This post was originally published on StreamNative blog.
Like this post? Please recommend and/or share.
Want to learn more? See https://streamnative.io/blog. Follow us here on Medium and check out our GitHub.