ID allocation when using Spark to read data from JanusGraph (OLAP)


"ra...@gmail.com" <rafi1...@...>
 

Hi everyone

I am using Spark for reading the Janusgraph data for analytics purpose. I am providing the configurations as mentioned in gremlin-hadoop . I am using the newApiHadoopRDD to fetch the data from JG. A sample code and conf is below: 

conf.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph")
conf.setProperty("gremlin.hadoop.graphReader", "org.janusgraph.hadoop.formats.cql.CqlInputFormat")
conf.setProperty("gremlin.hadoop.graphWriter", "org.apache.hadoop.mapreduce.lib.output.NullOutputFormat")

val rdd0: RDD[(NullWritable, VertexWritable)] = spark.sparkContext.newAPIHadoopRDD(hadoopConfiguration, classOf[CqlInputFormat], classOf[NullWritable], classOf[VertexWritable])

My questions are :
1) Is a new ID block being generated everytime I run the spark job to fetch the data? The reason being I do not want to waste the JG IDs just for OLAP purpose.
2) If answer to above question is YES, how do I make sure that new ID pool is not generated during every run of the Spark job ?

Thanks

Rafi

val rdd1: RDD[VertexWritable] = rdd0.map { case (x, y) => y.asInstanceOf[VertexWritable] }


HadoopMarc <bi...@...>
 

Hi Rafi,

Can you summarize what you have learnt until now, because this does not seam well documented?  Do you see warnings? Did you make calculations about exhausting id ranges?

These configuration options may be of interest:
storage.read-only    Maybe an instance with this option does not allocate id's???
ids.block-size           Seems of little use because its usage is GLOBAL OFFLINE

Best wishes,       Marc

Op vrijdag 11 september 2020 om 07:52:20 UTC+2 schreef ra...@...:

Hi everyone

I am using Spark for reading the Janusgraph data for analytics purpose. I am providing the configurations as mentioned in gremlin-hadoop . I am using the newApiHadoopRDD to fetch the data from JG. A sample code and conf is below: 

conf.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph")
conf.setProperty("gremlin.hadoop.graphReader", "org.janusgraph.hadoop.formats.cql.CqlInputFormat")
conf.setProperty("gremlin.hadoop.graphWriter", "org.apache.hadoop.mapreduce.lib.output.NullOutputFormat")

val rdd0: RDD[(NullWritable, VertexWritable)] = spark.sparkContext.newAPIHadoopRDD(hadoopConfiguration, classOf[CqlInputFormat], classOf[NullWritable], classOf[VertexWritable])

My questions are :
1) Is a new ID block being generated everytime I run the spark job to fetch the data? The reason being I do not want to waste the JG IDs just for OLAP purpose.
2) If answer to above question is YES, how do I make sure that new ID pool is not generated during every run of the Spark job ?

Thanks

Rafi

val rdd1: RDD[VertexWritable] = rdd0.map { case (x, y) => y.asInstanceOf[VertexWritable] }


"ra...@gmail.com" <rafi1...@...>
 


Hi Marc,

Thanks for the suggestion for two options.
I have not got any warnings but since I'll be using these Spark jobs a lot, so just to be sure I am not wasting the IDs for these OLAP jobs, I raised the query. For more clarifications, below is the set of configurations I use for these jobs:

conf.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph")
conf.setProperty("gremlin.hadoop.graphReader", "org.janusgraph.hadoop.formats.cql.CqlInputFormat")
conf.setProperty("gremlin.hadoop.graphWriter", "org.apache.hadoop.mapreduce.lib.output.NullOutputFormat")
conf.setProperty("spark.cassandra.connection.host", "127.0.0.1")
conf.setProperty("janusgraphmr.ioformat.conf.storage.backend", "cql")
conf.setProperty("janusgraphmr.ioformat.conf.storage.hostname", "127.0.0.1")
conf.setProperty("janusgraphmr.ioformat.conf.storage.port", 9042)
conf.setProperty("janusgraphmr.ioformat.conf.storage.cql.keyspace", "graph_db_01")
conf.setProperty("janusgraphmr.ioformat.conf.index.search.backend", "elasticsearch")
conf.setProperty("janusgraphmr.ioformat.conf.index.search.hostname", "127.0.0.1")
conf.setProperty("janusgraphmr.ioformat.conf.index.search.port", 9200)
conf.setProperty("janusgraphmr.ioformat.conf.index.search.index-name", "graph_01")
conf.setProperty("cassandra.input.partitioner.class","org.apache.cassandra.dht.Murmur3Partitioner")
conf.setProperty("cassandra.input.widerows",true)
conf.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.setProperty("spark.kryo.registrator", "org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator")

I looked at the storage.read-only , and it looks like the option to stop ID allocation.

Regards 
Rafi

On Friday, September 11, 2020 at 7:38:50 PM UTC+5:30 HadoopMarc wrote:
Hi Rafi,

Can you summarize what you have learnt until now, because this does not seam well documented?  Do you see warnings? Did you make calculations about exhausting id ranges?

These configuration options may be of interest:
storage.read-only    Maybe an instance with this option does not allocate id's???
ids.block-size           Seems of little use because its usage is GLOBAL OFFLINE

Best wishes,       Marc

Op vrijdag 11 september 2020 om 07:52:20 UTC+2 schreef ra...@...:
Hi everyone

I am using Spark for reading the Janusgraph data for analytics purpose. I am providing the configurations as mentioned in gremlin-hadoop . I am using the newApiHadoopRDD to fetch the data from JG. A sample code and conf is below: 

conf.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph")
conf.setProperty("gremlin.hadoop.graphReader", "org.janusgraph.hadoop.formats.cql.CqlInputFormat")
conf.setProperty("gremlin.hadoop.graphWriter", "org.apache.hadoop.mapreduce.lib.output.NullOutputFormat")

val rdd0: RDD[(NullWritable, VertexWritable)] = spark.sparkContext.newAPIHadoopRDD(hadoopConfiguration, classOf[CqlInputFormat], classOf[NullWritable], classOf[VertexWritable])

My questions are :
1) Is a new ID block being generated everytime I run the spark job to fetch the data? The reason being I do not want to waste the JG IDs just for OLAP purpose.
2) If answer to above question is YES, how do I make sure that new ID pool is not generated during every run of the Spark job ?

Thanks

Rafi

val rdd1: RDD[VertexWritable] = rdd0.map { case (x, y) => y.asInstanceOf[VertexWritable] }


"ra...@gmail.com" <rafi1...@...>
 

In addition to the above observations, I had one more question. Is there way of knowing if ID pool is being allocated every time I run the Spark job. If yes, any leads would be very helpful.

Regards

Rafi

On Monday, September 14, 2020 at 11:32:23 AM UTC+5:30 ra...@... wrote:

Hi Marc,

Thanks for the suggestion for two options.
I have not got any warnings but since I'll be using these Spark jobs a lot, so just to be sure I am not wasting the IDs for these OLAP jobs, I raised the query. For more clarifications, below is the set of configurations I use for these jobs:

conf.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph")
conf.setProperty("gremlin.hadoop.graphReader", "org.janusgraph.hadoop.formats.cql.CqlInputFormat")
conf.setProperty("gremlin.hadoop.graphWriter", "org.apache.hadoop.mapreduce.lib.output.NullOutputFormat")
conf.setProperty("spark.cassandra.connection.host", "127.0.0.1")
conf.setProperty("janusgraphmr.ioformat.conf.storage.backend", "cql")
conf.setProperty("janusgraphmr.ioformat.conf.storage.hostname", "127.0.0.1")
conf.setProperty("janusgraphmr.ioformat.conf.storage.port", 9042)
conf.setProperty("janusgraphmr.ioformat.conf.storage.cql.keyspace", "graph_db_01")
conf.setProperty("janusgraphmr.ioformat.conf.index.search.backend", "elasticsearch")
conf.setProperty("janusgraphmr.ioformat.conf.index.search.hostname", "127.0.0.1")
conf.setProperty("janusgraphmr.ioformat.conf.index.search.port", 9200)
conf.setProperty("janusgraphmr.ioformat.conf.index.search.index-name", "graph_01")
conf.setProperty("cassandra.input.partitioner.class","org.apache.cassandra.dht.Murmur3Partitioner")
conf.setProperty("cassandra.input.widerows",true)
conf.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.setProperty("spark.kryo.registrator", "org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator")

I looked at the storage.read-only , and it looks like the option to stop ID allocation.

Regards 
Rafi
On Friday, September 11, 2020 at 7:38:50 PM UTC+5:30 HadoopMarc wrote:
Hi Rafi,

Can you summarize what you have learnt until now, because this does not seam well documented?  Do you see warnings? Did you make calculations about exhausting id ranges?

These configuration options may be of interest:
storage.read-only    Maybe an instance with this option does not allocate id's???
ids.block-size           Seems of little use because its usage is GLOBAL OFFLINE

Best wishes,       Marc

Op vrijdag 11 september 2020 om 07:52:20 UTC+2 schreef ra...@...:
Hi everyone

I am using Spark for reading the Janusgraph data for analytics purpose. I am providing the configurations as mentioned in gremlin-hadoop . I am using the newApiHadoopRDD to fetch the data from JG. A sample code and conf is below: 

conf.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph")
conf.setProperty("gremlin.hadoop.graphReader", "org.janusgraph.hadoop.formats.cql.CqlInputFormat")
conf.setProperty("gremlin.hadoop.graphWriter", "org.apache.hadoop.mapreduce.lib.output.NullOutputFormat")

val rdd0: RDD[(NullWritable, VertexWritable)] = spark.sparkContext.newAPIHadoopRDD(hadoopConfiguration, classOf[CqlInputFormat], classOf[NullWritable], classOf[VertexWritable])

My questions are :
1) Is a new ID block being generated everytime I run the spark job to fetch the data? The reason being I do not want to waste the JG IDs just for OLAP purpose.
2) If answer to above question is YES, how do I make sure that new ID pool is not generated during every run of the Spark job ?

Thanks

Rafi

val rdd1: RDD[VertexWritable] = rdd0.map { case (x, y) => y.asInstanceOf[VertexWritable] }


HadoopMarc <bi...@...>
 

Hi Rafi,
You will have to search the source code.

Code below says the VertexIdAssigner class is alway preloded at runtime, irrespective of the read-ony property.
https://github.com/JanusGraph/janusgraph/blob/d5ca0b05dfd356824b8e0f507dff99e76755f400/janusgraph-core/src/main/java/org/janusgraph/core/util/ReflectiveConfigOptionLoader.java

From there you have to search further in VertexIdAssigner.

In my understanding, there is only one JanusGraph instance that can allocate ID blocks (that is why id allocation can be a bottleneck during bulk loading) and persists used id blocks in some system table. So you will have to find when this id allocation takes place: at startup of the janusgraph instance (as part of VertexIdAssigner) or when the need arises. Also look whether there should be any debug logs when id allocation takes place.

HTH,    Marc

Op maandag 14 september 2020 om 15:05:07 UTC+2 schreef ra...@...:

In addition to the above observations, I had one more question. Is there way of knowing if ID pool is being allocated every time I run the Spark job. If yes, any leads would be very helpful.

Regards

Rafi

On Monday, September 14, 2020 at 11:32:23 AM UTC+5:30 ra...@... wrote:

Hi Marc,

Thanks for the suggestion for two options.
I have not got any warnings but since I'll be using these Spark jobs a lot, so just to be sure I am not wasting the IDs for these OLAP jobs, I raised the query. For more clarifications, below is the set of configurations I use for these jobs:

conf.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph")
conf.setProperty("gremlin.hadoop.graphReader", "org.janusgraph.hadoop.formats.cql.CqlInputFormat")
conf.setProperty("gremlin.hadoop.graphWriter", "org.apache.hadoop.mapreduce.lib.output.NullOutputFormat")
conf.setProperty("spark.cassandra.connection.host", "127.0.0.1")
conf.setProperty("janusgraphmr.ioformat.conf.storage.backend", "cql")
conf.setProperty("janusgraphmr.ioformat.conf.storage.hostname", "127.0.0.1")
conf.setProperty("janusgraphmr.ioformat.conf.storage.port", 9042)
conf.setProperty("janusgraphmr.ioformat.conf.storage.cql.keyspace", "graph_db_01")
conf.setProperty("janusgraphmr.ioformat.conf.index.search.backend", "elasticsearch")
conf.setProperty("janusgraphmr.ioformat.conf.index.search.hostname", "127.0.0.1")
conf.setProperty("janusgraphmr.ioformat.conf.index.search.port", 9200)
conf.setProperty("janusgraphmr.ioformat.conf.index.search.index-name", "graph_01")
conf.setProperty("cassandra.input.partitioner.class","org.apache.cassandra.dht.Murmur3Partitioner")
conf.setProperty("cassandra.input.widerows",true)
conf.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.setProperty("spark.kryo.registrator", "org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator")

I looked at the storage.read-only , and it looks like the option to stop ID allocation.

Regards 
Rafi
On Friday, September 11, 2020 at 7:38:50 PM UTC+5:30 HadoopMarc wrote:
Hi Rafi,

Can you summarize what you have learnt until now, because this does not seam well documented?  Do you see warnings? Did you make calculations about exhausting id ranges?

These configuration options may be of interest:
storage.read-only    Maybe an instance with this option does not allocate id's???
ids.block-size           Seems of little use because its usage is GLOBAL OFFLINE

Best wishes,       Marc

Op vrijdag 11 september 2020 om 07:52:20 UTC+2 schreef ra...@...:
Hi everyone

I am using Spark for reading the Janusgraph data for analytics purpose. I am providing the configurations as mentioned in gremlin-hadoop . I am using the newApiHadoopRDD to fetch the data from JG. A sample code and conf is below: 

conf.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph")
conf.setProperty("gremlin.hadoop.graphReader", "org.janusgraph.hadoop.formats.cql.CqlInputFormat")
conf.setProperty("gremlin.hadoop.graphWriter", "org.apache.hadoop.mapreduce.lib.output.NullOutputFormat")

val rdd0: RDD[(NullWritable, VertexWritable)] = spark.sparkContext.newAPIHadoopRDD(hadoopConfiguration, classOf[CqlInputFormat], classOf[NullWritable], classOf[VertexWritable])

My questions are :
1) Is a new ID block being generated everytime I run the spark job to fetch the data? The reason being I do not want to waste the JG IDs just for OLAP purpose.
2) If answer to above question is YES, how do I make sure that new ID pool is not generated during every run of the Spark job ?

Thanks

Rafi

val rdd1: RDD[VertexWritable] = rdd0.map { case (x, y) => y.asInstanceOf[VertexWritable] }