Hi Everyone,
I am working on a Spark code to fetch all the edges from the Janusgraph. The JG configurations I am using are below:
conf.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph")
conf.setProperty("gremlin.hadoop.graphReader", "org.janusgraph.hadoop.formats.cql.CqlInputFormat")
conf.setProperty("gremlin.hadoop.graphWriter", "org.apache.hadoop.mapreduce.lib.output.NullOutputFormat")
conf.setProperty("spark.cassandra.connection.host", "127.0.0.1")
conf.setProperty("janusgraphmr.ioformat.conf.storage.backend", "cql")
conf.setProperty("janusgraphmr.ioformat.conf.storage.hostname", "127.0.0.1")
conf.setProperty("janusgraphmr.ioformat.conf.storage.port", 9042)
conf.setProperty("janusgraphmr.ioformat.conf.storage.cql.keyspace", "kscan_graph_db_0001")
conf.setProperty("janusgraphmr.ioformat.conf.storage.cql.read-consistency-level", "ONE")
conf.setProperty("janusgraphmr.ioformat.conf.index.search.backend", "elasticsearch")
conf.setProperty("janusgraphmr.ioformat.conf.index.search.hostname", "127.0.0.1")
conf.setProperty("janusgraphmr.ioformat.conf.index.search.port", 9200)
conf.setProperty("janusgraphmr.ioformat.conf.index.search.index-name", "kscan_graph_0001")
conf.setProperty("cassandra.input.partitioner.class", "org.apache.cassandra.dht.Murmur3Partitioner")
conf.setProperty("cassandra.input.widerows", true)
---------------------------------------------------------------------------------------------------------------------
I am using the Graph of Gods as a test graph which has not more than 30 edges but still the graph takes approx. 30 minutes to read the edges and write to json.
Below is a code snippet.
---------------------------
val rdd0: RDD[(NullWritable, VertexWritable)] = spark.sparkContext.newAPIHadoopRDD(hadoopConfiguration, classOf[CqlInputFormat], classOf[NullWritable], classOf[VertexWritable])
val rdd1: RDD[VertexWritable] = rdd0.map { case (x, y) => y.asInstanceOf[VertexWritable] }
val rdd2: RDD[Edge] = rdd1.map(x => getEdges(x)).flatMap(x => x)
val rdd3 = rdd2.filter(x => filterEdge(x))
val rdd4 = rdd3.map(x => getEdgeProperties(x)).flatMap(x => x)
import spark.implicits._
rdd4.toDF.coalesce(2).write.json("/home/work/Data/JG-SPARK-PART3.json")
val getEdges: VertexWritable => ArrayBuffer[Edge] = (vertex_wrtble:org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable) => {
var edge_list = ArrayBuffer[Edge]()
val strvrtx = vertex_wrtble.get()
val edges = strvrtx.edges(Direction.IN)
while(edges.hasNext()){
edge_list += edges.next()
}
edge_list
}
------------------------------------------------------------
Can there be any way to speedup the code by tweaking any configuration?
TIA
Rafi