A Tale of Graphs and Darkness

J Paul Daigle
Perplexinomicon
Published in
12 min readMar 23, 2018

TL/DR

  1. Review of the previous two posts.
  2. Small world networks have short distances between similarly connected nodes.
  3. You can extract a small world of data from a small world network with a breadth-first search.
  4. Indexes are your friend.
  5. The Postgres COPY command exports data as well as it imports data.

Example Code

Introduction

This is part 2 of 2 in a 2 part sub-series and the third in an N part series about microservices.

In the first post of the series, we downloaded three tab separated files from the Internet Movie DataBase and loaded them into a single Postgres Database, in three tables, title_basics, name_basics, and title_principals. We showed a little bit of how these tables related to each other, for example, we can use these three tables to find information about movies (titles) and actors (names) by way of the title_principals_table. So if we want to learn the names of the principal actors in the movie "Thor: The Dark World":

data_imdb=# select primary_name from name_basics
data_imdb-# where nconst in(
data_imdb(# select p.nconst from title_principals p,
data_imdb(# title_basics t
data_imdb(# where t.primary_title ilike('thor%dark%')
data_imdb(# and t.title_type='movie'
data_imdb(# and p.tconst=t.tconst
data_imdb(# and p.category ilike('act%'));
primary_name
-------------------
Natalie Portman
Stellan Skarsgård
Tom Hiddleston
Chris Hemsworth
(4 rows)

We also created a Phoenix application called DataMunger with three Schemas, TitlePrincipal, TitleBasic, and NameBasic. Using these Schemas and Ecto, we can find all the movies in which Natalie Portman played a principal role:

iex(2)> q = Ecto.Query.from(
...(2)> n in DataMunger.NameBasic,
...(2)> where: ilike(n.primary_name, "Natalie Portman"),
...(2)> select: n.nconst)
iex(4)> titles_from_nconst = fn(n) ->
...(4)> Ecto.Query.from(
...(4)> p in DataMunger.TitlePrincipal,
...(4)> where: p.nconst == ^n and
...(4)> ilike(p.category, "act%"),
...(4)> select: p.tconst) |>
...(4)> DataMunger.ImdbRepo.all() end
iex(8)> q |>
...(8)> DataMunger.ImdbRepo.one() |>
...(8)> titles_from_nconst.() |>
...(8)> Enum.map(fn(p) ->
...(8)> Ecto.Query.from(
...(8)> t in DataMunger.TitleBasic,
...(8)> where: t.tconst == ^p,
...(8)> select: t.primary_title) |>
...(8)> DataMunger.ImdbRepo.all() end)
[["Leon: The Professional"], [...], ...]

In the second post, we created three new Schemas, Actors, Movies, and ActorMovies with matching database tables, and we created changesets to convert from one repo to the other.

In a future post, we’ll be looking into the problem of finding the same relationships when the three tables are split across three microservices. To do this, we’re going to split our database into three databases, and launch three services on Heroku to act as API backends. We can’t do that as things stand, because I don’t want to pay to host several million rows in a Postgres database on Heroku. In fact, I’d like to stay on the free tier, so we’re going to need to lose almost all of the data.

We can’t just truncate the tables, because we want to maintain relationships.

title     title        name     title      title
name name
----- ----- ---- ---- -----
t1 -----> t1:n1 -----> n1 ----> n1:t1 ---> t1
\ \-> t1:n4 -- n2 \-> n1:t2 ---> t2
\--> t1:nb - \ n3 |-> n1:t3 ---> t3
\ \-> n4- |-> n1:t4 ---> t4
| n5 \ \-> n1:t5 ---> t5
| n6 \--> n4:t1 t6
| n7 |-> n4:t2 t7
| n8 |-> n4:t4 t8
| n9 \-> n4:tf -- t9
| na /-> nb:t1 | ta
\-> nb ----> nb:t4 | tb
\-> nb:t5 | ta
| tb
| tc
| td
| te
\-> tf

In the figure, t1 has 3 principal actors, n1, n4, and nb. Between the three actors, we have 6 movies, which would lead us to select even more actors, and so on. But if we only selected, say, 9 rows from each database, we would end up with titles in the title selection that didn't show up in the title_name selection, or title_name selections that didn't show up in names. So it's important that we select data algorithmically when trying to extract a useful subset of the data.

In the rest of this post, we’ll be using Ecto to get a subset of the data that meets these constraints.

Getting the Data Subset

There are a couple of approaches we could use, and probably there’s an optimal algorithm for getting just exactly the data we want, but this is a case where the intuitive approach will probably do. Looking at the figure, it seems that by just following the arrows from name -> title_principal -> title -> title_principal -> name and so on, we should be able to rapidly expand our selection of names and titles until we have a nicely connected selection across all three databases. In this post, we'll write queries to extract the records we want from the initial IMDb database, transform them, load them into our new databases, and then export those databases to CSV files.

Extracting the Data

Extracting the data means solving the problem of getting a sparse, connected data set of fewer than 10000 rows for the largest database. We’ll start in Postgres and explore the data. The queries we use to do this will be refined and converted into Ecto when we have a better idea what we want to do.

Just as a reminder, we’re dealing with six tables here:

The IMDb data is loaded into an ImdbRepo, into three tables, name_basics, title_basics, and title_principals. title_basics contains information about video productions: movies, documentaries, whatever. Each title_basics record has a tconst identifier and other information, such as title, release date, and title_type. name_basics has the same sort of information about people, name, nconst, birth_year. title_principals is a join table, it keeps the tconst field and the nconst field from title_basics and name_basics, as well as information about what job the person had on the film. The tables we want to load into are the actor, movie, and actor_movie tables in the Repo.

Q for Query

Let’s start by asking a simple question: How many movies has Natalie Portman starred in? We’ll start with getting a universal key:

data_imdb=# select nconst from name_basics
data_imdb-# where primary_name
data_imdb-# ilike('natalie portman');
nconst
-----------
nm0000204

Then use that universal key to answer the question:

data_imdb=# select count(*) from title_principals p
data_imdb-# join title_basics t
data_imdb-# on (p.tconst=t.tconst)
data_imdb-# where nconst='nm0000204' and
data_imdb-# category ilike('act%') and
data_imdb-# title_type='movie';
count
-------
36
(1 row)

That’s a fair number of movies. How many costars does that get us?

data_imdb=# select count(distinct p.nconst) 
data_imdb=# from title_principals p where
data_imdb=# p.tconst in
data_imdb=# (select p.tconst from title_principals p
data_imdb=# join title_basics t
data_imdb=# on (p.tconst=t.tconst)
data_imdb=# where nconst='nm0000204' and
data_imdb=# category ilike('act%') and
data_imdb=# title_type='movie');
count
-------
276
(1 row)

So, if we want to limit our extraction to 10,000 rows, we won’t be able to go 2 degrees from the source before passing our limit, because 200 squared is 40,000. If we include all the movies for all the actors, we’re going to end up with a network that has a small number of extremely tightly coupled nodes, which isn’t really the same shape as the IMDb database.

In the IMDb data, yes, there are 276 actors who are costars of Natalie Portman, but there are 1.6 million actors who are not. But for most of these 1.6 million actors, there is a fairly short path to Natalie Portman, or to any other actor. If an actor has 100 costars in a career, and each of these has a 100 costars, that’s 10,000 actors who have co-starred with a co-star of the first actor, and 1,000,000 who have co-starred with a co-star of a co-star. A network like this, where most nodes are not neighbors, but the path length between any two nodes is short, is called a “small world” network. This is the property that we want to maintain in our own data. One thing we could try is limiting the number of costars by limiting the number of movies:

data_imdb=# select count(distinct p.nconst) 
data_imdb=# from title_principals p where
data_imdb=# p.tconst in
data_imdb=# (select p.tconst from title_principals p
data_imdb=# join title_basics t
data_imdb=# on (p.tconst=t.tconst)
data_imdb=# where nconst='nm0000204' and
data_imdb=# category ilike('act%') and
data_imdb=# title_type='movie' limit 3);
count
-------
22
(1 row)

22 cubed is 10,648, so theoretically we won’t even be able to get 3 degrees from the source before passing our limit. However, there’s probably a good deal of overlap (that’s also a characteristic of small-world networks), so 3 seems like a reasonable starting place for limiting movies per actor.

At this point we’re passing my ability to write SQL, so lets see if we can do some exploring in Ecto. A query for finding three movies for an actor:

iex(15)> tfa = fn(a) -> Ecto.Query.from(                 
...(15)> p in DataMunger.TitlePrincipal,
...(15)> join: t in DataMunger.TitleBasic,
...(15)> on: p.tconst == t.tconst,
...(15)> where: p.nconst == ^a and
...(15)> t.title_type=="movie" and
...(15)> ilike(p.category, "act%"),
...(15)> select: p.tconst,
...(15)> limit: 3) |>
...(15)> DataMunger.ImdbRepo.all() end
#Function<6.52032458/1 in :erl_eval.expr/5>
iex(16)> tfa.("nm0000204")
["tt2798920", "tt2180351", "tt0947798"]

A query for actors in a movie:

iex(20)> aft = fn(t) -> Ecto.Query.from(
...(20)> p in DataMunger.TitlePrincipal,
...(20)> where: p.tconst == ^t and
...(20)> ilike(p.category, "act%"),
...(20)> select: p.nconst)
...(20)> |> DataMunger.ImdbRepo.all() end
#Function<6.52032458/1 in :erl_eval.expr/5>
iex(21)> aft.("tt2798920")
["nm0000204", "nm0000492", "nm1935086", "nm0938950"]

The movies for actor tfa/1 query takes about 5 seconds to return, and the aft/1 query takes half a second. We'll be happier if we add a couple of indexes.

imdb# create index on title_principals 
imdb# (tconst, nconst, category);
imdb# create index on title_principals
imdb# (tconst, category);
imdb# create index on title_basics
imdb# (tconst, title_type);
imdb# create index on name_basics
imdb# (nconst);

On my machine, that drops both queries into the 1–2 millisecond range. The last index on name_basics we won't need until the end, but we might as well add it now.

Then we can write a function to chain our two queries together.

iex(81)> xs_from_ys = fn(vals, f) ->
...(81)> Enum.flat_map(vals, fn(v) ->
...(81)> f.(v) end) |> Enum.uniq end
#Function<12.52032458/2 in :erl_eval.expr/5>
iex(82)> aft.("tt0110413") |> xs_from_ys.(mfa)
["tt0085426", "tt0095250", "tt0099789", "tt0091954",
"tt0093776", "tt0096294", "tt0080605", "tt0081145",
"tt0082177", "tt0110413", "tt0120915", "tt0121765"]

So now we can find how many movies and titles we have at various distances from our seed actor.

iex(103)> tfa.("nm0000204") |> xs_from_ys.(aft) |>
...(103)> xs_from_ys.(tfa) |> xs_from_ys.(aft) |>
...(103)> xs_from_ys.(tfa) |> xs_from_ys.(aft) |>
...(103)> xs_from_ys.(tfa) |> xs_from_ys.(aft) |>
...(103)> Enum.count()
2927
iex(103)> tfa.("nm0000204") |> xs_from_ys.(aft) |>
...(103)> xs_from_ys.(tfa) |> xs_from_ys.(aft) |>
...(103)> xs_from_ys.(tfa) |> xs_from_ys.(aft) |>
...(103)> xs_from_ys.(tfa) |> xs_from_ys.(aft) |>
...(103)> xs_from_ys.(tfa) |> xs_from_ys.(aft) |>
...(103)> Enum.count()
12325

This pattern of moving from movie to actor to movie to actor and so on should look familiar, we’re basically recreating the logic from our ascii figure above.

After player around with this a bit, it seems like we can safely go about 4 degrees away from the seed if we limit the number of movies to three. This is imprecise, but it is a strategy that should build a sparse, connected, small worldy network.

The Professional (Query)

We’ll clean up our queries and add them to a GraphTraversemodule.

defmodule DataMunger.GraphTraverseTest do
use ExUnit.Case, async: true
alias DataMunger.GraphTraverse
alias DataMunger.ImdbRepo
alias DataMunger.TitleBasic
alias DataMunger.TitlePrincipal

describe "movies for actor" do
test "given a name, finds 3 movie tconsts" do
["nm0000203", "nm0000702", "nm0000204", "nm0000102"]
|> Enum.each(fn(n) ->
GraphTraverse.movies_for_actor(n)
|> Enum.map(fn(m) ->
ImdbRepo.get(TitleBasic, m)
end)
|> Enum.map(fn(mov) ->
assert mov.title_type == "movie"
end)
|> (&(assert Enum.count(&1) == 3)).()
end)
end
end
end

Our testing strategy is basically to call back to TitleBasic to make sure we got the right number of movies. We should also check that the movies we got back directly related to each name as an Actor in the TitlePrincipal table. We won't show that test here, but it is in the example code.

We can pass these tests with the following code:

defmodule DataMunger.GraphTraverse do
alias DataMunger.TitlePrincipal
alias DataMunger.TitleBasic
alias DataMunger.ImdbRepo
import Ecto.Query, only: [from: 2]

def movies_for_actor(actor, count \\ 3) do
from(p in TitlePrincipal,
join: t in TitleBasic,
on: p.tconst == t.tconst,
where: p.nconst == ^actor and
t.title_type=="movie" and
ilike(p.category, "act%"),
select: p.tconst,
limit: ^count)
|> ImdbRepo.all()
end
end

The code and tests for the complementary actors_for_moviefunction and the principals_from_movies function are very similar so we won't show them here. We are interested in the code that puts these together. We'll test the functionality of finding many movies and actors from a single actor. In test/data_munger/graph_traverse_test:

describe "gets actors and movies from one actor" do
test "returns uniq collections" do
{actors, movies} = ["nm0000204"]
|> GraphTraverse.movies_and_actors([], 2)
actors
|> Enum.each(fn(a) ->
assert String.match?(a, ~r/nm[0-9]{7}/)
end)
assert actors
|> Enum.uniq()
|> Enum.count() == actors
|> Enum.count()
assert Enum.count(actors) > 18
movies
|> Enum.each(fn(m) ->
assert String.match?(m, ~r/tt[0-9]{7}/)
end)
assert movies
|> Enum.uniq()
|> Enum.count() == movies
|> Enum.count()
assert Enum.count(movies) > 6
end
end

The API we’re assuming here is that the movies_and_actorsfunction is going to take three arguments, actors, movies, and a depth. The idea is to find all the costars of the first set of actors, then the costars of all the costars (for three movies) out to a certain search depth into the graph. In /lib/data_munger/graph_traverse.ex:

def movies_and_actors(actors, movies, 0) do
{actors, movies}
end

def movies_and_actors(actors, _movies, depth) do
new_movies = actors
|> Enum.flat_map(fn(a) ->
movies_for_actor(a) end)
|> Enum.uniq()
new_actors = new_movies
|> Enum.flat_map(fn(m) ->
actors_for_movie(m) end)
|> Enum.uniq()
movies_and_actors(new_actors, new_movies, depth-1)
end

A bit of explanation: we’re defining this as a recursive function, using the first pattern (actors, movies, 0) as the terminating condition, and the second pattern (actors, _movies, depth) is the recurrent procedure. In the recurring procedure, we use the list of actors to find 3 movies for each actor, and then we get all the actors for each of those movies. We go to the next recurrence with this new group of actors and movies. The depth variable keeps track of how deep we've gone into the recursion, ensuring that we'll exit when that value hits zero.

This solution could be improved in a couple of ways. The most obvious is that we don’t use the results of previous searches to limit the current search, so we make a lot of repetitive calls to the database. In fact, we only carry the movies from one recursion to the next so that we can return them when we hit 0. If we were concerned with performance we’d have to use a technique such as memoization to avoid making repetitive calls, but since we aren’t the naive version of the algorithm is fine. It’s worth noting in any case, because recursion is used a lot in Elixir code and it’s good to be aware of when you are falling into a common recursion pitfall.

We can improve this slightly by adding at least one other seed actor and extract our sparse subgraph:

iex(326)> DataMunger.GraphTraverse.movies_and_actors(
...(326)> ["nm0000204", "nm0000702"], [], 4)
{["nm0427136", "nm0301556", "nm0473218", ...],
["tt0084627", "tt0085426", "tt0275909", ...]}
iex(327)> {actors, movies} = v()
{["nm0427136", "nm0301556", "nm0473218", ...],
["tt0084627", "tt0085426", "tt0275909", ...]}
iex(329)> princs = movies |>
...(329)> DataMunger.GraphTraverse.principals_from_movies()
[714013, 714015, 714014, ...]
iex(330)> Enum.count(actors)
4787
iex(331)> Enum.count(movies)
1966
iex(332)> Enum.count(princs)
8137

It’s not perfect, but it’s fine.

Extract, Transform, Load

We’re now just about done. All that’s left is using the results from the functions in GraphTraverse to load data into the schemas we created in the last post. We'll look in depth at just title_principal -> movie_actor. Again, we start with a test (test/data_munger/etl_test.exs):

defmodule DataMunger.EtlTest do
use ExUnit.Case, async: true
require Ecto.Query
alias Ecto.Query
alias DataMunger.Etl
alias DataMunger.Repo
alias DataMunger.ImdbRepo
alias DataMunger.Movie
alias DataMunger.TitlePrincipal
alias DataMunger.Actor
alias DataMunger.MovieActor

setup do
:ok = Ecto.Adapters.SQL.Sandbox.checkout(DataMunger.Repo)
end

test "adds the actor-movies" do
DataMunger.Etl.load_from_seed(["nm0000204"], 1)

ids = [937006, 937007, 937009, 937008, 1024551,
1024552, 1024554, 1024553, 1029911,
1029913, 1029914, 1029912]
title_principals = Query.from(
p in TitlePrincipal,
where: p.id in ^ids,
select: {p.nconst, p.tconst})
|> ImdbRepo.all()
movie_actors = Query.from(
ma in MovieActor,
select: {ma.nconst, ma.tconst})
|> Repo.all()

assert title_principals -- movie_actors == []
assert movie_actors -- title_principals == []
assert Enum.count(movie_actors) == 12
end
end

So obviously there’s some cheating going on here, I’m counting on the movies being returned from the database to always be returned in order, and I checked before hand to see which ids I would get back from a call to GraphTraverse.movies_and_actors/3. But essentially, I want to know that I'm creating records that match on the relevant transformed fields (nconst and tconst). We put all of this together in data_munger/lib/data_munger/etl.ex:

defmodule DataMunger.Etl do
alias DataMunger.GraphTraverse
alias DataMunger.TitlePrincipal
alias DataMunger.MovieActor
alias DataMunger.Repo
alias DataMunger.ImdbRepo
import Ecto.Query, only: [from: 2]

def load_from_seed(seeds, depth) do
{names, movies} = GraphTraverse.movies_and_actors(seeds, [], depth)
principals = GraphTraverse.principals_from_movies(movies)
from(p in TitlePrincipal,
where: p.id in ^principals,
select: p)
|> ImdbRepo.all()
|> Enum.each(fn(p) ->
movie_actor_from_title_principal(p)
end)
end

defp movie_actor_from_title_principal(principal) do
%MovieActor{}
|> MovieActor.changeset(principal)
|> Repo.insert()
end
end

Similar strategies will load the names and movies tables. With this code written, we're ready to actually execute the ETL in development.

data_imdb=# \c data_munger_dev
You are now connected to database "data_munger_dev" as
data_munger_dev=# select count(*) from actors;
count
-------
0
(1 row)

data_munger_dev=# select count(*) from movies;
count
-------
0
(1 row)

data_munger_dev=# select count(*) from movie_actors;
count
-------
0
(1 row)

So at this point, if you haven’t created the indexes we created earlier and you want to run this step, definitely create the indexes! Otherwise this will absolutely not run in a reasonable period of time:

iex(1)> DataMunger.Etl.load_from_seed(["nm0000204", "nm0000702"], 4)

If we check our work in psql:

data_munger_dev=# select count(*) from actors;
count
-------
4787
(1 row)

data_munger_dev=# select count(*) from movies;
count
-------
1966
(1 row)

data_munger_dev=# select count(*) from movie_actors;
count
-------
8137
(1 row)

Now that we have the data loaded into separate databases and the data set shrunk down to the size we wanted, we can export each database. The <filepath> here has to be an absolute filepath.

data_munger_dev=# copy actors to <filepath>;
COPY 4787
data_munger_dev=# copy movies to <filepath>;
COPY 1966
data_munger_dev=# copy movie_actors to <filepath>;
COPY 8137

And finally, we’re done! Next time, we’ll look at creating our three phoenix services and launching them to heroku. Code from this post here.

--

--

J Paul Daigle
Perplexinomicon

Father, husband, code monkey, experimental mathematician and conventional musician.