Home > Async Job, Dynamics CRM 2011, Dynamics CRM Online > Dynamics CRM: Awaiting for async job completion

Dynamics CRM: Awaiting for async job completion

September 27th, 2013 Leave a comment Go to comments

    Having initiated an async job through the code, it’s often needed to wait until the job is completed. A very simple method doing that could be found in SDK samples provided by Microsoft. The WaitForAsyncJobCompletion method is contained at SDK\SampleCode\CS\DataManagement\DataImport\BulkImportHelper.cs and used for awaiting of completion of async jobs participating in a bulk data import:

public static void WaitForAsyncJobCompletion(OrganizationServiceProxy serviceProxy, 
                                                            Guid asyncJobId)
{
	ColumnSet cs = new ColumnSet("statecode", "statuscode");
	AsyncOperation asyncjob = 
             (AsyncOperation)serviceProxy.Retrieve("asyncoperation", asyncJobId, cs);

	int retryCount = 100;

	while (asyncjob.StateCode.Value != AsyncOperationState.Completed && retryCount > 0)
	{
		asyncjob = (AsyncOperation)serviceProxy.Retrieve("asyncoperation", asyncJobId, cs);
		System.Threading.Thread.Sleep(2000);
		retryCount--;
		Console.WriteLine("Async operation state is " + asyncjob.StateCode.Value.ToString());
	}

	Console.WriteLine("Async job is " + asyncjob.StateCode.Value.ToString() + 
		" with status " + ((asyncoperation_statuscode)asyncjob.StatusCode.Value).ToString());
}

This method is acceptable, but, as for me, it has some disadvantages. First of all, it’s better to have adjustable retryCount and sleep interval, so that to make them more suitable for each particular job. For example, if we know that an operation takes a very long time, we would extend the sleep interval and decrease max number of retries to minimize CPU time spent on checking of the job status. We could even make the waiting more adaptive and extend the sleep interval gradually: the more idle retries we have, the bigger the sleep interval gets. Secondly, the WaitForAsyncJobCompletion doesn’t analyze thoroughly the current state (and status) of the job. The job could be in Suspended or Paused state, in both cases, I think, it’s better not to count such retry as the job isn’t running at that moment. In addition, I would like to get some intermediate state on each iteration of the while-statement as it would allow tracing the job state changing in the time.

Based on the above weaknesses and wishes I’ve implemented my own WaitForAsyncJobCompletion method listed below:

#define WRITE_TRACE_INFO

using System;
using System.Threading;
using Microsoft.Xrm.Sdk.Client;
using Microsoft.Xrm.Sdk.Query;
...
public static AsyncJobState WaitForAsyncJobCompletion(OrganizationServiceProxy proxy, 
                   AsyncJobAwaiting asyncJobAwaiting)
{
	var state = new AsyncJobState();

	#if WRITE_TRACE_INFO
	WriteInfo(string.Format("Waiting for async job (Id: {0}) completion ...", 
                   asyncJobAwaiting.AsyncJobId));
	#endif

	var columnSet = new ColumnSet("statecode", "statuscode");

	state.AsyncJobId        = asyncJobAwaiting.AsyncJobId;
	state.CurrentRetryCount = asyncJobAwaiting.Params.RetryCount;
	state.CurrentTimeout    = asyncJobAwaiting.Params.StartTimeout;

	do
	{
		Thread.Sleep(state.CurrentTimeout);

		var asyncJob = 
      (AsyncOperation)proxy.Retrieve("asyncoperation", asyncJobAwaiting.AsyncJobId, columnSet);

		state.CurrentState  = asyncJob.StateCode;
		state.CurrentStatus = asyncJob.StatusCode != null ? 
          (asyncoperation_statuscode)asyncJob.StatusCode.Value : (asyncoperation_statuscode?)null;
		
		#if WRITE_TRACE_INFO
		if (state.IsSuspended)
			WriteInfo(string.Format("Async job (Id: {0}) is suspended", asyncJobAwaiting.AsyncJobId));
		if (state.IsPaused)
			WriteInfo(string.Format("Async job (Id: {0}) is paused", asyncJobAwaiting.AsyncJobId));
		if (asyncJob.StateCode != null)
			WriteInfo(string.Format("Async job (Id: {0}) state is {1} {2}", asyncJobAwaiting.AsyncJobId, asyncJob.StateCode.Value, (asyncJob.StatusCode != null ? "(Status: " + (asyncoperation_statuscode)asyncJob.StatusCode.Value + ")" : string.Empty)));
		#endif

		asyncJobAwaiting.FireOnProgress(state);

		if (asyncJobAwaiting.Params.TimeoutStep != null && 
            asyncJobAwaiting.Params.EndTimeout != null && 
            state.CurrentTimeout < asyncJobAwaiting.Params.EndTimeout)
			  state.CurrentTimeout += asyncJobAwaiting.Params.TimeoutStep.Value;

		if (asyncJobAwaiting.Params.WaitForever)
			continue;
		if (state.IsSuspended && asyncJobAwaiting.Params.DoNotCountIfSuspended)
			continue;
		if (state.IsPaused && asyncJobAwaiting.Params.DoNotCountIfPaused)
			continue;

		state.CurrentRetryCount--;

	} while (!state.IsCompleted && state.CurrentRetryCount > 0);

	return state;
}

public static void WriteInfo(string message)
{
	Console.WriteLine(message);
}

Note that the first thing I do in the while-statement is the sleeping for the current timeout value as I think It’s pointless to examine an async job for completion right after it has been created.

As you can see, this implementation is still quite simple, however it depends on a few classes referred to as parameters of waiting and current state of the target job. The diagram below depicts these assistant classes and their dependencies. If you wonder why I use inheritance and produced so many classes for a simple async job awaiting I can just say in excuse of this that the base classes are employed for keeping track of other operations as well (e.g. Bulk Delete operations).

Async Job Awaiting Class Diagram

The source code of the classes along with some explanations are shown below.

/// <summary>
/// Represents the awaiting parameters
/// </summary>
public class AsyncOperationAwaitingParams
{
	private const int DefaultTimeout    = 2000;
	private const int DefaultRetryCount = 100;

	private int  _startTimeout          = DefaultTimeout;
	private int  _retryCount            = DefaultRetryCount;
	private bool _doNotCountIfSuspended = true;
	private bool _doNotCountIfPaused    = true;

	/// <summary>
	/// If set, specifies the value to be added to the sleep timeout on each iteration
	/// </summary>
	public int? TimeoutStep { get; set; }
	/// <summary>
	/// If set, specifies the top limit of the sleep timeout
	/// </summary>
	public int? EndTimeout  { get; set; }

	/// <summary>
	/// Indicates if we need to wait for the operation completeness eternally
	/// </summary>
	public bool WaitForever { get; set; }

	/// <summary>
	/// Specifies the initial and maximal number of iterations
	/// </summary>
	public int RetryCount
	{
		get { return _retryCount; }
		set { _retryCount = value; }
	}
	/// <summary>
	/// Specifies the initial sleep timeout
	/// </summary>
	public int StartTimeout
	{
		get { return _startTimeout; }
		set { _startTimeout = value; }
	}
	/// <summary>
	/// Indicates if an iteration counts when the operation is suspended
	/// </summary>
	public bool DoNotCountIfSuspended
	{
		get { return _doNotCountIfSuspended; }
		set { _doNotCountIfSuspended = value; }
	}
	/// <summary>
	/// Indicates if an iteration counts when the operation is paused
	/// </summary>
	public bool DoNotCountIfPaused
	{
		get { return _doNotCountIfPaused; }
		set { _doNotCountIfPaused = value; }
	}
}

I added the WaitForever option just in case. However, I recommend avoiding the use of it as it’s better to play with RetryCount and sleep timeouts (EndTimeout, TimeoutStep, StartTimeout) and find the most suitable values for them for each specific type of operations.

/// <summary>
/// Comprises the awaiting parameters and id of the operation to be tracked. 
/// Provides the OnProgress event supposed to be fired on each iteration of awaiting process.
/// </summary>
/// <typeparam name="TState">Represents a type of the operation state</typeparam>
/// <typeparam name="TParams">Represents a type of the awaiting parameters</typeparam>
public abstract class AsyncOperationAwaiting<TState, TParams>
{
	protected readonly Guid    _asyncOperationId;
	protected readonly TParams _asyncOperationAwaitingParams;

	/// <summary>
	/// Represents the awaiting parameters
	/// </summary>
	public TParams Params
	{
		get { return _asyncOperationAwaitingParams; }
	}

	/// <summary>
	/// Event to fire and get operation state on each iteration
	/// </summary>
	public event EventHandler<TState> OnProgress;

	public void FireOnProgress(TState state)
	{
		if (OnProgress != null)
			OnProgress(this, state);
	}

	protected AsyncOperationAwaiting(Guid asyncOperationId, TParams asyncOperationAwaitingParams)
	{
		_asyncOperationId             = asyncOperationId;
		_asyncOperationAwaitingParams = asyncOperationAwaitingParams;
	}
}

The descendants of the AsyncOperationAwaiting are supposed to supply parameters for more specific operations like async jobs, Bulk Deletes and so on.

/// <summary>
/// Represents parameters for tracking of an async job
/// </summary>
public class AsyncJobAwaiting : 
               AsyncOperationAwaiting<AsyncJobState, AsyncOperationAwaitingParams>
{
	/// <summary>
	/// Id of the async job
	/// </summary>
	public Guid AsyncJobId
	{
		get { return _asyncOperationId; }
	}
	
	public AsyncJobAwaiting(Guid asyncJobId, AsyncOperationAwaitingParams asyncJobAwaitingParams) : 
        base(asyncJobId, asyncJobAwaitingParams)
	{
	}
}

The AsyncJobAwaiting supplies the awaiting parameters for async jobs. An instance of the AsyncJobAwaiting class is passed to the WaitForAsyncJobCompletion method.

/// <summary>
/// Represents current state of an operation
/// </summary>
/// <typeparam name="TState">Represents a type of the operation state</typeparam>
/// <typeparam name="TStatus">Represents a type of the operation status</typeparam>
[Serializable]
public abstract class AsyncOperationState<TState, TStatus>
{
	/// <summary>
	/// Indicates how many retries remain
	/// </summary>
	public int  CurrentRetryCount    { get; set; }
	/// <summary>
	/// Indicates sleep timeout on the last iteration
	/// </summary>
	public int  CurrentTimeout       { get; set; }

	/// <summary>
	/// Represents current operation state
	/// </summary>
	public TState  CurrentState      { get; set; }
	/// <summary>
	/// Represents current operation status
	/// </summary>
	public TStatus CurrentStatus     { get; set; }

	/// <summary>
	/// Indicates if the operations is suspended
	/// </summary>
	public abstract bool IsSuspended { get; }
	/// <summary>
	/// Indicates if the operations is paused
	/// </summary>
	public abstract bool IsPaused    { get; }
	/// <summary>
	/// Indicates if the operations is completed
	/// </summary>
	public abstract bool IsCompleted { get; }
	/// <summary>
	/// Indicates if the number of retries have run out while the operation is still uncompleted
	/// </summary>
	public abstract bool IsTimedOut  { get; }
	/// <summary>
	/// Indicates if the operations is failed
	/// </summary>
	public abstract bool IsFailed    { get; }
}

The descendants of the AsyncOperationState are supposed to be states for more specific operations like async jobs, Bulk Deletes and so on. The class is marked as Serializable, it gives us an ability to persist the state somewhere like logs, databases etc.

/// <summary>
/// Represents current state of an async job
/// </summary>
[Serializable]
public class AsyncJobState : 
     AsyncOperationState<AsyncOperationState?, asyncoperation_statuscode?>
{
	/// <summary>
	/// Id of the async job
	/// </summary>
	public Guid AsyncJobId { get; set; }

	/// <summary>
	/// Indicates if the async job is suspended
	/// </summary>
	public override bool IsSuspended
	{
		get { return CurrentState != null && 
                     CurrentState.Value == AsyncOperationState.Suspended; }
	}
	/// <summary>
	/// Indicates if the async job is paused
	/// </summary>
	public override bool IsPaused
	{
		get { return CurrentState != null && CurrentState.Value == AsyncOperationState.Locked && 
             CurrentStatus != null && CurrentStatus.Value == asyncoperation_statuscode.Pausing; }
	}
	/// <summary>
	/// Indicates if the async job is completed
	/// </summary>
	public override bool IsCompleted
	{
		get { return CurrentState != null && 
                     CurrentState.Value == AsyncOperationState.Completed; }
	}
	/// <summary>
	/// Indicates if the number of retries have run out while the async job is still uncompleted
	/// </summary>
	public override bool IsTimedOut
	{
		get { return !IsCompleted && CurrentRetryCount <= 0; }
	}
	/// <summary>
	/// Indicates if the async job is failed
	/// </summary>
	public override bool IsFailed
	{
		get { return CurrentStatus != null && 
                     CurrentStatus.Value == asyncoperation_statuscode.Failed; }
	}
}

The class is a current state of the async job being tracked. The WaitForAsyncJobCompletion method returns the final state and provides the current state on each iteration through the OnProgress event available in the passed AsyncJobAwaiting instance. The asyncoperation_statuscode is defined in the OptionSets.cs that could be found at sdk\samplecode\cs\helpercode.

All the source code can be downloaded here – ConnectToCrm3.zip

Related posts:
 
  1. No comments yet.
  1. No trackbacks yet.