RcBuilder@walla.com
call centre: 054-5614020

TPL – C# Task Parallel Library

26
Jan
2016
Posted by: RcBuilder  /   Category: Multi Threads / Parallelism / SOURCE CODE / TPL   /   No Comments

TPL – C# Task Parallel Library

advantages:
1. heavy actions do not stuck the UI due to the using of another thread and maybe even a different core
a good example for this phenomenon is when working with winForms and perform an heavy task – the UI is not responsive

2. an abstraction that built on top of the Threads (using ThreadPool backstage) and manages the threadPool for better performance

——-

- Terms -

// create new task and start it
// note! not supported in Action<T>
var task = Task.Factory.StartNew([Action])

// create new task without starting it – use start() method to start the task
var task = new Task([Action])
task.start();

// wait for the task to complete
task.Wait();

// once the prevoius task is done – continue to another task – use prevTask.Result in order to get the return value of the previous task
task.ContinueWith([Action(prevTask)]);

// choose the thread context which will execute this task – in this example we refer to the main thread
TaskScheduler.FromCurrentSynchronizationContext();

// task that returns T value
Task<T>

// holds the return value of the task – also, waits for the task to complete
task.Result

// async is a method decorator – it only inform the compiler that this method may include a wait clause within it
// await is the clause uses to wait for a task to complete
async … await

// in addition, we can use some independend code while waiting to the task to complete – just need to set the code between the task and the await
var task = MyMethodAsync();
code here …
var result = await task;
code here …

// create an Action
Action action1 = () => { … }

// create an action with parameters
Action<T> action1 = (T) => { … }

// create a Func
Func<T, TResult> func = (T) => { return TResult; };

// get the current thread id
Thread.CurrentThread.ManagedThreadId

// create a range – we can use it to generate a range of tasks
Enumerable.Range([start], [count]).Select([Func]);
Enumerable.Range([start], [count]).ToList().ForEach([Action]); // foreach
Enumerable.Range([start], [count]).AsParallel().ForAll([Action]); // parallel
Enumerable.Range([start], [count]).AsParallel().WithDegreeOfParallelism([int]).ForAll([Action]); // parallel + set max tasks to perform in parallel

// list ForEach and ForAll
List.ForEach([Action<T>]);
List.AsParallel().ForAll([Action<T>]);

e.g:
var list = new List<string> { “A”, “B”, “C”, “D”, “E”, “F” };
list.ForEach(x => { Console.WriteLine(x); });
list.AsParallel().ForAll(x => { Console.WriteLine(x); });

// create a bulk of Action
Parallel.For([from], [to], [Action]);
Parallel.ForEach([source], [Action<T>]);

// BlockingCollection
BlockingCollection uses ConcurrentQueue backstage which serve as a Thread safe Queue, but with an addon of the max queue size allowed
when this max queue is reached – the queue will go to sleep – this behaviour handle a memory leaks
where the producers continuing to produce and flood the queue while there are no cunsumers to cunsume from it

// recommendation: we should add the ‘Async’ suffix to all of our async methods
private async void ExecuteAsync(){ … }

——-

- Task.Factory.StartNew -

example:
var task = Task.Factory.StartNew(() => {
Thread.Sleep(2000);
Console.WriteLine(“TASK COMPLETED”);
});

Console.WriteLine(“END”);

console:
END
TASK COMPLETED

result:
the main thread will NOT wait the task to complete its work and therefore it will start working and print the ‘END’
when the task will finish its background proccess – it will print ‘TASK COMPLETED’

——-

- Task Wait -

example:
var task = Task.Factory.StartNew(() => {
Thread.Sleep(2000);
Console.WriteLine(“TASK COMPLETED”);
});
task.Wait();
Console.WriteLine(“END”);

console:
TASK COMPLETED
END

result:
the task.Wait() causes the main thread to wait the task to complete its work

——-

- new Task -

example:
var task = new Task(() => {
Thread.Sleep(2000);
Console.WriteLine(“TASK COMPLETED”);
});

Thread.Sleep(500);
Console.WriteLine(“BEFORE STARTING THE TASK”);
task.Start();
Console.WriteLine(“AFTER STARTING THE TASK”);

task.Wait();
Console.WriteLine(“END”);

console:
BEFORE STARTING THE TASK
AFTER STARTING THE TASK
TASK COMPLETED
END

result:
the task.Start() will trigger the task to proccess

——-

- ContinueWith -

example:
var task = Task.Factory.StartNew(() => {
Thread.Sleep(2000);
Console.WriteLine(“TASK 1 COMPLETED”);
return 1;
}).ContinueWith((prevTask) => {
Thread.Sleep(2000);
Console.WriteLine(“TASK 2 COMPLETED WITH RESULT {0}”, prevTask.Result);
});

task.Wait();

console:
TASK 1 COMPLETED
TASK 2 COMPLETED WITH RESULT 1

result:
the first task is returning an int result which can be accessed in the continuing task

——-

- Task Result -

example:
var task = new Task<string>(() =>
{
Thread.Sleep(2000);
Console.WriteLine(“TASK COMPLETED”);
return “ABCD”;
});

task.Start();
Console.WriteLine(task.Result);
Console.WriteLine(“END”);

console:
TASK COMPLETED
ABCD
END

result:
task.Result holds the return value of the task
note! when calling to task Result property – it waits for the task to complete its work so we do NOT require the task.wait clause

——-

- async and await -

steps:
1. create a method that return a task
2. create an async method that execute the method from chapter 1 using an await syntax
note! Must be decorated as async in its signature
3. call the async method from chapter 2

example:
static Task<string> CreateTask() {
var task = Task.Factory.StartNew(() => {
Thread.Sleep(1000);
Console.WriteLine(“CreateTask”);
return “ABCD”;
});

return task;
}

static async Task ExecuteAsync() {
var task = CreateTask();

// code here …
// note! if no code require to run while waiting to the async task to complete – we can use:
// var result = await CreateTask;

var result = await task;
Console.WriteLine(result);
}

static void Main(string[] args)
{
ExecuteAsync();
Console.WriteLine(“continue working … “);
Console.ReadKey();
}

console:
continue working …
CreateTask
ABCD

result:
when using async and await we have to create a method that return a Task – the await claue will wait the task to finish
and only then – will continue the code comes after it
- async and await important note -

the main thread continue working and not paused by the ExecuteAsync method!
the reason is that await replaced br the compiler to Task and ContinueWith whereas the await content will be the task and the content after the await
will be the ContinueWith.

e.g:
the original code below translated into the translation code beneath by the compiler:

// original code

public Task<string> Fun1() {
return Task.Factory.StartNew(() => {
Console.WriteLine(“ENTER Fun1″);
Thread.Sleep(2000);
Console.WriteLine(“EXIT Fun1″);

return “ABCD”;
});
}

public async void Fun1Async() {
Console.WriteLine(“ENTER Fun1Async”);
await Fun1();
Console.WriteLine(“EXIT Fun1Async”);
}

public void Main()
{
Fun1Async();
Console.WriteLine(“continue working … “);
Console.ReadKey();
}

// translated code

static void Main(string[] args)
{
Console.WriteLine(“ENTER Fun1Async”);
Task.Factory.StartNew(() => {
Console.WriteLine(“ENTER Fun1″);
Thread.Sleep(2000);
Console.WriteLine(“EXIT Fun1″);

return “ABCD”;
}).ContinueWith((prevTask) => {
Console.WriteLine(“EXIT Fun1Async”);
});

Console.WriteLine(“continue working … “);
Console.ReadKey();
}

——-

- tasks array -

example:
Action action = () => { … };

var tasks = new[]{
Task.Factory.StartNew(action),
Task.Factory.StartNew(action),
Task.Factory.StartNew(action)
};

Task.WaitAll(tasks);

result:
those tasks will be executed in parallel

——-

- Action<Action> -

example:
Action<Action> calculateActionTime = (action) => {
var timer = new Stopwatch();
timer.Start();
action();
timer.Stop();
Console.WriteLine(timer.Elapsed.TotalSeconds);
};

Action myAction = () => {
Thread.Sleep(2000);
};

calculateActionTime(myAction);

result:
create an Action which receive an action – execute it and calculate its proccessing time

——-

- Action<T> -

example:
Action<int> actionInt = (param) => { }; // param is int
Action<string> actionString = (param) => { }; // param is string
Action<float> actionFloat = (param) => { }; // param is float
Action<Action> actionAction = (param) => { }; // param is Action

——-

- Enumerable.Range -

example:
Action myAction = () => {
Thread.Sleep(2000);
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
};

var tasks = Enumerable.Range(1, 10).Select(x => Task.Factory.StartNew(myAction)).ToArray();
Task.WaitAll(tasks);

result:
create a range of 10 tasks

tip:
use Enumerable.Range AsParallel() instead of creating the tasks manually

——-

- Enumerable.Range ForEach vs ForAll -

example:
Action myAction = () => {
Thread.Sleep(2000);
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
};

// 1st
Enumerable.Range(1, 10).ToList().ForEach(x => { myAction(); }); // 20 seconds

// 2nd
Enumerable.Range(1, 10).AsParallel().ForAll(x => {myAction(); }); // 6 seconds

result:
the 1st loop will exexute 10 instances of myAction one after the other in a single thread (whole proccess will take 20 sec)
the 2sn loop will perform 10 instances of myAction in parallel (whole proccess will take 6 sec!!)
*
——-

- Parallel.For -

example:
Action myAction = () => {
Thread.Sleep(2000);
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
};

Parallel.For(0, 10, _ => { myAction(); });
Console.WriteLine(“END”);

note!
Parallel.For clause wait for all tasks therefore,
the ‘END’ will be written after all tasks will finish their work

——-

- Parallel.ForEach -

example:
var lstNumbers = new List<int> { 1, 4, 5, 7, 8, 11, 89, 15, 9, 130, 54 };

Action<int> myAction = (number) => {
Thread.Sleep(1000);
Console.WriteLine(number);
};

Parallel.ForEach(lstNumbers, myAction);
Console.WriteLine(“END”);

note!
Parallel.ForEach clause wait for all tasks therefore,
the ‘END’ will be written after all tasks will finish their work

——-

- producers/consumers example -

// put queue to sleep when reaching 100 items – handle memory overflow
// when the consumers have not consume as fast as the producer produce
var queue = new BlockingCollection<int>(100);

Action<int> produceAction = (index) => {
Enumerable.Range(1, 30).ToList().ForEach((item) => {
Console.WriteLine(“producer {0} produce item {1}”, index, item);
queue.Add(item);
Thread.Sleep(150);
});
};

Action<int> consumeAction = (index) =>{
foreach (var item in queue.GetConsumingEnumerable())
{
Console.WriteLine(“consumer {0} consume item {1}”, index, item);
Thread.Sleep(150);
}
};
var producers = Enumerable.Range(1, 3).Select(x => Task.Factory.StartNew(() => { produceAction(x); })).ToArray();
var consumers = Enumerable.Range(1, 2).Select(x => Task.Factory.StartNew(() => { consumeAction(x); })).ToArray();

Task.WaitAll(producers);
Task.WaitAll(consumers);

Console.WriteLine(“END”);

——-

- async and await -

example:
static Task<int> CreateTask() {
return Task.Factory.StartNew(() => {
Thread.Sleep(2000);
Console.WriteLine(“IN TASK ….”);
return Thread.CurrentThread.ManagedThreadId;
});
}

static async Task ExecuteAsync() {
var task = CreateTask();

Enumerable.Range(1, 10).ToList().ForEach((i) => {
Console.WriteLine(“IN ASYNC METHOD … {0}”, i);
Thread.Sleep(500);
});

var result = await task;
Console.WriteLine(“result: {0}”, result);
}

static void Main(string[] args)
{
var task = ExecuteAsync();
task.Wait();

Console.WriteLine(“END”);
Console.ReadKey();
}

console:
IN ASYNC METHOD … 1
IN ASYNC METHOD … 2
IN ASYNC METHOD … 3
IN ASYNC METHOD … 4
IN TASK ….
IN ASYNC METHOD … 5
IN ASYNC METHOD … 6
IN ASYNC METHOD … 7
IN ASYNC METHOD … 8
IN ASYNC METHOD … 9
IN ASYNC METHOD … 10
result: 6
END

————————————————————————–

- examples:

Task taskA = Task.Factory.StartNew(() => {
Console.WriteLine(“Starting Task A”);
Thread.Sleep(2000);
Console.WriteLine(“Ending Task A”);
});

Task taskB = Task.Factory.StartNew(() =>
{
Console.WriteLine(“Starting Task B”);
Thread.Sleep(5000);
Console.WriteLine(“Ending Task B”);
});

Task<int> taskC = Task.Factory.StartNew(() =>
{
Console.WriteLine(“Starting Task C”);
Thread.Sleep(1400);
Console.WriteLine(“Ending Task C”);

return 123; // return value – Task<int>
});

Task taskD = taskC.ContinueWith((prevTask) =>
{
Console.WriteLine(“Starting Task D”);
Console.WriteLine(prevTask.Result); // get the return value of the previous task
Thread.Sleep(500);
Console.WriteLine(“Ending Task D”);
});

// note! no need to wait for taskC due to the ContinueWith clause – taskD will start only when taskC is finished
Task.WaitAll(taskA, taskB, taskD);
Console.WriteLine(“END”);

result:

Starting Task A
Starting Task B
Starting Task C
Ending Task C
Starting Task D
123
Ending Task D
Ending Task A
Ending Task B
END

—-

var rnd = new Random();

Action<string> action = (item) => {
Thread.Sleep(rnd.Next(4000));
Console.WriteLine(item);
};

var lst = new List<string> { “A”, “B”, “C”, “D”, “E”, “F” };
lst.AsParallel().ForAll(action);

/* Manually
var tasks = new List<Task>();
lst.ForEach(item => tasks.Add(Task.Factory.StartNew(() => { action(item); })));
Task.WaitAll(tasks.ToArray());
*/

Author Avatar

About the Author

בניית אתרים ופתרונות טכנולוגים | RcBuilder

No Comments


  • פיתוח מערכות
  • פתרונות טכנולוגים
  • קידום אתרים
  • בניית אתרים