Date
1 - 3 of 3
Failed to find all paths between 2 vertices upon the graph having 100 million vertices and 100 million edges using SparkGraphComputer
Roy Yu <7604...@...>
Hi Marc
toggle quoted message
Show quoted text
My graph has 100 million edges not 100 edges. Sorry for my miswriting. From your advice I think I need to do two things. Firstly I need to dig into ConnectedComponentVertexProgram and manage how to write my own VertexProgram. Seondly, implement the VertexProgram path finding logic, about which I have no idea. As the path between 2 vertices on the graph containing 100 million edges could be easily explode. I have no memory or even disk to store all the results. Could you give your solution in detail? On Saturday, January 2, 2021 at 6:21:06 PM UTC+8 HadoopMarc wrote:
|
|
HadoopMarc <bi...@...>
Hi Roy, Nice to see you back here, still going strong! I guess the TraversalVertexProgram used for OLAP traversals is not well suited to your use case. You must realize that 200 stages in an OLAP traversal is a fairly extreme. I assume you edge count is 100 million and not 100. So, the number of paths between two vertices could easily explode and the storage of associated java objects (Traversers in the stacktrace) could grow beyond 80 Gb. It would be relatively easy to write your own VertexProgram for this simple traversal (you can take the ConnectedComponentVertexProgram as an example). See also the explanation in the corresponding recipe. This will give you far more control over data structures and their memory usage. Best wishes, Marc Op zaterdag 2 januari 2021 om 06:53:08 UTC+1 schreef Roy Yu:
|
|
Roy Yu <7604...@...>
The graph has 100 million vertices and 100 edges Graph data is saved at HBase Table: MyHBaseTable. The size of MyHBaseTable is 16.2GB: root@~$ hdfs dfs -du -h /apps/hbase/data/data/default/ 16.2 G 32.4 G /apps/hbase/data/data/default/MyHBaseTable MyHBaseTable has 190 regions, the edge data (HBase column family e ) of every region is less than 100MB (one spark task processes one region, in order to avoid spark OOM during loading region data, I use HBaseAdmin to split HBase region to make sure the edges data (HBase column family e ) of every region is less than 100MB) . Below the size of region 077288f4be4c439443bb45b0c2369d5b is more than 100MB because it has index data. root@~$ hdfs dfs -du -h /apps/hbase/data/data/default/MyHBaseTable 3.8 K 7.6 K /apps/hbase/data/data/default/MyHBaseTable/.tabledesc 0 0 /apps/hbase/data/data/default/MyHBaseTable/.tmp 78.3 M 156.7 M /apps/hbase/data/data/default/MyHBaseTable/007e9dbf74f5d35862b68d6434f1d6f2 92.2 M 184.3 M /apps/hbase/data/data/default/MyHBaseTable/077288f4be4c439443bb45b0c2369d5b 102.4 M 204.8 M /apps/hbase/data/data/default/MyHBaseTable/0782782071e4a7f2d17800d4a0989a7f 50.6 M 101.3 M /apps/hbase/data/data/default/MyHBaseTable/07e795022e56a969ede48c9c23fbbc7c 50.6 M 101.3 M /apps/hbase/data/data/default/MyHBaseTable/084e54e61bbcfc2decd14dcbac55bc50 99.7 M 199.4 M /apps/hbase/data/data/default/MyHBaseTable/0a85ae356b19c605d9a32b9bf513bcbb 431.3 M 862.6 M /apps/hbase/data/data/default/MyHBaseTable/0b024c812acfa6efaa40e1cca232e192 5.0 K 10.1 K /apps/hbase/data/data/default/MyHBaseTable/0c2d8e3a6daaa8ab30c399783e343890 ... the properties of the graph: gremlin.graph=org.janusgraph.core.JanusGraphFactory cluster.max-partitions = 16 storage.backend=hbase storage.hbase.table=MyHBaseTable storage.hbase.ext.zookeeper.znode.parent=/hbase-unsecure schema.default=none storage.hostname=master001,master002,master003 storage.port=2181 storage.hbase.region-count=64 storage.write-time=1000000 storage.read-time=100000 ids.block-size=200000 ids.renew-timeout=600000 ids.renew-percentage=0.4 ids.authority.conflict-avoidance-mode=GLOBAL_AUTO index.search.backend=elasticsearch index.search.hostname=es001,es002,es003 index.search.elasticsearch.create.ext.index.number_of_shards=15 index.search.elasticsearch.create.ext.index.refresh_interval=-1 index.search.elasticsearch.create.ext.index.translog.sync_interval=5000s index.search.elasticsearch.create.ext.index.translog.durability=async index.search.elasticsearch.create.ext.index.number_of_replicas=0 index.search.elasticsearch.create.ext.index.shard.check_on_startup=false the schema of the graph: def defineSchema(graph) { m = graph.openManagement() node = m.makeVertexLabel("node").make() relation = m.makeEdgeLabel("relation").make() obj_type_value = m.makePropertyKey("obj_type_value").dataType(String.class).make() // edge props start_time = m.makePropertyKey("start_time").dataType(Date.class).make() end_time = m.makePropertyKey("end_time").dataType(Date.class).make() count = m.makePropertyKey("count").dataType(Integer.class).make() rel_type = m.makePropertyKey("rel_type").dataType(String.class).make() //index m.buildIndex("MyHBaseTable_obj_type_value_Index", Vertex.class).addKey(obj_type_value).unique().buildCompositeIndex() m.buildIndex("MyHBaseTable_rel_type_index", Edge.class).addKey(rel_type).buildCompositeIndex() m.buildIndex("MyHBaseTable_count_index", Edge.class).addKey(count).buildMixedIndex("search") m.buildIndex("MyHBaseTable_start_time_index", Edge.class).addKey(start_time).buildMixedIndex("search") m.buildIndex("MyHBaseTable_end_time_index", Edge.class).addKey(end_time).buildMixedIndex("search") m.commit() } the Gremlin I use to find all paths between 2 vertices: import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; import org.apache.tinkerpop.gremlin.process.traversal.P; def executeScript(graph){ traversal = graph.traversal().withComputer(SparkGraphComputer.class); return traversal.V(624453904).repeat(__.both().simplePath()).until(__.hasId(192204064).or().loops().is(200)).hasId(192204064).path().dedup().limit(1000).toList() //return traversal.V().where(__.outE().count().is(P.gte(50000))).id().toList() }; The OLAP spark graph conf: gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph gremlin.hadoop.graphReader=org.janusgraph.hadoop.formats.hbase.HBaseInputFormat gremlin.hadoop.graphWriter=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat gremlin.hadoop.jarsInDistributedCache=true gremlin.hadoop.inputLocation=none gremlin.hadoop.outputLocation=output gremlin.spark.graphStorageLevel=DISK_ONLY gremlin.spark.persistStorageLevel=DISK_ONLY #################################### # JanusGraph HBase InputFormat configuration #################################### janusgraphmr.ioformat.conf.storage.backend=hbase janusgraphmr.ioformat.conf.storage.hostname=master002,master003,master001 janusgraphmr.ioformat.conf.storage.hbase.table=MyHBaseTable janusgraphmr.ioformat.conf.storage.hbase.ext.zookeeper.znode.parent=/hbase-unsecure #################################### # SparkGraphComputer Configuration # #################################### spark.master=yarn spark.submit.deployMode=client spark.yarn.jars=hdfs://GRAPHOLAP/user/spark/jars/*.jar # the Spark YARN ApplicationManager needs this to resolve classpath it sends to the executors spark.yarn.appMasterEnv.JAVA_HOME=/usr/local/jdk1.8.0_191/ spark.yarn.appMasterEnv.HADOOP_CONF_DIR=/usr/hdp/3.1.4.0-315/hadoop/conf spark.yarn.am.extraJavaOptions=-Diop.version=3.1.4.0-315 -Djava.library.path=/usr/hdp/current/hadoop-client/lib/native spark.executor.memoryOverhead=5G spark.driver.extraJavaOptions=-Diop.version=3.1.4.0-315 -Djava.library.path=/usr/hdp/current/hadoop-client/lib/native # the Spark Executors (on the work nodes) needs this to resolve classpath to run Spark tasks spark.executorEnv.JAVA_HOME=/usr/local/jdk1.8.0_191/ #spark.executorEnv.HADOOP_CONF_DIR=/usr/hdp/3.1.4.0-315/hadoop/conf spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:MaxGCPauseMillis=500 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/mnt/data_1/log/spark2/gc-spark%p.log spark.executor.cores=1 spark.executor.memory=80G spark.executor.instances=3 spark.executor.extraClassPath=/etc/hadoop/conf:/usr/spark/jars:/usr/hdp/current/hbase-client/lib:/usr/janusgraph/0.4.0/lib spark.serializer=org.apache.spark.serializer.KryoSerializer spark.network.timeout=1000000 spark.rpc.askTimeout=1000000 spark.shuffle.service.enabled=true spark.shuffle.service.port=7447 spark.maxRemoteBlockSizeFetchToMem=10485760 spark.memory.useLegacyMode=true spark.shuffle.memoryFraction=0.1 spark.storage.memoryFraction=0.1 spark.memory.fraction=0.1 spark.memory.storageFraction=0.1 spark.shuffle.accurateBlockThreshold=1048576 The spark job failed at stage 50 : 20/12/30 01:53:00 ERROR executor.Executor: Exception in task 40.0 in stage 50.0 (TID 192084) java.lang.OutOfMemoryError: Java heap space at sun.reflect.generics.repository.ClassRepository.getSuperInterfaces(ClassRepository.java:114) at java.lang.Class.getGenericInterfaces(Class.java:913) at java.util.HashMap.comparableClassFor(HashMap.java:351) at java.util.HashMap$TreeNode.treeify(HashMap.java:1932) at java.util.HashMap.treeifyBin(HashMap.java:772) at java.util.HashMap.putVal(HashMap.java:644) at java.util.HashMap.put(HashMap.java:612) at java.util.Collections$SynchronizedMap.put(Collections.java:2588) at org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet.add(TraverserSet.java:90) at org.apache.tinkerpop.gremlin.process.computer.traversal.WorkerExecutor.lambda$drainStep$4(WorkerExecutor.java:232) at org.apache.tinkerpop.gremlin.process.computer.traversal.WorkerExecutor$$Lambda$86/877696627.accept(Unknown Source) at java.util.Iterator.forEachRemaining(Iterator.java:116) at org.apache.tinkerpop.gremlin.process.computer.traversal.WorkerExecutor.drainStep(WorkerExecutor.java:221) at org.apache.tinkerpop.gremlin.process.computer.traversal.WorkerExecutor.execute(WorkerExecutor.java:151) at org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram.execute(TraversalVertexProgram.java:307) at org.apache.tinkerpop.gremlin.spark.process.computer.SparkExecutor.lambda$null$4(SparkExecutor.java:118) at org.apache.tinkerpop.gremlin.spark.process.computer.SparkExecutor$$Lambda$72/1209554928.apply(Unknown Source) at org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils$3.next(IteratorUtils.java:247) at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) From the log it seems there is too much data even the 80G executor heap is not enough. Anybody who can help me ? Anybody who has idea to find all the paths between 2 vertices upon large graph? |
|