CLR multithreading, Part 1

A process is an inner container which has its own virtual address space
Libraries of code are mapped into the address spaces
A thread is path of execution through code within a single process.
For managed threads within an AppDomain
Each thread has it’s own callstack and copy of cpu registers

Advantages:
1. Better cpu utilization when a process runs on multiple core cpu or multiple processors machine.
2. Better support for asynchronous I/O operations. Performing CPU bound operations while waiting to complete I/O bound operations
3. Better UI responsiveness

Disadvantages:
Context switching overhead

Added program complexity:
1. More lines of code
2. Hard to maintain (readability)
3. Hard to debug
4. Hard to test

CLR thread class name
System.Threading.Thread

Thread life time:
Execution continues until thread returns from its entry point.
1. As a result of a standard method return.
2. As result of unhandled exception
1. As an exception caused by the running thread
2. As a result of calling Abort(), Interrupt() methods.

Coordinated thread shut-down
1. User-defined mechanism
2. Requesting thread waits:
2.1 Polling IsAlive
2.2. Calling Thread.Join

Thread Pool
CLR provides per process thread pool
1. Whenever thread is created a few microseconds goes on creating a thread stack and for allocating a memory. Thread pooll cuts overhead by sharing and recycling threads.
2. CLRs pool manager add thread or removes thread from a thread pool on demand.
3. IsBackground property is set to false

When not to use a thread pool
1. When we need foreground thread
2. When we need to set priority to the thread
3. When we run a long running task
4. When we need to identify a thread

Delegates and Async I/O

1. Delegates BeginInvoke queue a request method to the ThreaPool

private int AddMethod(int x)
{
return (unchecked)i++;
}

private delegate string OperationMethod(int x);

OperationMethod test = Method;
test.BeginInvoke(0, null, null);

2.
2.1 In case of Async I/O when we call I/O methods such as BeginRead, BeginWrite these methods calls goes to I/O completion port and these methods do not block the thread.
2.2. When Async I/O methods complete only then this call is queued to the ThreadPool.

Cache Lines and False Sharing

  1. What is the cache line?
  2. What does false sharing mean?
  3. How to avoid false sharing?

Each location in the cache has a tag that contains the index of the datum
in main memory that has been cached. In a CPU’s data cache these entries are
called cache lines or cache blocks.

public static string
GetCacheBlockSize()

{

ManagementObjectSearcher
searcher = new

ManagementObjectSearcher(@”SELECT * FROM Win32_CacheMemory”);

 

foreach
(ManagementObject wmi in searcher.Get())

{

try

{

return
“Cache Block Size: ” +

wmi.GetPropertyValue(“BlockSize”).ToString();

}

catch
{ }

}

return
“Cache Block Size: Unknown”;

}

 

 

Sample 1:

class Data

{

public int _field1;

public int _field2;

}

 

private const int
_numIterations = 100000000;

private static long
_threadCounter = 2;

private static long
_startTimestamp = 0;

public static void Main()

{

Data
data = new Data();

 

_startTimestamp = Stopwatch.GetTimestamp();

 

ThreadPool.QueueUserWorkItem(o
=> PerformTest(data, 0));

ThreadPool.QueueUserWorkItem(o
=> PerformTest(data, 1));

 

Console.ReadKey();

}

 

 

private static void
PerformTest(Data data, byte numThread)

{

for (int i = 0; i < _numIterations; i++)

{

if
(numThread == 0)

data._field1++;

else

data._field2++;

}

 

if (Interlocked.Decrement(ref
_threadCounter) == 0)

Console.WriteLine(“It takes about {0} ms”, Stopwatch.GetTimestamp() – _startTimestamp);

}

 

Output
1: 3470119 ms

 

 

 

Sample 2:

[StructLayout(LayoutKind.Explicit)]

class Data

{

[FieldOffset(0)]

public int _field1;

[FieldOffset(64)]

public int _field2;

}


Output
2: 1649438 ms

Performing a Periodic Compute-Bound Operation

To perform a periodic compute-bound operation we can
use a timer class.

public Timer(
    TimerCallback callback,
    Object state,
    int dueTime,
    int period
)

Parameters:

callback – A TimerCallback
delegate representing a method to be executed.

state – An object containing information to be used by the callback method, or null reference.

dueTime – The amount of time to delay before callback is invoked, in milliseconds. Specify Timeout.Infinite to
prevent the timer from starting. Specify zero (0) to start the timer
immediately.

period  – The
time interval between invocations of callback, in milliseconds. Specify Timeout .Infinite to
disable periodic signaling.

When
a long-running operation is running the timer can go off so this could cause multiple
threads pools threads to be executed:

static void Main(string[]
args)

{

System.Threading.Timer
s_global = null;

using
(s_global = new System.Threading.Timer(state =>

{

Console.WriteLine(state);

s_global.Change(200,
System.Threading.Timeout.Infinite);

}

, “state”,
0, System.Threading.Timeout.Infinite))

{

Console.ReadKey();

}

}

Parallel Language Integrated Query

  1. How to convert a sequential query into a parallel
    query?
  2. How to convert a
    parallel query back into a sequential query?
  3. How to make the
    query’s result processed in parallel?
  4. What are the methods
    to control how the query is processed?

To convert a sequential query into parallel query we can use one of these
methods:

Name Description
AsParallel <(Of <( TSource >)>) (IEnumerable <(Of <(TSource >)>)) Enables parallelization of a query.
AsParallel(IEnumerable) Enables parallelization of a query.
AsParallel <(Of <(TSource >)>)(Partitioner <(Of <( TSource >)>)) Enables parallelization of a query, as sourced
by a custom partitioner that is responsible for splitting the input sequence
into partitions.

Sample 1:

static void SearchForObsoleteMethodsAsParallel(Assembly asm)

{

    (from type in asm.GetExportedTypes().AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered)

                    from
member in type.GetMembers()

                    let
obsoleteAttribute = typeof(ObsoleteAttribute)

                    where
member.IsDefined(obsoleteAttribute, true)

                    select new { Name = member.Name, Type =
member.MemberType.ToString() })

                    .Distinct()

                    .ForAll(item => Console.WriteLine(item.Type + “:” + item.Name ));

}

To convert a parallel query back into sequential query we can use

public static IEnumerable<TSource> AsSequential<TSource>(

    this ParallelQuery<TSource> source

)

        To make the query’s result processed in parallel we can use


public staticvoid ForAll<TSource>(

    this ParallelQuery<TSource> source,

    Action<TSource> action

)


            To control how to query is processed we can use methods such as:


public static ParallelQuery<TSource> WithCancellation<TSource>(

    this ParallelQuery<TSource> source,

    CancellationToken cancellationToken

)


public static ParallelQuery<TSource> WithDegreeOfParallelism<TSource>(

    this ParallelQuery<TSource> source,

    int degreeOfParallelism

)


public static ParallelQuery<TSource> WithExecutionMode<TSource>(

    this ParallelQuery<TSource> source,

    ParallelExecutionMode executionMode

)


public enum ParallelExecutionMode


Default – This
is the default setting. PLINQ will examine the query’s structure and will only
parallelize the query if will likely result in speedup. If the query structure
indicates that speedup is not likely to be obtained, then PLINQ will execute
the query as an ordinary LINQ to Objects query.


ForceParallelism – Parallelize
the entire query, even if that means using high-overhead algorithms. Use this flag
in cases where you know that parallel execution of the query will result in
speedup, but PLINQ in the Default mode would execute it as sequential.


public static ParallelQuery<TSource> WithMergeOptions<TSource>(

    this ParallelQuery<TSource> source,

    ParallelMergeOptions mergeOptions

)


public enum ParallelMergeOptions


Default - Use the default merge type, which is AutoBuffered.

NotBuffered - Use a merge without output buffers. As soon as result elements have been computed, make that element available to the consumer of the query.

AutoBuffered - Use a merge with output buffers of a size chosen by the system. Results will accumulate into an output buffer before they are available to the consumer of the query.

Parallel’s Static For, ForEach, and Invoke Methods

  1. When is better to use Parallel.For, Parallel.ForEach
    and Parallel.Invoke methods?
  2. How do Parallels methods
    work?
  3. What are overloads of
    the Parallel.For and Parallel.ForEach methods?

We don’t have to use parallels methods when operations must be processed
in sequential order. To gain a performance increase we should use parallels
methods when we have many work items which can be processed by multiple threads
(they don’t have shared data) or long-running compute-bound operations.

Sample 1

int n = 20;

int[] array = new int[20] { 0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19 };

Parallel.For(0, n,
i => DoWork(i));

Parallel.ForEach(array,
i => DoWork(i));

The calling thread also participate in processing work items however if it
completes its processing before the thread pool threads will finish their work the
calling thread suspends itself until all the work is done.

Sample 2

CancellationTokenSource cts = new CancellationTokenSource();

ParallelOptions opts = new ParallelOptions

{

CancellationToken = cts.Token

,

MaxDegreeOfParallelism = -1

,

TaskScheduler = TaskScheduler.Default

};

Parallel.Invoke(opts,
new Action[]{

() => { Thread.Sleep(400);
Console.WriteLine(“call
1”); }

, () => { Thread.Sleep(300);
Console.WriteLine(“call
2”);  }

, () => { Thread.Sleep(200);
Console.WriteLine(“call
3”);  }

, () => { Thread.Sleep(100);
Console.WriteLine(“call
4”);  }

});

Name Description
CancellationToken Gets or
sets the CancellationToken
associated with this ParallelOptions instance.
MaxDegreeOfParallelism Gets or
sets the maximum degree of parallelism enabled by this ParallelOptions
instance.
TaskScheduler Gets or
sets the TaskScheduler
associated with this ParallelOptions instance.Setting
this property to null indicates that the current scheduler should be used

There are overloads methods
for Parallel.ForEach and Parallel.For which allow controlling execution.

localInit

Type: System.Func<TLocal>

The function delegate that returns the initial
state of the local data for each thread.

The state
that is returned from the localInit method can be passed to the first body invocation
for each thread.

body

Type: System.Func<Int32, ParallelLoopState, TLocal, TLocal>

The delegate that is invoked once per iteration.

The
result that is returned from the body can be passed to be processed by the
localFinally delegate.

localFinally

Type: System.Action<TLocal>

The delegate that performs a final action on the
local state of each thread.

Sample 3

private static void
ParralelForWithInitBodyFinal()

{

long
totalLength = 0;

try

{

ParallelLoopResult
result = Parallel.ForEach<string, long>(

Directory.GetFiles(@”C:\rush\”)

, () =>
{ Console.WriteLine(“Local
task init”); return 0; }

, (path2file, loopState, index,
runningTotal) =>

{

Console.WriteLine(“Is exceptional: {0}”,
loopState.IsExceptional);

long
fileLength = 0;

using
(FileStream fs = new
FileStream(path2file, FileMode.Open))

{

fileLength = fs.Length;

}

return
runningTotal += fileLength;

}

, runningTotal =>

{

Console.WriteLine(“Local task finally”);

Interlocked.Add(ref totalLength, runningTotal);

});

Console.WriteLine(result.ToString());

}

catch (AggregateException ex)

{

foreach
(Exception innerException in ex.InnerExceptions)

{

Console.WriteLine(innerException);

}

}

Console.WriteLine(totalLength);

}

We can
manage execution of the Parallel.For and Parallel.ForEach methods using ParallelLoopState members.

Methods

Name Description
Break Communicates
that the Parallel loop should cease execution at the
system’s earliest convenience of iterations beyond the current iteration.
Stop Communicates
that the Parallel loop should cease execution at the
system’s earliest convenience.

Properties

Name Description
IsExceptional Gets
whether any iteration of the loop has thrown an exception that went unhandled
by that iteration.
IsStopped Gets
whether any iteration of the loop has called Stop.
LowestBreakIteration Gets
the lowest iteration of the loop from which Break was called.
ShouldExitCurrentIteration Gets
whether the current iteration of the loop should exit based on requests made
by this or other iterations.

The ParallelLoopResult has
following properties:

Name Description
IsCompleted Gets
whether the loop ran to completion, such that all iterations of the loop were
executed and the loop didn’t receive a request to end prematurely.
LowestBreakIteration Gets
the index of the lowest iteration from which Break was
called.

Task Schedulers

  1. 1.   What is a TaskScheduler?

 

A TaskScheduler represents an object that handles the low-level work of scheduling threads. The Framework provides two concrete implementations: the default scheduler which works in tandem with ThreadPool and the synchronization context scheduler, which is used by WPF and WinForms. Which task scheduler a task should use we can point out manually.

public partial class Form1 : Form

{

private readonly TaskScheduler _taskScheduler = null;

private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();

public Form1()

{

    InitializeComponent();

    Text = “some text”;

    Width = 500;

    Height = 200;

    Visible = true;

    _taskScheduler = TaskScheduler.FromCurrentSynchronizationContext();

}

private void InitializeComponent(){}

private int DoSomeCalc(int n)

{

    for (int i = 0; i < n; i++)

    {

        Thread.Sleep(100);

        n++;

    }

    return n;

}

protected override void OnMouseClick(MouseEventArgs e)

{

    if (_cancellationTokenSource != null)

    {

        _cancellationTokenSource.Cancel();

        _cancellationTokenSource = null;

    }

    else

    {

        _cancellationTokenSource = new CancellationTokenSource();

        Task<int> task = new Task<int>(() => DoSomeCalc(10), _cancellationTokenSource.Token);

        task.Start();

        task.ContinueWith(t => { Text = “Thread completed, the result is: ” + t.Result.ToString(); }, CancellationToken.None, TaskContinuationOptions.OnlyOnRanToCompletion, _taskScheduler);

        task.ContinueWith(t => { Text = “Thread was canceled, the result is: ” + t.Result.ToString(); }, CancellationToken.None, TaskContinuationOptions.OnlyOnCanceled, _taskScheduler);

        task.ContinueWith(t => { Text = “Thread faulted, the result is: ” + t.Result.ToString(); }, CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted, _taskScheduler);

    }

    base.OnMouseClick(e);

}

}

It’s possible to write our own TaskScheduler.

Task Factories

  1. 1.   What is the purpose of a TaskFactory?

 

We need a TaskFactory when we have to repeatedly create tasks with the same parameters. Using a TaskFactory we can achieve three goals:

  1. Create ordinary Tasks (via StartNew()).
  2.  Continuations with multiple antecedents (via ContinueWhenAll() or  ContinueWhenAny()).
  3. Tasks that wrap methods that follow the asynchronous programming model (via FromAsync()).

Sample 1:

Task parent = new Task(() =>

{

    CancellationTokenSource cts = new CancellationTokenSource();

    TaskFactory<int> taskFactory = new TaskFactory<int>(cts.Token,

        TaskCreationOptions.AttachedToParent,

        TaskContinuationOptions.ExecuteSynchronously,

        TaskScheduler.Default);

    Task<int>[] childTasks = new[]

    {

        taskFactory.StartNew(

            () => GetResult(100)

        , cts.Token),

        taskFactory.StartNew( () =>

            GetResult(200)

            , cts.Token

        ),

        taskFactory.StartNew( () =>

            GetResult(int.MaxValue)

            , cts.Token)                   

    };

    foreach (Task<int> childTask in childTasks)

    {

        childTask

            .ContinueWith(t => cts.Cancel(), TaskContinuationOptions.OnlyOnFaulted);

    }

    taskFactory.ContinueWhenAll(

        childTasks

        , selectedTasks => selectedTasks

            .Where(t => !t.IsCanceled && !t.IsFaulted)

            .Max(t => t.Result)

        , CancellationToken.None)

        .ContinueWith(t => Console.WriteLine(t.Result), TaskContinuationOptions.ExecuteSynchronously);

});

parent.ContinueWith(task =>

{

    StringBuilder sb = new StringBuilder(task.Exception.InnerExceptions.Count);

    foreach (Exception exception in task.Exception.InnerExceptions)

    {

        sb.Append(exception.ToString());

    }

    Console.WriteLine(sb.ToString());

}, TaskContinuationOptions.OnlyOnFaulted);

parent.Start();

Inside a Task

  1. What are properties of a task?
  2. What does the Dispose() method do?
  3. What is the tasks ID?
  4. What are the statuses of a task?

Each Task(TResult) class has following properties?

  Name Description
  AsyncState Gets the state object supplied when the Task was created, or null if none was supplied. (Inherited from Task.)
  CreationOptions Gets the TaskCreationOptions used to create this task. (Inherited from Task.)
  Exception Gets the AggregateException that caused the Task to end prematurely. If the Task completed successfully or has not yet thrown any exceptions, this will return null. (Inherited from Task.)
  Factory Provides access to factory methods for creating Task<(Of <(TResult>)>) instances.
  Id Gets a unique ID for this Task instance. (Inherited from Task.)
  IsCanceled Gets whether this Task instance has completed execution due to being canceled. (Inherited from Task.)
  IsCompleted Gets whether this Task has completed. (Inherited from Task.)
  IsFaulted Gets whether the Task completed due to an unhandled exception. (Inherited from Task.)
  Result Gets the result value of this Task<(Of <(TResult>)>).
  Status Gets the TaskStatus of this Task. (Inherited from Task.)

 

As we can see the Task.Dispose() method sets the ManualResetEventSlim object and calls its Dispose() method.

protected virtual void Dispose(bool disposing)

{

    if (disposing)

    {

        if (!this.IsCompleted)

        {

            throw new InvalidOperationException(

Environment.GetResourceString(“Task_Dispose_NotCompleted”)

);

        }

        ManualResetEventSlim completionEvent = this.m_completionEvent;

        if (completionEvent != null)

        {

            if (!completionEvent.IsSet)

            {

                completionEvent.Set();

            }

            completionEvent.Dispose();

            this.m_completionEvent = null;

        }

    }

    this.m_stateFlags |= 0x40000;

}

The id of a task is calculated on the time of request.

public int Id

{

    get

    {

        if (this.m_taskId == 0)

        {

            int num = 0;

            do

            {

                num = Interlocked.Increment(ref s_taskIdCounter);

            }

            while (num == 0);

            Interlocked.CompareExchange(ref this.m_taskId, num, 0);

        }

        return this.m_taskId;

    }

}

If we query the CurrentId  property while the task is not running it returns null.

public static int? CurrentId

{

    get

    {

        Task internalCurrent = InternalCurrent;

        if (internalCurrent != null)

        {

            return new int?(internalCurrent.Id);

        }

        return null;

    }

}

We can obtain a current of a task requesting the Status property.

Member name Description
Created The task has been initialized but has not yet been scheduled.
WaitingForActivation The task is waiting to be activated and scheduled internally by the .NET Framework infrastructure.
WaitingToRun The task has been scheduled for execution but has not yet begun executing.
Running The task is running but has not yet completed.
WaitingForChildrenToComplete The task has finished executing and is implicitly waiting for attached child tasks to complete.
RanToCompletion The task completed execution successfully.
Canceled The task acknowledged cancellation by throwing an OperationCanceledException with its own CancellationToken while the token was in signaled state, or the task’s CancellationToken was already signaled before the task started executing. For more information, see Task Cancellation.
Faulted The task completed due to an unhandled exception.

A Task May Start Child Tasks

1. How to create children tasks attached to its parent task?

By default, a top-level task that creates internal tasks doesn’t have relationships to the tasks that create. To make a task which contains children tasks isn’t considered as completed until its children tasks have finished running, we can create children tasks attached to its parent task.

static void AttachedToParentTest()

{

    Task<int[]> mainTask = new Task<int[]>(() =>

        {

            int[] result = new int[3];

            new Task(() => result[0] = GetResult(100)

                , TaskCreationOptions.AttachedToParent).Start();

            new Task(() => result[1] = GetResult(200)

                , TaskCreationOptions.AttachedToParent).Start();

            new Task(() => result[2] = GetResult(300)

                , TaskCreationOptions.AttachedToParent).Start();

            return result;

        });

    Trace.WriteLine(“Main thread Id:” + Thread.CurrentThread.ManagedThreadId.ToString());

    mainTask.Start();

    Task continueWithTask = mainTask.ContinueWith(tasks => Array.ForEach(tasks.Result,

    result =>

    {

        Trace.WriteLine(result);

        Trace.WriteLine(“ContinueWith thread Id:” + Thread.CurrentThread.ManagedThreadId.ToString());

    }));           

}