Archive

Posts Tagged ‘Async Job’

Dynamics CRM: Awaiting for Bulk Delete completion

October 29th, 2013 No comments

    In my previous post I described my method for awaiting of async job completion. I have a similar one for awaiting Bulk Delete operation completion as well. Dealing with Bulk Delete we have to keep in mind the following: initiating the operation through the code, the GUID returned in the BulkDeleteResponse is not the id of the Bulk Delete operation, but the id of some async job; the target Bulk Delete operation will be created a bit later and will be associated with the async job; The code below illustrates the submitting of Bulk Delete request:

...
OrganizationServiceProxy proxy   = ...;
QueryExpression[]        queries = ...;

var bulkDeleteRequest = new BulkDeleteRequest
{
	JobName               = "Bulk Delete " + DateTime.Now,
	QuerySet              = queries,
	StartDateTime         = DateTime.Now.AddDays(-2), /* -2 is to avoid the problem with 
                                                          local and utc times */
	ToRecipients          = new Guid[] {},
	CCRecipients          = new Guid[] {},
	SendEmailNotification = false,
	RecurrencePattern     = String.Empty
};        

var bulkDeleteResponse = (BulkDeleteResponse)proxy.Execute(bulkDeleteRequest);

// id of the job which will be associated with the Bulk Delete operation
Guid jobId = bulkDeleteResponse.JobId;

So, having id of the async job that will be shortly associated with the Bulk Delete operation, the awaiting process supposes two steps:

  • wait until the Bulk Delete operation has been created;
  • wait until the Bulk Delete operation has completed;

For both steps the RetrieveMultiple is repeatedly called against the bulkdeleteoperation records. Firstly, we try to find a record, the asyncoperationid attribute of which equals the async job id received before. Once the record has been found, it means the Bulk Delete operation has been created. Secondly, then we wait for the Bulk Delete operation completion.

The steps mentioned above have been implemented in the WaitForBulkDeleteCompletion method shown below. Like the WaitForAsyncJobCompletion, the WaitForBulkDeleteCompletion allows adjusting number of retries and sleep interval. It also analyses operation’s current state (and status) and provides actual progress on every iteration of the awaiting process.

public static BulkDeleteState WaitForBulkDeleteCompletion(OrganizationServiceProxy proxy, 
														BulkDeleteAwaiting asyncJobAwaiting)
{
	var state = new BulkDeleteState();

	#if WRITE_TRACE_INFO
	WriteInfo(string.Format(@"Waiting for completion of the bulk 
			delete associated with the Async Job {0}...", asyncJobAwaiting.AsyncJobId));
	#endif

	// Query for bulk delete operation and check for status.
	var bulkQuery = new QueryByAttribute(BulkDeleteOperation.EntityLogicalName)
		{
			ColumnSet = new ColumnSet(true)
		};
	bulkQuery.Attributes.Add("asyncoperationid");
	bulkQuery.Values.Add(asyncJobAwaiting.AsyncJobId);

	state.AsyncJobId        = asyncJobAwaiting.AsyncJobId;
	state.CurrentRetryCount = asyncJobAwaiting.Params.RetryCount;
	state.CurrentTimeout    = asyncJobAwaiting.Params.StartTimeout;

	do
	{
		Thread.Sleep(state.CurrentTimeout);

		EntityCollection entityCollection = proxy.RetrieveMultiple(bulkQuery);

		#if WRITE_TRACE_INFO
		WriteInfo(string.Format("Found {0} entities ...", entityCollection.Entities.Count));
		#endif

		if (entityCollection.Entities.Count > 0)
		{
			// Grab the one bulk operation that has been created.
			var createdBulkDeleteOperation = (BulkDeleteOperation)entityCollection.Entities[0];

			state.BulkDeleteOperationId = createdBulkDeleteOperation.BulkDeleteOperationId;
			state.CurrentState          = createdBulkDeleteOperation.StateCode;
			state.CurrentStatus         = createdBulkDeleteOperation.StatusCode != null ? 
					(bulkdeleteoperation_statuscode)createdBulkDeleteOperation.StatusCode.Value : 
					(bulkdeleteoperation_statuscode?) null;
			state.SuccessCount          = createdBulkDeleteOperation.SuccessCount;
			state.FailureCount          = createdBulkDeleteOperation.FailureCount;
		}
		
		#if WRITE_TRACE_INFO
		if (state.IsCreated)
		{
			if (state.IsSuspended)
				WriteInfo(string.Format("Bulk Delete (Id: {0}) is suspended", state.BulkDeleteOperationId));
			if (state.IsPaused)
				WriteInfo(string.Format("Bulk Delete (Id: {0}) is paused", state.BulkDeleteOperationId));
			if (state.CurrentState != null)
				WriteInfo(string.Format("Bulk Delete (Id: {0}) state is {1} {2}", asyncJobAwaiting.AsyncJobId, state.CurrentState, (state.CurrentStatus != null ? "(Status: " + state.CurrentStatus.Value + ")" : string.Empty)));

			WriteInfo(string.Format("{0} records were successfully deleted", state.SuccessCount ?? 0));
			WriteInfo(string.Format("{0} records failed to be deleted", state.FailureCount ?? 0));
		}
		else
			WriteInfo(string.Format("Bulk Delete is still not created. Associated Async Job Id:{0})", asyncJobAwaiting.AsyncJobId));
		#endif

		asyncJobAwaiting.FireOnProgress(state);

		if (asyncJobAwaiting.Params.TimeoutStep != null && 
		    asyncJobAwaiting.Params.EndTimeout != null && 
		    state.CurrentTimeout < asyncJobAwaiting.Params.EndTimeout)
				state.CurrentTimeout += asyncJobAwaiting.Params.TimeoutStep.Value;

		if (asyncJobAwaiting.Params.WaitForever)
			continue;

		if (state.IsSuspended && asyncJobAwaiting.Params.DoNotCountIfSuspended)
			continue;
		if (state.IsPaused && asyncJobAwaiting.Params.DoNotCountIfPaused)
			continue;
		if(!state.IsCreated && asyncJobAwaiting.Params.DoNotCountIfNotCreated)
			continue;

		state.CurrentRetryCount--;

	} while (!state.IsCompleted && state.CurrentRetryCount > 0);

	return state;
}

Classes involved in the method are derived from the same base classes as those used for WaitForAsyncJobCompletion. The diagram below demonstrates dependencies between the base classes and the ones specific for Bulk Delete operations.

Bulk Delete Waiting Class Diagram

The source code of the base classes AsyncOperationState, AsyncOperationAwaitingParams and AsyncOperationAwaiting are shown in the post – Awaiting for async job completion. The rest of used classes are listed below:

/// <summary>
/// Represents the awaiting parameters for Bulk Delete operations
/// </summary>
public class BulkDeleteAwaitingParams : AsyncOperationAwaitingParams
{
	private bool _doNotCountIfNotCreated = true;

	/// <summary>
	/// Indicates if an iteration counts when the target Bulk Delete operation 
	/// hasn't been created yet
	/// </summary>
	public bool DoNotCountIfNotCreated 
	{
		get { return _doNotCountIfNotCreated; }
		set { _doNotCountIfNotCreated = value; }
	}
}

In comparison with the AsyncOperationAwaitingParams, I added one more property specifying whether retries are counted while Bulk Delete operation hasn’t been created yet.

/// <summary>
/// Represents parameters for tracking of a Bulk Delete operation
/// </summary>
public class BulkDeleteAwaiting : AsyncOperationAwaiting<BulkDeleteState, 
													BulkDeleteAwaitingParams>
{
	/// <summary>
	/// Id of the async job that will be associated with the target Bulk Delete operation
	/// </summary>
	public Guid AsyncJobId
	{
		get { return _asyncOperationId; }
	}

	public BulkDeleteAwaiting(Guid asyncJobId, BulkDeleteAwaitingParams asyncJobAwaitingParams):
						base(asyncJobId, asyncJobAwaitingParams)
	{
	}
}

Note that the AsyncJobId here represents an auxiliary async job that is supposed to be associated with the target Bulk Delete operation.

/// <summary>
/// Represents current state of a Bulk Delete operation
/// </summary>
[Serializable]
public class BulkDeleteState : AsyncOperationState<BulkDeleteOperationState?, 
												bulkdeleteoperation_statuscode?>
{
	/// <summary>
	/// Id of the async job associated with the target Bulk Delete operation
	/// </summary>
	public Guid  AsyncJobId            { get; set; }

	/// <summary>
	/// Id of the target Bulk Delete operation. Null when the operation isn't created.
	/// </summary>
	public Guid? BulkDeleteOperationId { get; set; }

	/// <summary>
	/// Current number of successfully deleted records
	/// </summary>
	public int?  SuccessCount          { get; set; }

	/// <summary>
	/// Current number of problem records
	/// </summary>
	public int?  FailureCount          { get; set; }

	/// <summary>
	/// Indicates if the Bulk Delete is suspended
	/// </summary>
	public override bool IsSuspended
	{
		get { return CurrentState != null && 
					CurrentState.Value == BulkDeleteOperationState.Suspended; }
	}
	/// <summary>
	/// Indicates if the Bulk Delete is paused
	/// </summary>
	public override bool IsPaused
	{
		get { return CurrentState != null && 
					CurrentState.Value == BulkDeleteOperationState.Locked && 
					CurrentStatus != null && 
					CurrentStatus.Value == bulkdeleteoperation_statuscode.Pausing; }
	}
	/// <summary>
	/// Indicates if the Bulk Delete is completed
	/// </summary>
	public override bool IsCompleted
	{
		get { return CurrentState != null && 
					CurrentState.Value == BulkDeleteOperationState.Completed; }
	}
	/// <summary>
	/// Indicates if the number of retries have run out while the Bulk Delete is 
	/// still uncompleted
	/// </summary>
	public override bool IsTimedOut
	{
		get { return !IsCompleted && CurrentRetryCount <= 0; }
	}
	/// <summary>
	/// Indicates if the Bulk Delete is failed
	/// </summary>
	public override bool IsFailed
	{
		get { return CurrentStatus != null && 
					CurrentStatus.Value == bulkdeleteoperation_statuscode.Failed; }
	}
	/// <summary>
	/// Indicates if the Bulk Delete is created
	/// </summary>
	public bool IsCreated
	{
		get { return BulkDeleteOperationId != null && 
					BulkDeleteOperationId.Value != Guid.Empty; }
	}
}

The class represents a current state of the Bulk Delete operation being awaited. The current state on each iteration is supplied by the WaitForBulkDeleteCompletion through the OnProgress event available in the passed BulkDeleteAwaiting instance. In the end the WaitForBulkDeleteCompletion method returns the final state. The bulkdeleteoperation_statuscode enum is defined in the OptionSets.cs at sdk\samplecode\cs\helpercode.

All the source code can be downloaded here – ConnectToCrm4.zip

Related posts:

Dynamics CRM: Awaiting for async job completion

September 27th, 2013 No comments

    Having initiated an async job through the code, it’s often needed to wait until the job is completed. A very simple method doing that could be found in SDK samples provided by Microsoft. The WaitForAsyncJobCompletion method is contained at SDK\SampleCode\CS\DataManagement\DataImport\BulkImportHelper.cs and used for awaiting of completion of async jobs participating in a bulk data import:

public static void WaitForAsyncJobCompletion(OrganizationServiceProxy serviceProxy, 
                                                            Guid asyncJobId)
{
	ColumnSet cs = new ColumnSet("statecode", "statuscode");
	AsyncOperation asyncjob = 
             (AsyncOperation)serviceProxy.Retrieve("asyncoperation", asyncJobId, cs);

	int retryCount = 100;

	while (asyncjob.StateCode.Value != AsyncOperationState.Completed && retryCount > 0)
	{
		asyncjob = (AsyncOperation)serviceProxy.Retrieve("asyncoperation", asyncJobId, cs);
		System.Threading.Thread.Sleep(2000);
		retryCount--;
		Console.WriteLine("Async operation state is " + asyncjob.StateCode.Value.ToString());
	}

	Console.WriteLine("Async job is " + asyncjob.StateCode.Value.ToString() + 
		" with status " + ((asyncoperation_statuscode)asyncjob.StatusCode.Value).ToString());
}

This method is acceptable, but, as for me, it has some disadvantages. First of all, it’s better to have adjustable retryCount and sleep interval, so that to make them more suitable for each particular job. For example, if we know that an operation takes a very long time, we would extend the sleep interval and decrease max number of retries to minimize CPU time spent on checking of the job status. We could even make the waiting more adaptive and extend the sleep interval gradually: the more idle retries we have, the bigger the sleep interval gets. Secondly, the WaitForAsyncJobCompletion doesn’t analyze thoroughly the current state (and status) of the job. The job could be in Suspended or Paused state, in both cases, I think, it’s better not to count such retry as the job isn’t running at that moment. In addition, I would like to get some intermediate state on each iteration of the while-statement as it would allow tracing the job state changing in the time.

Based on the above weaknesses and wishes I’ve implemented my own WaitForAsyncJobCompletion method listed below:

#define WRITE_TRACE_INFO

using System;
using System.Threading;
using Microsoft.Xrm.Sdk.Client;
using Microsoft.Xrm.Sdk.Query;
...
public static AsyncJobState WaitForAsyncJobCompletion(OrganizationServiceProxy proxy, 
                   AsyncJobAwaiting asyncJobAwaiting)
{
	var state = new AsyncJobState();

	#if WRITE_TRACE_INFO
	WriteInfo(string.Format("Waiting for async job (Id: {0}) completion ...", 
                   asyncJobAwaiting.AsyncJobId));
	#endif

	var columnSet = new ColumnSet("statecode", "statuscode");

	state.AsyncJobId        = asyncJobAwaiting.AsyncJobId;
	state.CurrentRetryCount = asyncJobAwaiting.Params.RetryCount;
	state.CurrentTimeout    = asyncJobAwaiting.Params.StartTimeout;

	do
	{
		Thread.Sleep(state.CurrentTimeout);

		var asyncJob = 
      (AsyncOperation)proxy.Retrieve("asyncoperation", asyncJobAwaiting.AsyncJobId, columnSet);

		state.CurrentState  = asyncJob.StateCode;
		state.CurrentStatus = asyncJob.StatusCode != null ? 
          (asyncoperation_statuscode)asyncJob.StatusCode.Value : (asyncoperation_statuscode?)null;
		
		#if WRITE_TRACE_INFO
		if (state.IsSuspended)
			WriteInfo(string.Format("Async job (Id: {0}) is suspended", asyncJobAwaiting.AsyncJobId));
		if (state.IsPaused)
			WriteInfo(string.Format("Async job (Id: {0}) is paused", asyncJobAwaiting.AsyncJobId));
		if (asyncJob.StateCode != null)
			WriteInfo(string.Format("Async job (Id: {0}) state is {1} {2}", asyncJobAwaiting.AsyncJobId, asyncJob.StateCode.Value, (asyncJob.StatusCode != null ? "(Status: " + (asyncoperation_statuscode)asyncJob.StatusCode.Value + ")" : string.Empty)));
		#endif

		asyncJobAwaiting.FireOnProgress(state);

		if (asyncJobAwaiting.Params.TimeoutStep != null && 
            asyncJobAwaiting.Params.EndTimeout != null && 
            state.CurrentTimeout < asyncJobAwaiting.Params.EndTimeout)
			  state.CurrentTimeout += asyncJobAwaiting.Params.TimeoutStep.Value;

		if (asyncJobAwaiting.Params.WaitForever)
			continue;
		if (state.IsSuspended && asyncJobAwaiting.Params.DoNotCountIfSuspended)
			continue;
		if (state.IsPaused && asyncJobAwaiting.Params.DoNotCountIfPaused)
			continue;

		state.CurrentRetryCount--;

	} while (!state.IsCompleted && state.CurrentRetryCount > 0);

	return state;
}

public static void WriteInfo(string message)
{
	Console.WriteLine(message);
}

Note that the first thing I do in the while-statement is the sleeping for the current timeout value as I think It’s pointless to examine an async job for completion right after it has been created.

As you can see, this implementation is still quite simple, however it depends on a few classes referred to as parameters of waiting and current state of the target job. The diagram below depicts these assistant classes and their dependencies. If you wonder why I use inheritance and produced so many classes for a simple async job awaiting I can just say in excuse of this that the base classes are employed for keeping track of other operations as well (e.g. Bulk Delete operations).

Async Job Awaiting Class Diagram

The source code of the classes along with some explanations are shown below.

/// <summary>
/// Represents the awaiting parameters
/// </summary>
public class AsyncOperationAwaitingParams
{
	private const int DefaultTimeout    = 2000;
	private const int DefaultRetryCount = 100;

	private int  _startTimeout          = DefaultTimeout;
	private int  _retryCount            = DefaultRetryCount;
	private bool _doNotCountIfSuspended = true;
	private bool _doNotCountIfPaused    = true;

	/// <summary>
	/// If set, specifies the value to be added to the sleep timeout on each iteration
	/// </summary>
	public int? TimeoutStep { get; set; }
	/// <summary>
	/// If set, specifies the top limit of the sleep timeout
	/// </summary>
	public int? EndTimeout  { get; set; }

	/// <summary>
	/// Indicates if we need to wait for the operation completeness eternally
	/// </summary>
	public bool WaitForever { get; set; }

	/// <summary>
	/// Specifies the initial and maximal number of iterations
	/// </summary>
	public int RetryCount
	{
		get { return _retryCount; }
		set { _retryCount = value; }
	}
	/// <summary>
	/// Specifies the initial sleep timeout
	/// </summary>
	public int StartTimeout
	{
		get { return _startTimeout; }
		set { _startTimeout = value; }
	}
	/// <summary>
	/// Indicates if an iteration counts when the operation is suspended
	/// </summary>
	public bool DoNotCountIfSuspended
	{
		get { return _doNotCountIfSuspended; }
		set { _doNotCountIfSuspended = value; }
	}
	/// <summary>
	/// Indicates if an iteration counts when the operation is paused
	/// </summary>
	public bool DoNotCountIfPaused
	{
		get { return _doNotCountIfPaused; }
		set { _doNotCountIfPaused = value; }
	}
}

I added the WaitForever option just in case. However, I recommend avoiding the use of it as it’s better to play with RetryCount and sleep timeouts (EndTimeout, TimeoutStep, StartTimeout) and find the most suitable values for them for each specific type of operations.

/// <summary>
/// Comprises the awaiting parameters and id of the operation to be tracked. 
/// Provides the OnProgress event supposed to be fired on each iteration of awaiting process.
/// </summary>
/// <typeparam name="TState">Represents a type of the operation state</typeparam>
/// <typeparam name="TParams">Represents a type of the awaiting parameters</typeparam>
public abstract class AsyncOperationAwaiting<TState, TParams>
{
	protected readonly Guid    _asyncOperationId;
	protected readonly TParams _asyncOperationAwaitingParams;

	/// <summary>
	/// Represents the awaiting parameters
	/// </summary>
	public TParams Params
	{
		get { return _asyncOperationAwaitingParams; }
	}

	/// <summary>
	/// Event to fire and get operation state on each iteration
	/// </summary>
	public event EventHandler<TState> OnProgress;

	public void FireOnProgress(TState state)
	{
		if (OnProgress != null)
			OnProgress(this, state);
	}

	protected AsyncOperationAwaiting(Guid asyncOperationId, TParams asyncOperationAwaitingParams)
	{
		_asyncOperationId             = asyncOperationId;
		_asyncOperationAwaitingParams = asyncOperationAwaitingParams;
	}
}

The descendants of the AsyncOperationAwaiting are supposed to supply parameters for more specific operations like async jobs, Bulk Deletes and so on.

/// <summary>
/// Represents parameters for tracking of an async job
/// </summary>
public class AsyncJobAwaiting : 
               AsyncOperationAwaiting<AsyncJobState, AsyncOperationAwaitingParams>
{
	/// <summary>
	/// Id of the async job
	/// </summary>
	public Guid AsyncJobId
	{
		get { return _asyncOperationId; }
	}
	
	public AsyncJobAwaiting(Guid asyncJobId, AsyncOperationAwaitingParams asyncJobAwaitingParams) : 
        base(asyncJobId, asyncJobAwaitingParams)
	{
	}
}

The AsyncJobAwaiting supplies the awaiting parameters for async jobs. An instance of the AsyncJobAwaiting class is passed to the WaitForAsyncJobCompletion method.

/// <summary>
/// Represents current state of an operation
/// </summary>
/// <typeparam name="TState">Represents a type of the operation state</typeparam>
/// <typeparam name="TStatus">Represents a type of the operation status</typeparam>
[Serializable]
public abstract class AsyncOperationState<TState, TStatus>
{
	/// <summary>
	/// Indicates how many retries remain
	/// </summary>
	public int  CurrentRetryCount    { get; set; }
	/// <summary>
	/// Indicates sleep timeout on the last iteration
	/// </summary>
	public int  CurrentTimeout       { get; set; }

	/// <summary>
	/// Represents current operation state
	/// </summary>
	public TState  CurrentState      { get; set; }
	/// <summary>
	/// Represents current operation status
	/// </summary>
	public TStatus CurrentStatus     { get; set; }

	/// <summary>
	/// Indicates if the operations is suspended
	/// </summary>
	public abstract bool IsSuspended { get; }
	/// <summary>
	/// Indicates if the operations is paused
	/// </summary>
	public abstract bool IsPaused    { get; }
	/// <summary>
	/// Indicates if the operations is completed
	/// </summary>
	public abstract bool IsCompleted { get; }
	/// <summary>
	/// Indicates if the number of retries have run out while the operation is still uncompleted
	/// </summary>
	public abstract bool IsTimedOut  { get; }
	/// <summary>
	/// Indicates if the operations is failed
	/// </summary>
	public abstract bool IsFailed    { get; }
}

The descendants of the AsyncOperationState are supposed to be states for more specific operations like async jobs, Bulk Deletes and so on. The class is marked as Serializable, it gives us an ability to persist the state somewhere like logs, databases etc.

/// <summary>
/// Represents current state of an async job
/// </summary>
[Serializable]
public class AsyncJobState : 
     AsyncOperationState<AsyncOperationState?, asyncoperation_statuscode?>
{
	/// <summary>
	/// Id of the async job
	/// </summary>
	public Guid AsyncJobId { get; set; }

	/// <summary>
	/// Indicates if the async job is suspended
	/// </summary>
	public override bool IsSuspended
	{
		get { return CurrentState != null && 
                     CurrentState.Value == AsyncOperationState.Suspended; }
	}
	/// <summary>
	/// Indicates if the async job is paused
	/// </summary>
	public override bool IsPaused
	{
		get { return CurrentState != null && CurrentState.Value == AsyncOperationState.Locked && 
             CurrentStatus != null && CurrentStatus.Value == asyncoperation_statuscode.Pausing; }
	}
	/// <summary>
	/// Indicates if the async job is completed
	/// </summary>
	public override bool IsCompleted
	{
		get { return CurrentState != null && 
                     CurrentState.Value == AsyncOperationState.Completed; }
	}
	/// <summary>
	/// Indicates if the number of retries have run out while the async job is still uncompleted
	/// </summary>
	public override bool IsTimedOut
	{
		get { return !IsCompleted && CurrentRetryCount <= 0; }
	}
	/// <summary>
	/// Indicates if the async job is failed
	/// </summary>
	public override bool IsFailed
	{
		get { return CurrentStatus != null && 
                     CurrentStatus.Value == asyncoperation_statuscode.Failed; }
	}
}

The class is a current state of the async job being tracked. The WaitForAsyncJobCompletion method returns the final state and provides the current state on each iteration through the OnProgress event available in the passed AsyncJobAwaiting instance. The asyncoperation_statuscode is defined in the OptionSets.cs that could be found at sdk\samplecode\cs\helpercode.

All the source code can be downloaded here – ConnectToCrm3.zip

Related posts: