JanusGraph REINDEXING with MapReduce job


Debasish Kanhar <d.k...@...>
 

Thanks @pluradj for helping out. The problem we were facing while using JanusGraph is that, the data we have loaded in our system was loaded one time, as a batch job quite early. We have a dataset with around 1.8 billion nodes. The data loading speed is pretty slower (When it was done first time) and made use of only selective composite index at that time. Thus when we are creating microservices around multiple queries, not all the microservices are quick enough, i.e. their response times are huge and unacceptable.
Upon profiling, we can see that JanusGraph and query in itself didn't make use of proper index per our understanding like hasNot() or has(property, null) does optimizations instead of using index queries thus making the responses slower. Avoiding that can be a way to create meta properties and defaulting them to single value. Like instead of has(property, null) you can create a traversal which replaces null with a default value like g.V().has("nodelabel", "vertex").as("a").property("property", coalesce(select("a").values("property"), -999).iterate() and then querying like g.V().has("property", -999) and such queries will make use of index, and optimize the traversal time as well.
Now, sometime, if we follow such process, we may need to create additional properties which weren't defined as part of initial schema. Like if we want to Query those vertex which doesn't have a particular type of edge the query will fall back to issue previously mentioned, and the query won't be fast. Workaround it will be create a property edge_count_on_vertex and store the number of edge count in it. If 0, it means that the vertex doesn't have the edge which is same as hasNot(__.bothE("edgeLabel")) in traversal.
Since, it requires creating a new property, the same new property needs to be reindex as well. Ideally we want schema to be defined apriori and avoid re-indexing as much as possible, but in our case that's not possible, hence we had to reindex our data. So the issue we faced was related to re-indexing and above was background into the problem
The following is snippet of conversation b/w me and @pluradj
So, Reindexing is an expensive job. Our data size is around 1.8 billion vertices (This is for Lighthouse project as well)I know traditional way of reindexing using following steps:
CREATE INDEX:
// Create an index
mgmt = graph.openManagement()

deletedOn = mgmt.getPropertyKey("deletedOn")
expirationDate = mgmt.getPropertyKey("expirationDate")
vertexLabel = mgmt.getPropertyKey("vertexLabel")
videoMixedIndex = mgmt.buildIndex('byVideoCombo1Mixed_2', Vertex.class).addKey(deletedOn).addKey(expirationDate).addKey(vertexLabel).buildMixedIndex("search")
mgmt.commit()

graph.tx().rollback()

# REINDEX
//Wait for the index to become available
ManagementSystem.awaitGraphIndexStatus(graph, 'byVideoCombo1Mixed_2').call()
//Reindex the existing data
mgmt = graph.openManagement()
mgmt.updateIndex(mgmt.getGraphIndex("byVideoCombo1Mixed_2"), SchemaAction.REINDEX).get()
mgmt.commit()
[1:24 PM]
I think this makes use of JanusGraphManagement to do reindexing on single machine (From docs as well) spawns a single-machine OLAP job so as expected this will be really slow for the scale of data we are talking about.
[1:25 PM]
I think there is also a way to reindex data using MapReduce job right? How do we do that? I think this was part of new versions. Per docs (https://docs.janusgraph.org/index-management/index-reindexing/) we can do following:
mgmt = graph.openManagement()
mr = new MapReduceIndexManagement(graph)
mr.updateIndex(mgmt.getRelationIndex(mgmt.getRelationType("battled"), "battlesByTime"), SchemaAction.REINDEX).get()
mgmt.commit()
[1:27 PM]
But, gremlin console throws an exception when I run```
mr = new MapReduceIndexManagement(graph)

I'm using `JanusGraph 0.3.2`

gremlin>  mr = new MapReduceIndexManagement(graph)
groovysh_evaluate: 3: unable to resolve class MapReduceIndexManagement
```
JASON:
you need to install the Hadoop-Gremlin plugin into the Gremlin Console http://tinkerpop.apache.org/docs/3.3.3/reference/#_installing_hadoop_gremlin
:plugin use tinkerpop.hadoop
Well, using that as well didn't optimize reindexing time much, but there was certain reduction in reinding step. Also, as a workaround, we restricted index creation to vertexLabel to reduce scope of reindexing and making it finish in tangiable time.
Hope this helps someone else who also needs some help on reindexing


priyanka...@...
 

I am trying toreindex data using mapreduce job. I have checked at many places online, but could not find any solution for this.

 But i am getting the below error:


10:42:45 WARN  org.apache.hadoop.mapreduce.JobSubmitter  - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
10:42:50 DEBUG org.janusgraph.diskstorage.hbase.HBaseStoreManager  - Substituted default CF name "edgestore" with short form "e" to reduce HBase KeyValue size
10:42:50 WARN  org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper  - Unable to create ZooKeeper Connection
java.net.UnknownHostException: [abc.com: invalid IPv6 address
        at java.net.InetAddress.getAllByName(InetAddress.java:1147)
        at java.net.InetAddress.getAllByName(InetAddress.java:1127)
        at org.apache.hadoop.hbase.shaded.org.apache.zookeeper.client.StaticHostProvider.<init>(StaticHostProvider.java:61)
        at org.apache.hadoop.hbase.shaded.org.apache.zookeeper.ZooKeeper.<init>(ZooKeeper.java:445)
        at org.apache.hadoop.hbase.shaded.org.apache.zookeeper.ZooKeeper.<init>(ZooKeeper.java:380)
        at org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.checkZk(RecoverableZooKeeper.java:141)
        at org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.<init>(RecoverableZooKeeper.java:128)
        at org.apache.hadoop.hbase.zookeeper.ZKUtil.connect(ZKUtil.java:137)
        at org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher.<init>(ZooKeeperWatcher.java:185)
        at org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher.<init>(ZooKeeperWatcher.java:153)
        at org.apache.hadoop.hbase.client.ZooKeeperKeepAliveConnection.<init>(ZooKeeperKeepAliveConnection.java:43)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getKeepAliveZooKeeperWatcher(ConnectionManager.java:1690)
        at org.apache.hadoop.hbase.client.ZooKeeperRegistry.getClusterId(ZooKeeperRegistry.java:104)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.retrieveClusterId(ConnectionManager.java:905)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.<init>(ConnectionManager.java:648)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:238)
        at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:218)
        at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)
        at org.apache.hadoop.hbase.mapreduce.TableInputFormat.initialize(TableInputFormat.java:185)
        at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:241)
        at org.apache.hadoop.hbase.mapreduce.TableInputFormat.getSplits(TableInputFormat.java:239)
        at org.janusgraph.hadoop.formats.hbase.HBaseBinaryInputFormat.getSplits(HBaseBinaryInputFormat.java:58


On Sunday, October 20, 2019 at 2:37:24 PM UTC+5:30, Debasish Kanhar wrote:
Thanks @pluradj for helping out. The problem we were facing while using JanusGraph is that, the data we have loaded in our system was loaded one time, as a batch job quite early. We have a dataset with around 1.8 billion nodes. The data loading speed is pretty slower (When it was done first time) and made use of only selective composite index at that time. Thus when we are creating microservices around multiple queries, not all the microservices are quick enough, i.e. their response times are huge and unacceptable.
Upon profiling, we can see that JanusGraph and query in itself didn't make use of proper index per our understanding like hasNot() or has(property, null) does optimizations instead of using index queries thus making the responses slower. Avoiding that can be a way to create meta properties and defaulting them to single value. Like instead of has(property, null) you can create a traversal which replaces null with a default value like g.V().has("nodelabel", "vertex").as("a").property("property", coalesce(select("a").values("property"), -999).iterate() and then querying like g.V().has("property", -999) and such queries will make use of index, and optimize the traversal time as well.
Now, sometime, if we follow such process, we may need to create additional properties which weren't defined as part of initial schema. Like if we want to Query those vertex which doesn't have a particular type of edge the query will fall back to issue previously mentioned, and the query won't be fast. Workaround it will be create a property edge_count_on_vertex and store the number of edge count in it. If 0, it means that the vertex doesn't have the edge which is same as hasNot(__.bothE("edgeLabel")) in traversal.
Since, it requires creating a new property, the same new property needs to be reindex as well. Ideally we want schema to be defined apriori and avoid re-indexing as much as possible, but in our case that's not possible, hence we had to reindex our data. So the issue we faced was related to re-indexing and above was background into the problem
The following is snippet of conversation b/w me and @pluradj
So, Reindexing is an expensive job. Our data size is around 1.8 billion vertices (This is for Lighthouse project as well)I know traditional way of reindexing using following steps:
CREATE INDEX:
// Create an index
mgmt = graph.openManagement()

deletedOn = mgmt.getPropertyKey("deletedOn")
expirationDate = mgmt.getPropertyKey("expirationDate")
vertexLabel = mgmt.getPropertyKey("vertexLabel")
videoMixedIndex = mgmt.buildIndex('byVideoCombo1Mixed_2', Vertex.class).addKey(deletedOn).addKey(expirationDate).addKey(vertexLabel).buildMixedIndex("search")
mgmt.commit()

graph.tx().rollback()

# REINDEX
//Wait for the index to become available
ManagementSystem.awaitGraphIndexStatus(graph, 'byVideoCombo1Mixed_2').call()
//Reindex the existing data
mgmt = graph.openManagement()
mgmt.updateIndex(mgmt.getGraphIndex("byVideoCombo1Mixed_2"), SchemaAction.REINDEX).get()
mgmt.commit()
[1:24 PM]
I think this makes use of JanusGraphManagement to do reindexing on single machine (From docs as well) spawns a single-machine OLAP job so as expected this will be really slow for the scale of data we are talking about.
[1:25 PM]
I think there is also a way to reindex data using MapReduce job right? How do we do that? I think this was part of new versions. Per docs (https://docs.janusgraph.org/index-management/index-reindexing/) we can do following:
mgmt = graph.openManagement()
mr = new MapReduceIndexManagement(graph)
mr.updateIndex(mgmt.getRelationIndex(mgmt.getRelationType("battled"), "battlesByTime"), SchemaAction.REINDEX).get()
mgmt.commit()
[1:27 PM]
But, gremlin console throws an exception when I run```
mr = new MapReduceIndexManagement(graph)

I'm using `JanusGraph 0.3.2`

gremlin>  mr = new MapReduceIndexManagement(graph)
groovysh_evaluate: 3: unable to resolve class MapReduceIndexManagement
```
JASON:
you need to install the Hadoop-Gremlin plugin into the Gremlin Console http://tinkerpop.apache.org/docs/3.3.3/reference/#_installing_hadoop_gremlin
:plugin use tinkerpop.hadoop
Well, using that as well didn't optimize reindexing time much, but there was certain reduction in reinding step. Also, as a workaround, we restricted index creation to vertexLabel to reduce scope of reindexing and making it finish in tangiable time.
Hope this helps someone else who also needs some help on reindexing