At the plugin level, there is currently no way to get the value of timestamp.

There is a way to use the value without calling predicate.test actually, though it’s not the most beautiful way, but I’m living with it:

Basically in getTableLayouts function, you can find all the predicate details from the “Constraint” parameter. e.g.

public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns)
KafkaTableHandle handle = convertTableHandle(table);

Long startTs = null;
Long endTs = null;
Optional<Map<ColumnHandle, Domain>> domains = constraint.getSummary().getDomains();"TableLayout");
if (domains.isPresent()) {
Map<ColumnHandle, Domain> columnHandleDomainMap = domains.get();
for (Map.Entry<ColumnHandle, Domain> entry : columnHandleDomainMap.entrySet()) {
if (entry.getKey() instanceof KafkaColumnHandle && ((KafkaColumnHandle) entry.getKey()).getName().equals(KafkaInternalFieldDescription.OFFSET_TIMESTAMP_FIELD.getName())) {
Range span = entry.getValue().getValues().getRanges().getSpan();
Marker low = span.getLow();
Marker high = span.getHigh();
if (!low.isLowerUnbounded()) {
startTs = (Long) low.getValue();
if (!high.isUpperUnbounded()) {
endTs = (Long) high.getValue();
}"K: %s\tV: %s", entry.getKey().toString(), entry.getValue().toString());
}"startTs: %s, endTs: %s", startTs, endTs);
ConnectorTableLayout layout = new ConnectorTableLayout(new KafkaTableLayoutHandle(handle, startTs, endTs));
return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));

Let me know what do you think about this? There may be concerns that is obvious to you but I may be missing. I’m working on similar project and I’m very much inspired by your work of the protobuf decoder as well as this predicate pushdown idea :)

Like what you read? Give xiaoyao a round of applause.

From a quick cheer to a standing ovation, clap to show how much you enjoyed this story.