Performing Aggregate Query On Twitter Data

Ganeshwara Hananda
Vaticle
Published in
5 min readOct 2, 2017
Image by NASA Goddard Space Flight Center is licensed under CC BY 2.0

Grakn is the database for AI. It is a distributed knowledge base designed specifically to handle complex data in knowledge-oriented system — a task for which traditional database technologies are not the best fit.

To ensure that their internal knowledge is the most up-to-date and relevant, AI systems are always hungry for newly updated data. Working seamlessly with streaming data is therefore useful for building knowledge-oriented systems. In this blog post, we will look at how to stream public tweets into Grakn’s distributed knowledge base.

Continuing Where We Left Off

In my previous post, we covered data insertion aspects, such as defining a schema as well as retrieving and inserting Twitter data. In this post we will look specifically at performing an aggregate query.

Crafting Group Aggregate Query In Graql

We will perform a query which will count the number of tweets a user has posted since the program started. It can be achieved by utilising the aggregate query feature. Graql has been chosen over the java API for this task because it is declarative and therefore much easier to use for complex queries.

Let’s look at how we can build it, step-by-step.

Start by creating a QueryBuilder object which we will use to craft the query in Graql.

QueryBuilder qb = tx.graql();

Now, let’s begin crafting the query. For this tutorial, let’s create a match query where we retrieve both the user and tweet.

We will bind them into vars which will be named user and tweet, respectively. Notice how we deliberately assign the vars identical names as the respective entity types. This is not a necessity: in practice, you are free to name them anything you want.

Also, pay attention to how we also supply the user-tweet-relationship relationship as part of the condition.

qb.match(
var("user").isa("user"),
var("tweet").isa("tweet"),
var().rel("posts", "user").rel("posted_by", "tweet").isa("user-tweet-relationship"));

The query we’ve just defined will return every user and tweet along with their relationships. We will use it as the basis of the aggregate query.

Let’s do some aggregation over the result here.

We will supply "user" and count() as the argument for group(), which essentially tells Grakn to group the result by username, and count the number of occurrences per username.

qb.match(
var("user").isa("user"),
var("tweet").isa("tweet"),
var().rel("posts", "user").rel("posted_by", "tweet").isa("user-tweet-relationship")
).aggregate(group("user", count()));

The query will now return the number of tweet a user has posted, which is what we want, as an object of type Map<Concept, Long>.

To be able to conveniently iterate, we will transform it into the relatively more straightforward type Stream<Map.Entry<String, Long>>, i.e., a stream of pairs of username and tweet count.

// execute query
Map<Concept, Long> result = ((Map<Concept, Long>) q.execute());
// map Map<Concept, Long> into Stream<Map.Entry<String, Long>> before returning
AttributeType screenNameAttributeType = tx.getAttributeType("screen_name");
Stream<Map.Entry<String, Long>> mapped = result.entrySet().stream().map(entry -> {
Concept key = entry.getKey();
Long value = entry.getValue();
String screenName = (String) key.asEntity().attributes(screenNameAttributeType).iterator().next().getValue();
return new HashMap.SimpleImmutableEntry<>(screenName, value);
});

Let’s put them all together in a new function calculateTweetCountPerUser:

public static Stream<Map.Entry<String, Long>> calculateTweetCountPerUser(GraknTx tx) {
// build query
QueryBuilder qb = tx.graql();
AggregateQuery q = qb.match(
var("user").isa("user"),
var("tweet").isa("tweet"),
var().rel("posts", "user").rel("posted_by", "tweet").isa("user-tweet-relationship")
).aggregate(group("user", count()));
// execute query
Map<Concept, Long> result = ((Map<Concept, Long>) q.execute());
// map Map<Concept, Long> into Stream<Map.Entry<String, Long>> before returning
AttributeType screenNameAttributeType = tx.getAttributeType("screen_name");
Stream<Map.Entry<String, Long>> mapped = result.entrySet().stream().map(entry -> {
Concept key = entry.getKey();
Long value = entry.getValue();
String screenName = (String) key.asEntity().attributes(screenNameAttributeType).iterator().next().getValue();
return new HashMap.SimpleImmutableEntry<>(screenName, value);
});
return mapped;
}

With that done, let’s update the main function like so:

public class Main {
// Twitter credentials
private static final String consumerKey = "...";
private static final String consumerSecret = "...";
private static final String accessToken = "...";
private static final String accessTokenSecret = "...";
// Grakn settings
private static final String implementation = Grakn.IN_MEMORY;
private static final String keyspace = "twitter-example";
public static void main(String[] args) {
try (GraknSession session = Grakn.session(mplementation, keyspace)) {
withGraknTx(session, tx -> initTweetSchema(tx)); // initialize schema
listenToTwitterStreamAsync(consumerKey, consumerSecret, accessToken, accessTokenSecret, (screenName, tweet) -> {
withGraknTx(session, tx -> {
insertUserTweet(tx, screenName, tweet); // insert tweet
Stream<Map.Entry<String, Long>> result = calculateTweetCountPerUser(tx); // query
prettyPrintQueryResult(result); // display
});
});
}
}
public static void prettyPrintQueryResult(Stream<Map.Entry<String, Long>> result) {
System.out.println("------");
result.forEach(e -> System.out.println("-- user " + e.getKey() + " tweeted " + e.getValue() + " time(s)."));
System.out.println("------");
}
}

Notice the two changes being introduced here. First we’ve added the call to our newly made function calculateTweetCountPerUser. Second, we're adding a pretty print function prettyPrintQueryResult to display our query in a nice way.

Running The Application

We’re all set! The only thing left is to run the application:

$ mvn package
$ java -jar target/twitterexample-1.0-SNAPSHOT.jar

Watch the terminal as the application runs. You should see the following text printed every time there’s an incoming tweet:

------
-- user <user-1> tweeted 2 time(s).
-- user <user-2> tweeted 1 time(s).
-- user <user-3> tweeted 1 time(s).
-- user <user-n> tweeted 1 time(s).
------

Wrapping Up

In this post we’ve looked at performing an aggregate query on real-time data. What this query does is it groups tweets by user and displays the number of tweets per user — akin to performing a SELECT followed by a GROUP BY clause in regular SQL.

Aggregate queries are one of the many bread and butter of data analysis, useful even when working with the sort of highly-connected data which Grakn particularly excels at.

There are other types of supported aggregation functions which you can find in the documentation. They are all expressed with a similar syntax so you shouldn’t have any trouble understanding them.

Thanks for staying tuned! In the next article we will look at how we can work with batch insertion. Batching is an important technique for achieving better throughput on high volume data.

And don’t forget: we have a working sample which you can just clone and run!

If you enjoyed this article, please hit the clap button below, so others can find it, too. Please get in touch if you’ve any questions or comments, either below, via our Community Slack channel, or via our discussion forum.

Find out more from https://grakn.ai.

Feature Image credit: “NASA Earth’s Light” by NASA Goddard Space Flight Center is licensed under CC BY 2.0

--

--

Ganeshwara Hananda
Vaticle
Writer for

Staff Engineer @ TypeDB. Original author of TypeDB Cluster. Designer of TypeDB’s actor-model implementation. Raft, Paxos, and LSM Tree are my cup of tea.