RabbitMQ #
RabbitMQ adalah message broker paling populer di dunia yang mengimplementasikan protokol AMQP (Advanced Message Queuing Protocol). Berbeda dari Kafka yang menyimpan semua pesan seperti log, RabbitMQ adalah broker berbasis routing — pesan dikirim ke exchange, di-routing berdasarkan aturan binding, lalu diletakkan di queue yang sesuai. Begitu consumer mengambil dan mengakui (acknowledge) pesan, pesan dihapus dari queue. Model ini sangat cocok untuk task queue, work distribution, dan RPC. Package dart_amqp menyediakan client AMQP 0-9-1 yang lengkap untuk Dart.
Konsep Inti RabbitMQ #
flowchart LR
P["Producer\n(Dart App)"] -->|publish| EX["Exchange\n(direct/topic/fanout)"]
EX -->|routing key| Q1["Queue: order.new"]
EX -->|routing key| Q2["Queue: order.payment"]
EX -->|fanout| Q3["Queue: notification"]
Q1 -->|deliver| C1["Consumer 1\n(order processor)"]
Q2 -->|deliver| C2["Consumer 2\n(payment service)"]
Q3 -->|deliver| C3["Consumer 3\n(email service)"]
Q3 -->|deliver| C4["Consumer 4\n(sms service)"]
| Konsep | Deskripsi |
|---|---|
| Exchange | Penerima pesan dari producer, merouting ke queue berdasarkan aturan |
| Queue | Buffer penyimpanan pesan yang menunggu dikonsumsi |
| Binding | Aturan yang menghubungkan exchange ke queue |
| Routing Key | Label pada pesan yang digunakan exchange untuk routing |
| Acknowledgment | Konfirmasi dari consumer bahwa pesan sudah diproses |
| Prefetch | Batas jumlah pesan yang di-deliver ke consumer sebelum ada ACK |
| DLX | Dead Letter Exchange — tempat pesan yang rejected atau expired |
Setup Package #
dart pub add dart_amqp
# pubspec.yaml
dependencies:
dart_amqp: ^0.2.5
Koneksi ke RabbitMQ #
import 'package:dart_amqp/dart_amqp.dart';
Future<void> main() async {
// Koneksi dasar
final client = Client(
settings: ConnectionSettings(
host: 'localhost',
port: 5672,
virtualHost: '/', // default virtual host
credentials: PlainAuthenticator('guest', 'guest'),
connectionTimeout: Duration(seconds: 10),
),
);
// Buka channel — operasi AMQP dilakukan melalui channel
final channel = await client.channel();
print('Terhubung ke RabbitMQ');
await client.close();
}
Koneksi dengan URL #
// Format URL: amqp://username:password@host:port/vhost
final client = Client(
settings: ConnectionSettings.fromUri(
Uri.parse('amqp://admin:[email protected]:5672/production'),
),
);
// Dengan SSL/TLS
final clientTLS = Client(
settings: ConnectionSettings(
host: 'rabbitmq.example.com',
port: 5671, // port AMQPS
credentials: PlainAuthenticator('admin', 'password'),
tlsContext: SecurityContext.defaultContext,
),
);
Exchange Types #
RabbitMQ mendukung beberapa tipe exchange dengan routing behavior berbeda:
import 'package:dart_amqp/dart_amqp.dart';
Future<void> deklarasiExchange(Channel channel) async {
// Direct exchange — routing berdasarkan exact match routing key
await channel.exchange(
'order.direct',
ExchangeType.DIRECT,
durable: true, // bertahan setelah restart RabbitMQ
);
// Topic exchange — routing dengan wildcard (* = satu kata, # = banyak kata)
await channel.exchange(
'app.events',
ExchangeType.TOPIC,
durable: true,
);
// Fanout exchange — broadcast ke semua queue yang terikat, abaikan routing key
await channel.exchange(
'broadcast',
ExchangeType.FANOUT,
durable: true,
);
// Headers exchange — routing berdasarkan header pesan, bukan routing key
await channel.exchange(
'reports.headers',
ExchangeType.HEADERS,
durable: true,
);
}
Publisher — Mengirim Pesan #
import 'package:dart_amqp/dart_amqp.dart';
import 'dart:convert';
class OrderPublisher {
late final Client _client;
late final Channel _channel;
Future<void> inisialisasi() async {
_client = Client(
settings: ConnectionSettings(
host: 'localhost',
credentials: PlainAuthenticator('guest', 'guest'),
),
);
_channel = await _client.channel();
// Deklarasi exchange
await _channel.exchange('order.events', ExchangeType.TOPIC, durable: true);
}
Future<void> kirimOrderDibuat({
required String idOrder,
required String idPengguna,
required double total,
}) async {
final pesan = {
'tipe': 'order.dibuat',
'idOrder': idOrder,
'idPengguna': idPengguna,
'total': total,
'timestamp': DateTime.now().toUtc().toIso8601String(),
};
_channel.basicPublish(
'order.events', // exchange name
'order.dibuat', // routing key
AmqpMessage.fromBytes(utf8.encode(jsonEncode(pesan)))
..properties = (MessageProperties()
..contentType = 'application/json'
..deliveryMode = DeliveryMode.PERSISTENT // simpan ke disk, tahan restart
..timestamp = DateTime.now()
..messageId = idOrder),
);
print('Event order.dibuat terkirim untuk $idOrder');
}
Future<void> kirimPembayaranDikonfirmasi({
required String idOrder,
required String metodePembayaran,
required double jumlah,
}) async {
final pesan = jsonEncode({
'tipe': 'pembayaran.dikonfirmasi',
'idOrder': idOrder,
'metodePembayaran': metodePembayaran,
'jumlah': jumlah,
});
// Topic routing: 'pembayaran.dikonfirmasi' → queue yang subscribe pattern ini
_channel.basicPublish(
'order.events',
'pembayaran.dikonfirmasi',
AmqpMessage.fromBytes(utf8.encode(pesan))
..properties = (MessageProperties()
..contentType = 'application/json'
..deliveryMode = DeliveryMode.PERSISTENT),
);
}
Future<void> tutup() => _client.close();
}
Consumer — Menerima Pesan #
import 'package:dart_amqp/dart_amqp.dart';
import 'dart:convert';
Future<void> jalankanConsumer() async {
final client = Client(
settings: ConnectionSettings(
host: 'localhost',
credentials: PlainAuthenticator('guest', 'guest'),
),
);
final channel = await client.channel();
// Deklarasi exchange (idempotent — aman meski sudah ada)
final exchange = await channel.exchange(
'order.events',
ExchangeType.TOPIC,
durable: true,
);
// Deklarasi queue untuk consumer ini
final queue = await channel.queue(
'order-processor', // nama queue (kosong = generate nama acak)
durable: true, // bertahan setelah restart
arguments: {
// Dead Letter Exchange — pesan rejected masuk ke sini
'x-dead-letter-exchange': 'order.events.dlx',
// TTL — pesan dihapus setelah 24 jam jika tidak dikonsumsi
'x-message-ttl': 86400000,
// Max queue length
'x-max-length': 10000,
},
);
// Binding: queue mendengarkan pattern routing key dari exchange
await queue.bind(exchange, 'order.*'); // semua event order
await queue.bind(exchange, 'pembayaran.*'); // semua event pembayaran
// Prefetch — batas pesan yang di-deliver sebelum ada ACK
// Mencegah satu consumer mendapat terlalu banyak pesan
await channel.basicQos(0, 10); // max 10 pesan tanpa ACK
// Subscribe dengan manual acknowledgment
final consumer = await queue.consume(noAck: false);
print('Consumer berjalan, menunggu pesan dari queue: ${queue.name}');
consumer.listen(
(message) async {
try {
final body = utf8.decode(message.payload!);
final data = jsonDecode(body) as Map<String, dynamic>;
print('Pesan diterima: ${message.routingKey}');
await prosesPesan(data);
// ACK — konfirmasi pesan berhasil diproses → dihapus dari queue
message.ack();
} catch (e) {
print('Error memproses pesan: $e');
// NACK — kembalikan ke queue untuk diproses ulang
// requeue: true → masuk kembali ke queue
// requeue: false → ke Dead Letter Exchange (jika dikonfigurasi)
message.nack(requeue: false); // kirim ke DLX setelah gagal
}
},
onError: (error) {
print('Error dari channel: $error');
},
);
// Jaga proses tetap hidup
await Future.delayed(Duration(hours: 24));
await client.close();
}
Future<void> prosesPesan(Map<String, dynamic> data) async {
switch (data['tipe'] as String) {
case 'order.dibuat':
print('Memproses order baru: ${data['idOrder']}');
// ... logika bisnis
case 'pembayaran.dikonfirmasi':
print('Memproses konfirmasi pembayaran: ${data['idOrder']}');
// ... logika bisnis
default:
print('Tipe pesan tidak dikenal: ${data['tipe']}');
}
}
Dead Letter Exchange (DLX) #
DLX menangkap pesan yang gagal diproses (rejected/expired) untuk investigasi atau retry:
import 'package:dart_amqp/dart_amqp.dart';
Future<void> setupDLX(Channel channel) async {
// 1. Buat DLX exchange
final dlxExchange = await channel.exchange(
'order.events.dlx',
ExchangeType.DIRECT,
durable: true,
);
// 2. Buat DLQ (Dead Letter Queue)
final dlQueue = await channel.queue(
'order-processor.dlq',
durable: true,
arguments: {
// Pesan di DLQ disimpan 30 hari untuk investigasi
'x-message-ttl': 2592000000,
},
);
// 3. Bind DLQ ke DLX
await dlQueue.bind(dlxExchange, 'order-processor');
// 4. Consumer DLQ untuk monitoring dan retry manual
final dlqConsumer = await dlQueue.consume(noAck: false);
dlqConsumer.listen((message) {
final headers = message.properties?.headers ?? {};
print('=== Pesan Gagal di DLQ ===');
print('Alasan: ${headers['x-death']?[0]?['reason']}');
print('Queue asal: ${headers['x-death']?[0]?['queue']}');
print('Jumlah kematian: ${headers['x-death']?[0]?['count']}');
print('Payload: ${utf8.decode(message.payload!)}');
// Setelah investigasi, ACK agar dihapus dari DLQ
message.ack();
});
}
Work Queue — Distribusi Tugas #
Work queue mendistribusikan tugas berat ke beberapa worker secara round-robin:
import 'package:dart_amqp/dart_amqp.dart';
import 'dart:convert';
import 'dart:math';
// Publisher — kirim tugas
Future<void> kirimTugas(Channel channel, Map<String, dynamic> tugas) async {
final queue = await channel.queue(
'tugas-berat',
durable: true,
arguments: {'x-dead-letter-exchange': 'dlx'},
);
channel.basicPublish(
'', // default exchange (direct ke queue)
queue.name, // routing key = nama queue untuk default exchange
AmqpMessage.fromBytes(utf8.encode(jsonEncode(tugas)))
..properties = (MessageProperties()
..deliveryMode = DeliveryMode.PERSISTENT),
);
}
// Worker — proses tugas
Future<void> jalankanWorker(String workerId) async {
final client = Client(
settings: ConnectionSettings(
host: 'localhost',
credentials: PlainAuthenticator('guest', 'guest'),
),
);
final channel = await client.channel();
final queue = await channel.queue('tugas-berat', durable: true);
// Prefetch = 1 → setiap worker hanya dapat 1 tugas dalam satu waktu
// Memastikan distribusi merata berdasarkan kecepatan worker
await channel.basicQos(0, 1);
final consumer = await queue.consume(noAck: false);
print('Worker $workerId siap');
consumer.listen((message) async {
final tugas = jsonDecode(utf8.decode(message.payload!)) as Map<String, dynamic>;
print('Worker $workerId mengerjakan: ${tugas['nama']}');
try {
// Simulasi tugas berat
final durasi = Duration(seconds: Random().nextInt(5) + 1);
await Future.delayed(durasi);
print('Worker $workerId selesai: ${tugas['nama']} (${durasi.inSeconds}s)');
message.ack();
} catch (e) {
print('Worker $workerId gagal: $e');
message.nack(requeue: false);
}
});
}
RPC Pattern — Request-Reply #
RabbitMQ mendukung pola RPC (Remote Procedure Call) menggunakan correlation ID dan reply-to queue:
import 'package:dart_amqp/dart_amqp.dart';
import 'dart:convert';
// RPC Client — pengirim request dan menunggu reply
Future<Map<String, dynamic>> panggilRPC(
Channel channel,
String serverQueue,
Map<String, dynamic> request,
) async {
// Buat queue sementara untuk menerima reply
final replyQueue = await channel.queue('', exclusive: true);
final correlationId = DateTime.now().microsecondsSinceEpoch.toString();
final completer = Completer<Map<String, dynamic>>();
// Listen untuk reply
final consumer = await replyQueue.consume(noAck: true);
consumer.listen((message) {
if (message.properties?.correlationId == correlationId) {
final data = jsonDecode(utf8.decode(message.payload!)) as Map<String, dynamic>;
completer.complete(data);
}
});
// Kirim request
channel.basicPublish(
'',
serverQueue,
AmqpMessage.fromBytes(utf8.encode(jsonEncode(request)))
..properties = (MessageProperties()
..replyTo = replyQueue.name
..correlationId = correlationId),
);
// Tunggu dengan timeout 30 detik
return await completer.future.timeout(
Duration(seconds: 30),
onTimeout: () => throw TimeoutException('RPC timeout'),
);
}
// RPC Server — menerima request dan mengirim reply
Future<void> jalankanRPCServer(Channel channel, String nama) async {
final queue = await channel.queue('rpc.$nama', durable: true);
await channel.basicQos(0, 1);
final consumer = await queue.consume(noAck: false);
print('RPC Server "$nama" berjalan');
consumer.listen((message) async {
try {
final request = jsonDecode(utf8.decode(message.payload!)) as Map<String, dynamic>;
// Proses request
final hasil = await prosesRequest(request);
// Kirim reply ke replyTo queue dengan correlationId yang sama
channel.basicPublish(
'',
message.properties!.replyTo!,
AmqpMessage.fromBytes(utf8.encode(jsonEncode(hasil)))
..properties = (MessageProperties()
..correlationId = message.properties!.correlationId),
);
message.ack();
} catch (e) {
// Kirim error response
channel.basicPublish(
'',
message.properties!.replyTo!,
AmqpMessage.fromBytes(utf8.encode(jsonEncode({'error': e.toString()})))
..properties = (MessageProperties()
..correlationId = message.properties!.correlationId),
);
message.nack(requeue: false);
}
});
}
Anti-Pattern RabbitMQ #
Tidak Menggunakan Acknowledgment #
// ANTI-PATTERN: auto-acknowledge — pesan hilang jika consumer crash saat proses
final consumer = await queue.consume(noAck: true); // ✗ pesan langsung dihapus saat deliver
consumer.listen((message) async {
await prosesBerat(message); // jika crash di sini, pesan hilang!
// Tidak perlu ack karena noAck: true — tapi pesan sudah terhapus
});
// BENAR: manual acknowledgment
final consumer = await queue.consume(noAck: false); // ✓
consumer.listen((message) async {
try {
await prosesBerat(message);
message.ack(); // konfirmasi setelah berhasil
} catch (e) {
message.nack(requeue: true); // kembalikan ke queue jika gagal
}
});
Prefetch Terlalu Besar #
// ANTI-PATTERN: prefetch sangat besar — satu consumer mendominasi
await channel.basicQos(0, 10000); // ✗ satu consumer dapat 10.000 pesan sekaligus
// Worker lain tidak dapat tugas meski consumer ini lambat
// BENAR: prefetch kecil (1-10) untuk distribusi yang adil
await channel.basicQos(0, 1); // ✓ satu tugas per worker sebelum ada ACK
// Worker cepat mendapat lebih banyak tugas, worker lambat dapat lebih sedikit
Tidak Mendeklarasikan Queue sebagai Durable #
// ANTI-PATTERN: queue tidak durable — hilang saat RabbitMQ restart
final queue = await channel.queue('penting'); // ✗ durable: false by default
// BENAR: durable: true dan pesan PERSISTENT untuk ketahanan penuh
final queue = await channel.queue(
'penting',
durable: true, // ✓ queue bertahan setelah restart
);
channel.basicPublish(
'',
queue.name,
AmqpMessage.fromBytes(data)
..properties = (MessageProperties()
..deliveryMode = DeliveryMode.PERSISTENT), // ✓ pesan tersimpan ke disk
);
Ringkasan #
- Exchange types menentukan cara routing:
direct(exact key),topic(wildcard*dan#),fanout(broadcast ke semua queue terikat),headers(berdasarkan header).- Deklarasi idempotent — selalu deklarasikan exchange dan queue di producer maupun consumer karena urutan start tidak bisa diprediksi.
- Manual acknowledgment (
noAck: false) adalah standar untuk task yang kritis —ack()setelah berhasil,nack(requeue: false)untuk mengirim ke DLX saat gagal.- Prefetch 1 (
basicQos(0, 1)) untuk work queue yang adil — worker cepat mendapat lebih banyak tugas dibanding worker lambat, bukan round-robin yang buta.- Durable queue + PERSISTENT message diperlukan bersamaan untuk pesan yang tahan restart — queue durable tapi pesan tidak persistent akan hilang saat restart.
- Dead Letter Exchange (DLX) untuk menangkap pesan gagal — konfigurasikan via
x-dead-letter-exchangeargument pada queue dan buat consumer DLQ untuk monitoring.- Topic exchange paling fleksibel —
order.*menangkaporder.dibuat,order.dikirimtapi tidakorder.pembayaran.sukses.order.#menangkap semua yang diawaliorder..- RPC dengan correlationId — gunakan queue reply sementara (
exclusive: true) dan correlationId unik untuk mencocokkan request dengan reply yang tepat.- Prefetch lebih kecil dari 10 untuk consumer yang melakukan operasi I/O berat — consumer tidak akan terbebani jika operasi lambat.
- Virtual host memisahkan lingkungan dalam satu RabbitMQ instance — gunakan
/production,/staging,/developmentsebagai isolasi lingkungan.