· Producer (yayınlayan) uygulama örneği
· Consumer (tüketen) uygulama örneği
· Temel RabbitMQ API yöntemleri ve kütüphaneleri
· Mesaj sırası, teslim garantileri ve “ack” mekanizması

Producer (yayınlayan) uygulama örneği

· Producer, RabbitMQ Broker’a mesaj gönderen (publish eden) taraftır.
· Producer uygulaması, mesajları oluşturur ve Exchange’e gönderirken genellikle bir Routing Key belirtir.
· Gönderilen mesaj, ilgili Exchange türü ve Binding Key kurallarına göre doğru kuyruğa yönlendirilir.

Producer’ın Temel Adımları:

1.Bağlantı oluşturma: RabbitMQ broker’ına bağlanmak için bir bağlantı (connection) açılır.

2.Kanal (channel) açma: Bağlantı üzerinden bir “channel” oluşturulur; bu, mesajların gönderilmesi ve alınması için kullanılan mantıksal yoldur.

3.Mesaj gönderme (publish):
channel.basicPublish(exchangeName, routingKey, ...) gibi bir yöntemle mesaj gönderilir.

Bağlantıyı kapatma: İlgili işlemler bittiğinde kanal ve bağlantı kapatılır (ya da açık bırakılıp tekrar kullanılabilir).

Gereksinimler:

  1. RabbitMQ.Client NuGet paketi eklenmeli:

RabbitMQ ile iletişim kurmak için gerekli olan bu paketi projeye ekleyin:

dotnet add package RabbitMQ.Client

C# Producer Örneği:

using RabbitMQ.Client;
using System;
using System.Text;

// RabbitMQ bağlantı ayarları
var factory = new ConnectionFactory()
{
    HostName = "localhost", // RabbitMQ sunucu adresi
    UserName = "guest",     // Varsayılan kullanıcı
    Password = "guest"      // Varsayılan şifre
};

// RabbitMQ bağlantısını oluştur
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

// Queue tanımla
string queueName = "example-queue";
channel.QueueDeclare(
    queue: queueName,
    durable: true,   // Mesajlar RabbitMQ yeniden başlatıldığında korunur
    exclusive: false,
    autoDelete: false,
    arguments: null);

Console.WriteLine("Mesaj gönderilecek. Çıkış için Ctrl+C basabilirsiniz.");

while (true)
{
    Console.Write("Göndermek istediğiniz mesaj: ");
    var message = Console.ReadLine();

    // Mesajı byte dizisine dönüştür
    var body = Encoding.UTF8.GetBytes(message);

    // Mesajı RabbitMQ'ya gönder
    channel.BasicPublish(
        exchange: "",           // Default exchange
        routingKey: queueName,  // Hedef queue adı
        basicProperties: null,  // Varsayılan özellikler
        body: body);

    Console.WriteLine($"Mesaj gönderildi: {message}");
}

1. RabbitMQ Bağlantı Ayarları

  • Bir ConnectionFactory nesnesi oluşturularak RabbitMQ sunucusuna bağlanmak için gerekli ayarlar tanımlanır:
    • HostName: RabbitMQ sunucusunun adresi. Burada "localhost" olarak belirtilmiştir.
    • UserName ve Password: RabbitMQ'nun varsayılan kullanıcı adı ve şifresi olan guest kullanılır.

2. RabbitMQ Bağlantısı ve Kanal Oluşturma

  • factory.CreateConnection() metodu ile RabbitMQ sunucusuna bir bağlantı açılır.
  • connection.CreateModel() ile bu bağlantı üzerinden bir kanal (channel) oluşturulur:
    • Kanallar, RabbitMQ ile iletişim kurmak için kullanılır.

3. Queue Tanımlama

  • channel.QueueDeclare ile bir kuyruk (queue) oluşturulur:
    • queue: Kuyruğun adı, burada "example-queue" olarak belirtilmiştir.
    • durable: true olduğunda, kuyruk RabbitMQ yeniden başlatıldığında korunur.
    • exclusive: false olduğunda kuyruk birden fazla bağlantı tarafından paylaşılabilir.
    • autoDelete: false olduğunda, kuyruk otomatik olarak silinmez.
    • arguments: Kuyruğa özel ek parametreler için kullanılır, burada null verilmiştir.

4. Mesaj Gönderme Döngüsü

  • Bir sonsuz döngü (while (true)) ile kullanıcıdan mesaj alınıp RabbitMQ'ya gönderilir:
    • Console.ReadLine(): Kullanıcıdan mesajı alır.
    • Encoding.UTF8.GetBytes(message): Mesajı RabbitMQ'ya uygun bir byte dizisine dönüştürür.
    • channel.BasicPublish: Mesajı RabbitMQ'ya gönderir:
      • exchange: Boş bırakılarak varsayılan exchange kullanılır.
      • routingKey: Mesajın yönlendirilmesini sağlayan kuyruk adı (örnekte "example-queue").
      • body: Kuyruğa gönderilecek mesajın içeriği (byte formatında).

5. Konsol Çıktıları

  • Kullanıcıdan mesaj girişi istenir: Console.Write("Göndermek istediğiniz mesaj: ").
  • Mesaj başarıyla gönderildiğinde bir bilgi mesajı gösterilir: Console.WriteLine($"Mesaj gönderildi: {message}");.

6. Kaynak Yönetimi (using var)

  • using var kullanılarak:
    • connection ve channel nesneleri, kullanımları tamamlandığında otomatik olarak serbest bırakılır.
  • Bu, bağlantıların ve kaynakların temiz bir şekilde yönetilmesini sağlar.

Consumer (Tüketen) Uygulama

  • Consumer, RabbitMQ’da bir kuyruğa abone (subscribe) olarak mesajları çeken ve işleyen taraftır.
  • Consumer çoğunlukla kuyruk adını (queue name) bilir ve bir callback (geri çağırım) mekanizması ya da döngü (loop) içinde mesajları okur.
  • Mesaj işleme tamamlanınca, çoğunlukla bir ack (acknowledgement) gönderilir; bu, mesajın başarılı şekilde işlendiğini broker’a bildirir.

Consumer’ın Temel Adımları:

  1. Bağlantı ve kanal oluşturma
  2. Kuyruğa abone olma: channel.basicConsume(queueName, ...) ile kuyruktaki mesajları dinleyecek bir consumer tanımlanır.
  3. Mesaj işleme: Kuyruktan gelen her mesaj için belirlenmiş bir callback fonksiyon çalışır.
  4. ACK göndermek: Mesaj başarıyla işlendikten sonra channel.basicAck(deliveryTag, false) gibi bir yöntemle onay gönderilir.
  5. Hata/geri gönderme (Reject/Nack): Eğer mesaj işlenemiyorsa, basicNack veya basicReject ile mesajı iade etmek veya silebilmek de mümkündür.

C# Consumer Örneği:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

// RabbitMQ bağlantısı için gerekli bilgiler
var factory = new ConnectionFactory()
{
    HostName = "localhost", // RabbitMQ server adresi
    UserName = "guest",     // Kullanıcı adı
    Password = "guest"      // Şifre
};

using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

// Dinlenecek kuyruk adı
string queueName = "example-queue";

// Kuyruğun var olduğunu garanti etmek için (eğer yoksa oluşturulur)
channel.QueueDeclare(queue: queueName,
                     durable: true,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

Console.WriteLine("[Consumer] Kuyruk dinleniyor: " + queueName);

// Mesajların tüketimi için bir event tanımlıyoruz
var consumer = new EventingBasicConsumer(channel);

// Mesaj geldiğinde çalışacak callback fonksiyonu
consumer.Received += (model, ea) =>
{
    // Mesajın içeriğini alıyoruz
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);

    Console.WriteLine($"[Consumer] Gelen mesaj: {message}");

    try
    {
        // Burada mesaj işlenir
        ProcessMessage(message);

        // İşlem başarılıysa ACK gönderilir
        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        Console.WriteLine("[Consumer] Mesaj başarıyla işlendi.");
    }
    catch (Exception ex)
    {
        Console.WriteLine($"[Consumer] Mesaj işlenirken hata: {ex.Message}");

        // Mesaj işlenemezse NACK ile reddedilebilir
        channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
        Console.WriteLine("[Consumer] Mesaj tekrar kuyruğa gönderildi.");
    }
};

// Consumer kuyruğa bağlanıyor
channel.BasicConsume(queue: queueName,
                     autoAck: false, // ACK işlemini manuel yapıyoruz
                     consumer: consumer);

Console.WriteLine("[Consumer] Çıkmak için bir tuşa basın.");
Console.ReadLine();

void ProcessMessage(string message)
{
    // Mesaj işleme mantığı burada uygulanır
    Console.WriteLine($"[ProcessMessage] Mesaj işleniyor: {message}");

    // Örnek: Mesajın boş olup olmadığını kontrol et
    if (string.IsNullOrWhiteSpace(message))
    {
        throw new Exception("Mesaj boş olamaz!");
    }

    // İşlem başarılıysa herhangi bir şey yapabiliriz
}

1. Kuyruk Tanımlama (QueueDeclare)


channel.QueueDeclare(queue: queueName,
                     durable: true,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

  • queue: queueName:
    • Kuyruğun adı. Burada queueName değişkeni kullanılmış ve değeri "example-queue" olarak ayarlanmış.
  • durable: true:
    • Kuyruğun kalıcı olup olmadığını belirtir.
    • true: Kuyruk, RabbitMQ yeniden başlatılsa bile korunur.
    • false: Kuyruk, RabbitMQ yeniden başlatıldığında silinir.
  • exclusive: false:
    • Kuyruğun yalnızca bu bağlantı tarafından mı kullanılabileceğini belirler.
    • true: Kuyruk yalnızca tanımlayan bağlantı tarafından kullanılabilir ve bağlantı kesildiğinde silinir.
    • false: Kuyruk diğer bağlantılar tarafından da kullanılabilir.
  • autoDelete: false:
    • Kuyruğun otomatik olarak silinip silinmeyeceğini belirtir.
    • true: Kuyruğa abone olan son tüketici bağlantısını kestiğinde kuyruk otomatik olarak silinir.
    • false: Kuyruk otomatik olarak silinmez.
  • arguments: null:
    • Kuyruğa özel ek ayarları içerir. Burada null verilmiş, yani ek bir ayar yok.

2. Mesaj Tüketici Tanımlama (BasicConsume)


channel.BasicConsume(queue: queueName,
                     autoAck: false, 
                     consumer: consumer);
  • queue: queueName:Tüketicinin dinleyeceği kuyruk adı. Burada "example-queue".
  • autoAck: false:Otomatik onaylama davranışını belirtir.true: Mesajlar, alındığı anda otomatik olarak onaylanır. (Hata yönetimi yoktur.)false: Onaylama işlemi manuel yapılır. (Bu, BasicAck veya BasicNack kullanılarak yapılır.)
  • consumer: consumer:Mesajları işlemek için kullanılan tüketici (burada EventingBasicConsumer).

3. Mesaj İşleme Callback (consumer.Received)


consumer.Received += (model, ea) => {
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
};

  • consumer.Received:Mesaj kuyruktan geldiğinde çalışacak olaydır.
  • ea.Body.ToArray():Gelen mesajın içeriğini bayt dizisi (byte[]) olarak alır.
  • Encoding.UTF8.GetString(body):Mesaj içeriğini UTF-8 formatında string'e dönüştürür.

4. Mesaj İşleme Başarısı (BasicAck)


channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

  • deliveryTag: ea.DeliveryTag:Onaylanan mesajın teslimat etiketini belirtir. RabbitMQ bu etiket üzerinden hangi mesajın onaylandığını takip eder.
  • multiple: false:Birden fazla mesajı aynı anda onaylayıp onaylamadığını belirtir.true: Belirtilen etikete kadar olan tüm mesajlar topluca onaylanır.false: Yalnızca belirtilen mesaj onaylanır.

5. Mesaj İşleme Başarısızlığı (BasicNack)


channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);

  • deliveryTag: ea.DeliveryTag:
    • Reddedilen mesajın teslimat etiketini belirtir.
  • multiple: false:
    • Birden fazla mesajın aynı anda reddedilip reddedilmeyeceğini belirtir.
    • true: Belirtilen etikete kadar olan tüm mesajlar topluca reddedilir.
    • false: Yalnızca belirtilen mesaj reddedilir.
  • requeue: true:
    • Reddedilen mesajın tekrar kuyruğa gönderilip gönderilmeyeceğini belirtir.
    • true: Mesaj kuyruktan tekrar işlenmek üzere geri gönderilir.
    • false: Mesaj kuyruktan tamamen silinir.

6. Kuyruğa Mesaj Gönderme (Varsayımsal Kod Örneği)

Kodda gönderme işlemi yok, ancak şu şekilde olurdu:


channel.BasicPublish(exchange: "",
                     routingKey: queueName,
                     basicProperties: null,
                     body: Encoding.UTF8.GetBytes("Hello World!"));

  • exchange: \"\":
    • Kullanılacak exchange adı. Boş string, RabbitMQ'nun varsayılan exchange'ini kullanır.
  • routingKey: queueName:
    • Mesajın yönlendirileceği kuyruk adı.
  • basicProperties: null:
    • Mesaj için ek özellikler. Burada null, yani ek özellik yok.
  • body: Encoding.UTF8.GetBytes(\"Hello World!\"):
    • Mesajın içeriği bayt dizisi olarak gönderilir.


Temel RabbitMQ API Yöntemleri ve Kütüphaneleri

  1. connectionFactory.newConnection() veya amqp.connect()
    • Broker’a bağlanmak için kullanılır.
  2. connection.createChannel()
    • Mesaj göndermek ve almak için bir “channel” oluşturur.
  3. queueDeclare / assertQueue
    • Bir kuyruğu oluşturur veya zaten mevcutsa özelliklerini doğrular.
  4. basicPublish / sendToQueue
    • Mesajları yayınlamak (publish etmek) için kullanılır.
  5. basicConsume
    • Bir kuyruktan mesaj tüketmek için consumer tanımlamasını gerçekleştirir.
  6. basicAck / channel.ack
    • Mesajın başarıyla işlendiğini onaylamak için kullanılır.
  7. basicNack / channel.nack
    • Mesajın işlenemediğini veya yeniden kuyruğa konması (requeue) gerektiğini belirtir.
  8. basicReject
    • Tek bir mesaj için işlenememe durumunu bildirir.

Diller arası farklı metod isimleri olsa da, kavramsal olarak aynı adımlar izlenir.


Mesaj Sırası, Teslim Garantileri ve “Ack” Mekanizması

Mesaj Sırası (Ordering)

  • RabbitMQ, aynı kuyruk içindeki mesajları, ilk giren ilk çıkar (FIFO) mantığına göre dağıtmaya çalışır.
  • Ancak, çoklu tüketiciler kullanılıyorsa ve farklı hızlarda mesaj işleniyorsa, global bir sıra yerine, her tüketici kendi sıradaki mesajını işleyebilir.
  • Prefetch ayarı ile tüketici başına belirli sayıda mesajın “önden” yüklenmesi sağlanabilir. Bu, performans ile sıralama kontrolü arasında bir denge kurar.

Teslim Garantileri

  • Persistent Mesajlar: deliveryMode=2 veya kütüphaneye göre persistent=true şeklinde işaretlenirse, mesaj disk üzerinde saklanarak olası broker yeniden başlatmalarında kaybolmaz.
  • Durable Kuyruklar: Kuyruğun durable=true olarak tanımlanması, broker yeniden başlatılsa bile kuyruğun varlığını sürdürmesini sağlar.
  • Cluster ve Mirroring: Yüksek erişilebilirlik (HA) için RabbitMQ cluster’ında kuyruğu aynalama (mirroring) veya quorum queues yapılandırmalarından faydalanılabilir.

“Ack” (Acknowledge) Mekanizması

  • Manual Ack: Tüketici mesajı başarıyla işlediğinde basicAck veya channel.ack çağırarak broker’a “Bu mesaj işleme alındı, artık silebilirsin.” der.
  • Otomatik Ack: autoAck = true (ya da benzer parametreler) ile mesaj kuyruktan anında kaldırılır. Bu, mesaj kaybına neden olabilir çünkü tüketici mesajı henüz tam işlememiş olsa bile broker mesajı “işlendi” kabul eder.
  • Nack / Reject: Mesaj işlenemediğinde basicNack (toplu veya tekil) veya basicReject ile broker’a “Bu mesajı yeniden kuyruğa koy” veya “tamamen sil” şeklinde talimat verilebilir.
    • requeue=true ile mesaj aynı kuyruğa geri dönerek başka bir tüketiciye veya yeniden aynı tüketiciye gidebilir.
    • requeue=false ile mesaj tamamen kaybedilir veya Dead Letter Queue (DLQ) varsa oraya yönlendirilir.

Bir sonraki aşamada, Performans ve Ölçeklenebilirlik (kuyruk konfigürasyonları, prefetch ayarları, clustering) veya High Availability (HA) gibi konulara geçerek mesajlaşma mimarinizi kurumsal ölçekte daha sağlam hale getirebilirsiniz.

İyi çalışmalar!