· Producer (yayınlayan) uygulama örneği
· Consumer (tüketen) uygulama örneği
· Temel RabbitMQ API yöntemleri ve kütüphaneleri
· Mesaj sırası, teslim garantileri ve “ack” mekanizması
Producer (yayınlayan) uygulama örneği
· Producer, RabbitMQ Broker’a mesaj gönderen (publish eden) taraftır.
· Producer uygulaması, mesajları oluşturur ve Exchange’e gönderirken genellikle bir Routing Key belirtir.
· Gönderilen mesaj, ilgili Exchange türü ve Binding Key kurallarına göre doğru kuyruğa yönlendirilir.
Producer’ın Temel Adımları:
1.Bağlantı oluşturma: RabbitMQ broker’ına bağlanmak için bir bağlantı (connection) açılır.
2.Kanal (channel) açma: Bağlantı üzerinden bir “channel” oluşturulur; bu, mesajların gönderilmesi ve alınması için kullanılan mantıksal yoldur.
3.Mesaj gönderme (publish): channel.basicPublish(exchangeName, routingKey, ...)
gibi bir yöntemle mesaj gönderilir.
Bağlantıyı kapatma: İlgili işlemler bittiğinde kanal ve bağlantı kapatılır (ya da açık bırakılıp tekrar kullanılabilir).
Gereksinimler:
- RabbitMQ.Client NuGet paketi eklenmeli:
RabbitMQ ile iletişim kurmak için gerekli olan bu paketi projeye ekleyin:
dotnet add package RabbitMQ.Client
C# Producer Örneği:
using RabbitMQ.Client;
using System;
using System.Text;
// RabbitMQ bağlantı ayarları
var factory = new ConnectionFactory()
{
HostName = "localhost", // RabbitMQ sunucu adresi
UserName = "guest", // Varsayılan kullanıcı
Password = "guest" // Varsayılan şifre
};
// RabbitMQ bağlantısını oluştur
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// Queue tanımla
string queueName = "example-queue";
channel.QueueDeclare(
queue: queueName,
durable: true, // Mesajlar RabbitMQ yeniden başlatıldığında korunur
exclusive: false,
autoDelete: false,
arguments: null);
Console.WriteLine("Mesaj gönderilecek. Çıkış için Ctrl+C basabilirsiniz.");
while (true)
{
Console.Write("Göndermek istediğiniz mesaj: ");
var message = Console.ReadLine();
// Mesajı byte dizisine dönüştür
var body = Encoding.UTF8.GetBytes(message);
// Mesajı RabbitMQ'ya gönder
channel.BasicPublish(
exchange: "", // Default exchange
routingKey: queueName, // Hedef queue adı
basicProperties: null, // Varsayılan özellikler
body: body);
Console.WriteLine($"Mesaj gönderildi: {message}");
}
1. RabbitMQ Bağlantı Ayarları
- Bir
ConnectionFactory
nesnesi oluşturularak RabbitMQ sunucusuna bağlanmak için gerekli ayarlar tanımlanır:HostName
: RabbitMQ sunucusunun adresi. Burada "localhost" olarak belirtilmiştir.UserName
vePassword
: RabbitMQ'nun varsayılan kullanıcı adı ve şifresi olanguest
kullanılır.
2. RabbitMQ Bağlantısı ve Kanal Oluşturma
factory.CreateConnection()
metodu ile RabbitMQ sunucusuna bir bağlantı açılır.connection.CreateModel()
ile bu bağlantı üzerinden bir kanal (channel) oluşturulur:- Kanallar, RabbitMQ ile iletişim kurmak için kullanılır.
3. Queue Tanımlama
channel.QueueDeclare
ile bir kuyruk (queue) oluşturulur:queue
: Kuyruğun adı, burada "example-queue" olarak belirtilmiştir.durable
:true
olduğunda, kuyruk RabbitMQ yeniden başlatıldığında korunur.exclusive
:false
olduğunda kuyruk birden fazla bağlantı tarafından paylaşılabilir.autoDelete
:false
olduğunda, kuyruk otomatik olarak silinmez.arguments
: Kuyruğa özel ek parametreler için kullanılır, buradanull
verilmiştir.
4. Mesaj Gönderme Döngüsü
- Bir sonsuz döngü (
while (true)
) ile kullanıcıdan mesaj alınıp RabbitMQ'ya gönderilir:Console.ReadLine()
: Kullanıcıdan mesajı alır.Encoding.UTF8.GetBytes(message)
: Mesajı RabbitMQ'ya uygun bir byte dizisine dönüştürür.channel.BasicPublish
: Mesajı RabbitMQ'ya gönderir:exchange
: Boş bırakılarak varsayılan exchange kullanılır.routingKey
: Mesajın yönlendirilmesini sağlayan kuyruk adı (örnekte "example-queue").body
: Kuyruğa gönderilecek mesajın içeriği (byte formatında).
5. Konsol Çıktıları
- Kullanıcıdan mesaj girişi istenir:
Console.Write("Göndermek istediğiniz mesaj: ")
. - Mesaj başarıyla gönderildiğinde bir bilgi mesajı gösterilir:
Console.WriteLine($"Mesaj gönderildi: {message}");
.
6. Kaynak Yönetimi (using var)
using var
kullanılarak:connection
vechannel
nesneleri, kullanımları tamamlandığında otomatik olarak serbest bırakılır.
- Bu, bağlantıların ve kaynakların temiz bir şekilde yönetilmesini sağlar.
Consumer (Tüketen) Uygulama
- Consumer, RabbitMQ’da bir kuyruğa abone (subscribe) olarak mesajları çeken ve işleyen taraftır.
- Consumer çoğunlukla kuyruk adını (queue name) bilir ve bir callback (geri çağırım) mekanizması ya da döngü (loop) içinde mesajları okur.
- Mesaj işleme tamamlanınca, çoğunlukla bir ack (acknowledgement) gönderilir; bu, mesajın başarılı şekilde işlendiğini broker’a bildirir.
Consumer’ın Temel Adımları:
- Bağlantı ve kanal oluşturma
- Kuyruğa abone olma:
channel.basicConsume(queueName, ...)
ile kuyruktaki mesajları dinleyecek bir consumer tanımlanır. - Mesaj işleme: Kuyruktan gelen her mesaj için belirlenmiş bir callback fonksiyon çalışır.
- ACK göndermek: Mesaj başarıyla işlendikten sonra
channel.basicAck(deliveryTag, false)
gibi bir yöntemle onay gönderilir. - Hata/geri gönderme (Reject/Nack): Eğer mesaj işlenemiyorsa,
basicNack
veyabasicReject
ile mesajı iade etmek veya silebilmek de mümkündür.
C# Consumer Örneği:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
// RabbitMQ bağlantısı için gerekli bilgiler
var factory = new ConnectionFactory()
{
HostName = "localhost", // RabbitMQ server adresi
UserName = "guest", // Kullanıcı adı
Password = "guest" // Şifre
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// Dinlenecek kuyruk adı
string queueName = "example-queue";
// Kuyruğun var olduğunu garanti etmek için (eğer yoksa oluşturulur)
channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
Console.WriteLine("[Consumer] Kuyruk dinleniyor: " + queueName);
// Mesajların tüketimi için bir event tanımlıyoruz
var consumer = new EventingBasicConsumer(channel);
// Mesaj geldiğinde çalışacak callback fonksiyonu
consumer.Received += (model, ea) =>
{
// Mesajın içeriğini alıyoruz
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"[Consumer] Gelen mesaj: {message}");
try
{
// Burada mesaj işlenir
ProcessMessage(message);
// İşlem başarılıysa ACK gönderilir
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
Console.WriteLine("[Consumer] Mesaj başarıyla işlendi.");
}
catch (Exception ex)
{
Console.WriteLine($"[Consumer] Mesaj işlenirken hata: {ex.Message}");
// Mesaj işlenemezse NACK ile reddedilebilir
channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
Console.WriteLine("[Consumer] Mesaj tekrar kuyruğa gönderildi.");
}
};
// Consumer kuyruğa bağlanıyor
channel.BasicConsume(queue: queueName,
autoAck: false, // ACK işlemini manuel yapıyoruz
consumer: consumer);
Console.WriteLine("[Consumer] Çıkmak için bir tuşa basın.");
Console.ReadLine();
void ProcessMessage(string message)
{
// Mesaj işleme mantığı burada uygulanır
Console.WriteLine($"[ProcessMessage] Mesaj işleniyor: {message}");
// Örnek: Mesajın boş olup olmadığını kontrol et
if (string.IsNullOrWhiteSpace(message))
{
throw new Exception("Mesaj boş olamaz!");
}
// İşlem başarılıysa herhangi bir şey yapabiliriz
}
1. Kuyruk Tanımlama (QueueDeclare
)
channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
queue: queueName
:- Kuyruğun adı. Burada
queueName
değişkeni kullanılmış ve değeri"example-queue"
olarak ayarlanmış.
- Kuyruğun adı. Burada
durable: true
:- Kuyruğun kalıcı olup olmadığını belirtir.
true
: Kuyruk, RabbitMQ yeniden başlatılsa bile korunur.false
: Kuyruk, RabbitMQ yeniden başlatıldığında silinir.
exclusive: false
:- Kuyruğun yalnızca bu bağlantı tarafından mı kullanılabileceğini belirler.
true
: Kuyruk yalnızca tanımlayan bağlantı tarafından kullanılabilir ve bağlantı kesildiğinde silinir.false
: Kuyruk diğer bağlantılar tarafından da kullanılabilir.
autoDelete: false
:- Kuyruğun otomatik olarak silinip silinmeyeceğini belirtir.
true
: Kuyruğa abone olan son tüketici bağlantısını kestiğinde kuyruk otomatik olarak silinir.false
: Kuyruk otomatik olarak silinmez.
arguments: null
:- Kuyruğa özel ek ayarları içerir. Burada
null
verilmiş, yani ek bir ayar yok.
- Kuyruğa özel ek ayarları içerir. Burada
2. Mesaj Tüketici Tanımlama (BasicConsume
)
channel.BasicConsume(queue: queueName,
autoAck: false,
consumer: consumer);
queue: queueName
:Tüketicinin dinleyeceği kuyruk adı. Burada"example-queue"
.autoAck: false
:Otomatik onaylama davranışını belirtir.true
: Mesajlar, alındığı anda otomatik olarak onaylanır. (Hata yönetimi yoktur.)false
: Onaylama işlemi manuel yapılır. (Bu,BasicAck
veyaBasicNack
kullanılarak yapılır.)consumer: consumer
:Mesajları işlemek için kullanılan tüketici (buradaEventingBasicConsumer
).
3. Mesaj İşleme Callback (consumer.Received
)
consumer.Received += (model, ea) => {
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
};
consumer.Received
:Mesaj kuyruktan geldiğinde çalışacak olaydır.ea.Body.ToArray()
:Gelen mesajın içeriğini bayt dizisi (byte[]
) olarak alır.Encoding.UTF8.GetString(body)
:Mesaj içeriğini UTF-8 formatında string'e dönüştürür.
4. Mesaj İşleme Başarısı (BasicAck
)
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
deliveryTag: ea.DeliveryTag
:Onaylanan mesajın teslimat etiketini belirtir. RabbitMQ bu etiket üzerinden hangi mesajın onaylandığını takip eder.multiple: false
:Birden fazla mesajı aynı anda onaylayıp onaylamadığını belirtir.true
: Belirtilen etikete kadar olan tüm mesajlar topluca onaylanır.false
: Yalnızca belirtilen mesaj onaylanır.
5. Mesaj İşleme Başarısızlığı (BasicNack
)
channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
deliveryTag: ea.DeliveryTag
:- Reddedilen mesajın teslimat etiketini belirtir.
multiple: false
:- Birden fazla mesajın aynı anda reddedilip reddedilmeyeceğini belirtir.
true
: Belirtilen etikete kadar olan tüm mesajlar topluca reddedilir.false
: Yalnızca belirtilen mesaj reddedilir.
requeue: true
:- Reddedilen mesajın tekrar kuyruğa gönderilip gönderilmeyeceğini belirtir.
true
: Mesaj kuyruktan tekrar işlenmek üzere geri gönderilir.false
: Mesaj kuyruktan tamamen silinir.
6. Kuyruğa Mesaj Gönderme (Varsayımsal Kod Örneği)
Kodda gönderme işlemi yok, ancak şu şekilde olurdu:
channel.BasicPublish(exchange: "",
routingKey: queueName,
basicProperties: null,
body: Encoding.UTF8.GetBytes("Hello World!"));
exchange: \"\"
:- Kullanılacak exchange adı. Boş string, RabbitMQ'nun varsayılan exchange'ini kullanır.
routingKey: queueName
:- Mesajın yönlendirileceği kuyruk adı.
basicProperties: null
:- Mesaj için ek özellikler. Burada
null
, yani ek özellik yok.
- Mesaj için ek özellikler. Burada
body: Encoding.UTF8.GetBytes(\"Hello World!\")
:- Mesajın içeriği bayt dizisi olarak gönderilir.
Temel RabbitMQ API Yöntemleri ve Kütüphaneleri
- connectionFactory.newConnection() veya amqp.connect()
- Broker’a bağlanmak için kullanılır.
- connection.createChannel()
- Mesaj göndermek ve almak için bir “channel” oluşturur.
- queueDeclare / assertQueue
- Bir kuyruğu oluşturur veya zaten mevcutsa özelliklerini doğrular.
- basicPublish / sendToQueue
- Mesajları yayınlamak (publish etmek) için kullanılır.
- basicConsume
- Bir kuyruktan mesaj tüketmek için consumer tanımlamasını gerçekleştirir.
- basicAck / channel.ack
- Mesajın başarıyla işlendiğini onaylamak için kullanılır.
- basicNack / channel.nack
- Mesajın işlenemediğini veya yeniden kuyruğa konması (requeue) gerektiğini belirtir.
- basicReject
- Tek bir mesaj için işlenememe durumunu bildirir.
Diller arası farklı metod isimleri olsa da, kavramsal olarak aynı adımlar izlenir.
Mesaj Sırası, Teslim Garantileri ve “Ack” Mekanizması
Mesaj Sırası (Ordering)
- RabbitMQ, aynı kuyruk içindeki mesajları, ilk giren ilk çıkar (FIFO) mantığına göre dağıtmaya çalışır.
- Ancak, çoklu tüketiciler kullanılıyorsa ve farklı hızlarda mesaj işleniyorsa, global bir sıra yerine, her tüketici kendi sıradaki mesajını işleyebilir.
- Prefetch ayarı ile tüketici başına belirli sayıda mesajın “önden” yüklenmesi sağlanabilir. Bu, performans ile sıralama kontrolü arasında bir denge kurar.
Teslim Garantileri
- Persistent Mesajlar:
deliveryMode=2
veya kütüphaneye görepersistent=true
şeklinde işaretlenirse, mesaj disk üzerinde saklanarak olası broker yeniden başlatmalarında kaybolmaz. - Durable Kuyruklar: Kuyruğun
durable=true
olarak tanımlanması, broker yeniden başlatılsa bile kuyruğun varlığını sürdürmesini sağlar. - Cluster ve Mirroring: Yüksek erişilebilirlik (HA) için RabbitMQ cluster’ında kuyruğu aynalama (mirroring) veya quorum queues yapılandırmalarından faydalanılabilir.
“Ack” (Acknowledge) Mekanizması
- Manual Ack: Tüketici mesajı başarıyla işlediğinde
basicAck
veyachannel.ack
çağırarak broker’a “Bu mesaj işleme alındı, artık silebilirsin.” der. - Otomatik Ack:
autoAck = true
(ya da benzer parametreler) ile mesaj kuyruktan anında kaldırılır. Bu, mesaj kaybına neden olabilir çünkü tüketici mesajı henüz tam işlememiş olsa bile broker mesajı “işlendi” kabul eder. - Nack / Reject: Mesaj işlenemediğinde
basicNack
(toplu veya tekil) veyabasicReject
ile broker’a “Bu mesajı yeniden kuyruğa koy” veya “tamamen sil” şeklinde talimat verilebilir.requeue=true
ile mesaj aynı kuyruğa geri dönerek başka bir tüketiciye veya yeniden aynı tüketiciye gidebilir.requeue=false
ile mesaj tamamen kaybedilir veya Dead Letter Queue (DLQ) varsa oraya yönlendirilir.
Bir sonraki aşamada, Performans ve Ölçeklenebilirlik (kuyruk konfigürasyonları, prefetch ayarları, clustering) veya High Availability (HA) gibi konulara geçerek mesajlaşma mimarinizi kurumsal ölçekte daha sağlam hale getirebilirsiniz.
İyi çalışmalar!