call queue is full on /0.0.0.0.:60020, too many items queued? hbase


aoz...@...
 

Here is my problem:

We are using cloudera 5.7.0 with java 1.8.0_74 and we have spark 1.6.0, janusgraph 0.1.1, hbase 1.2.0.

I try to load 200Gb of graph data and for that I run the following code in gremlin shell:

:load data/call-janusgraph-schema-groovy
writeGraphPath='conf/my-janusgraph-hbase.properties'
writeGraph=JanusGraphFactory.open(writeGraphPath)
defineCallSchema(writeGraph)
writeGraph.close()

readGraph=GraphFactory.open('conf/hadoop-graph/hadoop-call-script.properties')
gRead=readGraph.traversal()
gRead.V().valueMap()

//so far so good everything works perfectly

blvp=BulkLoaderVertexProgram.build().bulkLoader(OneTimeBulkLoader).intermediateBatchSize(10000).writeGraph(writeGraphPath).create(readGraph)
readGraph.compute(SparkGraphComputer).workers(512).program(blvp).submit().get()

It starts executing the spark job and Stage-0 runs smoothly however at Stage-1 I get an Exception:

org.hbase.async.CallQueueTooBigException: Call queue is full on /0.0.0.0:60020, too many items queued ?

However spark recovers the failed tasks and completes the Stage-1 and then Stage-2 completes flawlessly. Since Spark persists the previous results in memory, Stage-3 and Stage-4 is skipped and Stage-5 is started however Stage-5 gets the same CallQueueTooBigException exceptions, nevertheless spark recovers the problem again. 

My problem is this stage (Stage-5) takes too long to execute. Actually it took 14 hours at my last run and I killed the spark job. I think this is really odd for such a little input data(200 GB). Normally my cluster is so fast that I am able to load 3 TB of data into HBase(with bulkloading via mapreduce) in 1 hour. I tried to increase the number of workers

readGraph.compute(SparkGraphComputer).workers(1024).program(blvp).submit().get()

however this time the number of CallQueueTooBigException exceptions were so high that they did not let the spark job recover from the exceptions.

Is there any way that I can decrease the runtime of the job?


Below I am giving extra materials that hopefully may lead you to the source of the problem:

Here is how I start the gremlin shell

#!/bin/bash

export JAVA_HOME=/mnt/hdfs/jdk.1.8.0_74
export HADOOP_CONF_DIR= /etc/hadoop/conf.cloudera.yarn
export YARN_HOME=/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/hadoop-yarn
export YARN_CONF_DIR=$HADOOP_CONF_DIR
export SPARK_HOME=/opt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/spark
export SPARK_CONF_DIR=$SPARK_HOME/conf


GREMLINHOME=/mnt/hdfs/janusgraph-0.1.1-hadoop2

export CLASSPATH=$YARN_HOME/*:$YARN_CONF_DIR:$SPARK_HOME/lib/*:$SPARK_CONF_DIR:$CLASSPATH

cd $GREMLINHOME
export GREMLIN_LOG_LEVEL=info
exec $GREMLINHOME/bin/gremlin.sh $*




and here is my conf/hadoop-graph/hadoop-call-script.properties file:

gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.GraphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.script.ScriptInputFormat
gremlin.hadoop.inputLocation=/user/hive/warehouse/tablex/000000_0
gremlin.hadoop.scriptInputFormat.script=/user/me/janus/script-input-call.groovy
gremlin.hadoop.outputLocation=output
gremlin.hadoop.jarsInDistributedCache=true

spark.driver.maxResultSize=8192
spark.yarn.executor.memoryOverhead=5000
spark.executor.cores=1
spark.executor.instances=1024
spark.master=yarn-client
spark.executor.memory=20g
spark.driver.memory=20g
spark.serializer=org.apache.spark.serializer.JavaSerializer


conf/my-janusgraph-hbase.properties:

gremlin.graph=org.janusgraph.core.JanusGraphFactory
storage.backend=hbase
storage.batch-loading=true
storage.hbase.region-count=1024
cluster.max-partitions=1024
cluster.partition=true

ids.block-size=10000
storage.buffer-size=10000
storage.transactions=false
ids.num-partitions=1024

storage.hbase.table=myjanus
storage.hostname=x.x.x.x
cache.db-cache=true
cache.db-cache-clean-wait=20
cache.db-cache-time=180000
cache.db-cache-size=0.5



Thx in advance,
Ali