Archive

Archive for September, 2013

Dynamics CRM: Awaiting for async job completion

September 27th, 2013 No comments

    Having initiated an async job through the code, it’s often needed to wait until the job is completed. A very simple method doing that could be found in SDK samples provided by Microsoft. The WaitForAsyncJobCompletion method is contained at SDK\SampleCode\CS\DataManagement\DataImport\BulkImportHelper.cs and used for awaiting of completion of async jobs participating in a bulk data import:

public static void WaitForAsyncJobCompletion(OrganizationServiceProxy serviceProxy, 
                                                            Guid asyncJobId)
{
	ColumnSet cs = new ColumnSet("statecode", "statuscode");
	AsyncOperation asyncjob = 
             (AsyncOperation)serviceProxy.Retrieve("asyncoperation", asyncJobId, cs);

	int retryCount = 100;

	while (asyncjob.StateCode.Value != AsyncOperationState.Completed && retryCount > 0)
	{
		asyncjob = (AsyncOperation)serviceProxy.Retrieve("asyncoperation", asyncJobId, cs);
		System.Threading.Thread.Sleep(2000);
		retryCount--;
		Console.WriteLine("Async operation state is " + asyncjob.StateCode.Value.ToString());
	}

	Console.WriteLine("Async job is " + asyncjob.StateCode.Value.ToString() + 
		" with status " + ((asyncoperation_statuscode)asyncjob.StatusCode.Value).ToString());
}

This method is acceptable, but, as for me, it has some disadvantages. First of all, it’s better to have adjustable retryCount and sleep interval, so that to make them more suitable for each particular job. For example, if we know that an operation takes a very long time, we would extend the sleep interval and decrease max number of retries to minimize CPU time spent on checking of the job status. We could even make the waiting more adaptive and extend the sleep interval gradually: the more idle retries we have, the bigger the sleep interval gets. Secondly, the WaitForAsyncJobCompletion doesn’t analyze thoroughly the current state (and status) of the job. The job could be in Suspended or Paused state, in both cases, I think, it’s better not to count such retry as the job isn’t running at that moment. In addition, I would like to get some intermediate state on each iteration of the while-statement as it would allow tracing the job state changing in the time.

Based on the above weaknesses and wishes I’ve implemented my own WaitForAsyncJobCompletion method listed below:

#define WRITE_TRACE_INFO

using System;
using System.Threading;
using Microsoft.Xrm.Sdk.Client;
using Microsoft.Xrm.Sdk.Query;
...
public static AsyncJobState WaitForAsyncJobCompletion(OrganizationServiceProxy proxy, 
                   AsyncJobAwaiting asyncJobAwaiting)
{
	var state = new AsyncJobState();

	#if WRITE_TRACE_INFO
	WriteInfo(string.Format("Waiting for async job (Id: {0}) completion ...", 
                   asyncJobAwaiting.AsyncJobId));
	#endif

	var columnSet = new ColumnSet("statecode", "statuscode");

	state.AsyncJobId        = asyncJobAwaiting.AsyncJobId;
	state.CurrentRetryCount = asyncJobAwaiting.Params.RetryCount;
	state.CurrentTimeout    = asyncJobAwaiting.Params.StartTimeout;

	do
	{
		Thread.Sleep(state.CurrentTimeout);

		var asyncJob = 
      (AsyncOperation)proxy.Retrieve("asyncoperation", asyncJobAwaiting.AsyncJobId, columnSet);

		state.CurrentState  = asyncJob.StateCode;
		state.CurrentStatus = asyncJob.StatusCode != null ? 
          (asyncoperation_statuscode)asyncJob.StatusCode.Value : (asyncoperation_statuscode?)null;
		
		#if WRITE_TRACE_INFO
		if (state.IsSuspended)
			WriteInfo(string.Format("Async job (Id: {0}) is suspended", asyncJobAwaiting.AsyncJobId));
		if (state.IsPaused)
			WriteInfo(string.Format("Async job (Id: {0}) is paused", asyncJobAwaiting.AsyncJobId));
		if (asyncJob.StateCode != null)
			WriteInfo(string.Format("Async job (Id: {0}) state is {1} {2}", asyncJobAwaiting.AsyncJobId, asyncJob.StateCode.Value, (asyncJob.StatusCode != null ? "(Status: " + (asyncoperation_statuscode)asyncJob.StatusCode.Value + ")" : string.Empty)));
		#endif

		asyncJobAwaiting.FireOnProgress(state);

		if (asyncJobAwaiting.Params.TimeoutStep != null && 
            asyncJobAwaiting.Params.EndTimeout != null && 
            state.CurrentTimeout < asyncJobAwaiting.Params.EndTimeout)
			  state.CurrentTimeout += asyncJobAwaiting.Params.TimeoutStep.Value;

		if (asyncJobAwaiting.Params.WaitForever)
			continue;
		if (state.IsSuspended && asyncJobAwaiting.Params.DoNotCountIfSuspended)
			continue;
		if (state.IsPaused && asyncJobAwaiting.Params.DoNotCountIfPaused)
			continue;

		state.CurrentRetryCount--;

	} while (!state.IsCompleted && state.CurrentRetryCount > 0);

	return state;
}

public static void WriteInfo(string message)
{
	Console.WriteLine(message);
}

Note that the first thing I do in the while-statement is the sleeping for the current timeout value as I think It’s pointless to examine an async job for completion right after it has been created.

As you can see, this implementation is still quite simple, however it depends on a few classes referred to as parameters of waiting and current state of the target job. The diagram below depicts these assistant classes and their dependencies. If you wonder why I use inheritance and produced so many classes for a simple async job awaiting I can just say in excuse of this that the base classes are employed for keeping track of other operations as well (e.g. Bulk Delete operations).

Async Job Awaiting Class Diagram

The source code of the classes along with some explanations are shown below.

/// <summary>
/// Represents the awaiting parameters
/// </summary>
public class AsyncOperationAwaitingParams
{
	private const int DefaultTimeout    = 2000;
	private const int DefaultRetryCount = 100;

	private int  _startTimeout          = DefaultTimeout;
	private int  _retryCount            = DefaultRetryCount;
	private bool _doNotCountIfSuspended = true;
	private bool _doNotCountIfPaused    = true;

	/// <summary>
	/// If set, specifies the value to be added to the sleep timeout on each iteration
	/// </summary>
	public int? TimeoutStep { get; set; }
	/// <summary>
	/// If set, specifies the top limit of the sleep timeout
	/// </summary>
	public int? EndTimeout  { get; set; }

	/// <summary>
	/// Indicates if we need to wait for the operation completeness eternally
	/// </summary>
	public bool WaitForever { get; set; }

	/// <summary>
	/// Specifies the initial and maximal number of iterations
	/// </summary>
	public int RetryCount
	{
		get { return _retryCount; }
		set { _retryCount = value; }
	}
	/// <summary>
	/// Specifies the initial sleep timeout
	/// </summary>
	public int StartTimeout
	{
		get { return _startTimeout; }
		set { _startTimeout = value; }
	}
	/// <summary>
	/// Indicates if an iteration counts when the operation is suspended
	/// </summary>
	public bool DoNotCountIfSuspended
	{
		get { return _doNotCountIfSuspended; }
		set { _doNotCountIfSuspended = value; }
	}
	/// <summary>
	/// Indicates if an iteration counts when the operation is paused
	/// </summary>
	public bool DoNotCountIfPaused
	{
		get { return _doNotCountIfPaused; }
		set { _doNotCountIfPaused = value; }
	}
}

I added the WaitForever option just in case. However, I recommend avoiding the use of it as it’s better to play with RetryCount and sleep timeouts (EndTimeout, TimeoutStep, StartTimeout) and find the most suitable values for them for each specific type of operations.

/// <summary>
/// Comprises the awaiting parameters and id of the operation to be tracked. 
/// Provides the OnProgress event supposed to be fired on each iteration of awaiting process.
/// </summary>
/// <typeparam name="TState">Represents a type of the operation state</typeparam>
/// <typeparam name="TParams">Represents a type of the awaiting parameters</typeparam>
public abstract class AsyncOperationAwaiting<TState, TParams>
{
	protected readonly Guid    _asyncOperationId;
	protected readonly TParams _asyncOperationAwaitingParams;

	/// <summary>
	/// Represents the awaiting parameters
	/// </summary>
	public TParams Params
	{
		get { return _asyncOperationAwaitingParams; }
	}

	/// <summary>
	/// Event to fire and get operation state on each iteration
	/// </summary>
	public event EventHandler<TState> OnProgress;

	public void FireOnProgress(TState state)
	{
		if (OnProgress != null)
			OnProgress(this, state);
	}

	protected AsyncOperationAwaiting(Guid asyncOperationId, TParams asyncOperationAwaitingParams)
	{
		_asyncOperationId             = asyncOperationId;
		_asyncOperationAwaitingParams = asyncOperationAwaitingParams;
	}
}

The descendants of the AsyncOperationAwaiting are supposed to supply parameters for more specific operations like async jobs, Bulk Deletes and so on.

/// <summary>
/// Represents parameters for tracking of an async job
/// </summary>
public class AsyncJobAwaiting : 
               AsyncOperationAwaiting<AsyncJobState, AsyncOperationAwaitingParams>
{
	/// <summary>
	/// Id of the async job
	/// </summary>
	public Guid AsyncJobId
	{
		get { return _asyncOperationId; }
	}
	
	public AsyncJobAwaiting(Guid asyncJobId, AsyncOperationAwaitingParams asyncJobAwaitingParams) : 
        base(asyncJobId, asyncJobAwaitingParams)
	{
	}
}

The AsyncJobAwaiting supplies the awaiting parameters for async jobs. An instance of the AsyncJobAwaiting class is passed to the WaitForAsyncJobCompletion method.

/// <summary>
/// Represents current state of an operation
/// </summary>
/// <typeparam name="TState">Represents a type of the operation state</typeparam>
/// <typeparam name="TStatus">Represents a type of the operation status</typeparam>
[Serializable]
public abstract class AsyncOperationState<TState, TStatus>
{
	/// <summary>
	/// Indicates how many retries remain
	/// </summary>
	public int  CurrentRetryCount    { get; set; }
	/// <summary>
	/// Indicates sleep timeout on the last iteration
	/// </summary>
	public int  CurrentTimeout       { get; set; }

	/// <summary>
	/// Represents current operation state
	/// </summary>
	public TState  CurrentState      { get; set; }
	/// <summary>
	/// Represents current operation status
	/// </summary>
	public TStatus CurrentStatus     { get; set; }

	/// <summary>
	/// Indicates if the operations is suspended
	/// </summary>
	public abstract bool IsSuspended { get; }
	/// <summary>
	/// Indicates if the operations is paused
	/// </summary>
	public abstract bool IsPaused    { get; }
	/// <summary>
	/// Indicates if the operations is completed
	/// </summary>
	public abstract bool IsCompleted { get; }
	/// <summary>
	/// Indicates if the number of retries have run out while the operation is still uncompleted
	/// </summary>
	public abstract bool IsTimedOut  { get; }
	/// <summary>
	/// Indicates if the operations is failed
	/// </summary>
	public abstract bool IsFailed    { get; }
}

The descendants of the AsyncOperationState are supposed to be states for more specific operations like async jobs, Bulk Deletes and so on. The class is marked as Serializable, it gives us an ability to persist the state somewhere like logs, databases etc.

/// <summary>
/// Represents current state of an async job
/// </summary>
[Serializable]
public class AsyncJobState : 
     AsyncOperationState<AsyncOperationState?, asyncoperation_statuscode?>
{
	/// <summary>
	/// Id of the async job
	/// </summary>
	public Guid AsyncJobId { get; set; }

	/// <summary>
	/// Indicates if the async job is suspended
	/// </summary>
	public override bool IsSuspended
	{
		get { return CurrentState != null && 
                     CurrentState.Value == AsyncOperationState.Suspended; }
	}
	/// <summary>
	/// Indicates if the async job is paused
	/// </summary>
	public override bool IsPaused
	{
		get { return CurrentState != null && CurrentState.Value == AsyncOperationState.Locked && 
             CurrentStatus != null && CurrentStatus.Value == asyncoperation_statuscode.Pausing; }
	}
	/// <summary>
	/// Indicates if the async job is completed
	/// </summary>
	public override bool IsCompleted
	{
		get { return CurrentState != null && 
                     CurrentState.Value == AsyncOperationState.Completed; }
	}
	/// <summary>
	/// Indicates if the number of retries have run out while the async job is still uncompleted
	/// </summary>
	public override bool IsTimedOut
	{
		get { return !IsCompleted && CurrentRetryCount <= 0; }
	}
	/// <summary>
	/// Indicates if the async job is failed
	/// </summary>
	public override bool IsFailed
	{
		get { return CurrentStatus != null && 
                     CurrentStatus.Value == asyncoperation_statuscode.Failed; }
	}
}

The class is a current state of the async job being tracked. The WaitForAsyncJobCompletion method returns the final state and provides the current state on each iteration through the OnProgress event available in the passed AsyncJobAwaiting instance. The asyncoperation_statuscode is defined in the OptionSets.cs that could be found at sdk\samplecode\cs\helpercode.

All the source code can be downloaded here – ConnectToCrm3.zip

Related posts:

Dynamics CRM: Import of several files to one CRM Entity at once = An item with the same key has already been added

September 10th, 2013 No comments

    Importing data to CRM programmatically (so-called Bulk Import), we create an Import instance and then create one or more ImportFile instances associated with the Import and upload the files. Each ImportFile instance also has to be associated with an ImportMap. The subsequent Parse, Transform and ImportRecords operations are executed for all of the ImportFiles grouped under the Import. The size of a file that could be uploaded to CRM is limited. So, when I needed to import a huge number of records to a CRM Entity, I decided to split the file with the records into several ones satisfying the limitation. When the files of smaller size had been created and associated with one Import and one ImportMap, the data parsing was requested. Unfortunately, the records weren’t imported as the Import failed. I found the following exception thrown on the CRM server side:

Unhandled Exception: System.ArgumentException: 
An item with the same key has already been added.
  at System.Collections.Generic.Dictionary`2.Insert(TKey key, TValue value, Boolean add)
  at Microsoft.Crm.Asynchronous.ImportHelperData.InitializeImportHelperData(Guid importId, 
            Boolean initializeDictionaries, IOrganizationService crmService, 
            Guid organizationId, Int32 languageCode)
  at Microsoft.Crm.Asynchronous.ImportHelperData..ctor(Guid importId, 
            Boolean initializeDictionaries, IOrganizationService crmService, 
            Guid organizationId, Int32 languageCode)
  at Microsoft.Crm.Asynchronous.ImportOperationParse.ExecuteImportOperation(
            Guid organizationId, Guid importId, Int32 operationType)
  at Microsoft.Crm.Asynchronous.ImportOperation.InternalExecute(AsyncEvent asyncEvent)

Having armed with .Net Reflector, I found the exact place where the exception had occurred. It’s the InitializeImportHelperData method of the ImportHelperData class defined in Microsoft.Crm.Asynchronous.DataManagement assembly. A bit simplified code of the InitializeImportHelperData is listed below:

private void InitializeImportHelperData(Guid importId, bool initializeDictionaries, 
                     IOrganizationService crmService, Guid organizationId, int languageCode)
{
    this._crmService = crmService;
    this._importId = importId;
    ...
    RetrieveRequest request = new RetrieveRequest {
        ColumnSet = new ColumnSet(true),
        Target = new EntityReference("import", importId)
    };
    RetrieveResponse response = (RetrieveResponse) this._crmService.Execute(request);
    Entity entity = response.Entity;
    ...
    EntityCollection importFileCollection = this.RetrieveImportFiles(this._importId);
    this.ProcessImportFilesForImportEntityMappings(ref importFileCollection);
    this._importFileIdToImportFileObjectDictionary = new Dictionary<Guid, Entity>();
    if (initializeDictionaries)
    {
        this._sourceEntityToImportFileIdDictionary = new Dictionary<string, Guid>();
        this._importFileIdToImportFileHelperDataDictionary = 
                              new Dictionary<Guid, ImportFileHelperData>();
    }
    foreach (Entity entity2 in importFileCollection.Entities)
    {
       Guid id = entity2.Id;
       this._importFileIdToImportFileObjectDictionary.Add(id, entity2);
       if (initializeDictionaries)
       {
          if (entity2.Attributes.Contains("sourceentityname") && 
                                       entity2["sourceentityname"] != null)
 /***/     this._sourceEntityToImportFileIdDictionary.Add(
              DataManagementHelper.GetStringFromDynamicEntity(entity2, "sourceentityname"), id);
          this._importFileIdToImportFileHelperDataDictionary.Add(id, null);
       }
    }
}

The exception is thrown in the highlighted line. This code gets the ImportFiles, goes through them and fills the dictionary with ImportFile Ids, using the SourceEntityName as a key. Obviously, if we have at least two ImportFiles with the same SourceEntityName (my case), we get the exception.

I was looking for a workaround and the first thought that came into my mind was to set different SourceEntityName for each ImportFile. However, SourceEntityName of ImportFile must coincide with the one specified in an associated ImportMap. I had one ImportMap for all of ImportFiles as they supply records for one CRM Entity. So, following this way, I would have to provide with as many almost identical ImportMaps as many ImportFiles are to be uploaded. The difference between the ImportMaps would be only in the SourceEntityName attribute. I discarded this idea as it’s extremely not optimal.

The approach I chose in the end is just to use an individual Import for every ImportFile. So, if you have a number of ImportFiles and each ImportFile is associated with a distinct ImportMap and supplies records for a distinct CRM Entity, you can combine them into one package (group under one Import instance). But if you are planning to import records from several ImportFiles to one and the same CRM Entity using one and the same ImportMap, use an individual package (separate Import instance) for every such file.

The fact that we are not able to import data from several files to a CRM Entity at once (i.e. using one Import package) is a very strange limitation for me. I hope this inconvenience will be fixed in upcoming CRM versions.

Dynamics CRM: Get current user information

September 5th, 2013 No comments

    One of the most frequently used operation with OrganizationServiceProxy is to get information about the current user. For simplicity such operation could be implemented as an extension to the OrganizationServiceProxy class. So, let’s put the following code somewhere in a project:

using Microsoft.Crm.Sdk.Messages;
using Microsoft.Xrm.Sdk.Client;

namespace dotNetFollower
{
    public static class OrganizationServiceProxyExtensions
    {
        public static WhoAmIResponse GetCurrentUser(this OrganizationServiceProxy proxy)
        {
            return (WhoAmIResponse)proxy.Execute(new WhoAmIRequest());
        }
    }
}

How to use the extension is shown below:

var serverConnection = new ServerConnectionEx();
serverConnection.DoInOrganizationServiceProxyContext(crmConfiguration, proxy =>
{
	Guid userId = proxy.GetCurrentUser().UserId;

	// do something else
});

The code above supposes that you use the ServerConnection helper class in your project. The ServerConnectionEx class, in turn, is derived from ServerConnection and described in the article Dynamics CRM: How to connect to CRM through the code.

More elegant way, as for me, is to add a lazy-loading property to a class derived from OrganizationServiceProxy. But in this case we have to make a lot of small changes within the ServerConnection class as the class creates and operates the instance of OrganizationServiceProxy. So, the derived class in this case looks like the following:

using System;
using System.ServiceModel.Description;
using Microsoft.Crm.Sdk.Messages;
using Microsoft.Xrm.Sdk;
using Microsoft.Xrm.Sdk.Client;

namespace dotNetFollower
{
  public class OrganizationServiceProxyEx : OrganizationServiceProxy
  {
    private WhoAmIResponse _whoAmIResponse;

    public WhoAmIResponse CurrentUser
    {
      get { return _whoAmIResponse ?? 
              (_whoAmIResponse = (WhoAmIResponse) Execute(new WhoAmIRequest())); }
    }

    public OrganizationServiceProxyEx(IServiceConfiguration<IOrganizationService> serviceConfiguration, 
                             ClientCredentials clientCredentials)
        : base(serviceConfiguration, clientCredentials)
    {            
    }
    public OrganizationServiceProxyEx(IServiceConfiguration<IOrganizationService> serviceConfiguration,
                                    SecurityTokenResponse securityTokenResponse)
        : base(serviceConfiguration, securityTokenResponse)
    {            
    }
    public OrganizationServiceProxyEx(IServiceManagement<IOrganizationService> serviceManagement,
                                    ClientCredentials clientCredentials)
        : base(serviceManagement, clientCredentials)
    {            
    }
    public OrganizationServiceProxyEx(IServiceManagement<IOrganizationService> serviceManagement,
                                    SecurityTokenResponse securityTokenResponse)
        : base(serviceManagement, securityTokenResponse)
    {            
    }
    public OrganizationServiceProxyEx(Uri uri, Uri homeRealmUri, ClientCredentials clientCredentials,
                                    ClientCredentials deviceCredentials)
        : base(uri, homeRealmUri, clientCredentials, deviceCredentials)
    {            
    }
  }
}

The next step is to open CrmServiceHelpers.cs and replace all mentions of OrganizationServiceProxy with OrganizationServiceProxyEx. Then do the same for ServerConnectionEx in case you use it. Or you can just take the demonstration project, where all required changes are applied. Download it here – ConnectToCrm2.zip.

Below is how to use the CurrentUser property exposed by the custom proxy class:

var serverConnection = new ServerConnectionEx();
serverConnection.DoInOrganizationServiceProxyContext(SampleCrmConfiguration, proxy =>
{
	Guid userId = proxy.CurrentUser.UserId;
	
	// do something else
});