rafi ansari <rafi1...@...>
Hi All,
I am currently working on using Janusgraph in batch mode using Spark.
I am facing a problem on filtering the edges by label.
Below are the specifications: Spark = 2.4.5 Janusgraph = 0.5.0
Below is the configuration file for Spark:
conf.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph")conf.setProperty("gremlin.hadoop.graphReader", "org.janusgraph.hadoop.formats.cql.CqlInputFormat")conf.setProperty("gremlin.hadoop.graphWriter", "org.apache.hadoop.mapreduce.lib.output.NullOutputFormat")conf.setProperty("spark.cassandra.connection.host", "127.0.0.1")conf.setProperty("janusgraphmr.ioformat.conf.storage.backend", "cql")conf.setProperty("janusgraphmr.ioformat.conf.storage.hostname", "127.0.0.1")conf.setProperty("janusgraphmr.ioformat.conf.storage.port", 9042)conf.setProperty("janusgraphmr.ioformat.conf.storage.cql.keyspace", "graph_db_1")conf.setProperty("janusgraphmr.ioformat.conf.index.search.backend", "elasticsearch")conf.setProperty("janusgraphmr.ioformat.conf.index.search.hostname", "127.0.0.1")conf.setProperty("janusgraphmr.ioformat.conf.index.search.port", 9200)conf.setProperty("janusgraphmr.ioformat.conf.index.search.index-name", "graph_1")conf.setProperty("cassandra.input.partitioner.class","org.apache.cassandra.dht.Murmur3Partitioner")conf.setProperty("cassandra.input.widerows",true)conf.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")conf.setProperty("spark.kryo.registrator", "org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator")
Below is the Spark code using newAPIHadoopRDD
val hadoopConfiguration = ConfUtil.makeHadoopConfiguration(conf)
val rdd: RDD[(NullWritable, VertexWritable)] =spark.sparkContext.newAPIHadoopRDD(hadoopConfiguration,hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, classOf[InputFormat[NullWritable, VertexWritable]]).asInstanceOf[Class[InputFormat[NullWritable, VertexWritable]]],classOf[NullWritable], classOf[VertexWritable])
The above lines give an RDD as output.
rdd: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.NullWritable, org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable)]
rdd.map{case (x,y)=>y.asInstanceOf[VertexWritable]}res17: Array[String] = Array(v[8344], v[12440], v[4336], v[4320], v[4136], v[8416], v[8192], v[4248], v[4344], v[8432], v[12528], v[4096])
From the res17 above, not sure how to filter the edges by labels
TIA
Regards
Rafi
|
|
Hi Rafi,
Do you mean that you want to filter the vertices based on the fact whether they have an inEdge or and outEdge with a certain label?
Maybe the storage format is just confusing to you. The RDD only contains StarVertex objects. An edge is stored two times: once in its inVertex and once in its outVertex. You can use the edges() method on the StarVertex to get the edges.
HTH, Marc
Op donderdag 11 juni 2020 14:38:35 UTC+2 schreef rafi ansari:
toggle quoted message
Show quoted text
Hi All,
I am currently working on using Janusgraph in batch mode using Spark.
I am facing a problem on filtering the edges by label.
Below are the specifications: Spark = 2.4.5 Janusgraph = 0.5.0
Below is the configuration file for Spark:
conf.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph")conf.setProperty("gremlin.hadoop.graphReader", "org.janusgraph.hadoop.formats.cql.CqlInputFormat")conf.setProperty("gremlin.hadoop.graphWriter", "org.apache.hadoop.mapreduce.lib.output.NullOutputFormat")conf.setProperty("spark.cassandra.connection.host", "127.0.0.1")conf.setProperty("janusgraphmr.ioformat.conf.storage.backend", "cql")conf.setProperty("janusgraphmr.ioformat.conf.storage.hostname", "127.0.0.1")conf.setProperty("janusgraphmr.ioformat.conf.storage.port", 9042)conf.setProperty("janusgraphmr.ioformat.conf.storage.cql.keyspace", "graph_db_1")conf.setProperty("janusgraphmr.ioformat.conf.index.search.backend", "elasticsearch")conf.setProperty("janusgraphmr.ioformat.conf.index.search.hostname", "127.0.0.1")conf.setProperty("janusgraphmr.ioformat.conf.index.search.port", 9200)conf.setProperty("janusgraphmr.ioformat.conf.index.search.index-name", "graph_1")conf.setProperty("cassandra.input.partitioner.class","org.apache.cassandra.dht.Murmur3Partitioner")conf.setProperty("cassandra.input.widerows",true)conf.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")conf.setProperty("spark.kryo.registrator", "org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator")
Below is the Spark code using newAPIHadoopRDD
val hadoopConfiguration = ConfUtil.makeHadoopConfiguration(conf)
val rdd: RDD[(NullWritable, VertexWritable)] =spark.sparkContext.newAPIHadoopRDD(hadoopConfiguration,hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, classOf[InputFormat[NullWritable, VertexWritable]]).asInstanceOf[Class[InputFormat[NullWritable, VertexWritable]]],classOf[NullWritable], classOf[VertexWritable])
The above lines give an RDD as output.
rdd: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.NullWritable, org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable)]
rdd.map{case (x,y)=>y.asInstanceOf[VertexWritable]}res17: Array[String] = Array(v[8344], v[12440], v[4336], v[4320], v[4136], v[8416], v[8192], v[4248], v[4344], v[8432], v[12528], v[4096])
From the res17 above, not sure how to filter the edges by labels
TIA
Regards
Rafi
|
|
rafi ansari <rafi1...@...>
Hi Marc
Thank you so much for the hint.
In continuation to my above code , I had to do the following to filter the edges by label : (Maybe helpful for some other) val vrtxrdd = rdd.map{case (x,y)=>y.asInstanceOf[VertexWritable]}
val strvrtxrdd =vrtxrdd.map(x => x.get()) -> this give starvertex val edgesrdd = vrtxrdd.map(x => x.edges(Direction.BOTH,"label1"))
I will try to add Vertex using rdd foreach/map.
Thanks once again.
toggle quoted message
Show quoted text
On Friday, June 12, 2020 at 12:20:55 AM UTC+5:30, HadoopMarc wrote: Hi Rafi,
Do you mean that you want to filter the vertices based on the fact whether they have an inEdge or and outEdge with a certain label?
Maybe the storage format is just confusing to you. The RDD only contains StarVertex objects. An edge is stored two times: once in its inVertex and once in its outVertex. You can use the edges() method on the StarVertex to get the edges.
HTH, Marc
Op donderdag 11 juni 2020 14:38:35 UTC+2 schreef rafi ansari: Hi All,
I am currently working on using Janusgraph in batch mode using Spark.
I am facing a problem on filtering the edges by label.
Below are the specifications: Spark = 2.4.5 Janusgraph = 0.5.0
Below is the configuration file for Spark:
conf.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph")conf.setProperty("gremlin.hadoop.graphReader", "org.janusgraph.hadoop.formats.cql.CqlInputFormat")conf.setProperty("gremlin.hadoop.graphWriter", "org.apache.hadoop.mapreduce.lib.output.NullOutputFormat")conf.setProperty("spark.cassandra.connection.host", "127.0.0.1")conf.setProperty("janusgraphmr.ioformat.conf.storage.backend", "cql")conf.setProperty("janusgraphmr.ioformat.conf.storage.hostname", "127.0.0.1")conf.setProperty("janusgraphmr.ioformat.conf.storage.port", 9042)conf.setProperty("janusgraphmr.ioformat.conf.storage.cql.keyspace", "graph_db_1")conf.setProperty("janusgraphmr.ioformat.conf.index.search.backend", "elasticsearch")conf.setProperty("janusgraphmr.ioformat.conf.index.search.hostname", "127.0.0.1")conf.setProperty("janusgraphmr.ioformat.conf.index.search.port", 9200)conf.setProperty("janusgraphmr.ioformat.conf.index.search.index-name", "graph_1")conf.setProperty("cassandra.input.partitioner.class","org.apache.cassandra.dht.Murmur3Partitioner")conf.setProperty("cassandra.input.widerows",true)conf.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")conf.setProperty("spark.kryo.registrator", "org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator")
Below is the Spark code using newAPIHadoopRDD
val hadoopConfiguration = ConfUtil.makeHadoopConfiguration(conf)
val rdd: RDD[(NullWritable, VertexWritable)] =spark.sparkContext.newAPIHadoopRDD(hadoopConfiguration,hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, classOf[InputFormat[NullWritable, VertexWritable]]).asInstanceOf[Class[InputFormat[NullWritable, VertexWritable]]],classOf[NullWritable], classOf[VertexWritable])
The above lines give an RDD as output.
rdd: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.NullWritable, org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable)]
rdd.map{case (x,y)=>y.asInstanceOf[VertexWritable]}res17: Array[String] = Array(v[8344], v[12440], v[4336], v[4320], v[4136], v[8416], v[8192], v[4248], v[4344], v[8432], v[12528], v[4096])
From the res17 above, not sure how to filter the edges by labels
TIA
Regards
Rafi
|
|
rafi ansari <rafi1...@...>
Hi Marc
I had one more question. Currently I am using newAPIHadoopRDD to get the VertexWritable objects like in the sample below. Is it the right approach to working with Janus-graph from Spark or can there be a better approach?
val rdd: RDD[(NullWritable, VertexWritable)] =spark.sparkContext.newAPIHadoopRDD(hadoopConfiguration,hadoopConfiguration. getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, classOf[InputFormat[NullWritable, VertexWritable]]). asInstanceOf[Class[InputFormat[NullWritable, VertexWritable]]],classOf[NullWritable], classOf[VertexWritable])
The above lines give an RDD as output.
rdd: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.NullWritable, org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable)]
Regards
Rafi
toggle quoted message
Show quoted text
On Friday, June 12, 2020 at 10:02:07 PM UTC+5:30, rafi ansari wrote: Hi Marc
Thank you so much for the hint.
In continuation to my above code , I had to do the following to filter the edges by label : (Maybe helpful for some other) val vrtxrdd = rdd.map{case (x,y)=>y.asInstanceOf[VertexWritable]}
val strvrtxrdd =vrtxrdd.map(x => x.get()) -> this give starvertex val edgesrdd = vrtxrdd.map(x => x.edges(Direction.BOTH,"label1"))
I will try to add Vertex using rdd foreach/map.
Thanks once again. On Friday, June 12, 2020 at 12:20:55 AM UTC+5:30, HadoopMarc wrote: Hi Rafi,
Do you mean that you want to filter the vertices based on the fact whether they have an inEdge or and outEdge with a certain label?
Maybe the storage format is just confusing to you. The RDD only contains StarVertex objects. An edge is stored two times: once in its inVertex and once in its outVertex. You can use the edges() method on the StarVertex to get the edges.
HTH, Marc
Op donderdag 11 juni 2020 14:38:35 UTC+2 schreef rafi ansari: Hi All,
I am currently working on using Janusgraph in batch mode using Spark.
I am facing a problem on filtering the edges by label.
Below are the specifications: Spark = 2.4.5 Janusgraph = 0.5.0
Below is the configuration file for Spark:
conf.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph")conf.setProperty("gremlin.hadoop.graphReader", "org.janusgraph.hadoop.formats.cql.CqlInputFormat")conf.setProperty("gremlin.hadoop.graphWriter", "org.apache.hadoop.mapreduce.lib.output.NullOutputFormat")conf.setProperty("spark.cassandra.connection.host", "127.0.0.1")conf.setProperty("janusgraphmr.ioformat.conf.storage.backend", "cql")conf.setProperty("janusgraphmr.ioformat.conf.storage.hostname", "127.0.0.1")conf.setProperty("janusgraphmr.ioformat.conf.storage.port", 9042)conf.setProperty("janusgraphmr.ioformat.conf.storage.cql.keyspace", "graph_db_1")conf.setProperty("janusgraphmr.ioformat.conf.index.search.backend", "elasticsearch")conf.setProperty("janusgraphmr.ioformat.conf.index.search.hostname", "127.0.0.1")conf.setProperty("janusgraphmr.ioformat.conf.index.search.port", 9200)conf.setProperty("janusgraphmr.ioformat.conf.index.search.index-name", "graph_1")conf.setProperty("cassandra.input.partitioner.class","org.apache.cassandra.dht.Murmur3Partitioner")conf.setProperty("cassandra.input.widerows",true)conf.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")conf.setProperty("spark.kryo.registrator", "org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator")
Below is the Spark code using newAPIHadoopRDD
val hadoopConfiguration = ConfUtil.makeHadoopConfiguration(conf)
val rdd: RDD[(NullWritable, VertexWritable)] =spark.sparkContext.newAPIHadoopRDD(hadoopConfiguration,hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, classOf[InputFormat[NullWritable, VertexWritable]]).asInstanceOf[Class[InputFormat[NullWritable, VertexWritable]]],classOf[NullWritable], classOf[VertexWritable])
The above lines give an RDD as output.
rdd: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.NullWritable, org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable)]
rdd.map{case (x,y)=>y.asInstanceOf[VertexWritable]}res17: Array[String] = Array(v[8344], v[12440], v[4336], v[4320], v[4136], v[8416], v[8192], v[4248], v[4344], v[8432], v[12528], v[4096])
From the res17 above, not sure how to filter the edges by labels
TIA
Regards
Rafi
|
|