Janusgraph (Cassandra + E.S ) , OLAP BULK ingestion , issues with ES (SecondaryPersistence used to store indexes) commit and rollback funcationlaity


Ramesh Babu Y <ramesh...@...>
 

We are using jansugraph with Cassandra + ES combination , we are doing data ingestion with OLAP mode , since we are submitting batch requests , and we are using commit method to commit the transaction and their by to start actual data ingestion , here problem is first it will do cassandra commit and if their is any issue during commit janusgraph core classes are doing rolback and in same janus graph core classes , their is no rollback performed when their is issues during ES commit . below the janusgraph class where we identified the issue 

In janusgraph-core .jar , internally this is the method that gets called in StandardJanusGraph.class

If you see the below code highlighted  , even if their are errors during ES commit , nothing is re-thrown , those errors stored in MAP and after that those are printed as log statements . this leads data inconsistency , because in same code below where cassandra commit when their are any exceptions transaction is getting rollback , so that the changes are being rollback , but for ES this is not happening .

When we see the latest code for the StandardJanusGraph.class , their also no exception re-thrown , and it mentioned in the code this needs to be cleaned , does that means this is something not implemented completely ?

https://github.com/JanusGraph/janusgraph/blob/6bb2ba926b6cac2669f608f9461177d964ae0be0/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java#L761

is this issue with janusgraph core jars ? is there any fix happened already ?

public void commit(Collection<InternalRelation> addedRelations, Collection<InternalRelation> deletedRelations, StandardJanusGraphTx tx) {
    if (!addedRelations.isEmpty() || !deletedRelations.isEmpty()) {
        log.debug("Saving transaction. Added {}, removed {}", addedRelations.size(), deletedRelations.size());
        if (!tx.getConfiguration().hasCommitTime()) {
            tx.getConfiguration().setCommitTime(this.times.getTime());
        }

        Instant txTimestamp = tx.getConfiguration().getCommitTime();
        long transactionId = this.txCounter.incrementAndGet();
        if (!tx.getConfiguration().hasAssignIDsImmediately()) {
            this.idAssigner.assignIDs(addedRelations);
        }

        BackendTransaction mutator = tx.getTxHandle();
        boolean acquireLocks = tx.getConfiguration().hasAcquireLocks();
        boolean hasTxIsolation = this.backend.getStoreFeatures().hasTxIsolation();
        boolean logTransaction = this.config.hasLogTransactions() && !tx.getConfiguration().hasEnabledBatchLoading();
        KCVSLog txLog = logTransaction ? this.backend.getSystemTxLog() : null;
        TransactionLogHeader txLogHeader = new TransactionLogHeader(transactionId, txTimestamp, this.times);

        try {
            if (logTransaction) {
                Preconditions.checkNotNull(txLog, "Transaction log is null");
                txLog.add(txLogHeader.serializeModifications(this.serializer, LogTxStatus.PRECOMMIT, tx, addedRelations, deletedRelations), txLogHeader.getLogKey());
            }

            boolean hasSchemaElements = !Iterables.isEmpty(Iterables.filter(deletedRelations, SCHEMA_FILTER)) || !Iterables.isEmpty(Iterables.filter(addedRelations, SCHEMA_FILTER));
            Preconditions.checkArgument(!hasSchemaElements || !tx.getConfiguration().hasEnabledBatchLoading() && acquireLocks, "Attempting to create schema elements in inconsistent state");
            StandardJanusGraph.ModificationSummary commitSummary;
            if (hasSchemaElements && !hasTxIsolation) {
                BackendTransaction schemaMutator = this.openBackendTransaction(tx);

                try {
                    commitSummary = this.prepareCommit(addedRelations, deletedRelations, SCHEMA_FILTER, schemaMutator, tx, acquireLocks);

                    assert commitSummary.hasModifications && !commitSummary.has2iModifications;
                } catch (Throwable var42) {
                    schemaMutator.rollback();
                    throw var42;
                }

                try {
                    schemaMutator.commit();
                } catch (Throwable var40) {
                    log.error("Could not commit transaction [" + transactionId + "] due to storage exception in system-commit", var40);
                    throw var40;
                }
            }

            commitSummary = this.prepareCommit(addedRelations, deletedRelations, hasTxIsolation ? NO_FILTER : NO_SCHEMA_FILTER, mutator, tx, acquireLocks);
            if (commitSummary.hasModifications) {
                String logTxIdentifier = tx.getConfiguration().getLogIdentifier();
                boolean hasSecondaryPersistence = logTxIdentifier != null || commitSummary.has2iModifications;
                if (logTransaction) {
                    txLog.add(txLogHeader.serializePrimary(this.serializer, hasSecondaryPersistence ? LogTxStatus.PRIMARY_SUCCESS : LogTxStatus.COMPLETE_SUCCESS), txLogHeader.getLogKey(), mutator.getTxLogPersistor());
                }

                try {
                    mutator.commitStorage();
                } catch (Throwable var39) {
                    log.error("Could not commit transaction [" + transactionId + "] due to storage exception in commit", var39);
                    throw var39;
                }

                if (hasSecondaryPersistence) {
                    LogTxStatus status = LogTxStatus.SECONDARY_SUCCESS;
                    Map<String, Throwable> indexFailures = ImmutableMap.of();
                    boolean userlogSuccess = true;

                    try {
                        indexFailures = mutator.commitIndexes();
                        if (!((Map)indexFailures).isEmpty()) {
                            status = LogTxStatus.SECONDARY_FAILURE;
                            Iterator var20 = ((Map)indexFailures).entrySet().iterator();

                            while(var20.hasNext()) {
                                java.util.Map.Entry<String, Throwable> entry = (java.util.Map.Entry)var20.next();
                                log.error("Error while committing index mutations for transaction [" + transactionId + "] on index: " + (String)entry.getKey(), (Throwable)entry.getValue());
                            }
                        }

                        if (logTxIdentifier != null) {
                            try {
                                userlogSuccess = false;
                                Log userLog = this.backend.getUserLog(logTxIdentifier);
                                Future<Message> env = userLog.add(txLogHeader.serializeModifications(this.serializer, LogTxStatus.USER_LOG, tx, addedRelations, deletedRelations));
                                if (env.isDone()) {
                                    try {
                                        env.get();
                                    } catch (ExecutionException var37) {
                                        throw var37.getCause();
                                    }
                                }

                                userlogSuccess = true;
                            } catch (Throwable var38) {
                                status = LogTxStatus.SECONDARY_FAILURE;
                                log.error("Could not user-log committed transaction [" + transactionId + "] to " + logTxIdentifier, var38);
                            }
                        }
                    } finally {
                        if (logTransaction) {
                            try {
                                txLog.add(txLogHeader.serializeSecondary(this.serializer, status, (Map)indexFailures, userlogSuccess), txLogHeader.getLogKey());
                            } catch (Throwable var36) {
                                log.error("Could not tx-log secondary persistence status on transaction [" + transactionId + "]", var36);
                            }
                        }

                    }
                } else {
                    mutator.commitIndexes();
                }
            } else {
                mutator.commit();
            }

        } catch (Throwable var43) {
            log.error("Could not commit transaction [" + transactionId + "] due to exception", var43);

            try {
                mutator.rollback();
            } catch (Throwable var35) {
                log.error("Could not roll-back transaction [" + transactionId + "] after failure due to exception", var35);
            }

            if (var43 instanceof RuntimeException) {
                throw (RuntimeException)var43;
            } else {
                throw new JanusGraphException("Unexpected exception", var43);
            }
        }
    }
}

Join janusgraph-dev@lists.lfaidata.foundation to automatically receive all group messages.