ID block allocation exception while creating edge
Hi All,
I am creating vertex and edges in bulk and getting below error while creating edge. Below is the exception log:
Cluster edge creation failed between guest node 20369408030929128 and identifier node 16891904008515712. Exception : org.janusgraph.core.JanusGraphException: ID block allocation on partition(29)-namespace(3) failed with an exception in 12.23 ms
I tired increasing value of "ids.block-size" but still no luck, even set to 1B also for testing purpose but still no luck, getting above error. I am creating around 4 - 5M nodes per hours.
Could you please share some pointers to fix it? Appreciate your help and time.
Thanks,
Anjani
Please show the properties file you use to open janusgraph.
I assume you also saw the other recommendations in https://docs.janusgraph.org/advanced-topics/bulk-loading/#optimizing-id-allocation
Best wishes, Marc
https://docs.janusgraph.org/advanced-topics/bulk-loading/#optimizing-id-allocation
Thanks for response Marc. Below is the method i am using to create janus connection:
public JanusGraph createJanusConnection(HashMap<String, Object> janusConfig) {
JanusGraphFactory.Builder configProps = JanusGraphFactory.build();
configProps.set(GREMLIN_GRAPH, “org.janusgraph.core.JanusGraphFactory”);
configProps.set(STORAGE_BACKEND, “cql”);
configProps.set(STORAGE_HOSTNAME, janusConfig.get("storage.hostname"));
configProps.set(STORAGE_CQL_KEYSPACE, janusConfig.get("storage.keyspace"));
configProps.set(CACHE_DB_CACHE, “false”);
configProps.set(CACHE_DB_CACHE_SIZE, “0.5”);
configProps.set(CACHE_DB_CACHE_TIME, “180000”);
configProps.set(CACHE_DB_CACHE_CLEAN_WAIT, “20”);
configProps.set(STORAGE_CQL_LOCAL_DATACENTER, janusConfig.get("local-datacenter"));
configProps.set(STORAGE_CQL_WRITE_CONSISTENCY_LEVEL, “LOCAL_ONE”);
configProps.set(STORAGE_CQL_READ_CONSISTENCY_LEVEL, “LOCAL_ONE”);
configProps.set(STORAGE_CQL_SSL_ENABLED, janusConfig.get("cql.ssl.enabled"));
configProps.set(STORAGE_CQL_SSL_TRUSTSTORE_LOCATION, janusConfig.get("truststore.location"));
configProps.set(STORAGE_CQL_SSL_TRUSTSTORE_PASSWORD, janusConfig.get("truststore.password"));
configProps.set(STORAGE_USERNAME, janusConfig.get("cassandra.username"));
configProps.set(STORAGE_PASSWORD, janusConfig.get("cassandra.password"));
configProps.set("storage.read-time", "120000");
configProps.set("storage.write-time", "120000");
configProps.set("storage.connection-timeout", "120000");
// added to fix ID block allocation exceptions
configProps.set("renew-timeout", "240000");
configProps.set("write-time", "1000");
configProps.set("read-time", "100");
configProps.set("renew-percentage", "0.4");
configProps.set(METRICS_ENABLED, “true”);
configProps.set(METRICS_JMX_ENABLED, “true”);
configProps.set(INDEX_SEARCH_BACKEND, “elasticsearch”);
configProps.set(INDEX_SEARCH_HOSTNAME, janusConfig.get("elasticsearch.hostname"));
configProps.set(INDEX_SEARCH_ELASTICSEARCH_HTTP_AUTH_TYPE,”basic”);
}
configProps.set(INDEX_SEARCH_ELASTICSEARCH_HTTP_AUTH_BASIC_USERNAME, janusConfig.get("elasticsearch.username"));
configProps.set(INDEX_SEARCH_ELASTICSEARCH_HTTP_AUTH_BASIC_PASSWORD, janusConfig.get("elasticsearch.password"));
configProps.set(INDEX_SEARCH_ELASTICSEARCH_SSL_ENABLED, janusConfig.get("elasticsearch.ssl.enabled")
);
configProps.set(IDS_BLOCK_SIZE, “1000000000”);
configProps.set(IDS_RENEW_PERCENTAGE, “0.3”);
logger.info("JanusGraph config initialization!!");
return configProps.open();
}
It is a while ago I did this myself. I interpret ids.num-partitions as a stock of reserved id blocks that can be delegated to a janugraph instance. It does not have a large value to not waste ids space.
Actually, parallel tasks is not the number we want. We want the ids.num-partitions to be equal to the number of janusgraph instances, because initially all janusgraph instances ask for an ids block at the same time. Note that the cores in a spark executor can share the same janusgraph instance if you use a singleton object for that.
So, if you have 50 executors with 5 cores each (and using a singleton janusgraph instance), I would try ids.num-partitions =50
Best wishes, Marc
I tried setting ids.num-partitions = number of executors through code not directly in janus global config files but no luck. Added below properties but it didn't helped.
configProps.set("ids.renew-timeout", "240000");
configProps.set("ids.renew-percentage", "0.4");
configProps.set("ids.num-partitions", "253");
Thanks,
Anjani
It is still most likely that the modified value of "ids.block-size" somehow does not come through. So, are you sure that
- all JanusGraph instances are closed before using the new value ("ids.block-size" has GLOBAL_OFFLINE mutability level). Safest is to have a fresh keyspace and one location for the properties to be used for both graph creation and bulk loading.
- sorry for asking: does IDS_BLOCK_SIZE equals "ids.block-size"
Thanks for response Marc. Yes i also think for some reason changes are not getting picked up but not able to figure out why so.
ids.block-size is updated in config file of all janus nodes and after that all nodes are re-started.
In code i have only one method which is used to create janus-instance and same is passed to method for node/edge creation.
Yes IDS_BLOCK_SIZE is equals "ids.block-size".
Thanks,
Anjani
Sharing detail on how i am creating node/edges to make sure nothing wrong with that which is resulting in ID allocation failures.
I am creating one static instance JanusGraph object on each spark worker box and using that i am creating multiple transaction and commit.
pairRDD.foreachPartition(partIterator -> {
partIterator.forEachRemaining( tuple -> {
createNodeAndEdge(tuple, JanusGraphConfig.getJanusGraph(janusConfig));
});
}); where JanusGraphConfig.getJanusGraph returns static instance.
In createNodeAndEdge() method i am creating GraphTraversalSource using static janusGraph, creating node, edge, committing and then closing GraphTraversalSource object, as shown below in pseudo code:
createNodeAndEdge(Tuple2<K, V> pair, JanusGraph janusGraph)
{
GraphTraversalSource g = janusGraph.buildTransaction().start().traversal();
try{
create node;
create edge;
g.tx().commit();
} catch ( Exception) {
g.tx().rollback();
} finally() {
g.tx().close();
g.close();
}
}
Thanks,
Anjani
One thing that does not feel good is that you create and commit a transaction for every row of your dataframe. Although I do not see how this would interfere with ID allocation, best practice is to have partitions of about 10.000 vertices/edges and commit these as one batch. In case of an exception, you rollback the transaction and raise your own exception. After that, Spark will retry the partition and your job will still succeed. It is worth a atry.
Best wishes, Marc