Failed to find all paths between 2 vertices upon the graph having 100 million vertices and 100 million edges using SparkGraphComputer


Roy Yu <7604...@...>
 

The graph has 100 million vertices and 100 edges
Graph data is saved at HBase Table: MyHBaseTable.

The size of MyHBaseTable is 16.2GB:
root@~$ hdfs dfs -du -h /apps/hbase/data/data/default/
16.2 G   32.4 G   /apps/hbase/data/data/default/MyHBaseTable

MyHBaseTable has 190 regions, the edge data (HBase column family e ) of every region is less than 100MB (one spark task processes one region, in order to avoid spark OOM during loading region data, I use HBaseAdmin to split HBase region to make sure the edges data (HBase column family e ) of every region is less than 100MB) . Below the size of region 077288f4be4c439443bb45b0c2369d5b is more than 100MB because it has index data.
root@~$ hdfs dfs -du -h /apps/hbase/data/data/default/MyHBaseTable
3.8 K    7.6 K    /apps/hbase/data/data/default/MyHBaseTable/.tabledesc
0        0        /apps/hbase/data/data/default/MyHBaseTable/.tmp
78.3 M   156.7 M  /apps/hbase/data/data/default/MyHBaseTable/007e9dbf74f5d35862b68d6434f1d6f2
92.2 M   184.3 M  /apps/hbase/data/data/default/MyHBaseTable/077288f4be4c439443bb45b0c2369d5b
102.4 M  204.8 M  /apps/hbase/data/data/default/MyHBaseTable/0782782071e4a7f2d17800d4a0989a7f
50.6 M   101.3 M  /apps/hbase/data/data/default/MyHBaseTable/07e795022e56a969ede48c9c23fbbc7c
50.6 M   101.3 M  /apps/hbase/data/data/default/MyHBaseTable/084e54e61bbcfc2decd14dcbac55bc50
99.7 M   199.4 M  /apps/hbase/data/data/default/MyHBaseTable/0a85ae356b19c605d9a32b9bf513bcbb
431.3 M  862.6 M  /apps/hbase/data/data/default/MyHBaseTable/0b024c812acfa6efaa40e1cca232e192
5.0 K    10.1 K   /apps/hbase/data/data/default/MyHBaseTable/0c2d8e3a6daaa8ab30c399783e343890
...


the properties of the graph:
gremlin.graph=org.janusgraph.core.JanusGraphFactory
cluster.max-partitions = 16
storage.backend=hbase
storage.hbase.table=MyHBaseTable
storage.hbase.ext.zookeeper.znode.parent=/hbase-unsecure
schema.default=none

storage.hostname=master001,master002,master003
storage.port=2181
storage.hbase.region-count=64
storage.write-time=1000000
storage.read-time=100000

ids.block-size=200000
ids.renew-timeout=600000
ids.renew-percentage=0.4
ids.authority.conflict-avoidance-mode=GLOBAL_AUTO

index.search.backend=elasticsearch
index.search.hostname=es001,es002,es003
index.search.elasticsearch.create.ext.index.number_of_shards=15
index.search.elasticsearch.create.ext.index.refresh_interval=-1
index.search.elasticsearch.create.ext.index.translog.sync_interval=5000s
index.search.elasticsearch.create.ext.index.translog.durability=async
index.search.elasticsearch.create.ext.index.number_of_replicas=0
index.search.elasticsearch.create.ext.index.shard.check_on_startup=false


the schema of the graph:
def defineSchema(graph) {
    m = graph.openManagement()

        node = m.makeVertexLabel("node").make()

        relation = m.makeEdgeLabel("relation").make()
        obj_type_value = m.makePropertyKey("obj_type_value").dataType(String.class).make()

    // edge props
        start_time = m.makePropertyKey("start_time").dataType(Date.class).make()
        end_time = m.makePropertyKey("end_time").dataType(Date.class).make()
        count = m.makePropertyKey("count").dataType(Integer.class).make()
        rel_type = m.makePropertyKey("rel_type").dataType(String.class).make()
    //index
        m.buildIndex("MyHBaseTable_obj_type_value_Index", Vertex.class).addKey(obj_type_value).unique().buildCompositeIndex()
        m.buildIndex("MyHBaseTable_rel_type_index", Edge.class).addKey(rel_type).buildCompositeIndex()
        m.buildIndex("MyHBaseTable_count_index", Edge.class).addKey(count).buildMixedIndex("search")
        m.buildIndex("MyHBaseTable_start_time_index", Edge.class).addKey(start_time).buildMixedIndex("search")
        m.buildIndex("MyHBaseTable_end_time_index", Edge.class).addKey(end_time).buildMixedIndex("search")
    m.commit()
}

the Gremlin I use to find all paths between 2 vertices:

import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.process.traversal.P;
def executeScript(graph){
    traversal = graph.traversal().withComputer(SparkGraphComputer.class);
    return traversal.V(624453904).repeat(__.both().simplePath()).until(__.hasId(192204064).or().loops().is(200)).hasId(192204064).path().dedup().limit(1000).toList()
    //return traversal.V().where(__.outE().count().is(P.gte(50000))).id().toList()
};

The OLAP spark graph conf:
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphReader=org.janusgraph.hadoop.formats.hbase.HBaseInputFormat
gremlin.hadoop.graphWriter=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat

gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output
gremlin.spark.graphStorageLevel=DISK_ONLY
gremlin.spark.persistStorageLevel=DISK_ONLY
####################################
# JanusGraph HBase InputFormat configuration
####################################
janusgraphmr.ioformat.conf.storage.backend=hbase
janusgraphmr.ioformat.conf.storage.hostname=master002,master003,master001
janusgraphmr.ioformat.conf.storage.hbase.table=MyHBaseTable
janusgraphmr.ioformat.conf.storage.hbase.ext.zookeeper.znode.parent=/hbase-unsecure

####################################
# SparkGraphComputer Configuration #
####################################
spark.master=yarn
spark.submit.deployMode=client
spark.yarn.jars=hdfs://GRAPHOLAP/user/spark/jars/*.jar

# the Spark YARN ApplicationManager needs this to resolve classpath it sends to the executors
spark.yarn.appMasterEnv.JAVA_HOME=/usr/local/jdk1.8.0_191/
spark.yarn.appMasterEnv.HADOOP_CONF_DIR=/usr/hdp/3.1.4.0-315/hadoop/conf
spark.yarn.am.extraJavaOptions=-Diop.version=3.1.4.0-315 -Djava.library.path=/usr/hdp/current/hadoop-client/lib/native
spark.executor.memoryOverhead=5G
spark.driver.extraJavaOptions=-Diop.version=3.1.4.0-315 -Djava.library.path=/usr/hdp/current/hadoop-client/lib/native

# the Spark Executors (on the work nodes) needs this to resolve classpath to run Spark tasks
spark.executorEnv.JAVA_HOME=/usr/local/jdk1.8.0_191/
#spark.executorEnv.HADOOP_CONF_DIR=/usr/hdp/3.1.4.0-315/hadoop/conf
spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:MaxGCPauseMillis=500 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/mnt/data_1/log/spark2/gc-spark%p.log
spark.executor.cores=1
spark.executor.memory=80G
spark.executor.instances=3
spark.executor.extraClassPath=/etc/hadoop/conf:/usr/spark/jars:/usr/hdp/current/hbase-client/lib:/usr/janusgraph/0.4.0/lib

spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.network.timeout=1000000
spark.rpc.askTimeout=1000000
spark.shuffle.service.enabled=true
spark.shuffle.service.port=7447
spark.maxRemoteBlockSizeFetchToMem=10485760
spark.memory.useLegacyMode=true
spark.shuffle.memoryFraction=0.1
spark.storage.memoryFraction=0.1
spark.memory.fraction=0.1
spark.memory.storageFraction=0.1
spark.shuffle.accurateBlockThreshold=1048576


The spark job failed at stage 50 :
20/12/30 01:53:00 ERROR executor.Executor: Exception in task 40.0 in stage 50.0 (TID 192084)
java.lang.OutOfMemoryError: Java heap space
        at sun.reflect.generics.repository.ClassRepository.getSuperInterfaces(ClassRepository.java:114)
        at java.lang.Class.getGenericInterfaces(Class.java:913)
        at java.util.HashMap.comparableClassFor(HashMap.java:351)
        at java.util.HashMap$TreeNode.treeify(HashMap.java:1932)
        at java.util.HashMap.treeifyBin(HashMap.java:772)
        at java.util.HashMap.putVal(HashMap.java:644)
        at java.util.HashMap.put(HashMap.java:612)
        at java.util.Collections$SynchronizedMap.put(Collections.java:2588)
        at org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet.add(TraverserSet.java:90)
        at org.apache.tinkerpop.gremlin.process.computer.traversal.WorkerExecutor.lambda$drainStep$4(WorkerExecutor.java:232)
        at org.apache.tinkerpop.gremlin.process.computer.traversal.WorkerExecutor$$Lambda$86/877696627.accept(Unknown Source)
        at java.util.Iterator.forEachRemaining(Iterator.java:116)
        at org.apache.tinkerpop.gremlin.process.computer.traversal.WorkerExecutor.drainStep(WorkerExecutor.java:221)
        at org.apache.tinkerpop.gremlin.process.computer.traversal.WorkerExecutor.execute(WorkerExecutor.java:151)
        at org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram.execute(TraversalVertexProgram.java:307)
        at org.apache.tinkerpop.gremlin.spark.process.computer.SparkExecutor.lambda$null$4(SparkExecutor.java:118)
        at org.apache.tinkerpop.gremlin.spark.process.computer.SparkExecutor$$Lambda$72/1209554928.apply(Unknown Source)
        at org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils$3.next(IteratorUtils.java:247)
        at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)


From the log it seems there is too much data even the 80G executor heap is not enough. 
Anybody who can help me ?  Anybody who has idea to find all the paths between 2 vertices upon large graph?

Join janusgraph-users@lists.lfaidata.foundation to automatically receive all group messages.