Amazon SQS

Amazon SQS #

Amazon Simple Queue Service (SQS) adalah layanan antrian pesan terkelola dari AWS — tidak perlu mengelola infrastruktur broker seperti RabbitMQ atau Kafka. SQS menyediakan dua jenis antrian: Standard (throughput tinggi, at-least-once delivery, urutan tidak dijamin) dan FIFO (urutan dijamin, exactly-once processing, throughput lebih rendah). Dari Dart, SQS diakses via REST API dengan autentikasi AWS Signature Version 4 — package aws_client menyederhanakan ini dengan menyediakan client yang sudah menangani signing secara otomatis.

Setup Package #

dart pub add aws_client
# pubspec.yaml
dependencies:
  aws_client: ^2.0.0

Standard vs FIFO Queue #

flowchart LR
    subgraph Standard Queue
        P1["Producer"] -->|send| SQ["Standard Queue\n• At-least-once delivery\n• Urutan tidak dijamin\n• Throughput unlimited\n• Harga lebih murah"]
        SQ -->|receive| C1["Consumer"]
        SQ -->|receive| C2["Consumer"]
    end

    subgraph FIFO Queue
        P2["Producer"] -->|send| FQ["FIFO Queue\n• Exactly-once delivery\n• Urutan dijamin per group\n• Max 3.000 TPS\n• Harga lebih mahal"]
        FQ -->|receive| C3["Consumer"]
    end
Aspek Standard Queue FIFO Queue
Ordering Best-effort (tidak dijamin) Strict (FIFO per group)
Delivery At-least-once (bisa duplikat) Exactly-once
Throughput Unlimited Max 3.000 TPS (dengan batching)
Nama antrian Bebas Harus diakhiri .fifo
Cocok untuk Email notifikasi, log, fanout Transaksi keuangan, order processing

Konfigurasi AWS #

import 'package:aws_client/sqs_2012_11_05.dart';
import 'package:aws_client/src/credentials/credentials.dart';

// Cara 1: Credential eksplisit (untuk development)
SQS buatKlienSQS() {
  return SQS(
    region: 'ap-southeast-1',  // region AWS, contoh: Asia Pacific (Singapore)
    credentials: AwsClientCredentials(
      accessKey: 'AKIAIOSFODNN7EXAMPLE',
      secretKey: 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
    ),
  );
}

// Cara 2: Environment variables (direkomendasikan untuk produksi)
// Set: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION
// aws_client otomatis membaca dari environment
SQS buatKlienSQSDariEnv() {
  return SQS(region: 'ap-southeast-1');
  // credentials dibaca dari: env var → ~/.aws/credentials → EC2 instance profile
}

// Cara 3: Untuk AWS Lambda / ECS — gunakan IAM Role (tanpa credentials eksplisit)
// Assign IAM role dengan permission SQS ke Lambda/ECS task
// aws_client akan menggunakan instance metadata service secara otomatis

Manajemen Queue #

import 'package:aws_client/sqs_2012_11_05.dart';

Future<void> kelolaQueue(SQS sqs) async {
  // Buat Standard Queue
  final standardQueue = await sqs.createQueue(
    queueName: 'order-events',
    attributes: {
      QueueAttributeName.visibilityTimeout: '30',   // 30 detik
      QueueAttributeName.messageRetentionPeriod: '86400',  // 1 hari
      QueueAttributeName.receiveMessageWaitTimeSeconds: '20',  // long polling
      QueueAttributeName.redrivePolicy: jsonEncode({
        'maxReceiveCount': '3',          // coba 3 kali sebelum ke DLQ
        'deadLetterTargetArn': 'arn:aws:sqs:ap-southeast-1:123456:order-events-dlq',
      }),
    },
  );
  print('Queue URL: ${standardQueue.queueUrl}');

  // Buat FIFO Queue — nama harus diakhiri .fifo
  final fifoQueue = await sqs.createQueue(
    queueName: 'payment-events.fifo',
    attributes: {
      QueueAttributeName.fifoQueue: 'true',
      QueueAttributeName.contentBasedDeduplication: 'true',  // dedup otomatis via hash
      QueueAttributeName.visibilityTimeout: '60',
    },
  );

  // List semua queue
  final queues = await sqs.listQueues();
  for (final url in queues.queueUrls ?? []) {
    print('Queue: $url');
  }

  // Dapatkan URL queue berdasarkan nama
  final urlResult = await sqs.getQueueUrl(queueName: 'order-events');
  final queueUrl = urlResult.queueUrl!;

  // Info attributes queue
  final attrs = await sqs.getQueueAttributes(
    queueUrl: queueUrl,
    attributeNames: [
      QueueAttributeName.approximateNumberOfMessages,
      QueueAttributeName.approximateNumberOfMessagesNotVisible,
    ],
  );
  print('Pesan tersedia: ${attrs.attributes?[QueueAttributeName.approximateNumberOfMessages]}');
}

Mengirim Pesan #

import 'package:aws_client/sqs_2012_11_05.dart';
import 'dart:convert';

class SQSProducer {
  final SQS _sqs;
  final String _queueUrl;

  SQSProducer({required SQS sqs, required String queueUrl})
      : _sqs = sqs,
        _queueUrl = queueUrl;

  // Kirim satu pesan ke Standard Queue
  Future<String> kirim(Map<String, dynamic> data) async {
    final result = await _sqs.sendMessage(
      queueUrl: _queueUrl,
      messageBody: jsonEncode(data),
      messageAttributes: {
        'tipeEvent': MessageAttributeValue(
          dataType: 'String',
          stringValue: data['tipe'] as String?,
        ),
        'versi': MessageAttributeValue(
          dataType: 'String',
          stringValue: '1.0',
        ),
      },
      delaySeconds: 0,  // delay pengiriman (0-900 detik)
    );

    print('Pesan terkirim: ${result.messageId}');
    return result.messageId!;
  }

  // Kirim ke FIFO Queue — butuh MessageGroupId dan MessageDeduplicationId
  Future<String> kirimFIFO({
    required Map<String, dynamic> data,
    required String groupId,        // urutan dijamin per groupId yang sama
    String? deduplicationId,        // ID unik untuk mencegah duplikat (null = auto dari content)
  }) async {
    final result = await _sqs.sendMessage(
      queueUrl: _queueUrl,
      messageBody: jsonEncode(data),
      messageGroupId: groupId,
      messageDeduplicationId: deduplicationId ??
          _hashContent(jsonEncode(data)),  // jika contentBasedDeduplication aktif, bisa null
    );

    return result.messageId!;
  }

  // Batch send — kirim hingga 10 pesan sekaligus (lebih hemat biaya)
  Future<void> kirimBatch(List<Map<String, dynamic>> pesanList) async {
    // SQS batch max 10 pesan
    for (int i = 0; i < pesanList.length; i += 10) {
      final batch = pesanList.sublist(i, (i + 10).clamp(0, pesanList.length));

      final entries = batch.asMap().entries.map((e) {
        return SendMessageBatchRequestEntry(
          id: 'msg-${e.key}',          // ID unik dalam batch
          messageBody: jsonEncode(e.value),
        );
      }).toList();

      final result = await _sqs.sendMessageBatch(
        queueUrl: _queueUrl,
        entries: entries,
      );

      if (result.failed != null && result.failed!.isNotEmpty) {
        for (final gagal in result.failed!) {
          print('Gagal kirim ${gagal.id}: ${gagal.message}');
        }
      }

      print('Batch terkirim: ${result.successful?.length ?? 0}/${batch.length}');
    }
  }

  String _hashContent(String content) {
    // Implementasi hash sederhana (gunakan crypto package untuk produksi)
    return content.hashCode.toRadixString(16);
  }
}

Menerima dan Memproses Pesan #

import 'package:aws_client/sqs_2012_11_05.dart';
import 'dart:convert';

class SQSConsumer {
  final SQS _sqs;
  final String _queueUrl;
  bool _jalan = true;

  SQSConsumer({required SQS sqs, required String queueUrl})
      : _sqs = sqs,
        _queueUrl = queueUrl;

  Future<void> mulai(Future<void> Function(Map<String, dynamic>) handler) async {
    print('Consumer SQS dimulai');

    while (_jalan) {
      try {
        // Long polling — tunggu hingga 20 detik jika queue kosong
        // Lebih efisien dari short polling yang terus-terusan kirim request
        final result = await _sqs.receiveMessage(
          queueUrl: _queueUrl,
          maxNumberOfMessages: 10,        // ambil max 10 pesan sekaligus
          waitTimeSeconds: 20,            // long polling — hemat biaya dan CPU
          visibilityTimeout: 60,          // sembunyikan dari consumer lain selama 60 detik
          messageAttributeNames: ['All'], // ambil semua message attributes
        );

        final pesan = result.messages ?? [];
        if (pesan.isEmpty) continue;

        // Proses semua pesan yang diterima
        await Future.wait(
          pesan.map((msg) => _prosesSatuPesan(msg, handler)),
        );

      } catch (e) {
        print('Error saat poll: $e');
        await Future.delayed(Duration(seconds: 5)); // backoff saat error
      }
    }
  }

  Future<void> _prosesSatuPesan(
    Message msg,
    Future<void> Function(Map<String, dynamic>) handler,
  ) async {
    try {
      final data = jsonDecode(msg.body!) as Map<String, dynamic>;

      print('Memproses pesan: ${msg.messageId}');
      print('Receive count: ${msg.attributes?['ApproximateReceiveCount']}');

      await handler(data);

      // Hapus pesan setelah berhasil diproses — konfirmasi seperti ACK
      await _sqs.deleteMessage(
        queueUrl: _queueUrl,
        receiptHandle: msg.receiptHandle!,  // handle unik dari receive
      );

      print('Pesan ${msg.messageId} berhasil diproses dan dihapus');

    } catch (e) {
      print('Gagal memproses pesan ${msg.messageId}: $e');
      // Jangan hapus — visibility timeout akan expired dan pesan kembali visible
      // Setelah maxReceiveCount kali gagal, pesan otomatis ke DLQ
    }
  }

  void hentikan() => _jalan = false;
}

// Penggunaan
Future<void> main() async {
  final sqs = SQS(region: 'ap-southeast-1');
  final urlResult = await sqs.getQueueUrl(queueName: 'order-events');

  final consumer = SQSConsumer(
    sqs: sqs,
    queueUrl: urlResult.queueUrl!,
  );

  await consumer.mulai((data) async {
    print('Memproses: ${data['tipe']}');
    await prosesBisnisLogic(data);
  });
}

Visibility Timeout — Cara SQS Bekerja #

Memahami visibility timeout sangat penting agar tidak kehilangan pesan atau memprosesnya dua kali:

// Skenario:
// 1. Consumer A menerima pesan → pesan "invisible" selama visibilityTimeout (30 detik)
// 2. Consumer A crash sebelum menghapus pesan
// 3. Setelah 30 detik, pesan kembali "visible"
// 4. Consumer B menerima pesan yang sama → at-least-once delivery!

// Untuk pesan yang butuh waktu proses lebih lama:
// Perpanjang visibility timeout sebelum expired
Future<void> prosesLama(SQS sqs, String queueUrl, Message msg) async {
  // Mulai timer untuk perpanjang setiap 25 detik (sebelum 30 detik habis)
  final timer = Timer.periodic(Duration(seconds: 25), (_) async {
    await sqs.changeMessageVisibility(
      queueUrl: queueUrl,
      receiptHandle: msg.receiptHandle!,
      visibilityTimeout: 30,  // perpanjang 30 detik lagi
    );
    print('Visibility timeout diperpanjang untuk ${msg.messageId}');
  });

  try {
    await operasiYangLama();  // proses yang butuh waktu lebih dari 30 detik
    await sqs.deleteMessage(queueUrl: queueUrl, receiptHandle: msg.receiptHandle!);
  } finally {
    timer.cancel();
  }
}

Dead Letter Queue (DLQ) #

import 'package:aws_client/sqs_2012_11_05.dart';

Future<void> setupDLQ(SQS sqs) async {
  // 1. Buat DLQ terlebih dahulu
  final dlqResult = await sqs.createQueue(
    queueName: 'order-events-dlq',
    attributes: {
      QueueAttributeName.messageRetentionPeriod: '1209600',  // simpan 14 hari
    },
  );
  final dlqUrl = dlqResult.queueUrl!;

  // Dapatkan ARN DLQ
  final dlqAttrs = await sqs.getQueueAttributes(
    queueUrl: dlqUrl,
    attributeNames: [QueueAttributeName.queueArn],
  );
  final dlqArn = dlqAttrs.attributes![QueueAttributeName.queueArn]!;

  // 2. Buat queue utama dengan redrive policy ke DLQ
  await sqs.createQueue(
    queueName: 'order-events',
    attributes: {
      QueueAttributeName.redrivePolicy: jsonEncode({
        'maxReceiveCount': '3',        // coba 3 kali, lalu ke DLQ
        'deadLetterTargetArn': dlqArn,
      }),
    },
  );

  // 3. Monitor DLQ — alert jika ada pesan di sini
  Future<void> monitorDLQ() async {
    while (true) {
      final attrs = await sqs.getQueueAttributes(
        queueUrl: dlqUrl,
        attributeNames: [QueueAttributeName.approximateNumberOfMessages],
      );
      final jumlah = int.tryParse(
        attrs.attributes?[QueueAttributeName.approximateNumberOfMessages] ?? '0',
      ) ?? 0;

      if (jumlah > 0) {
        print('⚠️ Ada $jumlah pesan gagal di DLQ!');
        // Kirim alert ke tim via Slack/PagerDuty
      }

      await Future.delayed(Duration(minutes: 5));
    }
  }
}

Integrasi SQS + SNS — Fan-Out Pattern #

SQS sering digunakan bersama SNS untuk pola fan-out — satu event dikirim ke banyak queue:

// SNS Topic → beberapa SQS Queue
// Satu pesan SNS terdistribusi ke semua subscriber secara paralel

// Setup via AWS Console atau CloudFormation:
// SNS Topic "order-events"
//   → Subscribe SQS "order-processor"      (proses order)
//   → Subscribe SQS "inventory-updater"    (update stok)
//   → Subscribe SQS "notification-sender"  (kirim notifikasi)
//   → Subscribe SQS "analytics-collector"  (rekam untuk analytics)

// Setiap SQS queue memproses secara independen
// Jika satu service down, service lain tidak terpengaruh

// Pesan dari SNS ke SQS datang dalam format envelope:
Future<void> prosesPesanDariSNS(Map<String, dynamic> sqsMessage) async {
  // SNS membungkus pesan dalam envelope JSON
  if (sqsMessage.containsKey('Type') && sqsMessage['Type'] == 'Notification') {
    // Ini pesan dari SNS
    final pesanAsli = jsonDecode(sqsMessage['Message'] as String) as Map<String, dynamic>;
    final topicArn = sqsMessage['TopicArn'] as String;
    print('Event dari SNS topic: $topicArn');
    await prosesEventAsli(pesanAsli);
  } else {
    // Pesan langsung ke SQS
    await prosesEventAsli(sqsMessage);
  }
}

Batch Delete — Hapus Banyak Pesan Sekaligus #

// Setelah proses batch, hapus semua sekaligus (hemat biaya API)
Future<void> prosesDanHapusBatch(
  SQS sqs,
  String queueUrl,
  List<Message> pesan,
  Future<void> Function(Map<String, dynamic>) handler,
) async {
  final berhasil = <String>[];  // receiptHandle yang berhasil

  await Future.wait(
    pesan.map((msg) async {
      try {
        final data = jsonDecode(msg.body!) as Map<String, dynamic>;
        await handler(data);
        berhasil.add(msg.receiptHandle!);
      } catch (e) {
        print('Gagal memproses ${msg.messageId}: $e');
        // Tidak ditambahkan ke berhasil → tidak dihapus → kembali ke queue
      }
    }),
  );

  if (berhasil.isEmpty) return;

  // Batch delete semua yang berhasil — max 10 per request
  final entries = berhasil.asMap().entries.map((e) =>
    DeleteMessageBatchRequestEntry(
      id: 'del-${e.key}',
      receiptHandle: e.value,
    )
  ).toList();

  await sqs.deleteMessageBatch(
    queueUrl: queueUrl,
    entries: entries,
  );

  print('Batch delete: ${berhasil.length}/${pesan.length} pesan dihapus');
}

Anti-Pattern SQS #

Short Polling Berulang #

// ANTI-PATTERN: short polling — boros biaya dan CPU
while (true) {
  final result = await sqs.receiveMessage(
    queueUrl: queueUrl,
    maxNumberOfMessages: 1,
    waitTimeSeconds: 0,  // ✗ short polling — langsung return meski queue kosong
  );
  // Jika queue kosong, tetap mengirim request dan dikenai biaya!
  await Future.delayed(Duration(seconds: 1)); // loop sangat cepat
}

// BENAR: long polling — efisien dan hemat biaya
while (true) {
  final result = await sqs.receiveMessage(
    queueUrl: queueUrl,
    maxNumberOfMessages: 10,
    waitTimeSeconds: 20,  // ✓ tunggu max 20 detik — hemat biaya API call
  );
}

Tidak Menghapus Pesan Setelah Proses #

// ANTI-PATTERN: lupa menghapus pesan setelah berhasil diproses
final result = await sqs.receiveMessage(queueUrl: queueUrl);
for (final msg in result.messages ?? []) {
  await prosesPesan(msg);
  // ✗ tidak memanggil deleteMessage → pesan kembali visible setelah timeout
  // Pesan akan diproses berulang kali!
}

// BENAR: selalu hapus setelah berhasil
for (final msg in result.messages ?? []) {
  await prosesPesan(msg);
  await sqs.deleteMessage(
    queueUrl: queueUrl,
    receiptHandle: msg.receiptHandle!,
  );  // ✓
}

Ringkasan #

  • Standard vs FIFO — gunakan Standard untuk throughput tinggi tanpa kebutuhan ordering ketat; FIFO untuk transaksi yang butuh urutan dan exactly-once (nama harus diakhiri .fifo).
  • Long polling (waitTimeSeconds: 20) wajib digunakan — short polling memboroskan biaya dan sumber daya. SQS mengenai biaya per request, bukan per pesan.
  • Batch operations (sendMessageBatch, deleteMessageBatch) menghemat hingga 90% biaya API — kirim dan hapus hingga 10 pesan per request.
  • Visibility timeout menyembunyikan pesan dari consumer lain saat diproses — atur lebih panjang dari durasi proses maksimal, atau perpanjang secara dinamis dengan changeMessageVisibility.
  • Dead Letter Queue (DLQ) dikonfigurasi via redrivePolicy.maxReceiveCount — setelah N kali gagal, pesan otomatis pindah ke DLQ. Monitor DLQ untuk mendeteksi masalah.
  • Selalu hapus pesan dengan deleteMessage setelah berhasil diproses — jika tidak, pesan akan diproses ulang setelah visibility timeout expired.
  • IAM Role lebih aman dari credential eksplisit untuk Lambda, ECS, dan EC2 — tidak ada secret yang perlu disimpan, rotasi otomatis oleh AWS.
  • SNS + SQS fan-out untuk distribusi event ke banyak service — satu publish SNS, banyak SQS queue menerima secara independen.
  • FIFO message group — pesan dengan messageGroupId yang sama diproses berurutan (FIFO), tapi group yang berbeda bisa diproses paralel.
  • contentBasedDeduplication pada FIFO queue menghilangkan kebutuhan messageDeduplicationId eksplisit — SQS otomatis dedup berdasarkan hash konten selama 5 menit.

← Sebelumnya: RabbitMQ   Berikutnya: Google Pub/Sub →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact