Skip to content

Commit

Permalink
dangling transaction fixes in unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
s4ke committed Jul 28, 2015
1 parent bfacdd7 commit 82273b5
Show file tree
Hide file tree
Showing 10 changed files with 397 additions and 202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,17 @@ public void postInsert(DescriptorEvent event) {
session = ((UnitOfWork) session).getParent();
}
Transaction tx = EclipseLinkUpdateSource.this.transactions.get( session );
EclipseLinkUpdateSource.this.indexUpdater.index( entity, tx );
if(tx == null) {
tx = new Transaction();
try {
EclipseLinkUpdateSource.this.indexUpdater.index( entity, tx );
tx.commit();
} catch(Exception e) {
tx.rollback();
}
} else {
EclipseLinkUpdateSource.this.indexUpdater.index( entity, tx );
}
}
}

Expand All @@ -105,7 +115,17 @@ public void postUpdate(DescriptorEvent event) {
session = ((UnitOfWork) session).getParent();
}
Transaction tx = EclipseLinkUpdateSource.this.transactions.get( session );
EclipseLinkUpdateSource.this.indexUpdater.update( entity, tx );
if(tx == null) {
tx = new Transaction();
try {
EclipseLinkUpdateSource.this.indexUpdater.update( entity, tx );
tx.commit();
} catch(Exception e) {
tx.rollback();
}
} else {
EclipseLinkUpdateSource.this.indexUpdater.update( entity, tx );
}
}
}

Expand All @@ -127,39 +147,57 @@ public void postDelete(DescriptorEvent event) {
}
final Session session = tmp;
Transaction tx = EclipseLinkUpdateSource.this.transactions.get( session );
List<Class<?>> inIndexOf = EclipseLinkUpdateSource.this.containedInIndexOf.get( entityClass );
if ( inIndexOf.size() > 0 ) {
//hack, but works
RehashedTypeMetadata metadata = EclipseLinkUpdateSource.this.rehashedTypeMetadataPerIndexRoot.get(
inIndexOf.get( 0 )
);
XProperty idProperty = metadata.getIdPropertyAccessorForType().get( entityClass );
Object id = idProperty.invoke( entity );
EclipseLinkUpdateSource.this.indexUpdater.delete(
entityClass, inIndexOf, id, new EntityProvider() {

@Override
public Object get(Class<?> entityClass, Object id, Map<String, String> hints) {
ReadObjectQuery nativeQuery = new ReadObjectQuery();
nativeQuery.setReferenceClass( entityClass );
nativeQuery.setSelectionId( id );
nativeQuery.setCacheUsage( ObjectLevelReadQuery.DoNotCheckCache );
Object original = session.executeQuery( nativeQuery );
return original;
}

@Override
public List getBatch(Class<?> entityClass, List<Object> id, Map<String, String> hints) {
throw new AssertionFailure( "normally not used in IndexUpdater" );
}

@Override
public void close() throws IOException {
//no-op
}

}, tx
);
boolean createdOwnTx = false;
if(tx == null) {
tx = new Transaction();
createdOwnTx = true;
}
try {
List<Class<?>> inIndexOf = EclipseLinkUpdateSource.this.containedInIndexOf.get( entityClass );
if ( inIndexOf.size() > 0 ) {
//hack, but works
RehashedTypeMetadata metadata = EclipseLinkUpdateSource.this.rehashedTypeMetadataPerIndexRoot.get(
inIndexOf.get( 0 )
);
XProperty idProperty = metadata.getIdPropertyAccessorForType().get( entityClass );
Object id = idProperty.invoke( entity );
EclipseLinkUpdateSource.this.indexUpdater.delete(
entityClass, inIndexOf, id, new EntityProvider() {

@Override
public Object get(Class<?> entityClass, Object id, Map<String, String> hints) {
ReadObjectQuery nativeQuery = new ReadObjectQuery();
nativeQuery.setReferenceClass( entityClass );
nativeQuery.setSelectionId( id );
nativeQuery.setCacheUsage( ObjectLevelReadQuery.DoNotCheckCache );
Object original = session.executeQuery( nativeQuery );
return original;
}

@Override
public List getBatch(
Class<?> entityClass,
List<Object> id,
Map<String, String> hints) {
throw new AssertionFailure( "normally not used in IndexUpdater" );
}

@Override
public void close() throws IOException {
//no-op
}

}, tx
);
}
if(createdOwnTx) {
tx.commit();
}
} catch(Exception e) {
if(createdOwnTx) {
tx.rollback();
}
throw e;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@
import javax.transaction.Synchronization;
import javax.transaction.TransactionManager;
import java.io.Serializable;
import java.util.logging.Logger;

import org.hibernate.HibernateException;
import org.hibernate.action.spi.AfterTransactionCompletionProcess;
import org.hibernate.action.spi.BeforeTransactionCompletionProcess;
import org.hibernate.engine.spi.ActionQueue;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.event.service.spi.EventListenerRegistry;
import org.hibernate.event.spi.EventSource;
import org.hibernate.event.spi.EventType;
import org.hibernate.event.spi.FlushEventListener;
import org.hibernate.search.backend.TransactionContext;
import org.hibernate.search.util.logging.impl.Log;
import org.hibernate.search.util.logging.impl.LoggerFactory;
Expand All @@ -32,7 +36,7 @@
*/
public class EventSourceTransactionContext implements TransactionContext, Serializable {

private static final Log log = LoggerFactory.make();
private static final Logger LOGGER = Logger.getLogger( EventSourceTransactionContext.class.getName() );

private final EventSource eventSource;

Expand All @@ -43,6 +47,9 @@ public class EventSourceTransactionContext implements TransactionContext, Serial
private boolean realTxInProgress = false;
private boolean realTxInProgressInitialized = false;

//this transient is required to break recursive serialization
private transient HibernateUpdateSource flushListener;

public EventSourceTransactionContext(EventSource eventSource) {
this.eventSource = eventSource;
}
Expand Down Expand Up @@ -90,19 +97,20 @@ public void registerSynchronization(Synchronization synchronization) {
actionQueue.registerProcess( new DelegateToSynchronizationOnAfterTx( synchronization ) );
}
//FIXME: should we support flush events?
//else {
// //registerSynchronization is only called if isRealTransactionInProgress or if
// // a flushListener was found; still we might need to find the listener again
// // as it might have been cleared by serialization (is transient).
// flushListener = getIndexWorkFlushEventListener();
// if ( flushListener != null ) {
// flushListener.addSynchronization( eventSource, synchronization );
// }
// else {
// //shouldn't happen if the code about serialization is fine:
// throw new SearchException( "AssertionFailure: flushListener not registered any more." );
// }
//}
else {
//registerSynchronization is only called if isRealTransactionInProgress or if
// a flushListener was found; still we might need to find the listener again
// as it might have been cleared by serialization (is transient).
flushListener = getIndexWorkFlushEventListener();
if ( flushListener != null ) {
flushListener.addSynchronization( eventSource, synchronization );
}
else {
//shouldn't happen if the code about serialization is fine:
//
// throw new SearchException( "AssertionFailure: flushListener not registered any more." );
}
}
}

private boolean isLocalTransaction() {
Expand All @@ -120,21 +128,21 @@ private <T extends Service> T getService(Class<T> serviceClass) {
return eventSource.getFactory().getServiceRegistry().getService( serviceClass );
}

//private FullTextIndexEventListener getIndexWorkFlushEventListener() {
// if ( this.flushListener != null ) {
// //for the "transient" case: might have been nullified.
// return flushListener;
// }
// final Iterable<FlushEventListener> listeners = getService( EventListenerRegistry.class )
// .getEventListenerGroup( EventType.FLUSH ).listeners();
// for ( FlushEventListener listener : listeners ) {
// if ( FullTextIndexEventListener.class.isAssignableFrom( listener.getClass() ) ) {
// return (FullTextIndexEventListener) listener;
// }
// }
// log.debug( "FullTextIndexEventListener was not registered as FlushEventListener" );
// return null;
//}
private HibernateUpdateSource getIndexWorkFlushEventListener() {
if ( this.flushListener != null ) {
//for the "transient" case: might have been nullified.
return flushListener;
}
final Iterable<FlushEventListener> listeners = getService( EventListenerRegistry.class )
.getEventListenerGroup( EventType.FLUSH ).listeners();
for ( FlushEventListener listener : listeners ) {
if ( HibernateUpdateSource.class.isAssignableFrom( listener.getClass() ) ) {
return (HibernateUpdateSource) listener;
}
}
LOGGER.fine( "FullTextIndexEventListener was not registered as FlushEventListener" );
return null;
}

//The code is not really fitting the method name;
//(unless you consider a flush as a mini-transaction)
Expand All @@ -143,8 +151,8 @@ private <T extends Service> T getService(Class<T> serviceClass) {
public boolean isTransactionInProgress() {
// either it is a real transaction, or if we are capable to manage this in the IndexWorkFlushEventListener
return
//getIndexWorkFlushEventListener() != null ||
isRealTransactionInProgress();
getIndexWorkFlushEventListener() != null ||
isRealTransactionInProgress();
}

private boolean isRealTransactionInProgress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.hibernate.search.genericjpa.db.events.hibernate.impl;

import javax.transaction.Status;
import javax.transaction.Synchronization;
import java.io.IOException;
import java.io.ObjectInputStream;
Expand All @@ -15,11 +16,15 @@
import java.util.Map;
import java.util.logging.Logger;

import org.hibernate.HibernateException;
import org.hibernate.Session;
import org.hibernate.collection.spi.PersistentCollection;
import org.hibernate.engine.spi.EntityEntry;
import org.hibernate.event.spi.AbstractCollectionEvent;
import org.hibernate.event.spi.AbstractEvent;
import org.hibernate.event.spi.EventSource;
import org.hibernate.event.spi.FlushEvent;
import org.hibernate.event.spi.FlushEventListener;
import org.hibernate.event.spi.PostCollectionRecreateEvent;
import org.hibernate.event.spi.PostCollectionRecreateEventListener;
import org.hibernate.event.spi.PostCollectionRemoveEvent;
Expand Down Expand Up @@ -56,11 +61,21 @@
public class HibernateUpdateSource implements SynchronizedUpdateSource, PostDeleteEventListener,
PostInsertEventListener, PostUpdateEventListener,
PostCollectionRecreateEventListener, PostCollectionRemoveEventListener,
PostCollectionUpdateEventListener,
PostCollectionUpdateEventListener, FlushEventListener,
Serializable {

private static final Logger LOGGER = Logger.getLogger( HibernateUpdateSource.class.getName() );

//only used by the FullTextIndexEventListener instance playing in the FlushEventListener role.
// transient because it's not serializable (and state doesn't need to live longer than a flush).
// final because its initialization should be published to other threads.
// ! update the readObject() method in case of name changes !
// make sure the Synchronization doesn't contain references to Session, otherwise we'll leak memory.
private final transient Map<Session, Synchronization> flushSynch = Maps.createIdentityWeakKeyConcurrentMap(
64,
32
);

private boolean disabled = false;
private boolean skipDirtyChecks = true;
private ExtendedSearchIntegrator extendedIntegrator;
Expand Down Expand Up @@ -168,7 +183,8 @@ public void initialize(ExtendedSearchIntegrator extendedIntegrator) {
}
}

protected void processWork(Object entity,
protected void processWork(
Object entity,
Serializable id,
WorkType workType,
AbstractEvent event,
Expand Down Expand Up @@ -275,4 +291,33 @@ public boolean requiresPostCommitHanding(EntityPersister persister) {
public void close() {

}

@Override
public void onFlush(FlushEvent event) throws HibernateException {
Session session = event.getSession();
Synchronization synchronization = flushSynch.get( session );
if ( synchronization != null ) {
//first cleanup
flushSynch.remove( session );
LOGGER.fine( "flush event causing index update out of transaction" );
synchronization.beforeCompletion();
synchronization.afterCompletion( Status.STATUS_COMMITTED );
}
}

/**
* Adds a synchronization to be performed in the onFlush method;
* should only be used as workaround for the case a flush is happening
* out of transaction.
* Warning: if the synchronization contains a hard reference
* to the Session proper cleanup is not guaranteed and memory leaks
* will happen.
*
* @param eventSource should be the Session doing the flush
* @param synchronization the synchronisation instance
*/
public void addSynchronization(EventSource eventSource, Synchronization synchronization) {
this.flushSynch.put( eventSource, synchronization );
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -440,26 +440,31 @@ public void updateEvent(List<UpdateEventInfo> arg0) {
);

updateSource.start();
Sleep.sleep(
100_000, () -> {
tx.begin();
try {
return em.createNativeQuery(
"SELECT * FROM " + this.triggerSource.getDelimitedIdentifierToken() + "PlaceSorcererUpdatesHsearch" + this.triggerSource
.getDelimitedIdentifierToken()
)
.getResultList()
.size() == 0;
}
finally {
tx.commit();
}
},
100, ""
);
try {
Sleep.sleep(
100_000, () -> {
tx.begin();
try {
return em.createNativeQuery(
"SELECT * FROM " + this.triggerSource.getDelimitedIdentifierToken() + "PlaceSorcererUpdatesHsearch" + this.triggerSource
.getDelimitedIdentifierToken()
)
.getResultList()
.size() == 0;
}
finally {
tx.commit();
}
},
100, ""
);

if ( exceptionString != null ) {
fail( exceptionString );
if ( exceptionString != null ) {
fail( exceptionString );
}
}
finally {
updateSource.stop();
}
}
finally {
Expand Down
Loading

0 comments on commit 82273b5

Please sign in to comment.