Integrating custom vertex program with janusgraph
This is my first blog and I am very nervous!!
The purpose of writing this blog is to help people like me who are new in this space and have to get information in bits and pieces to get sample vertex program working.
To start with I have used a Vertex Program from blog where I struggled a bit to get it working. The explanation is worth reading in this blog.
Following are the simple steps to follow I hope !!!
- Clone https://github.com/JanusGraph/janusgraph.git and write your vertex program “FavorVertexProgram.java” in local path.
2. Complete code with imports is as follows for FavorVertexProgram.java
// Copyright 2017 JanusGraph Authors
// //
// // Licensed under the Apache License, Version 2.0 (the "License");
// // you may not use this file except in compliance with the License.
// // You may obtain a copy of the License at
// //
// // http://www.apache.org/licenses/LICENSE-2.0
// //
// // Unless required by applicable law or agreed to in writing, software
// // distributed under the License is distributed on an "AS IS" BASIS,
// // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// // See the License for the specific language governing permissions and
// // limitations under the License.
//
package org.janusgraph.graphdb.olap.computer;import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
import org.apache.tinkerpop.gremlin.process.computer.clustering.connected.ConnectedComponentVertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
import org.apache.tinkerpop.gremlin.process.computer.Messenger;
import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
import org.apache.tinkerpop.gremlin.process.traversal.Operator;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.apache.commons.configuration2.ConfigurationUtils;
import org.apache.commons.configuration2.BaseConfiguration;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.commons.configuration2.Configuration;import java.util.Set;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.HashMap;
import java.util.List;
import java.util.Arrays;public class FavorVertexProgram implements VertexProgram<Double> {
public static final String FAVOR = "^favor";
public static final String TOTAL_FAVOR = "^totalFavor";
private static final String VOTE_TO_HALT = "favorVertexProgram.voteToHalt";private static final Set<MemoryComputeKey> MEMORY_COMPUTE_KEYS = Collections.singleton(MemoryComputeKey.of(VOTE_TO_HALT, Operator.and, false, true));private MessageScope.Local<?> scope = MessageScope.Local.of(__::outE);
private Set<MessageScope> scopes;
private Configuration configuration;
private String nameOfStartVertrex = null;private FavorVertexProgram() {}@Override
public void loadState(Graph graph, Configuration config) {
configuration = new BaseConfiguration();
if (config != null) {
ConfigurationUtils.copy(config, configuration);
}nameOfStartVertrex = configuration.getString("name");
scopes = new HashSet<>(Collections.singletonList(scope));
}@Override
public void storeState(Configuration config) {
VertexProgram.super.storeState(config);
if (configuration != null) {
ConfigurationUtils.copy(configuration, config);
}
}@Override
public void setup(Memory memory) {
memory.set(VOTE_TO_HALT, true);
}@Override
public void execute(Vertex vertex, Messenger<Double> messenger, Memory memory) {
// through Memory we can check how many times the VertexProgram has been
// executed. that might be useful in initializing some state or controlling
// some flow or determining if it is time to terminate. in this case the first
// pass is used to calculate the "total flavor" for all vertices and to pass
// the calculated current favor forward along to incident edges only for
// the "start vertex" - in the context of the question, this starting vertex
// would be "jane"
if (memory.isInitialIteration()) {
// on the first pass, just initialize the favor and totalFavor properties
boolean startVertex = vertex.value("name").equals(nameOfStartVertrex);
double initialFavor = startVertex ? 1d : 0d;
vertex.property(VertexProperty.Cardinality.single, FAVOR, initialFavor);
vertex.property(VertexProperty.Cardinality.single, TOTAL_FAVOR,
IteratorUtils.stream(vertex.edges(Direction.OUT)).
mapToDouble(e -> e.value("weight")).sum());if (startVertex) {
Iterator<Edge> incidents = vertex.edges(Direction.OUT);// if there are no outgoing edges then from the perspective of
// this VertexProgram, there is no need to process this
// VertexProgram any further. Setting the memory key of VOTE_TO_HALT
// to true will have the effect of saying that this vertex believes
// there is nothing more to do and therefore it's ok to stop
// executing. all vertices must agree to this halting and looking
// above to the MEMORY_COMPUTE_KEYS we can see that the VOTE_TO_HALT
// key is define using an Operate.and which will have the effect of
// ANDing together the votes of all vertices to set this key in Memory.
// As such, any one vertex that sets this key to false will negate
// the termination and only a unanimous setting of true will stop it.
memory.add(VOTE_TO_HALT, !incidents.hasNext());// iterate all the outgoing edges and for each send a message to the
// adjacent vertex with the calculated "proportional favor"
while (incidents.hasNext()) {
Edge incident = incidents.next();
messenger.sendMessage(MessageScope.Global.of(incident.inVertex()),
(double) incident.value("weight") / (double) vertex.value(TOTAL_FAVOR));
}
}
} else {
// after the first iteration of the VertexProgram messages should be passing
// among vertices along outgoing edges. those messages received by the
// current vertex should be checked as they contain the incoming proportional
// favor that needs to be applied to it.
Iterator<Double> messages = messenger.receiveMessages();
boolean hasMessages = messages.hasNext();// assuming there are messages to process, those proportional favors are
// summed together as described in step 2 of the algorithm
if (hasMessages) {
double adjacentFavor = IteratorUtils.reduce(messages, 0.0d, Double::sum);
vertex.property(VertexProperty.Cardinality.single, FAVOR,
(double) vertex.value(FAVOR) + adjacentFavor);// the logic to follow is described above however we must now multiply
// the proportional favor by the adjacent favor before sending the
// message. technically, this could have been done above and this duplicate
// code likely extracted to a function - the adjacentFavor above just
// defaults to "1" and therefore has no effect on the calculation.
Iterator<Edge> incidents = vertex.edges(Direction.OUT);
memory.add(VOTE_TO_HALT, !incidents.hasNext());
while (incidents.hasNext()) {
Edge incident = incidents.next();
messenger.sendMessage(MessageScope.Global.of(incident.inVertex()),
adjacentFavor * ((double) incident.value("weight") / (double) vertex.value(TOTAL_FAVOR)));
}
}
}
}@Override
public Set<VertexComputeKey> getVertexComputeKeys() {
return new HashSet<>(Arrays.asList(
VertexComputeKey.of(FAVOR, false),
VertexComputeKey.of(TOTAL_FAVOR, false),
VertexComputeKey.of(TraversalVertexProgram.HALTED_TRAVERSERS, false)));
}@Override
public Set<MemoryComputeKey> getMemoryComputeKeys() {
return MEMORY_COMPUTE_KEYS;
}@Override
public boolean terminate(Memory memory) {
boolean voteToHalt = memory.<Boolean>get(VOTE_TO_HALT);
if (voteToHalt) {
return true;
} else {
// it is basically always assumed that the program will want to halt, but if message passing occurs, the
// program will want to continue, thus reset false values to true for future iterations
memory.set(VOTE_TO_HALT, true);
return false;
}
}@Override
public Set<MessageScope> getMessageScopes(Memory memory) {
return scopes;
}@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.VERTEX_PROPERTIES;
}@Override
@SuppressWarnings("CloneDoesntCallSuperClone,CloneDoesntDeclareCloneNotSupportedException")
public FavorVertexProgram clone() {
return this;
}@Override
public Features getFeatures() {
return new Features() {
@Override
public boolean requiresLocalMessageScopes() {
return true;
}@Override
public boolean requiresVertexPropertyAddition() {
return true;
}
};
}public static FavorVertexProgram.Builder build() {
return new FavorVertexProgram.Builder();
}// having a builder that constructs the Configuration object for the VertexProgram is helpful
// to ensure that it gets built properly and ensures a more fluent style of usages that is
// common in Gremlinpublic static final class Builder extends AbstractVertexProgramBuilder<FavorVertexProgram.Builder> {private Builder() {
super(FavorVertexProgram.class);
}public FavorVertexProgram.Builder name(final String nameOfStartVertex) {
this.configuration.setProperty("name", nameOfStartVertex);
return this;
}
}
}
3. Now build the code to get janusgraph-core.jar file in target folder
mvn clean install -rf :janusgraph-core -DskipTests=true -X -Dcheckstyle.skip
4. Run following command to get the released janusgraph v0.6.0 binary
wget https://github.com/JanusGraph/janusgraph/releases/download/v0.6.0/janusgraph-0.6.0.zip && unzip janusgraph-0.6.0.zip
5. Now overwite the generated jar file in step3 ie janusgraph-core.jar to janusgraph-0.6.0/lib of step 4.
6. Run the janusgraph and gremlin console. Follow local installation in https://docs.janusgraph.org/getting-started/installation/
7. In the gremlin console
gremlin> graph=TinkerGraph.open()
==>tinkergraph[vertices:0 edges:0]gremlin> g=graph.traversal()
==>graphtraversalsource[tinkergraph[vertices:0 edges:0], standard]gremlin> import org.janusgraph.graphdb.olap.computer.*
==>org....gremlin> g.addV('person').as('1').
property(single, 'name', 'jane').
addV('person').as('2').
property(single, 'name', 'thomas').
addV('person').as('3').
property(single, 'name', 'lisa').
addV('person').as('4').
property(single, 'name', 'wyd').
addV('person').as('5').
property(single, 'name', 'jerryd').
addE('favor').from('1').to('2').
property('weight', 10.0d).addE('favor').
from('1').to('3').property('weight', 20.0d).
addE('favor').from('3').to('2').
property('weight', 90.0d).addE('favor').
from('2').to('4').property('weight', 50.0d).
addE('favor').from('2').to('5').
property('weight', 90.0d).addE('favor').
from('3').to('5').property('weight', 100.0d).iterate()gremlin> rg=graph.computer().program(FavorVertexprogram.build().name("jane").create()).submit().get().graph().traversal()gremlin> rg.V().elementMap()
==>{id=0, label=person, ^favor=1.0, name=jane, ^totalFavor=30.0}
==>{id=2, label=person, ^favor=0.6491228070175439, name=thomas, ^totalFavor=140.0}
==>{id=4, label=person, ^favor=0.6666666666666666, name=lisa, ^totalFavor=190.0}
==>{id=6, label=person, ^favor=0.23182957393483708, name=wyd, ^totalFavor=0.0}
==>{id=8, label=person, ^favor=0.768170426065163, name=jerryd, ^totalFavor=0.0}
If you like my blog, please press a👏 button :)