Re: JanusGraph Spark read edges

"" <rafi1...@...>

Screenshot_JGSPARK-LINEAR (1).jpgScreenshot_JGSPARK-PARALLEL (1).jpg

Hi Marc, 

I tried the spark persist() method to avoid fetching of data from Cassandra again and again but no difference on the number of partitions and the tasks. I have attached two images of the spark job , one for the spark-shell run, command by command and one for the spark job as a jar. If you notice the highlighted part in both the images, you can see that in the jar run, the tasks are running synchronously, therefore 257 tasks take up almost 30 minutes while in the spark-shell run 7-8 tasks are running parallely therefore the entire job gets completed in almost 3 minutes. Not sure of the reason for such behaviour.

On Monday, August 31, 2020 at 5:07:36 PM UTC+5:30 HadoopMarc wrote:
Hi Rafi,
Do these 257 tasks also correspond to the number of partitions (the largest number of tasks in any stage)? That would be strange given the supposed small number of vertices in the graph.

You can try and add a spark persist() step directly after the reading and before the count. Then you are sure that reading from Cassandra is not repeated.


Op maandag 31 augustus 2020 om 07:30:15 UTC+2 schreef ra...@...:
HI Marc,

Thanks for the suggestion. I did some analysis regarding the job and here's what I found.
If I simply do an rdd2.count() on spark-shell, it will get completed within 3 minutes but if I perform the rdd4.write operation, it takes almost 30 minutes to complete. There are 257 tasks created in both the cases. But it seems like the rdd2.count() operations executes 7-8 tasks parallely while the rdd4.write operation executes tasks one by one and therefore all the tasks take up almost 30 minutes. I'm still not sure what might be the reason for this.

On Friday, August 28, 2020 at 12:25:40 AM UTC+5:30 HadoopMarc wrote:
Hi Rafi,

I agree that this takes far too long and should rather happen in 10 seconds or (most of it spent on class loading.). I would investigate further what really happens. Look in the spark web UI what tasks are taking the most time or whether disk spilling occurs. Print/log a simple rdd.count() on line 2 to be sure the problem is in the InputFormat. Be sure that a simple gremlin OLTP g.V().count() query on this keyspace returns instantly, etc.

HTH,    Marc

Op zondag 23 augustus 2020 om 10:13:11 UTC+2 schreef ra...@...:
Hi Everyone,

I am working on a Spark code to fetch all the edges from the Janusgraph. The JG configurations I am using are below:

conf.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph")
conf.setProperty("gremlin.hadoop.graphReader", "org.janusgraph.hadoop.formats.cql.CqlInputFormat")
conf.setProperty("gremlin.hadoop.graphWriter", "org.apache.hadoop.mapreduce.lib.output.NullOutputFormat")
conf.setProperty("", "")
conf.setProperty("", "cql")
conf.setProperty("", "")
conf.setProperty("", 9042)
conf.setProperty("", "kscan_graph_db_0001")
conf.setProperty("", "ONE")
conf.setProperty("", "elasticsearch")
conf.setProperty("", "")
conf.setProperty("", 9200)
conf.setProperty("", "kscan_graph_0001")
conf.setProperty("cassandra.input.partitioner.class", "org.apache.cassandra.dht.Murmur3Partitioner")
conf.setProperty("cassandra.input.widerows", true)
I am using the Graph of Gods as a test graph which has not more than 30 edges but still the graph takes approx. 30 minutes to read the edges and write to json.

Below is  a code snippet.
val rdd0: RDD[(NullWritable, VertexWritable)] = spark.sparkContext.newAPIHadoopRDD(hadoopConfiguration, classOf[CqlInputFormat], classOf[NullWritable], classOf[VertexWritable])
val rdd1: RDD[VertexWritable] = { case (x, y) => y.asInstanceOf[VertexWritable] }

val rdd2: RDD[Edge] = => getEdges(x)).flatMap(x => x)

val rdd3 = rdd2.filter(x => filterEdge(x))

val rdd4 = => getEdgeProperties(x)).flatMap(x => x)

import spark.implicits._

val getEdges: VertexWritable => ArrayBuffer[Edge] = ( => {
var edge_list = ArrayBuffer[Edge]()
val strvrtx = vertex_wrtble.get()
val edges = strvrtx.edges(Direction.IN)
edge_list +=
Can there be any way to speedup the code by tweaking any configuration?



Join { to automatically receive all group messages.