///
/// Copyright © 2003-2008 JetBrains s.r.o.
/// You may distribute under the terms of the GNU General Public License, as published by the Free Software Foundation, version 2 (see License.txt in the repository root folder).
///
using System;
using System.Drawing;
using System.Runtime.InteropServices;
using System.Threading;
using System.Collections;
using System.Windows.Forms;
using System35;
using JetBrains.Annotations;
using JetBrains.Interop.WinApi;
using JetBrains.Omea.Containers;
using JetBrains.Omea.OpenAPI;
using JetBrains.Omea.Base;
using JetBrains.Omea.Diagnostics;
using JetBrains.DataStructures;
namespace JetBrains.Omea.AsyncProcessing
{
public delegate void AsyncExceptionHandler( Exception e );
public class AsyncProcessorException : Exception
{
public AsyncProcessorException( Exception innerException )
: base( "Exception raised during async processing", innerException )
{
}
}
/**
* Omea asynchronous processor
*/
public class AsyncProcessor : IAsyncProcessor
{
public AsyncProcessor()
: this( null, true ) {}
public AsyncProcessor( bool startProcessorThread )
: this( null, startProcessorThread ) {}
public AsyncProcessor( AsyncExceptionHandler exceptionHandler, bool startProcessorThread )
{
_empty = true;
_processMessages = false;
_isReenterable = true;
_isThreadStarted = 0;
_idlePeriod = _defaultIdlePeriod;
_jobQueue = new PriorityQueue();
_uniqueJobs = new HashSet( 50 );
_idleJobQueue = new PriorityQueue();
_uniqueIdleJobs = new HashSet( 10 );
_timedJobs = new DateTimePriorityQueue();
_timedJobCounts = new HashMap( 30 );
_startedJobs = new HashMap( 30 );
_startedReenteringJobs = new HashSet();
_exceptionHandler = exceptionHandler;
_awakeningEvent = new ManualResetEvent( false );
_tracer = new Tracer( "AsyncProcessor" );
_currentJob = null;
_processorThread = new Thread( new ThreadStart( ProcessorThread ) );
_processorThread.IsBackground = true;
if( startProcessorThread )
{
StartThread();
}
}
public bool Finished
{
get { return _finished; }
}
public string CurrentJobName
{
get
{
AbstractNamedJob namedJob = _currentJob as AbstractNamedJob;
return ( namedJob == null ) ? string.Empty : namedJob.Name ;
}
}
public int OutstandingJobs
{
get { return _jobQueue.Count; }
}
public bool ProcessMessages
{
get { return _processMessages; }
set { _processMessages = value; }
}
public bool Reenterable
{
get { return _isReenterable; }
set { _isReenterable = value; }
}
#region thread controlling functions
public void StartThread()
{
if( Interlocked.Exchange( ref _isThreadStarted, 1 ) == 0 )
{
_processorThread.Start();
}
}
public virtual void EmployCurrentThread()
{
if( Interlocked.Exchange( ref _isThreadStarted, 1 ) == 0 )
{
_processorThread = Thread.CurrentThread;
ProcessorThread();
}
}
public ThreadPriority ThreadPriority
{
get { return _processorThread.Priority; }
set
{
if( !_finished && _processorThread.IsAlive )
{
try
{
_processorThread.Priority = value;
}
catch( ThreadStateException ) {}
}
}
}
public string ThreadName
{
get { return _processorThread.Name; }
set
{
if( _processorThread.Name == null )
{
_processorThread.Name = value;
}
}
}
public Thread Thread
{
get { return _processorThread; }
}
public virtual bool IsOwnerThread
{
get { return _processorThread == Thread.CurrentThread; }
}
private delegate TimeSpan GetTimeSpanDelegate();
public TimeSpan GetKernelTime()
{
if( !IsOwnerThread )
{
return (TimeSpan) RunUniqueJob( new GetTimeSpanDelegate( GetKernelTime ) );
}
else
{
WindowsAPI.FILETIME ct = new WindowsAPI.FILETIME();
WindowsAPI.FILETIME et = new WindowsAPI.FILETIME();
WindowsAPI.FILETIME kt = new WindowsAPI.FILETIME();
WindowsAPI.FILETIME ut = new WindowsAPI.FILETIME();
WindowsAPI.GetThreadTimes( WindowsAPI.GetCurrentThread(), ref ct, ref et, ref kt, ref ut );
long ticks = kt.dwLowDateTime + ( ( (long) kt.dwHighDateTime ) << 32 );
return new TimeSpan( ticks );
}
}
public TimeSpan GetUserTime()
{
if( !IsOwnerThread )
{
return (TimeSpan) RunUniqueJob( new GetTimeSpanDelegate( GetUserTime ) );
}
else
{
WindowsAPI.FILETIME ct = new WindowsAPI.FILETIME();
WindowsAPI.FILETIME et = new WindowsAPI.FILETIME();
WindowsAPI.FILETIME kt = new WindowsAPI.FILETIME();
WindowsAPI.FILETIME ut = new WindowsAPI.FILETIME();
WindowsAPI.GetThreadTimes( WindowsAPI.GetCurrentThread(), ref ct, ref et, ref kt, ref ut );
long ticks = ut.dwLowDateTime + ( ( (long) ut.dwHighDateTime ) << 32 );
return new TimeSpan( ticks );
}
}
// in milliseconds
public int IdlePeriod
{
get { return _idlePeriod; }
set
{
if( _idlePeriod <= 0 )
{
throw new Exception( "Idle period cannot be less or equal to zero" );
}
_idlePeriod = value;
}
}
public void WaitUntilFinished()
{
if( _isThreadStarted != 0 )
{
if( !Application.MessageLoop )
{
_processorThread.Join();
}
else
{
while( _processorThread.IsAlive && !_processorThread.Join( 100 ) )
{
Application.DoEvents();
}
}
}
}
#endregion
#region asyncprocessor events
public event EventHandler ThreadStarted;
public event EventHandler ThreadFinished;
public event EventHandler FillingEmptyQueue;
public event EventHandler QueueGotEmpty;
public event EventHandler JobStarting
{
add
{
JobStartingDelegate += value;
}
remove
{
JobStartingDelegate -= value;
}
}
public event EventHandler JobFinished
{
add
{
JobFinishedDelegate += value;
}
remove
{
JobFinishedDelegate -= value;
}
}
protected EventHandler JobStartingDelegate;
protected EventHandler JobFinishedDelegate;
public delegate void JobDelegate( object sender, AbstractJob job );
public event JobDelegate JobQueued;
#endregion
#region IAsyncProcessor Members
public bool QueueJob( AbstractJob job )
{
return PushJob( job );
}
public bool QueueJob( Delegate method, params object[] args )
{
return PushJob( new DelegateJob( method, args ) );
}
public bool QueueJob( string name, Delegate method, params object[] args )
{
return PushJob( new DelegateJob( name, method, args ) );
}
public void QueueJob( [NotNull] string name, [NotNull] Action action )
{
PushJob(new ActionJob(name, action));
}
public bool QueueJob([NotNull] string name, [NotNull] object identity, [NotNull] Action action)
{
return PushJob(new ActionJob(name, action, identity));
}
public bool QueueJob( JobPriority priority, AbstractJob job )
{
return PushJob( job, priority );
}
public bool QueueJob( JobPriority priority, Delegate method, params object[] args )
{
return PushJob( new DelegateJob( method, args ), priority );
}
public bool QueueJob( JobPriority priority, string name, Delegate method, params object[] args )
{
return PushJob( new DelegateJob( name, method, args ), priority );
}
public void QueueJob( JobPriority priority, [NotNull] string name, [NotNull] Action action )
{
PushJob( new ActionJob( name, action ), priority );
}
public bool QueueJob( JobPriority priority, [NotNull] string name, [NotNull] object identity, [NotNull] Action action)
{
return PushJob( new ActionJob( name, action, identity ), priority );
}
private delegate void QueueTimedJobDelegate( DateTime when, AbstractJob job );
public void QueueJobAt( DateTime when, AbstractJob job )
{
if( Thread.CurrentThread == _processorThread && _isThreadStarted != 0 )
{
QueueTimedJob( when, job );
}
else
{
if( when <= DateTime.Now )
{
PushJob( job, JobPriority.Immediate );
}
else
{
QueueJob( JobPriority.Immediate, "Queueing timed jobs",
new QueueTimedJobDelegate( QueueTimedJob ), when, job );
}
}
}
public void QueueJobAt( DateTime when, Delegate method, params object[] args )
{
QueueJobAt( when, new DelegateJob( method, args ) );
}
public void QueueJobAt( DateTime when, string name, Delegate method, params object[] args )
{
QueueJobAt( when, new DelegateJob( name, method, args ) );
}
public void QueueJobAt( DateTime when, string name, Action action )
{
QueueJobAt( when, new ActionJob( name, action ) );
}
protected delegate bool QueueIdleJobDelegate( JobPriority priority, AbstractJob job );
public void QueueIdleJob( JobPriority priority, AbstractJob job )
{
/**
* idle processing can be performed only under WinNT or later
* under other platforms, just queue an job
*/
if( _platform != PlatformID.Win32NT )
{
PushJob( job, priority );
}
else
{
if( Thread.CurrentThread == _processorThread && _isThreadStarted != 0 )
{
QueueIdleJobImpl( priority, job );
}
else
{
QueueJob( JobPriority.Immediate, "Queueing idle jobs",
new QueueIdleJobDelegate( QueueIdleJobImpl ), priority, job );
}
}
}
public void QueueIdleJob( AbstractJob job )
{
QueueIdleJob( JobPriority.Normal, job );
}
public void RunJob( AbstractJob job )
{
RunJob( job, true );
}
public object RunJob( Delegate method, params object[] args )
{
DelegateJob job = new DelegateJob( method, args );
job = (DelegateJob) RunJob( job, true );
return ( job != null ) ? job.ReturnValue : null;
}
public object RunJob( string name, Delegate method, params object[] args )
{
DelegateJob job = new DelegateJob( name, method, args );
job = (DelegateJob) RunJob( job, true );
return ( job != null ) ? job.ReturnValue : null;
}
public bool RunJob([NotNull] string name, [NotNull] Action action )
{
var job = new ActionJob(name, action);
return RunJob( job, true ) != null;
}
public void RunUniqueJob( AbstractJob job )
{
RunJob( job, false );
}
public object RunUniqueJob( Delegate method, params object[] args )
{
DelegateJob job = new DelegateJob( method, args );
job = (DelegateJob) RunJob( job, false );
return ( job != null ) ? job.ReturnValue : null;
}
public object RunUniqueJob( string name, Delegate method, params object[] args )
{
DelegateJob job = new DelegateJob( name, method, args );
job = (DelegateJob) RunJob( job, false );
return ( job != null ) ? job.ReturnValue : null;
}
public void CancelJobs( JobFilter filter )
{
_jobsLock.Enter();
try
{
PriorityQueue filteredQueue = new PriorityQueue();
while( _jobQueue.Count > 0 )
{
PriorityQueue.QueueEntry E = _jobQueue.PopEntry();
AbstractJob job = (AbstractJob) E.Value;
if( job != null )
{
if( !filter( job ) )
{
filteredQueue.Push( E.Priority, E.Value );
}
else
{
_uniqueJobs.Remove( job );
ICancelable cjob = job as ICancelable;
if( cjob != null )
{
cjob.OnCancel();
}
}
}
}
_jobQueue = filteredQueue;
}
finally
{
_jobsLock.Exit();
}
}
public void CancelJobs( Delegate method )
{
DelegateJobFilter filter = new DelegateJobFilter( method );
CancelJobs( new JobFilter( filter.DoFilter ) );
}
public void CancelJobs( AbstractJob job )
{
EqualsJobFilter filter = new EqualsJobFilter( job );
CancelJobs( new JobFilter( filter.DoFilter ) );
}
public void CancelJobs()
{
_jobsLock.Enter();
try
{
while( _jobQueue.Count > 0 )
{
PriorityQueue.QueueEntry E = _jobQueue.PopEntry();
AbstractJob job = (AbstractJob) E.Value;
if( job != null )
{
_uniqueJobs.Remove( job );
ICancelable cjob = job as ICancelable;
if( cjob != null )
{
cjob.OnCancel();
}
}
}
}
finally
{
_jobsLock.Exit();
}
}
protected delegate void CancelTimedJobsDelegate( JobFilter filter );
public void CancelTimedJobs( JobFilter filter )
{
if( Thread.CurrentThread == _processorThread && _isThreadStarted != 0 )
{
CancelTimedJobsImpl( filter );
}
else
{
QueueJob( JobPriority.Immediate, "Cancelling timed jobs",
new CancelTimedJobsDelegate( CancelTimedJobsImpl ), filter );
}
}
public void CancelTimedJobs( Delegate method )
{
DelegateJobFilter filter = new DelegateJobFilter( method );
CancelTimedJobs( new JobFilter( filter.DoFilter ) );
}
public void CancelTimedJobs( AbstractJob job )
{
EqualsJobFilter filter = new EqualsJobFilter( job );
CancelTimedJobs( new JobFilter( filter.DoFilter ) );
}
public void QueueEndOfWork()
{
QueueJob( JobPriority.Lowest, new MethodInvoker( EndWork ) );
}
public AsyncExceptionHandler ExceptionHandler
{
get { return _exceptionHandler; }
set { _exceptionHandler = value; }
}
#endregion
#region IDisposable Members
public virtual void Dispose()
{
if( _isThreadStarted != 0 && _processorThread != Thread.CurrentThread )
{
ThreadPriority = ThreadPriority.Normal;
_finished = true;
_awakeningEvent.Set();
if( ProcessMessages )
{
User32Dll.PostThreadMessageW( unchecked((uint)_processorThread.GetHashCode()), (uint)WindowsMessages.WM_QUIT, IntPtr.Zero, IntPtr.Zero );
}
WaitUntilFinished();
}
/**
* force cancel jobs in order to stop all runnable jobs in the queue
*/
CancelJobs();
_awakeningEvent.Close();
}
#endregion
#region implementation details
protected bool PushJob( AbstractJob job )
{
return PushJob( job, JobPriority.Normal );
}
protected virtual bool PushJob( AbstractJob job, JobPriority priority )
{
if( _finished )
{
return false;
}
_jobsLock.Enter();
try
{
if( _uniqueJobs.Contains( job ) )
{
return false;
}
_jobQueue.Push( (int) priority, job );
_uniqueJobs.Add( job );
}
finally
{
_jobsLock.Exit();
}
_awakeningEvent.Set();
if( JobQueued != null )
{
JobQueued( this, job );
}
return true;
}
/**
* decorator for runnable jobs
*/
protected class RunnableJobDecorator: AbstractNamedJob, ICancelable
{
public RunnableJobDecorator( AsyncProcessor processor, AbstractJob internalJob )
{
_processor = processor;
_internalJob = internalJob;
_refCount = 0;
}
public int IncRef()
{
return Interlocked.Increment( ref _refCount );
}
public int DecRef()
{
return Interlocked.Decrement( ref _refCount );
}
protected override void Execute()
{
try
{
_processor.ExecuteJob( _internalJob );
while( !_processor.Finished && _internalJob.NextWaitHandle != null )
{
_processor.DoJobs();
}
}
catch( Exception exc )
{
if( !( exc is ThreadAbortException ) )
{
_exception = exc;
}
}
finally
{
OnCancel();
}
}
public override string Name
{
get
{
string result = "Running job";
AbstractNamedJob internalNamedJob = _internalJob as AbstractNamedJob;
if ( internalNamedJob != null )
{
result += ": " + internalNamedJob.Name;
}
return result;
}
}
public override int GetHashCode()
{
return _internalJob.GetHashCode();
}
public override bool Equals( object obj )
{
AbstractJob right = obj as AbstractJob;
return ( right != null ) && right.Equals( _internalJob );
}
#region ICancelable Members
public void OnCancel()
{
try
{
_jobFinished.Set();
}
catch( Exception exc )
{
_processor._tracer.TraceException( exc );
}
}
#endregion
public AbstractJob _internalJob;
public ManualResetEvent _jobFinished;
public Exception _exception;
protected AsyncProcessor _processor;
protected int _refCount;
}
/**
* returns the instance of job which was actually executed
*/
protected AbstractJob RunJob( AbstractJob job, bool throwExceptionIfEqualJobsDetected )
{
if( _finished )
{
return null;
}
if( _isThreadStarted == 0 || Thread.CurrentThread == _processorThread )
{
ExecuteJob( job );
return job;
}
/**
* If Run operation is invoked from a thread of an AsyncProcessor, then try to
* get AsyncProcessor from the pool
*/
AsyncProcessor procOfCurrentThread = GetProcessorFromPool( Thread.CurrentThread );
if( procOfCurrentThread != null )
{
if( !procOfCurrentThread.Reenterable || procOfCurrentThread.Finished )
{
procOfCurrentThread = null;
}
else
{
if( procOfCurrentThread._numberOfRunJobs == _maxWaitableHandles - 1 )
{
procOfCurrentThread = null;
}
else
{
procOfCurrentThread._numberOfRunJobs++;
}
}
}
RunnableJobDecorator runnable;
ManualResetEvent jobFinished;
_jobsLock.Enter();
try
{
/**
* necessary check to avoid race condition OM-7021
*/
if( _finished )
{
return null;
}
runnable = new RunnableJobDecorator( this, job );
RunnableJobDecorator startedRunnable = _uniqueJobs.GetKey( runnable ) as RunnableJobDecorator;
if( startedRunnable != null )
{
if( throwExceptionIfEqualJobsDetected )
{
throw new Exception( "Attempt to run equal jobs! job.ToString() = " + job.ToString() );
}
runnable = startedRunnable;
jobFinished = runnable._jobFinished;
}
else
{
_uniqueJobs.Add( runnable );
_jobQueue.Push( (int) JobPriority.Immediate, runnable );
jobFinished = new ManualResetEvent( false );
runnable._jobFinished = jobFinished;
_awakeningEvent.Set();
}
runnable.IncRef();
}
finally
{
_jobsLock.Exit();
}
try
{
/**
* if a job is run from an asyncprocessor's thread, then do jobs for it until job finished
*/
if( procOfCurrentThread != null )
{
WaitForSingleObjectJob waitJob = new WaitForSingleObjectJob( jobFinished );
procOfCurrentThread.QueueJob( JobPriority.Immediate, waitJob );
do
{
procOfCurrentThread.DoJobs();
/**
* if processor of current thread is finished we may not to
* wait until the WaitForSingleObjectJob finishes
*/
if( procOfCurrentThread.Finished )
{
jobFinished.WaitOne();
break;
}
}
while( waitJob.NextWaitHandle != null );
procOfCurrentThread._numberOfRunJobs--;
}
else
{
if( !Application.MessageLoop )
{
jobFinished.WaitOne();
}
else
{
IntPtr[] handles = new IntPtr[] { jobFinished.Handle };
MsgWaitForMultipleObjects( handles, 1, WindowsAPI.INFINITE );
}
}
}
finally
{
if( runnable.DecRef() == 0 )
{
jobFinished.Close();
}
}
if( runnable._exception != null )
{
throw new AsyncProcessorException( runnable._exception );
}
return runnable._internalJob;
}
/**
* maximum number of async operations capable to be processed simultaneously
* restriction goes from win32
*/
public const int _maxWaitableHandles = 64;
/**
* period to wait for switching into idle mode (in milliseconds )
*/
public const int _defaultIdlePeriod = 300000; // 5 minutes
protected virtual void ThreadFunction()
{
CallEventDelegatesSafe( ThreadStarted );
if( _restartThread )
{
_finished = false;
}
_restartThread = false;
/**
* Add current thread to processor pool
*/
AddToProcessorPool( Thread.CurrentThread, this );
while( !_finished )
{
_numberOfRunJobs = _reenteringDoJobs = 0;
try
{
DoJobs();
}
catch( ThreadAbortException e )
{
/**
* if ThreadAbortException caught then we try to start the new
* processor thread instead of this one, which is finished
*/
_tracer.TraceException( e );
Thread.ResetAbort();
ThreadPriority priority = ThreadPriority;
string threadName = ThreadName;
_processorThread = new Thread( new ThreadStart( ProcessorThread ) );
_processorThread.IsBackground = true;
ThreadPriority = priority;
ThreadName = threadName;
_isThreadStarted = 0;
_restartThread = _finished = true;
}
catch( Exception e )
{
_tracer.TraceException( e );
if( _exceptionHandler != null )
{
_exceptionHandler( e );
}
}
}
/**
* clean async processor pool from current thread
*/
CleanProcessorPool( Thread.CurrentThread );
CallEventDelegatesSafe( ThreadFinished );
if( _restartThread )
{
StartThread();
}
}
public virtual void DoJobs()
{
if( _finished )
{
return;
}
++_reenteringDoJobs;
try
{
AbstractJob job;
int i;
int timeout;
timeout = ProcessTimedJobs();
// i - ticks until idle mode
i = Timeout.Infinite;
if( _jobQueue.Count == 0 && _idleJobQueue.Count > 0 )
{
i = _idlePeriod - GetIdleDuration();
if( i < 0 )
{
i = 0;
}
}
if( i != Timeout.Infinite && ( timeout == Timeout.Infinite || timeout > i ) )
{
timeout = i;
}
i = WaitEvents( timeout );
if( _finished )
{
return;
}
if( i == WaitHandle.WaitTimeout )
{
/**
* in idle mode execute only one job if any
*/
if( _idleJobQueue.Count > 0 && GetIdleDuration() >= _idlePeriod )
{
job = (AbstractJob)_idleJobQueue.Pop();
_uniqueIdleJobs.Remove( job );
ExecuteJob( job );
}
return;
}
/**
* execute jobs if no timeout occured
*/
try
{
/**
* if we get a signal that one of the earlier started async operations should be continued
*/
if( i > 0 )
{
WaitHandle handle = _handles[ i ];
job = ( AbstractJob ) _startedJobs[ handle ];
_startedJobs.Remove( handle );
ExecuteJob( job );
}
/**
* else we get the awakening signal, so either queue is not empty or we are to finish thread
*/
else
{
if( _empty && FillingEmptyQueue != null )
{
FillingEmptyQueue( this, EventArgs.Empty );
}
job = null;
int count;
_jobsLock.Enter();
try
{
count = _jobQueue.Count;
if( count > 0 )
{
job = (AbstractJob) _jobQueue.Pop();
_uniqueJobs.Remove( job );
--count;
}
}
finally
{
_jobsLock.Exit();
}
while( job != null )
{
ExecuteJob( job );
if( count == 0 || _finished || !_jobsLock.TryEnter() )
{
break;
}
try
{
job = null;
count = _jobQueue.Count;
if( count > 0 )
{
job = (AbstractJob) _jobQueue.Pop();
_uniqueJobs.Remove( job );
--count;
}
}
finally
{
_jobsLock.Exit();
}
}
}
}
finally
{
if( !_finished && _jobQueue.Count == 0 )
{
_jobsLock.Enter();
try
{
if( _empty = _jobQueue.Count == 0 )
{
_awakeningEvent.Reset();
}
}
finally
{
_jobsLock.Exit();
}
_empty = _empty && _reenteringDoJobs == 1 && _startedJobs.Count == 0;
if( _empty && QueueGotEmpty != null )
{
QueueGotEmpty( this, EventArgs.Empty );
}
}
}
}
finally
{
--_reenteringDoJobs;
}
}
/**
* function where an asyncprocessor spends all its time when it's idle
* waiting events scheme depends on number of events and the
* ProcessMessages property value. If ProcessMessages is set to true then
* waiting function processes windows messages by means of Application.DoEvents()
*/
protected int WaitEvents( int timeout )
{
/**
* if we're to wait only for awakening event is set to signaled state
* then use WaitOne() because its performance is much better than WaitAny()
*/
int handlesCount = _startedJobs.Count + 1;
if( handlesCount == 1 && !_processMessages )
{
return ( _awakeningEvent.WaitOne( timeout, false ) ) ? 0 : WaitHandle.WaitTimeout;
}
if( handlesCount >= _maxWaitableHandles )
{
handlesCount = _maxWaitableHandles;
if( _processMessages )
{
--handlesCount;
}
}
_handles[ 0 ] = _awakeningEvent;
IEnumerator mapEnumerator = _startedJobs.GetEnumerator();
for( int i = 1; i < handlesCount; ++i )
{
mapEnumerator.MoveNext();
_handles[ i ] = (WaitHandle)( (HashMap.Entry) mapEnumerator.Current ).Key;
}
int result = ( _processMessages ) ?
MsgWaitForMultipleObjects( handlesCount, timeout ) :
WaitForMultipleObjects( handlesCount, timeout );
if( result < 0 )
{
RemoveInvalidHandles();
result = WaitEvents( timeout );
}
return result;
}
/**
* check for obsolete timed jobs if any, returns number of ticks till next job
* if returned value is zero then at least one obsolete job was executed
*/
protected int ProcessTimedJobs()
{
if( _timedJobs.Count == 0 )
{
return Timeout.Infinite;
}
DateTime nearest = _timedJobs.GetMinimumEntry().Priority;
DateTime now = DateTime.Now;
if( nearest > now )
{
return (int) (( nearest.Ticks - now.Ticks ) / 10000 );
}
do
{
AbstractJob job = (AbstractJob)_timedJobs.Pop();
HashMap.Entry E = _timedJobCounts.GetEntry( job );
if( E != null && ( (int) E.Value ) > 1 )
{
E.Value = (int) E.Value - 1;
}
else
{
if( E != null )
{
_timedJobCounts.Remove( job );
}
ExecuteJob( job );
}
if( _timedJobs.Count == 0 )
{
break;
}
nearest = _timedJobs.GetMinimumEntry().Priority;
} while( nearest <= DateTime.Now );
return 0;
}
/**
* execute all jobs here
*/
protected virtual void ExecuteJob( AbstractJob job )
{
AbstractJob lastExecutedJob = _currentJob;
_currentJob = job;
if( JobStartingDelegate != null )
{
JobStartingDelegate( this, EventArgs.Empty );
}
try
{
ReenteringJob reenteringJob = job as ReenteringJob;
if( reenteringJob != null )
{
ReenteringJob sameJob = (ReenteringJob) _startedReenteringJobs.GetKey( reenteringJob );
if( sameJob == null )
{
reenteringJob._processor = this;
reenteringJob.Interrupted = false;
_startedReenteringJobs.Add( reenteringJob );
}
else
{
sameJob.Interrupted = true;
PushJob( reenteringJob, JobPriority.Lowest );
return;
}
}
MethodInvoker method = job.NextMethod;
job.InvokeAfterWait( method, _nullHandle );
method();
WaitHandle handle = job.NextWaitHandle;
if( handle == _nullHandle )
{
/**
* setting null handle means that job is finished
*/
job.InvokeAfterWait( method, handle = null );
}
if( handle != null )
{
int timeout = job.Timeout;
if( timeout != Timeout.Infinite )
{
QueueTimedJob( DateTime.Now.AddMilliseconds( timeout ),
new DelegateJob( new ProcessTimeoutOfStartedJobDelegate(
ProcessTimeoutOfStartedJob ), new object[] { handle } ) );
}
_startedJobs.Add( handle, job );
}
else
{
if( reenteringJob != null )
{
_startedReenteringJobs.Remove( job );
}
}
}
catch( Exception exc )
{
exc = Utils.GetMostInnerException( exc );
if( !( exc is ThreadAbortException ) )
{
throw new AsyncProcessorException( exc );
}
}
finally
{
if( JobFinishedDelegate != null )
{
JobFinishedDelegate( this, EventArgs.Empty );
}
_currentJob = lastExecutedJob;
}
}
private delegate void ProcessTimeoutOfStartedJobDelegate( WaitHandle handle );
private void ProcessTimeoutOfStartedJob( WaitHandle handle )
{
HashMap.Entry entry = _startedJobs.GetEntry( handle );
if( entry != null )
{
( (AbstractJob) entry.Value ).FireTimeout();
_startedJobs.Remove( handle );
}
}
private void QueueTimedJob( DateTime when, AbstractJob job )
{
_timedJobs.Push( when, job );
HashMap.Entry E = _timedJobCounts.GetEntry( job );
if( E == null )
{
_timedJobCounts.Add( job, 1 );
}
else
{
E.Value = (int) E.Value + 1;
}
}
private void CancelTimedJobsImpl( JobFilter filter )
{
_timedJobCounts.Clear();
DateTimePriorityQueue filteredQueue = new DateTimePriorityQueue();
while( _timedJobs.Count > 0 )
{
DateTimePriorityQueue.QueueEntry E = _timedJobs.PopEntry();
AbstractJob job = (AbstractJob) E.Value;
if( filter( job ) )
{
ICancelable cjob = job as ICancelable;
if( cjob != null )
{
cjob.OnCancel();
}
}
else
{
DateTime when = E.Priority;
filteredQueue.Push( when, job );
HashMap.Entry Entry = _timedJobCounts.GetEntry( job );
if( Entry == null )
{
_timedJobCounts.Add( job, 1 );
}
else
{
Entry.Value = (int) Entry.Value + 1;
}
}
}
_timedJobs = filteredQueue;
}
private bool QueueIdleJobImpl( JobPriority priority, AbstractJob job )
{
if( _uniqueIdleJobs.Contains( job ) )
{
return false;
}
_idleJobQueue.Push( (int) priority, job );
_uniqueIdleJobs.Add( job );
return true;
}
/**
* gets idle duration at the moment
*/
private static int GetIdleDuration()
{
if( _platform == PlatformID.Win32NT )
{
LASTINPUTINFO lii = new LASTINPUTINFO();
lii.cbSize = 8;
WindowsAPI.GetLastInputInfo( ref lii );
return (int)( Kernel32Dll.GetTickCount() - lii.dwTime );
}
else
{
if( Cursor.Position != _lastCursorPos )
{
_lastCursorPos = Cursor.Position;
_firstNoUserActivityTick = Kernel32Dll.GetTickCount();
return 0;
}
return (int)( Kernel32Dll.GetTickCount() - _firstNoUserActivityTick );
}
}
/**
* Class-scope wait functions wait for specified number of handles
* stored in the _handles array
*/
private const uint ERROR_INVALID_HANDLE = 6;
///
/// /// Waits for multiple objects in the _handles array.
///
/// Number of of waitable handles.
/// Value of timeout.
/// WaitHandle.WaitTimeout -- timeout, less than zero -- one of the handles is invalid,
/// otherwise -- index of signaled handle.
protected int WaitForMultipleObjects( int handlesCount, int timeout )
{
for( int i = 0; i < handlesCount; ++i )
{
_winHandles[ i ] = _handles[ i ].Handle;
}
uint waitResult = InteropWinApi.WaitForMultipleObjects(
(uint) handlesCount, _winHandles, false, (uint) timeout );
if( waitResult == WindowsAPI.WAIT_TIMEOUT )
{
return WaitHandle.WaitTimeout;
}
waitResult -= WindowsAPI.WAIT_OBJECT_0;
if( waitResult < handlesCount )
{
return (int) waitResult;
}
int error = Marshal.GetLastWin32Error();
if( error != ERROR_INVALID_HANDLE )
{
throw new Exception( "WaitForMultipleObjects failed. Error code: " + error );
}
return -1;
}
protected int MsgWaitForMultipleObjects( int handlesCount, int timeout )
{
for( int i = 0; i < handlesCount; ++i )
{
_winHandles[ i ] = _handles[ i ].Handle;
}
return MsgWaitForMultipleObjects( _winHandles, handlesCount, (uint) timeout );
}
///
/// Waits for multiple objects processing windows messages if any.
///
/// Array of handles.
/// Number of handles.
/// Timeout to wait.
/// WaitHandle.WaitTimeout -- timeout, less than zero -- one of the handles is invalid,
/// otherwise -- index of signaled handle.
protected static int MsgWaitForMultipleObjects( IntPtr[] handles, int handlesCount, uint timeout )
{
for( ; ; )
{
uint startWait = Kernel32Dll.GetTickCount();
uint waitResult = InteropWinApi.MsgWaitForMultipleObjectsEx(
(uint) handlesCount, handles, timeout, InteropWinApi.QS_ALLINPUT, InteropWinApi.MWMO_INPUTAVAILABLE );
if( waitResult == WindowsAPI.WAIT_FAILED )
{
int error = Marshal.GetLastWin32Error();
if( error != ERROR_INVALID_HANDLE )
{
throw new Exception( "MsgWaitForMultipleObjects failed. Error code: " + error );
}
return -1;
}
if( waitResult == WindowsAPI.WAIT_TIMEOUT )
{
return WaitHandle.WaitTimeout;
}
waitResult -= WindowsAPI.WAIT_OBJECT_0;
if( waitResult < handlesCount )
{
return (int) waitResult;
}
Application.DoEvents();
uint spentTicks = Kernel32Dll.GetTickCount() - startWait;
timeout = ( spentTicks >= timeout ) ? 0 : timeout - spentTicks;
}
}
private void RemoveInvalidHandles()
{
IntPtr[] oneHandle = new IntPtr[ 1 ];
do
{
object handle2Remove = null;
foreach( HashMap.Entry e in _startedJobs )
{
oneHandle[ 0 ] = ( (WaitHandle) e.Key ).Handle;
if( MsgWaitForMultipleObjects( oneHandle, 1, 0 ) < 0 )
{
// the handle is invalid!
handle2Remove = e.Key;
break;
}
}
if( handle2Remove != null )
{
_startedJobs.Remove( handle2Remove );
continue;
}
} while( false );
}
private void CallEventDelegatesSafe( EventHandler handler )
{
if( handler != null )
{
try
{
handler( this, EventArgs.Empty );
}
catch( Exception e )
{
_tracer.TraceException( e );
if( _exceptionHandler != null )
{
_exceptionHandler( e );
}
}
}
}
/**
* Working with pool of AsyncProcesors
*/
protected static void AddToProcessorPool( Thread thread, AsyncProcessor processor )
{
lock( _processorPool )
{
if( _processorPool.Contains( thread ) )
{
throw new InvalidOperationException(
"An async processor is already registered for thread (id=" + thread.GetHashCode() + ")" );
}
_processorPool[ thread ] = processor;
}
}
protected static void CleanProcessorPool( Thread thread )
{
lock( _processorPool )
{
_processorPool.Remove( thread );
}
}
protected static AsyncProcessor GetProcessorFromPool( Thread thread )
{
lock( _processorPool )
{
return (AsyncProcessor) _processorPool[ thread ];
}
}
public static AsyncProcessor[] GetAllPooledProcessors()
{
ArrayList procs = new ArrayList();
lock( _processorPool )
{
foreach( HashMap.Entry e in _processorPool )
{
procs.Add( e.Value );
}
}
return (AsyncProcessor[]) procs.ToArray( typeof( AsyncProcessor ) );
}
private void EndWork()
{
_finished = true;
}
/**
* ProcessorThread is instance function used for thread creation in contructor
* To change behaviour of AsyncProcessor, override ThreadFunction()
*/
private void ProcessorThread()
{
ThreadFunction();
}
protected bool _finished;
protected bool _restartThread;
protected bool _empty;
protected bool _processMessages;
protected bool _isReenterable;
protected int _isThreadStarted;
protected int _idlePeriod;
protected int _numberOfRunJobs;
protected int _reenteringDoJobs;
protected PriorityQueue _jobQueue;
protected HashSet _uniqueJobs;
protected PriorityQueue _idleJobQueue;
protected HashSet _uniqueIdleJobs;
protected DateTimePriorityQueue _timedJobs;
protected HashMap _timedJobCounts;
protected HashMap _startedJobs;
protected HashSet _startedReenteringJobs;
protected AsyncExceptionHandler _exceptionHandler;
protected ManualResetEvent _awakeningEvent;
protected Thread _processorThread;
protected Tracer _tracer;
protected EventHandler _threadStarted;
protected EventHandler _threadFinished;
protected EventHandler _fillingEmptyQueue;
protected EventHandler _queueGotEmpty;
protected AbstractJob _currentJob;
private SpinWaitLock _jobsLock = new SpinWaitLock();
private WaitHandle[] _handles = new WaitHandle[ _maxWaitableHandles ];
private IntPtr[] _winHandles = new IntPtr[ _maxWaitableHandles ];
private static HashMap _processorPool = new HashMap();
private static PlatformID _platform = Environment.OSVersion.Platform;
private static uint _firstNoUserActivityTick = Kernel32Dll.GetTickCount();
private static Point _lastCursorPos = Cursor.Position;
internal static WaitHandle _nullHandle = new Mutex();
#endregion
///
/// AsyncProcessor Private Interop.
///
internal static class InteropWinApi
{
[DllImport("user32", CharSet = CharSet.Auto, SetLastError = true)]
public static extern uint MsgWaitForMultipleObjectsEx(uint nCount, IntPtr[] pHandles, uint dwMilliseconds, uint dwWakeMask, uint dwFlags);
[DllImport("kernel32", CharSet = CharSet.Auto, SetLastError = true)]
public static extern uint WaitForMultipleObjects(uint nCount, IntPtr[] pHandles, bool fWaitAll, uint dwMilliseconds);
private const uint QS_KEY = 0x1;
private const uint QS_MOUSEMOVE = 0x2;
private const uint QS_MOUSEBUTTON = 0x4;
private const uint QS_POSTMESSAGE = 0x8;
private const uint QS_TIMER = 0x10;
private const uint QS_PAINT = 0x20;
private const uint QS_SENDMESSAGE = 0x40;
private const uint QS_HOTKEY = 0x80;
private const uint QS_RAWINPUT = 0x400;
internal const uint QS_ALLINPUT = (((QS_MOUSEMOVE | QS_MOUSEBUTTON) | QS_KEY | QS_RAWINPUT) | QS_POSTMESSAGE | QS_TIMER | QS_PAINT | QS_HOTKEY | QS_SENDMESSAGE);
internal const uint MWMO_INPUTAVAILABLE = 4;
}
}
internal class EqualsJobFilter
{
private AbstractJob _job;
internal EqualsJobFilter( AbstractJob job )
{
_job = job;
}
public bool DoFilter( AbstractJob job )
{
return _job.Equals( job );
}
}
}