Amazon SQS #
Amazon Simple Queue Service (SQS) adalah layanan antrian pesan terkelola dari AWS — tidak perlu mengelola infrastruktur broker seperti RabbitMQ atau Kafka. SQS menyediakan dua jenis antrian: Standard (throughput tinggi, at-least-once delivery, urutan tidak dijamin) dan FIFO (urutan dijamin, exactly-once processing, throughput lebih rendah). Dari Dart, SQS diakses via REST API dengan autentikasi AWS Signature Version 4 — package aws_client menyederhanakan ini dengan menyediakan client yang sudah menangani signing secara otomatis.
Setup Package #
dart pub add aws_client
# pubspec.yaml
dependencies:
aws_client: ^2.0.0
Standard vs FIFO Queue #
flowchart LR
subgraph Standard Queue
P1["Producer"] -->|send| SQ["Standard Queue\n• At-least-once delivery\n• Urutan tidak dijamin\n• Throughput unlimited\n• Harga lebih murah"]
SQ -->|receive| C1["Consumer"]
SQ -->|receive| C2["Consumer"]
end
subgraph FIFO Queue
P2["Producer"] -->|send| FQ["FIFO Queue\n• Exactly-once delivery\n• Urutan dijamin per group\n• Max 3.000 TPS\n• Harga lebih mahal"]
FQ -->|receive| C3["Consumer"]
end
| Aspek | Standard Queue | FIFO Queue |
|---|---|---|
| Ordering | Best-effort (tidak dijamin) | Strict (FIFO per group) |
| Delivery | At-least-once (bisa duplikat) | Exactly-once |
| Throughput | Unlimited | Max 3.000 TPS (dengan batching) |
| Nama antrian | Bebas | Harus diakhiri .fifo |
| Cocok untuk | Email notifikasi, log, fanout | Transaksi keuangan, order processing |
Konfigurasi AWS #
import 'package:aws_client/sqs_2012_11_05.dart';
import 'package:aws_client/src/credentials/credentials.dart';
// Cara 1: Credential eksplisit (untuk development)
SQS buatKlienSQS() {
return SQS(
region: 'ap-southeast-1', // region AWS, contoh: Asia Pacific (Singapore)
credentials: AwsClientCredentials(
accessKey: 'AKIAIOSFODNN7EXAMPLE',
secretKey: 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
),
);
}
// Cara 2: Environment variables (direkomendasikan untuk produksi)
// Set: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION
// aws_client otomatis membaca dari environment
SQS buatKlienSQSDariEnv() {
return SQS(region: 'ap-southeast-1');
// credentials dibaca dari: env var → ~/.aws/credentials → EC2 instance profile
}
// Cara 3: Untuk AWS Lambda / ECS — gunakan IAM Role (tanpa credentials eksplisit)
// Assign IAM role dengan permission SQS ke Lambda/ECS task
// aws_client akan menggunakan instance metadata service secara otomatis
Manajemen Queue #
import 'package:aws_client/sqs_2012_11_05.dart';
Future<void> kelolaQueue(SQS sqs) async {
// Buat Standard Queue
final standardQueue = await sqs.createQueue(
queueName: 'order-events',
attributes: {
QueueAttributeName.visibilityTimeout: '30', // 30 detik
QueueAttributeName.messageRetentionPeriod: '86400', // 1 hari
QueueAttributeName.receiveMessageWaitTimeSeconds: '20', // long polling
QueueAttributeName.redrivePolicy: jsonEncode({
'maxReceiveCount': '3', // coba 3 kali sebelum ke DLQ
'deadLetterTargetArn': 'arn:aws:sqs:ap-southeast-1:123456:order-events-dlq',
}),
},
);
print('Queue URL: ${standardQueue.queueUrl}');
// Buat FIFO Queue — nama harus diakhiri .fifo
final fifoQueue = await sqs.createQueue(
queueName: 'payment-events.fifo',
attributes: {
QueueAttributeName.fifoQueue: 'true',
QueueAttributeName.contentBasedDeduplication: 'true', // dedup otomatis via hash
QueueAttributeName.visibilityTimeout: '60',
},
);
// List semua queue
final queues = await sqs.listQueues();
for (final url in queues.queueUrls ?? []) {
print('Queue: $url');
}
// Dapatkan URL queue berdasarkan nama
final urlResult = await sqs.getQueueUrl(queueName: 'order-events');
final queueUrl = urlResult.queueUrl!;
// Info attributes queue
final attrs = await sqs.getQueueAttributes(
queueUrl: queueUrl,
attributeNames: [
QueueAttributeName.approximateNumberOfMessages,
QueueAttributeName.approximateNumberOfMessagesNotVisible,
],
);
print('Pesan tersedia: ${attrs.attributes?[QueueAttributeName.approximateNumberOfMessages]}');
}
Mengirim Pesan #
import 'package:aws_client/sqs_2012_11_05.dart';
import 'dart:convert';
class SQSProducer {
final SQS _sqs;
final String _queueUrl;
SQSProducer({required SQS sqs, required String queueUrl})
: _sqs = sqs,
_queueUrl = queueUrl;
// Kirim satu pesan ke Standard Queue
Future<String> kirim(Map<String, dynamic> data) async {
final result = await _sqs.sendMessage(
queueUrl: _queueUrl,
messageBody: jsonEncode(data),
messageAttributes: {
'tipeEvent': MessageAttributeValue(
dataType: 'String',
stringValue: data['tipe'] as String?,
),
'versi': MessageAttributeValue(
dataType: 'String',
stringValue: '1.0',
),
},
delaySeconds: 0, // delay pengiriman (0-900 detik)
);
print('Pesan terkirim: ${result.messageId}');
return result.messageId!;
}
// Kirim ke FIFO Queue — butuh MessageGroupId dan MessageDeduplicationId
Future<String> kirimFIFO({
required Map<String, dynamic> data,
required String groupId, // urutan dijamin per groupId yang sama
String? deduplicationId, // ID unik untuk mencegah duplikat (null = auto dari content)
}) async {
final result = await _sqs.sendMessage(
queueUrl: _queueUrl,
messageBody: jsonEncode(data),
messageGroupId: groupId,
messageDeduplicationId: deduplicationId ??
_hashContent(jsonEncode(data)), // jika contentBasedDeduplication aktif, bisa null
);
return result.messageId!;
}
// Batch send — kirim hingga 10 pesan sekaligus (lebih hemat biaya)
Future<void> kirimBatch(List<Map<String, dynamic>> pesanList) async {
// SQS batch max 10 pesan
for (int i = 0; i < pesanList.length; i += 10) {
final batch = pesanList.sublist(i, (i + 10).clamp(0, pesanList.length));
final entries = batch.asMap().entries.map((e) {
return SendMessageBatchRequestEntry(
id: 'msg-${e.key}', // ID unik dalam batch
messageBody: jsonEncode(e.value),
);
}).toList();
final result = await _sqs.sendMessageBatch(
queueUrl: _queueUrl,
entries: entries,
);
if (result.failed != null && result.failed!.isNotEmpty) {
for (final gagal in result.failed!) {
print('Gagal kirim ${gagal.id}: ${gagal.message}');
}
}
print('Batch terkirim: ${result.successful?.length ?? 0}/${batch.length}');
}
}
String _hashContent(String content) {
// Implementasi hash sederhana (gunakan crypto package untuk produksi)
return content.hashCode.toRadixString(16);
}
}
Menerima dan Memproses Pesan #
import 'package:aws_client/sqs_2012_11_05.dart';
import 'dart:convert';
class SQSConsumer {
final SQS _sqs;
final String _queueUrl;
bool _jalan = true;
SQSConsumer({required SQS sqs, required String queueUrl})
: _sqs = sqs,
_queueUrl = queueUrl;
Future<void> mulai(Future<void> Function(Map<String, dynamic>) handler) async {
print('Consumer SQS dimulai');
while (_jalan) {
try {
// Long polling — tunggu hingga 20 detik jika queue kosong
// Lebih efisien dari short polling yang terus-terusan kirim request
final result = await _sqs.receiveMessage(
queueUrl: _queueUrl,
maxNumberOfMessages: 10, // ambil max 10 pesan sekaligus
waitTimeSeconds: 20, // long polling — hemat biaya dan CPU
visibilityTimeout: 60, // sembunyikan dari consumer lain selama 60 detik
messageAttributeNames: ['All'], // ambil semua message attributes
);
final pesan = result.messages ?? [];
if (pesan.isEmpty) continue;
// Proses semua pesan yang diterima
await Future.wait(
pesan.map((msg) => _prosesSatuPesan(msg, handler)),
);
} catch (e) {
print('Error saat poll: $e');
await Future.delayed(Duration(seconds: 5)); // backoff saat error
}
}
}
Future<void> _prosesSatuPesan(
Message msg,
Future<void> Function(Map<String, dynamic>) handler,
) async {
try {
final data = jsonDecode(msg.body!) as Map<String, dynamic>;
print('Memproses pesan: ${msg.messageId}');
print('Receive count: ${msg.attributes?['ApproximateReceiveCount']}');
await handler(data);
// Hapus pesan setelah berhasil diproses — konfirmasi seperti ACK
await _sqs.deleteMessage(
queueUrl: _queueUrl,
receiptHandle: msg.receiptHandle!, // handle unik dari receive
);
print('Pesan ${msg.messageId} berhasil diproses dan dihapus');
} catch (e) {
print('Gagal memproses pesan ${msg.messageId}: $e');
// Jangan hapus — visibility timeout akan expired dan pesan kembali visible
// Setelah maxReceiveCount kali gagal, pesan otomatis ke DLQ
}
}
void hentikan() => _jalan = false;
}
// Penggunaan
Future<void> main() async {
final sqs = SQS(region: 'ap-southeast-1');
final urlResult = await sqs.getQueueUrl(queueName: 'order-events');
final consumer = SQSConsumer(
sqs: sqs,
queueUrl: urlResult.queueUrl!,
);
await consumer.mulai((data) async {
print('Memproses: ${data['tipe']}');
await prosesBisnisLogic(data);
});
}
Visibility Timeout — Cara SQS Bekerja #
Memahami visibility timeout sangat penting agar tidak kehilangan pesan atau memprosesnya dua kali:
// Skenario:
// 1. Consumer A menerima pesan → pesan "invisible" selama visibilityTimeout (30 detik)
// 2. Consumer A crash sebelum menghapus pesan
// 3. Setelah 30 detik, pesan kembali "visible"
// 4. Consumer B menerima pesan yang sama → at-least-once delivery!
// Untuk pesan yang butuh waktu proses lebih lama:
// Perpanjang visibility timeout sebelum expired
Future<void> prosesLama(SQS sqs, String queueUrl, Message msg) async {
// Mulai timer untuk perpanjang setiap 25 detik (sebelum 30 detik habis)
final timer = Timer.periodic(Duration(seconds: 25), (_) async {
await sqs.changeMessageVisibility(
queueUrl: queueUrl,
receiptHandle: msg.receiptHandle!,
visibilityTimeout: 30, // perpanjang 30 detik lagi
);
print('Visibility timeout diperpanjang untuk ${msg.messageId}');
});
try {
await operasiYangLama(); // proses yang butuh waktu lebih dari 30 detik
await sqs.deleteMessage(queueUrl: queueUrl, receiptHandle: msg.receiptHandle!);
} finally {
timer.cancel();
}
}
Dead Letter Queue (DLQ) #
import 'package:aws_client/sqs_2012_11_05.dart';
Future<void> setupDLQ(SQS sqs) async {
// 1. Buat DLQ terlebih dahulu
final dlqResult = await sqs.createQueue(
queueName: 'order-events-dlq',
attributes: {
QueueAttributeName.messageRetentionPeriod: '1209600', // simpan 14 hari
},
);
final dlqUrl = dlqResult.queueUrl!;
// Dapatkan ARN DLQ
final dlqAttrs = await sqs.getQueueAttributes(
queueUrl: dlqUrl,
attributeNames: [QueueAttributeName.queueArn],
);
final dlqArn = dlqAttrs.attributes![QueueAttributeName.queueArn]!;
// 2. Buat queue utama dengan redrive policy ke DLQ
await sqs.createQueue(
queueName: 'order-events',
attributes: {
QueueAttributeName.redrivePolicy: jsonEncode({
'maxReceiveCount': '3', // coba 3 kali, lalu ke DLQ
'deadLetterTargetArn': dlqArn,
}),
},
);
// 3. Monitor DLQ — alert jika ada pesan di sini
Future<void> monitorDLQ() async {
while (true) {
final attrs = await sqs.getQueueAttributes(
queueUrl: dlqUrl,
attributeNames: [QueueAttributeName.approximateNumberOfMessages],
);
final jumlah = int.tryParse(
attrs.attributes?[QueueAttributeName.approximateNumberOfMessages] ?? '0',
) ?? 0;
if (jumlah > 0) {
print('⚠️ Ada $jumlah pesan gagal di DLQ!');
// Kirim alert ke tim via Slack/PagerDuty
}
await Future.delayed(Duration(minutes: 5));
}
}
}
Integrasi SQS + SNS — Fan-Out Pattern #
SQS sering digunakan bersama SNS untuk pola fan-out — satu event dikirim ke banyak queue:
// SNS Topic → beberapa SQS Queue
// Satu pesan SNS terdistribusi ke semua subscriber secara paralel
// Setup via AWS Console atau CloudFormation:
// SNS Topic "order-events"
// → Subscribe SQS "order-processor" (proses order)
// → Subscribe SQS "inventory-updater" (update stok)
// → Subscribe SQS "notification-sender" (kirim notifikasi)
// → Subscribe SQS "analytics-collector" (rekam untuk analytics)
// Setiap SQS queue memproses secara independen
// Jika satu service down, service lain tidak terpengaruh
// Pesan dari SNS ke SQS datang dalam format envelope:
Future<void> prosesPesanDariSNS(Map<String, dynamic> sqsMessage) async {
// SNS membungkus pesan dalam envelope JSON
if (sqsMessage.containsKey('Type') && sqsMessage['Type'] == 'Notification') {
// Ini pesan dari SNS
final pesanAsli = jsonDecode(sqsMessage['Message'] as String) as Map<String, dynamic>;
final topicArn = sqsMessage['TopicArn'] as String;
print('Event dari SNS topic: $topicArn');
await prosesEventAsli(pesanAsli);
} else {
// Pesan langsung ke SQS
await prosesEventAsli(sqsMessage);
}
}
Batch Delete — Hapus Banyak Pesan Sekaligus #
// Setelah proses batch, hapus semua sekaligus (hemat biaya API)
Future<void> prosesDanHapusBatch(
SQS sqs,
String queueUrl,
List<Message> pesan,
Future<void> Function(Map<String, dynamic>) handler,
) async {
final berhasil = <String>[]; // receiptHandle yang berhasil
await Future.wait(
pesan.map((msg) async {
try {
final data = jsonDecode(msg.body!) as Map<String, dynamic>;
await handler(data);
berhasil.add(msg.receiptHandle!);
} catch (e) {
print('Gagal memproses ${msg.messageId}: $e');
// Tidak ditambahkan ke berhasil → tidak dihapus → kembali ke queue
}
}),
);
if (berhasil.isEmpty) return;
// Batch delete semua yang berhasil — max 10 per request
final entries = berhasil.asMap().entries.map((e) =>
DeleteMessageBatchRequestEntry(
id: 'del-${e.key}',
receiptHandle: e.value,
)
).toList();
await sqs.deleteMessageBatch(
queueUrl: queueUrl,
entries: entries,
);
print('Batch delete: ${berhasil.length}/${pesan.length} pesan dihapus');
}
Anti-Pattern SQS #
Short Polling Berulang #
// ANTI-PATTERN: short polling — boros biaya dan CPU
while (true) {
final result = await sqs.receiveMessage(
queueUrl: queueUrl,
maxNumberOfMessages: 1,
waitTimeSeconds: 0, // ✗ short polling — langsung return meski queue kosong
);
// Jika queue kosong, tetap mengirim request dan dikenai biaya!
await Future.delayed(Duration(seconds: 1)); // loop sangat cepat
}
// BENAR: long polling — efisien dan hemat biaya
while (true) {
final result = await sqs.receiveMessage(
queueUrl: queueUrl,
maxNumberOfMessages: 10,
waitTimeSeconds: 20, // ✓ tunggu max 20 detik — hemat biaya API call
);
}
Tidak Menghapus Pesan Setelah Proses #
// ANTI-PATTERN: lupa menghapus pesan setelah berhasil diproses
final result = await sqs.receiveMessage(queueUrl: queueUrl);
for (final msg in result.messages ?? []) {
await prosesPesan(msg);
// ✗ tidak memanggil deleteMessage → pesan kembali visible setelah timeout
// Pesan akan diproses berulang kali!
}
// BENAR: selalu hapus setelah berhasil
for (final msg in result.messages ?? []) {
await prosesPesan(msg);
await sqs.deleteMessage(
queueUrl: queueUrl,
receiptHandle: msg.receiptHandle!,
); // ✓
}
Ringkasan #
- Standard vs FIFO — gunakan Standard untuk throughput tinggi tanpa kebutuhan ordering ketat; FIFO untuk transaksi yang butuh urutan dan exactly-once (nama harus diakhiri
.fifo).- Long polling (
waitTimeSeconds: 20) wajib digunakan — short polling memboroskan biaya dan sumber daya. SQS mengenai biaya per request, bukan per pesan.- Batch operations (
sendMessageBatch,deleteMessageBatch) menghemat hingga 90% biaya API — kirim dan hapus hingga 10 pesan per request.- Visibility timeout menyembunyikan pesan dari consumer lain saat diproses — atur lebih panjang dari durasi proses maksimal, atau perpanjang secara dinamis dengan
changeMessageVisibility.- Dead Letter Queue (DLQ) dikonfigurasi via
redrivePolicy.maxReceiveCount— setelah N kali gagal, pesan otomatis pindah ke DLQ. Monitor DLQ untuk mendeteksi masalah.- Selalu hapus pesan dengan
deleteMessagesetelah berhasil diproses — jika tidak, pesan akan diproses ulang setelah visibility timeout expired.- IAM Role lebih aman dari credential eksplisit untuk Lambda, ECS, dan EC2 — tidak ada secret yang perlu disimpan, rotasi otomatis oleh AWS.
- SNS + SQS fan-out untuk distribusi event ke banyak service — satu publish SNS, banyak SQS queue menerima secara independen.
- FIFO message group — pesan dengan
messageGroupIdyang sama diproses berurutan (FIFO), tapi group yang berbeda bisa diproses paralel.contentBasedDeduplicationpada FIFO queue menghilangkan kebutuhanmessageDeduplicationIdeksplisit — SQS otomatis dedup berdasarkan hash konten selama 5 menit.