C#/.NET Concurrent Collections (Eşzamanlı Koleksiyonlar)
Merhabalar.
Bu yazıda thread-safe koleksiyonlardan bahsedeceğim. Öncelikle Thread-Safe kavramını açıklayacak olursak; bir iş parçacığını birden fazla kaynağın kullanabilmesi anlamına gelmektedir.
Concurrent Collection yapıları normalde bir iş parçacığını birden fazla kaynağın kullanması durumunda oluşabilecek hataların önüne geçmemizi sağlayan yapılardır. Bu yapılar .NET 4.0 ile birlikte hayatımıza girmiştir.
Thread Safe + aynı anda ilerleme, ekleme, çıkartma, düzenleme yapabilme yeteneği = pahalı maliyet
Concurrent Collections’ lar 5 e ayrılır. Bunlar;
- ConcurrentDictionary
- ConcurrentStack
- ConcurrentQueue
- ConcurrentBag
- BlockingCollection
1. ConcurrentDictionary
Aşağıdaki senaryoda eğer bir Dictionary koleksiyonun üyelerinin dolaşılması sırasında, eleman çıkartma işlemi gerçekleştirildiğinde çalışma zamanında aşağıdaki ekran görüntüsünde yer alan InvalidOperationException istisnasını almanız gerekirdi. Ama Concurrent versiyonu kullanıldığı için Thread Safe kuralları çerçevesinde herhangi bir sorun ile karşılaşılmaz.
ConcurrentDictionary<int, string> numbers = new ConcurrentDictionary<int, string>();numbers.TryAdd(1, "Bir");
numbers.TryAdd(2, "İki");
numbers.TryAdd(3, "Üç");
numbers.TryAdd(4, "Dört");
numbers.TryAdd(5, "Beş");
numbers.TryAdd(6, "Altı");foreach (KeyValuePair<int,string> number in numbers)
{
string value;
bool result = numbers.TryRemove(number.Key, out value);
if(result)
Console.WriteLine("{0} çıkartıldı.",value);
}
2. ConcurrentStack
Benzer şekilde bir ConcurrentStack hepimizin bildiği Stack yapısının eşzamanlı bir versiyonudur. Aynı kalıptaki verilere çok iş parçacıklı LIFO erişimi gerçekleştirebilirsiniz.
static void Main(string[] args)
{
ConcurrentStack numberCollection = new ConcurrentStack();
for (int i = 10; i <= 20; i++)
{
numberCollection.Push(i);
}
Parallel.For(0, 10, i =>
{
int value = 0;
if (numberCollection.TryPop(out value))
{
Console.WriteLine("Value {0} is popped out!", value);
}
});
Console.ReadLine();
}
3. ConcurrentQueue
Benzer şekilde, ConcurrentQueue, Queue yapısının eşzamanlı uygulamasıdır ve Try Pattern’i kullanır. Aşağıdaki kod, çok iş parçacıklı bir işlemde ConcurrentQueue kullanır.
static void Main(string[] args)
{
ConcurrentQueue nameCollection = new
ConcurrentQueue();
nameCollection.Enqueue("Luke Skywalker");
nameCollection.Enqueue("Leia Organa");
nameCollection.Enqueue("Han Solo");
nameCollection.Enqueue("Chewbacca");
nameCollection.Enqueue("Obi-Wan Kenobi");
nameCollection.Enqueue("Lando Calrissian");
Task[] taskList = new Task[2];
taskList[0] = Task.Factory.StartNew(() =>
{
string name = String.Empty;
while (nameCollection.TryDequeue(out name))
{
Console.WriteLine("Name {0} is dequeued!", name);
}
});
taskList[1] = Task.Factory.StartNew(() =>
{
string name = String.Empty;
while (nameCollection.TryDequeue(out name))
{
Console.WriteLine("Name {0} is dequeued!", name);
}
});
Task.WaitAll();
Console.ReadLine();
Kuyruktan kaldırmak için TryPattern’i ve sıranın içinde başka bir şey bırakılıp bırakılmadığını görmek için dönüş değerini kullanıldığını da görüyorsunuz.
4. ConcurrentBag
Bu koleksiyon işlenecek ama sıralanmamış itemları saklamak için kullanılır. Örnek verecek olursak sipariş kalemlerinin gelip işleneceğini ama sırasının önemini umursamadığınız durumlarda kullanılır.
private static ConcurrentBag m_entityCollection = new ConcurrentBag();static void Main(string[] args)
{
m_entityCollection.Add("Ripple");
m_entityCollection.Add("Bitcoin");
m_entityCollection.Add("Litecoin");
m_entityCollection.Add("OmiseGO");
m_entityCollection.Add("Ethereum");
m_entityCollection.Add("Swissborg");
m_entityCollection.Add("Tron");
m_entityCollection.Add("Monero");
ThreadStart threadStart = new ThreadStart(PrintEntities);
Thread thread1 = new Thread(threadStart);
thread1.Start();
Thread thread2 = new Thread(threadStart);
thread2.Start();
Console.ReadLine();
}
static void PrintEntities()
{
string crypto;
while (m_entityCollection.TryPeek(out crypto))
{
if (m_entityCollection.TryTake(out crypto))
{
Console.WriteLine(entity);
}
}
}
5. BlockingCollection
Veri kaynağının boş olduğu durumlarda eleman talep edildiği vakit bir eleman ekleninceye kadar metot akışını bekleten ve başka bir thread tarafından ilgili kaynağa eleman eklenince bu elemanı alarak geriye döndüren bir koleksiyondur. Ayriyetten bunların yanında belirlediğimiz kotayı aşamaması için boyutu limitlenebilen bir koleksiyondur. BlockingCollection koleksiyonu farklı overloadlarında yukarıda ele aldığımız ConcurrentQueue, ConcurrentStack ve ConcurrentBag koleksiyonlarıyla beraber çalışmaktadır.
static BlockingCollection<int> blockingCollection = new BlockingCollection<int>();
static void Main(string[] args)
{
int sayac = 10;
Task.Run(() =>
{
while (sayac >= 1) blockingCollection.Add(sayac--);
blockingCollection.CompleteAdding();
});
Task.Run(() =>
{
try
{
while (true) Console.WriteLine(blockingCollection.Take());
}
catch (InvalidOperationException ex)
{
Console.WriteLine(ex.Message);
}
});
Console.Read();
}
Eğer ki, BlockingCollection koleksiyonuna eklenecek olan verinin sonuna gelindiğine dair bilgi vermek istiyorsak ekleme işleminden sonra CompleteAdding metodunu çağırmanız yeterlidir. Bu metot çağrıldıktan sonraki ilk Take talebinde InvalidOperationException hatası fırlatılacak ve böylece yeni bir veri girilmesine dair beklenti sona erecektir.
Thread Safe koleksiyonları performans açısından normal koleksiyonlara nazaran oldukça düşük seviyede çalışmakta ve maliyeti arttırmaktadırlar. Bu koleksiyonları asenkron operasyonlar dahilinde yukarıda değindiğimiz senaryolara uygun bir şekilde değerlendirmenizi ve yersiz bir şekilde kullanımından kaçınmanızı önemle vurguluyorum.