Date
1 - 7 of 7
Recommendations for bulk delete of edges
ke...@...
Hi,
I have been using Titan for a while and have recently moved to Janusgraph. Thanks for revitalising the project!
I am using Janusgraph to look at social chat that is generating approximately 7 million messages a day. I want to keep 1 week's worth and when loading today's data, purge the messages that are more than 7 days old. Are there recommendations for performing this kind of bulk delete of edges?
I am using Cassandra as the backend store and I have messages as edges, with a timestamp property. I have an vertex-centric edgeIndex on the timestamp property and I also have an elasticsearch index on the same timestamp property. I have been running drops in batches of 10,000 and committing the transaction - trying to use (one of) the indexes to do this at speed.
The 2 ways I have tried are:
g.V().bothE('message').has('timestamp', lt(start_date)).limit(10000).drop().iterate() // uses the edgeIndex
g.E().has('timestamp', lt(start_date)).limit(10000).drop().iterate() // uses the elasticsearch index
Both of these are proving to be very slow, so I am probably missing something?
Thanks for any advice!
Kevin
Robert Dale <rob...@...>
I'm not sure I would use bothE(). Have you tried just outE()?
I would be curious to see if it's querying that's slow or if it's dropping. Try using profile() or using time of count() instead of drop() if profile() is not available.
Also, what's the frequency of purging?
Assuming the problem is drop speed, I would experiment with finding optimal batch sizes for both g.V().outE() and g.E(). No timestamp filter (unless query speed is negligible), want to isolate batch size so be sure to use a dataset you can blow away. Obviously 10k is "slow". I would be curious to know how "slow" is slow here. But bisect your way to optimal size.
Then, find the distribution (histogram) over time that closely resembles that batch size. i.e. if you find that the optimal batch size is 5k, then look for how many minutes/hours it takes to make 5k items max.
Let's say it's 1 minute.
Then starting at your oldest time, drop for that 1 minute period on the timestamp filter (begin < timestamp <= end). And simply keep moving that 1 minute period up, drop, repeat until you've reached the timeframe of which should be retained.
See how that goes. If that proves well, from there you could experiment with parallelism of those chunks. Other possibilities involve hardware (scaling), memory settings, cassandra, ES, etc.
Robert Dale
On Thu, Mar 16, 2017 at 10:37 AM, <ke...@...> wrote:
Hi,--I have been using Titan for a while and have recently moved to Janusgraph. Thanks for revitalising the project!I am using Janusgraph to look at social chat that is generating approximately 7 million messages a day. I want to keep 1 week's worth and when loading today's data, purge the messages that are more than 7 days old. Are there recommendations for performing this kind of bulk delete of edges?I am using Cassandra as the backend store and I have messages as edges, with a timestamp property. I have an vertex-centric edgeIndex on the timestamp property and I also have an elasticsearch index on the same timestamp property. I have been running drops in batches of 10,000 and committing the transaction - trying to use (one of) the indexes to do this at speed.The 2 ways I have tried are:g.V().bothE('message').has('timestamp', lt(start_date)).limit(10000). drop().iterate() // uses the edgeIndex g.E().has('timestamp', lt(start_date)).limit(10000).drop().iterate() // uses the elasticsearch index Both of these are proving to be very slow, so I am probably missing something?Thanks for any advice!Kevin
You received this message because you are subscribed to the Google Groups "JanusGraph users list" group.
To unsubscribe from this group and stop receiving emails from it, send an email to janusgraph-users+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
HadoopMarc <m.c.d...@...>
Hi Kevin,
By an "elasticsearch index" you do mean a mixed index, don't you?
Actually, indexing does not help you too much, because your query still hits 1/7 of your edgestore. The way to go here is to have a distributed query. You can do it either with the OLAP facility (e.g. SparkGraph computer of FulgoraGraphComputer) or you could do it in a poor man's way: just store the edge id's while doing the inserts and use the stored id's a week later for the dropping (in a threaded or otherwise distributed way) without need of an index.
Cheers, Marc
Op donderdag 16 maart 2017 16:56:11 UTC+1 schreef Robert Dale:
By an "elasticsearch index" you do mean a mixed index, don't you?
Actually, indexing does not help you too much, because your query still hits 1/7 of your edgestore. The way to go here is to have a distributed query. You can do it either with the OLAP facility (e.g. SparkGraph computer of FulgoraGraphComputer) or you could do it in a poor man's way: just store the edge id's while doing the inserts and use the stored id's a week later for the dropping (in a threaded or otherwise distributed way) without need of an index.
Cheers, Marc
Op donderdag 16 maart 2017 16:56:11 UTC+1 schreef Robert Dale:
I'm not sure I would use bothE(). Have you tried just outE()?I would be curious to see if it's querying that's slow or if it's dropping. Try using profile() or using time of count() instead of drop() if profile() is not available.Also, what's the frequency of purging?Assuming the problem is drop speed, I would experiment with finding optimal batch sizes for both g.V().outE() and g.E(). No timestamp filter (unless query speed is negligible), want to isolate batch size so be sure to use a dataset you can blow away. Obviously 10k is "slow". I would be curious to know how "slow" is slow here. But bisect your way to optimal size.Then, find the distribution (histogram) over time that closely resembles that batch size. i.e. if you find that the optimal batch size is 5k, then look for how many minutes/hours it takes to make 5k items max.Let's say it's 1 minute.Then starting at your oldest time, drop for that 1 minute period on the timestamp filter (begin < timestamp <= end). And simply keep moving that 1 minute period up, drop, repeat until you've reached the timeframe of which should be retained.See how that goes. If that proves well, from there you could experiment with parallelism of those chunks. Other possibilities involve hardware (scaling), memory settings, cassandra, ES, etc.Robert DaleOn Thu, Mar 16, 2017 at 10:37 AM, <k...@...> wrote:Hi,--I have been using Titan for a while and have recently moved to Janusgraph. Thanks for revitalising the project!I am using Janusgraph to look at social chat that is generating approximately 7 million messages a day. I want to keep 1 week's worth and when loading today's data, purge the messages that are more than 7 days old. Are there recommendations for performing this kind of bulk delete of edges?I am using Cassandra as the backend store and I have messages as edges, with a timestamp property. I have an vertex-centric edgeIndex on the timestamp property and I also have an elasticsearch index on the same timestamp property. I have been running drops in batches of 10,000 and committing the transaction - trying to use (one of) the indexes to do this at speed.The 2 ways I have tried are:g.V().bothE('message').has('timestamp', lt(start_date)).limit(10000). drop().iterate() // uses the edgeIndex g.E().has('timestamp', lt(start_date)).limit(10000).drop().iterate() // uses the elasticsearch index Both of these are proving to be very slow, so I am probably missing something?Thanks for any advice!Kevin
You received this message because you are subscribed to the Google Groups "JanusGraph users list" group.
To unsubscribe from this group and stop receiving emails from it, send an email to janusgraph-use...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
ke...@...
Thanks Robert,
I will follow the path you have suggested and see what that I can achieve. The problem with the g.V() queries is that it is iterating over all vertices, so it is fast when I first start deleting but gets slower over time as it restarts the iteration after each batch of 10,000. FYI, here is the profile output for the g.E() query:
gremlin> g.E().has('timestamp', lt(1488693599987)).limit(10000).profile()
==>Traversal Metrics
Step Count Traversers Time (ms) % Dur
=============================================================================================================
JanusGraphStep([timestamp.lt(1488693599987)]) 10000 10000 140285.011 100.00
optimization 43.808
backend-query 20000 2801.056
>TOTAL - - 140285.011 -
Robert Dale <rob...@...>
I'm definitely not a Titan/Janus or ES expert so I'm not sure how it works. But I imagine if you had an index by label, timestamp, asc, then you would only ever hit the tip of the index g.E().hasLabel('label').has('timestamp', lt(timestamp)). That should be very fast. Of course you would have to know all of the edge labels in advance. But again there's an opportunity for parallelism by label.
On Thursday, March 16, 2017 at 4:44:59 PM UTC-4, k...@... wrote:
Hi Marc,Thanks for this. Yes it is a mixed index with elasticsearch as the backend. I've thought about the poor man's approach and I'm going to give that a try. In fact I can probably query elasticsearch directly with this timestamp index and get the list of edge ids from there.
Ted Wilmes <twi...@...>
These are all good suggestions. Not sure if it would be feasible to recreate your db, but if it was, you may be able to use edge TTLs: http://docs.janusgraph.org/0.1.0-SNAPSHOT/advanced-schema.html.
I haven't used them but you could set a 7 day TTL for the relevant edge label like this:
mgmt = graph.openManagement()
visits = mgmt.makeEdgeLabel('visits').make()
mgmt.setTTL(visits, Duration.ofDays(7))
mgmt.commit()
--Ted
On Friday, March 17, 2017 at 7:51:34 AM UTC-5, Robert Dale wrote:
I'm definitely not a Titan/Janus or ES expert so I'm not sure how it works. But I imagine if you had an index by label, timestamp, asc, then you would only ever hit the tip of the index g.E().hasLabel('label').has('timestamp', lt(timestamp)). That should be very fast. Of course you would have to know all of the edge labels in advance. But again there's an opportunity for parallelism by label.
On Thursday, March 16, 2017 at 4:44:59 PM UTC-4, k...@... wrote:Hi Marc,Thanks for this. Yes it is a mixed index with elasticsearch as the backend. I've thought about the poor man's approach and I'm going to give that a try. In fact I can probably query elasticsearch directly with this timestamp index and get the list of edge ids from there.