Kafka

Kafka #

Apache Kafka adalah platform streaming terdistribusi yang dirancang untuk throughput tinggi, latensi rendah, dan ketahanan data — digunakan oleh LinkedIn, Uber, Netflix, dan ribuan perusahaan lainnya untuk memindahkan triliunan pesan per hari. Berbeda dari message broker tradisional seperti RabbitMQ, Kafka menyimpan semua pesan dalam log yang persisten dan terurut — consumer bisa membaca ulang pesan dari posisi manapun. Package kafka_dart menyediakan client Dart yang lengkap untuk berinteraksi dengan Kafka cluster. Artikel ini membahas konsep inti, producer, consumer, error handling, dan pola yang paling berguna dalam arsitektur event-driven.

Konsep Inti Kafka #

flowchart LR
    P1["Producer\n(Dart App)"] -->|publish| T["Topic: order-events\nPartition 0: [msg0, msg1, msg2]\nPartition 1: [msg3, msg4]\nPartition 2: [msg5, msg6]"]
    T -->|subscribe| CG1["Consumer Group A\nConsumer 1 → P0\nConsumer 2 → P1, P2"]
    T -->|subscribe| CG2["Consumer Group B\nConsumer 3 → P0, P1, P2"]
Konsep Deskripsi
Topic Kategori/nama saluran pesan — seperti tabel di database
Partition Pembagian horizontal topic — unit paralelisme Kafka
Offset Posisi pesan dalam partition — monoton meningkat dari 0
Producer Pengirim pesan ke topic
Consumer Penerima pesan dari topic
Consumer Group Beberapa consumer yang berbagi beban membaca topic
Broker Server Kafka yang menyimpan dan melayani pesan
Retention Berapa lama pesan disimpan (default 7 hari) — bisa dibaca ulang

Setup Package #

dart pub add kafka_dart
# pubspec.yaml
dependencies:
  kafka_dart: ^0.0.5
Untuk produksi, pastikan Kafka cluster sudah berjalan dan bisa diakses dari mesin Dart. Untuk development lokal, gunakan Docker: docker run -p 9092:9092 apache/kafka. Atau gunakan Confluent Platform / Redpanda sebagai alternatif yang lebih mudah di-setup.

Producer — Mengirim Pesan #

import 'package:kafka_dart/kafka_dart.dart';
import 'dart:convert';

Future<void> main() async {
  // Buat producer
  final producer = KafkaProducer(
    brokers: ['localhost:9092'],
    config: ProducerConfig(
      // Keandalan pengiriman
      acks: Acks.all,           // tunggu konfirmasi dari semua replica
      retries: 3,               // coba ulang 3 kali jika gagal
      retryBackoffMs: 100,      // jeda antara retry

      // Performa
      batchSize: 16384,         // kumpulkan hingga 16KB sebelum kirim
      lingerMs: 5,              // tunggu max 5ms untuk batch lebih besar
      compressionType: CompressionType.snappy, // kompresi untuk efisiensi

      // Idempotency — pastikan pesan tidak terduplikasi
      enableIdempotence: true,
    ),
  );

  // Kirim pesan sederhana
  await producer.send(
    ProducerRecord(
      topic: 'order-events',
      value: utf8.encode(jsonEncode({
        'tipe': 'order_dibuat',
        'idOrder': 'ORD-001',
        'idPengguna': 'USR-123',
        'total': 150_000,
        'timestamp': DateTime.now().toUtc().toIso8601String(),
      })),
    ),
  );

  print('Pesan terkirim');
  await producer.close();
}

Producer dengan Key untuk Partitioning #

Key menentukan ke partition mana pesan dikirim — pesan dengan key yang sama selalu masuk ke partition yang sama, menjamin urutan per entity:

import 'package:kafka_dart/kafka_dart.dart';
import 'dart:convert';

class OrderEventProducer {
  late final KafkaProducer _producer;

  Future<void> inisialisasi(List<String> brokers) async {
    _producer = KafkaProducer(
      brokers: brokers,
      config: ProducerConfig(
        acks: Acks.all,
        enableIdempotence: true,
        retries: 5,
      ),
    );
  }

  // Key = idOrder → semua event untuk order yang sama ke partition yang sama
  // Ini menjamin urutan event per order
  Future<void> kirimEvent({
    required String idOrder,
    required String tipeEvent,
    required Map<String, dynamic> data,
  }) async {
    final pesan = {
      'tipe': tipeEvent,
      'idOrder': idOrder,
      'data': data,
      'timestamp': DateTime.now().toUtc().toIso8601String(),
      'versi': '1.0',
    };

    final record = ProducerRecord(
      topic: 'order-events',
      key: utf8.encode(idOrder),    // key = idOrder untuk partisi yang konsisten
      value: utf8.encode(jsonEncode(pesan)),
      headers: {
        'tipe-event': utf8.encode(tipeEvent),
        'versi': utf8.encode('1.0'),
      },
    );

    final metadata = await _producer.send(record);
    print('Event "$tipeEvent" untuk order $idOrder → '
        'partition ${metadata.partition}, offset ${metadata.offset}');
  }

  Future<void> tutup() => _producer.close();
}

// Penggunaan
Future<void> main() async {
  final producer = OrderEventProducer();
  await producer.inisialisasi(['localhost:9092']);

  // Semua event ORDER-001 masuk ke partition yang sama → urutan terjaga
  await producer.kirimEvent(
    idOrder: 'ORDER-001',
    tipeEvent: 'order_dibuat',
    data: {'total': 150_000, 'itemCount': 3},
  );

  await producer.kirimEvent(
    idOrder: 'ORDER-001',
    tipeEvent: 'pembayaran_dikonfirmasi',
    data: {'metodePembayaran': 'transfer', 'jumlah': 150_000},
  );

  await producer.kirimEvent(
    idOrder: 'ORDER-001',
    tipeEvent: 'order_dikirim',
    data: {'kurir': 'JNE', 'noResi': 'JNE123456789'},
  );

  await producer.tutup();
}

Consumer — Menerima Pesan #

import 'package:kafka_dart/kafka_dart.dart';
import 'dart:convert';

Future<void> main() async {
  final consumer = KafkaConsumer(
    brokers: ['localhost:9092'],
    groupId: 'order-processor-group',   // consumer group ID
    config: ConsumerConfig(
      autoOffsetReset: AutoOffsetReset.earliest,  // mulai dari awal jika belum ada offset
      enableAutoCommit: false,    // PENTING: commit manual untuk kontrol penuh
      sessionTimeoutMs: 30000,    // timeout jika consumer tidak mengirim heartbeat
      maxPollRecords: 100,        // maksimal pesan per poll
    ),
  );

  // Subscribe ke topic
  consumer.subscribe(['order-events']);

  print('Consumer berjalan, menunggu pesan...');

  // Loop konsumsi pesan
  try {
    while (true) {
      // Poll dengan timeout 1 detik
      final records = await consumer.poll(Duration(seconds: 1));

      if (records.isEmpty) continue;

      for (final record in records) {
        try {
          await prosesRecord(record);
          // Commit offset SETELAH berhasil proses — at-least-once delivery
          await consumer.commitSync();
        } catch (e) {
          print('Gagal memproses pesan offset ${record.offset}: $e');
          // Jangan commit — pesan akan diproses ulang setelah restart
        }
      }
    }
  } finally {
    await consumer.close();
  }
}

Future<void> prosesRecord(ConsumerRecord record) async {
  final key = record.key != null ? utf8.decode(record.key!) : null;
  final value = jsonDecode(utf8.decode(record.value)) as Map<String, dynamic>;

  print('Partition: ${record.partition}, Offset: ${record.offset}');
  print('Key: $key');
  print('Tipe event: ${value['tipe']}');
  print('Data: ${value['data']}');

  // Proses berdasarkan tipe event
  switch (value['tipe'] as String) {
    case 'order_dibuat':
      await prosesOrderDibuat(value);
    case 'pembayaran_dikonfirmasi':
      await prosesPembayaran(value);
    case 'order_dikirim':
      await prosesOrderDikirim(value);
    default:
      print('Tipe event tidak dikenal: ${value['tipe']}');
  }
}

Consumer Group dan Paralelisme #

Consumer group memungkinkan beberapa instance consumer berbagi beban membaca topic — setiap partition hanya dibaca oleh satu consumer dalam group:

import 'package:kafka_dart/kafka_dart.dart';

// Jalankan beberapa worker secara paralel dengan group yang sama
Future<void> jalankanWorker(
  String workerId,
  List<String> brokers,
  String groupId,
  List<String> topics,
) async {
  final consumer = KafkaConsumer(
    brokers: brokers,
    groupId: groupId,   // group ID sama → Kafka akan bagi partition antar worker
    config: ConsumerConfig(
      enableAutoCommit: false,
      maxPollRecords: 50,
    ),
  );

  consumer.subscribe(topics);
  print('Worker $workerId dimulai');

  // Handler untuk rebalancing — saat consumer bergabung/keluar group
  consumer.onPartitionsAssigned = (partitions) {
    print('Worker $workerId mendapat partition: $partitions');
  };

  consumer.onPartitionsRevoked = (partitions) {
    print('Worker $workerId kehilangan partition: $partitions');
    // Commit offset sebelum partition diambil alih worker lain
    consumer.commitSync();
  };

  try {
    while (true) {
      final records = await consumer.poll(Duration(seconds: 1));
      for (final record in records) {
        await prosesRecord(record);
      }
      if (records.isNotEmpty) {
        await consumer.commitSync();
      }
    }
  } finally {
    await consumer.close();
    print('Worker $workerId selesai');
  }
}

Commit Strategy — At-Least-Once vs Exactly-Once #

Pilihan strategi commit sangat menentukan jaminan pengiriman pesan:

// AT-LEAST-ONCE (paling umum) — commit SETELAH proses
// Pesan bisa diproses lebih dari sekali jika crash setelah proses tapi sebelum commit
for (final record in records) {
  await prosesRecord(record);     // proses dulu
  await consumer.commitSync();    // lalu commit — at-least-once
}

// AT-MOST-ONCE — commit SEBELUM proses (tidak direkomendasikan untuk data kritis)
// Pesan bisa hilang jika crash setelah commit tapi sebelum proses
for (final record in records) {
  await consumer.commitSync();    // commit dulu
  await prosesRecord(record);     // lalu proses — at-most-once
}

// EXACTLY-ONCE — butuh transaksi atau idempotent consumer
// Simpan offset bersama hasil proses dalam transaksi atom
// Contoh: simpan offset ke database bersamaan dengan hasil proses
Future<void> prosesExactlyOnce(
  ConsumerRecord record,
  Pool pgPool,
) async {
  await pgPool.runTx((tx) async {
    // Cek apakah offset ini sudah pernah diproses
    final sudahDiproses = await tx.execute(
      r'SELECT 1 FROM processed_offsets WHERE topic = $1 AND partition = $2 AND offset = $3',
      parameters: [record.topic, record.partition, record.offset],
    );

    if (sudahDiproses.isNotEmpty) {
      print('Offset ${record.offset} sudah diproses, lewati');
      return;
    }

    // Proses dalam transaksi yang sama
    await prosesDalamTransaksi(tx, record);

    // Simpan offset sebagai bukti sudah diproses
    await tx.execute(
      r'INSERT INTO processed_offsets (topic, partition, offset, diproses_pada) VALUES ($1, $2, $3, NOW())',
      parameters: [record.topic, record.partition, record.offset],
    );
  });
}

Error Handling dan Dead Letter Queue #

import 'package:kafka_dart/kafka_dart.dart';

class ResilientConsumer {
  final KafkaConsumer _consumer;
  final KafkaProducer _dlqProducer;  // Dead Letter Queue producer
  final int _maxRetry;

  ResilientConsumer({
    required KafkaConsumer consumer,
    required KafkaProducer dlqProducer,
    int maxRetry = 3,
  })  : _consumer = consumer,
        _dlqProducer = dlqProducer,
        _maxRetry = maxRetry;

  Future<void> jalankan(List<String> topics) async {
    _consumer.subscribe(topics);

    while (true) {
      final records = await _consumer.poll(Duration(seconds: 1));

      for (final record in records) {
        bool berhasil = false;

        // Coba hingga maxRetry kali
        for (int percobaan = 1; percobaan <= _maxRetry; percobaan++) {
          try {
            await prosesRecord(record);
            berhasil = true;
            break;
          } catch (e) {
            print('Percobaan $percobaan/$_maxRetry gagal: $e');
            if (percobaan < _maxRetry) {
              // Exponential backoff
              await Future.delayed(Duration(milliseconds: 100 * (1 << percobaan)));
            }
          }
        }

        if (!berhasil) {
          // Kirim ke Dead Letter Queue untuk investigasi manual
          await _kirimKeDLQ(record);
        }
      }

      if (records.isNotEmpty) {
        await _consumer.commitSync();
      }
    }
  }

  Future<void> _kirimKeDLQ(ConsumerRecord record) async {
    print('Mengirim pesan gagal ke DLQ: offset ${record.offset}');

    await _dlqProducer.send(ProducerRecord(
      topic: '${record.topic}.dlq',  // konvensi nama DLQ topic
      key: record.key,
      value: record.value,
      headers: {
        // Metadata tambahan untuk debugging
        'original-topic': utf8.encode(record.topic),
        'original-partition': utf8.encode(record.partition.toString()),
        'original-offset': utf8.encode(record.offset.toString()),
        'failed-at': utf8.encode(DateTime.now().toIso8601String()),
      },
    ));
  }
}

Admin — Manajemen Topic #

import 'package:kafka_dart/kafka_dart.dart';

Future<void> kelolaTopik(List<String> brokers) async {
  final admin = KafkaAdmin(brokers: brokers);

  // Buat topic
  await admin.createTopics([
    TopicConfig(
      name: 'order-events',
      numPartitions: 6,        // 6 partition untuk paralelisme
      replicationFactor: 3,   // 3 replica untuk keandalan
      configs: {
        'retention.ms': '604800000',  // simpan 7 hari
        'cleanup.policy': 'delete',    // hapus pesan lama
        'compression.type': 'snappy',
      },
    ),
    TopicConfig(
      name: 'order-events.dlq',
      numPartitions: 1,
      replicationFactor: 3,
      configs: {
        'retention.ms': '2592000000',  // simpan 30 hari untuk DLQ
      },
    ),
  ]);

  // Daftar topic
  final topics = await admin.listTopics();
  print('Topics: $topics');

  // Describe topic — info partition dan replica
  final detail = await admin.describeTopics(['order-events']);
  for (final topic in detail) {
    print('Topic: ${topic.name}');
    for (final partition in topic.partitions) {
      print('  Partition ${partition.id}: leader=${partition.leader}, '
          'replicas=${partition.replicas}');
    }
  }

  // Hapus topic
  await admin.deleteTopics(['topic-lama']);

  await admin.close();
}

Pola Event-Driven dengan Kafka #

Event Sourcing — Simpan Semua Peristiwa #

// Semua perubahan state disimpan sebagai sequence event
// State saat ini = hasil replay semua event

class OrderEventStore {
  final KafkaProducer _producer;

  OrderEventStore(this._producer);

  Future<void> simpanEvent(OrderEvent event) async {
    await _producer.send(ProducerRecord(
      topic: 'order-events',
      key: utf8.encode(event.idOrder),
      value: utf8.encode(jsonEncode(event.toJson())),
    ));
  }
}

// Consumer yang rebuild state dari event stream
class OrderProjection {
  final Map<String, OrderState> _state = {};

  Future<void> prosesEvent(ConsumerRecord record) async {
    final event = OrderEvent.dariJson(
      jsonDecode(utf8.decode(record.value)) as Map<String, dynamic>,
    );

    final stateSebelumnya = _state[event.idOrder] ?? OrderState.awal();
    _state[event.idOrder] = stateSebelumnya.terapkan(event);
  }

  OrderState? stateOrder(String idOrder) => _state[idOrder];
}

CQRS — Command Query Responsibility Segregation #

// Command: tulis ke database, kirim event
Future<void> buatOrder(BuatOrderCommand cmd, Pool db, OrderEventStore eventStore) async {
  // Simpan ke database (write model)
  await db.execute(
    r'INSERT INTO orders (id, id_pengguna, total, status) VALUES ($1, $2, $3, $4)',
    parameters: [cmd.idOrder, cmd.idPengguna, cmd.total, 'pending'],
  );

  // Kirim event (untuk read model dan notifikasi)
  await eventStore.simpanEvent(OrderDibuatEvent(
    idOrder: cmd.idOrder,
    idPengguna: cmd.idPengguna,
    total: cmd.total,
  ));
}

// Query: baca dari read model yang di-build dari event
// Consumer lain bisa update Elasticsearch, Redis, atau tabel denormalisasi

Perbandingan Kafka vs RabbitMQ #

Aspek Kafka RabbitMQ
Model Log terdistribusi Message broker tradisional
Retensi Pesan disimpan (default 7 hari) Pesan dihapus setelah dikonsumsi
Throughput Sangat tinggi (jutaan/detik) Tinggi (ratusan ribu/detik)
Ordering Per partition Per queue
Consumer Pull-based Push-based
Replay ✓ Bisa baca ulang dari offset ✗ Tidak bisa
Use case Event streaming, log, analytics Task queue, RPC, routing kompleks
Setup Lebih kompleks Lebih sederhana

Ringkasan #

  • Key untuk ordering — pesan dengan key yang sama selalu masuk ke partition yang sama, menjamin urutan event per entity (misal: semua event satu order ke partition yang sama).
  • Consumer group membagi partition antar consumer — satu partition hanya dibaca oleh satu consumer dalam group. Tambah consumer = lebih paralel, tapi dibatasi jumlah partition.
  • Commit manual (enableAutoCommit: false) lebih aman dari auto-commit — commit offset hanya setelah pesan berhasil diproses untuk at-least-once delivery.
  • Dead Letter Queue (DLQ) untuk pesan yang gagal diproses setelah beberapa retry — simpan di topic .dlq dengan metadata debug untuk investigasi manual.
  • onPartitionsRevoked — commit offset sebelum partition diambil alih saat rebalancing, agar pesan tidak diproses ulang dari awal oleh consumer baru.
  • Retention policy — Kafka menyimpan pesan meski sudah dikonsumsi (default 7 hari). Consumer baru bisa membaca dari awal, consumer crash bisa lanjut dari offset terakhir.
  • Exactly-once — simpan offset bersama hasil proses dalam satu transaksi database. Saat restart, cek apakah offset sudah pernah diproses sebelum memproses ulang.
  • Jumlah partition = batas paralelisme — satu consumer per partition per group. Rencanakan jumlah partition berdasarkan kebutuhan throughput masa depan.
  • Producer idempotence (enableIdempotence: true) memastikan pesan tidak terduplikasi meski ada retry — dikombinasikan dengan acks: all untuk keandalan maksimal.
  • Admin API untuk manajemen topic secara programatik — berguna untuk infrastruktur-as-code dan testing.

← Sebelumnya: Elasticsearch   Berikutnya: RabbitMQ →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact