Google Pub/Sub

Google Pub/Sub #

Google Cloud Pub/Sub adalah layanan messaging terdistribusi yang dikelola sepenuhnya oleh Google Cloud — dirancang untuk skala global dengan latensi sangat rendah dan throughput hampir tak terbatas. Pub/Sub mendukung dua model pengiriman: pull (subscriber aktif menarik pesan) dan push (Google mendorong pesan ke endpoint HTTP subscriber). Ini menjadikannya sangat fleksibel untuk berbagai arsitektur — dari microservices hingga streaming data analytics yang terintegrasi langsung dengan BigQuery, Dataflow, dan layanan Google Cloud lainnya. Package googleapis menyediakan client Dart resmi untuk semua layanan Google Cloud termasuk Pub/Sub.

Konsep Inti Google Pub/Sub #

flowchart LR
    P["Publisher\n(Dart App)"] -->|publish message| T["Topic\nprojects/my-project/topics/order-events"]
    T -->|fan-out| S1["Subscription A\n(pull)\n→ order-processor"]
    T -->|fan-out| S2["Subscription B\n(pull)\n→ inventory-service"]
    T -->|push| S3["Subscription C\n(push)\n→ https://webhook.example.com"]
    S1 -->|pull| C1["Consumer\n(Dart App)"]
    S2 -->|pull| C2["Consumer\n(Dart App)"]
    S3 -->|HTTP POST| W["Webhook\nEndpoint"]
Konsep Deskripsi
Topic Saluran pengiriman pesan — publisher mengirim ke topic
Subscription Pelanggan topic — setiap subscription mendapat salinan pesan
Message Data (bytes) dengan optional attributes key-value
Acknowledge Konfirmasi bahwa pesan sudah diproses
Ack Deadline Batas waktu untuk acknowledge sebelum pesan dikirim ulang
Dead Letter Topic Topic untuk pesan yang gagal diproses berkali-kali
Ordering Key Kunci untuk menjamin urutan pesan per key
Filter Kondisi untuk menyaring pesan berdasarkan attributes

Setup Package #

dart pub add googleapis googleapis_auth http
# pubspec.yaml
dependencies:
  googleapis: ^12.0.0
  googleapis_auth: ^1.5.0
  http: ^1.2.0

Autentikasi Google Cloud #

Google Cloud menggunakan OAuth2 dengan Service Account untuk autentikasi server-to-server:

import 'package:googleapis_auth/auth_io.dart';
import 'package:googleapis/pubsub/v1.dart';
import 'dart:io';
import 'dart:convert';

// Cara 1: Service Account JSON Key File
Future<PubsubApi> buatKlienDariFile(String pathKeyFile) async {
  final keyJson = jsonDecode(await File(pathKeyFile).readAsString())
      as Map<String, dynamic>;

  final credentials = ServiceAccountCredentials.fromJson(keyJson);
  final scopes = [PubsubApi.cloudPlatformScope];

  final httpClient = await clientViaServiceAccount(credentials, scopes);
  return PubsubApi(httpClient);
}

// Cara 2: Application Default Credentials (ADC)
// Bekerja secara otomatis di GCE, Cloud Run, GKE, Cloud Functions
// Lokal: jalankan `gcloud auth application-default login`
Future<PubsubApi> buatKlienADC() async {
  final scopes = [PubsubApi.cloudPlatformScope];
  final httpClient = await clientViaApplicationDefaultCredentials(
    scopes: scopes,
  );
  return PubsubApi(httpClient);
}

// Cara 3: Emulator untuk development lokal
// Jalankan: gcloud beta emulators pubsub start
Future<PubsubApi> buatKlienEmulator() async {
  // Set environment variable: PUBSUB_EMULATOR_HOST=localhost:8085
  final emulatorHost = Platform.environment['PUBSUB_EMULATOR_HOST'];
  if (emulatorHost != null) {
    // Emulator tidak butuh autentikasi
    print('Menggunakan Pub/Sub emulator: $emulatorHost');
  }
  return buatKlienADC(); // ADC akan otomatis gunakan emulator jika env var di-set
}

Manajemen Topic dan Subscription #

import 'package:googleapis/pubsub/v1.dart';

const projectId = 'my-gcp-project';

String topicPath(String nama) => 'projects/$projectId/topics/$nama';
String subPath(String nama) => 'projects/$projectId/subscriptions/$nama';

Future<void> setupInfrastruktur(PubsubApi pubsub) async {
  // Buat topic
  try {
    await pubsub.projects.topics.create(
      Topic(
        name: topicPath('order-events'),
        messageRetentionDuration: '604800s',  // simpan 7 hari (untuk replay)
        labels: {'environment': 'production', 'team': 'platform'},
      ),
      topicPath('order-events'),
    );
    print('Topic berhasil dibuat');
  } on DetailedApiRequestError catch (e) {
    if (e.status == 409) {
      print('Topic sudah ada, lewati');
    } else {
      rethrow;
    }
  }

  // Buat Dead Letter Topic terlebih dahulu
  try {
    await pubsub.projects.topics.create(
      Topic(name: topicPath('order-events-dlq')),
      topicPath('order-events-dlq'),
    );
  } on DetailedApiRequestError catch (e) {
    if (e.status != 409) rethrow;
  }

  // Buat Pull Subscription
  await pubsub.projects.subscriptions.create(
    Subscription(
      name: subPath('order-processor'),
      topic: topicPath('order-events'),
      ackDeadlineSeconds: 60,          // 60 detik untuk acknowledge
      enableMessageOrdering: false,    // true jika butuh ordering per key
      deadLetterPolicy: DeadLetterPolicy(
        deadLetterTopic: topicPath('order-events-dlq'),
        maxDeliveryAttempts: 5,        // coba 5 kali sebelum ke DLQ
      ),
      retryPolicy: RetryPolicy(
        minimumBackoff: '10s',
        maximumBackoff: '300s',  // exponential backoff hingga 5 menit
      ),
      messageRetentionDuration: '604800s',
      expirationPolicy: ExpirationPolicy(ttl: '2592000s'),  // 30 hari
    ),
    subPath('order-processor'),
  );

  // Buat Push Subscription — Pub/Sub kirim ke HTTP endpoint
  await pubsub.projects.subscriptions.create(
    Subscription(
      name: subPath('webhook-notifier'),
      topic: topicPath('order-events'),
      ackDeadlineSeconds: 30,
      pushConfig: PushConfig(
        pushEndpoint: 'https://myapp.example.com/pubsub/webhook',
        oidcToken: OidcToken(
          serviceAccountEmail: '[email protected]',
        ),
      ),
    ),
    subPath('webhook-notifier'),
  );

  // List semua topic
  final topics = await pubsub.projects.topics.list('projects/$projectId');
  print('Topics: ${topics.topics?.map((t) => t.name).toList()}');
}

Publisher — Mengirim Pesan #

import 'package:googleapis/pubsub/v1.dart';
import 'dart:convert';

class GCPPublisher {
  final PubsubApi _pubsub;
  final String _topicPath;

  GCPPublisher({required PubsubApi pubsub, required String topicName})
      : _pubsub = pubsub,
        _topicPath = 'projects/$projectId/topics/$topicName';

  // Kirim satu pesan
  Future<String> kirim({
    required Map<String, dynamic> data,
    Map<String, String>? attributes,
    String? orderingKey,  // untuk pesan terurut per key
  }) async {
    final pesanBytes = base64Encode(utf8.encode(jsonEncode(data)));

    final response = await _pubsub.projects.topics.publish(
      PublishRequest(
        messages: [
          PubsubMessage(
            data: pesanBytes,           // data harus dalam format base64
            attributes: {
              'tipeEvent': data['tipe'] as String? ?? 'unknown',
              'versi': '1.0',
              'timestamp': DateTime.now().toUtc().toIso8601String(),
              ...?attributes,
            },
            orderingKey: orderingKey,   // null = tidak ada ordering guarantee
          ),
        ],
      ),
      _topicPath,
    );

    final messageId = response.messageIds?.first ?? '';
    print('Pesan terkirim: $messageId');
    return messageId;
  }

  // Batch publish — hingga 1.000 pesan atau 10MB per request
  Future<List<String>> kirimBatch(List<Map<String, dynamic>> pesanList) async {
    final messages = pesanList.map((data) => PubsubMessage(
      data: base64Encode(utf8.encode(jsonEncode(data))),
      attributes: {
        'tipeEvent': data['tipe'] as String? ?? 'unknown',
      },
    )).toList();

    final response = await _pubsub.projects.topics.publish(
      PublishRequest(messages: messages),
      _topicPath,
    );

    return response.messageIds ?? [];
  }
}

// Penggunaan
Future<void> main() async {
  final pubsub = await buatKlienADC();
  final publisher = GCPPublisher(pubsub: pubsub, topicName: 'order-events');

  await publisher.kirim(
    data: {
      'tipe': 'order_dibuat',
      'idOrder': 'ORD-001',
      'idPengguna': 'USR-123',
      'total': 150_000,
    },
    attributes: {'sumber': 'checkout-service'},
    orderingKey: 'USR-123',  // semua event user ini dikirim terurut
  );
}

Pull Subscriber — Menerima Pesan #

import 'package:googleapis/pubsub/v1.dart';
import 'dart:convert';

class GCPPullSubscriber {
  final PubsubApi _pubsub;
  final String _subscriptionPath;
  bool _jalan = true;

  GCPPullSubscriber({
    required PubsubApi pubsub,
    required String subscriptionName,
  })  : _pubsub = pubsub,
        _subscriptionPath = 'projects/$projectId/subscriptions/$subscriptionName';

  Future<void> mulai(
    Future<void> Function(Map<String, dynamic> data, Map<String, String> attributes) handler,
  ) async {
    print('Subscriber mulai pada: $_subscriptionPath');

    while (_jalan) {
      try {
        // Pull pesan — max 100 per request
        final response = await _pubsub.projects.subscriptions.pull(
          PullRequest(
            maxMessages: 100,
            returnImmediately: false,  // tunggu hingga ada pesan (blocking)
          ),
          _subscriptionPath,
        );

        final messages = response.receivedMessages ?? [];
        if (messages.isEmpty) continue;

        print('Menerima ${messages.length} pesan');

        final ackIds = <String>[];
        final nackIds = <String>[];

        await Future.wait(
          messages.map((received) async {
            final msg = received.message!;
            try {
              // Decode data dari base64
              final rawData = utf8.decode(base64Decode(msg.data!));
              final data = jsonDecode(rawData) as Map<String, dynamic>;
              final attributes = msg.attributes ?? {};

              print('Memproses: ${msg.messageId} (tipe: ${attributes['tipeEvent']})');
              await handler(data, Map<String, String>.from(attributes));

              ackIds.add(received.ackId!);  // tandai berhasil
            } catch (e) {
              print('Gagal memproses ${msg.messageId}: $e');
              nackIds.add(received.ackId!);  // tandai gagal untuk retry
            }
          }),
        );

        // Acknowledge semua yang berhasil
        if (ackIds.isNotEmpty) {
          await _pubsub.projects.subscriptions.acknowledge(
            AcknowledgeRequest(ackIds: ackIds),
            _subscriptionPath,
          );
        }

        // Modify ack deadline untuk yang gagal — perpendek agar lebih cepat retry
        if (nackIds.isNotEmpty) {
          await _pubsub.projects.subscriptions.modifyAckDeadline(
            ModifyAckDeadlineRequest(
              ackIds: nackIds,
              ackDeadlineSeconds: 0,  // 0 = segera tersedia kembali untuk retry
            ),
            _subscriptionPath,
          );
        }

      } catch (e) {
        print('Error saat pull: $e');
        await Future.delayed(Duration(seconds: 5));
      }
    }
  }

  void hentikan() => _jalan = false;
}

// Penggunaan
Future<void> main() async {
  final pubsub = await buatKlienADC();
  final subscriber = GCPPullSubscriber(
    pubsub: pubsub,
    subscriptionName: 'order-processor',
  );

  await subscriber.mulai((data, attributes) async {
    print('Event: ${attributes['tipeEvent']}');
    print('Data: $data');
    await prosesEventOrder(data);
  });
}

Push Subscription — Terima via Webhook #

Untuk push subscription, Pub/Sub mengirim HTTP POST ke endpoint yang kamu definisikan. Kamu perlu membuat HTTP server yang menerima push:

import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart' as shelf_io;
import 'dart:convert';

// Handler webhook yang menerima push dari Pub/Sub
Handler buatWebhookHandler() {
  return (Request request) async {
    if (request.method != 'POST') {
      return Response.methodNotAllowed();
    }

    try {
      final body = await request.readAsString();
      final envelope = jsonDecode(body) as Map<String, dynamic>;

      // Pub/Sub push envelope memiliki struktur:
      // {
      //   "message": {
      //     "data": "<base64-encoded-data>",
      //     "attributes": {...},
      //     "messageId": "...",
      //     "publishTime": "..."
      //   },
      //   "subscription": "projects/.../subscriptions/..."
      // }
      final message = envelope['message'] as Map<String, dynamic>;
      final rawData = utf8.decode(base64Decode(message['data'] as String));
      final data = jsonDecode(rawData) as Map<String, dynamic>;
      final attributes = (message['attributes'] as Map<String, dynamic>?)
          ?.map((k, v) => MapEntry(k, v as String)) ??
          {};

      print('Push diterima: ${message['messageId']}');
      print('Subscription: ${envelope['subscription']}');

      await prosesEventOrder(data);

      // Kembalikan 2xx agar Pub/Sub tahu pesan berhasil (auto-acknowledge)
      return Response.ok('OK');

    } catch (e) {
      print('Error memproses push: $e');
      // Kembalikan non-2xx agar Pub/Sub kirim ulang (retry)
      return Response.internalServerError(body: e.toString());
    }
  };
}

// Jalankan server webhook
Future<void> main() async {
  final handler = Pipeline()
      .addMiddleware(logRequests())
      .addHandler(buatWebhookHandler());

  final server = await shelf_io.serve(handler, '0.0.0.0', 8080);
  print('Webhook server: http://${server.address.host}:${server.port}');
}

Filter Subscription #

Pub/Sub mendukung filter berbasis attributes — subscriber hanya menerima pesan yang memenuhi kondisi:

// Subscription dengan filter — hanya terima pesan tertentu
await pubsub.projects.subscriptions.create(
  Subscription(
    name: subPath('payment-processor'),
    topic: topicPath('order-events'),
    ackDeadlineSeconds: 60,
    // Filter menggunakan Pub/Sub filter language
    filter: 'attributes.tipeEvent = "pembayaran_dikonfirmasi"',
  ),
  subPath('payment-processor'),
);

// Filter lebih kompleks
await pubsub.projects.subscriptions.create(
  Subscription(
    name: subPath('premium-order-processor'),
    topic: topicPath('order-events'),
    filter: 'attributes.tipeEvent = "order_dibuat" AND attributes.levelPengguna = "premium"',
  ),
  subPath('premium-order-processor'),
);

// Filter untuk exclude tipe tertentu
await pubsub.projects.subscriptions.create(
  Subscription(
    name: subPath('non-payment-processor'),
    topic: topicPath('order-events'),
    filter: 'NOT attributes.tipeEvent = "pembayaran_dikonfirmasi"',
  ),
  subPath('non-payment-processor'),
);

Ordering Key — Jaminan Urutan #

// Untuk subscription dengan ordering, enable di subscription
await pubsub.projects.subscriptions.create(
  Subscription(
    name: subPath('ordered-processor'),
    topic: topicPath('order-events'),
    enableMessageOrdering: true,  // wajib untuk menerima ordering key
  ),
  subPath('ordered-processor'),
);

// Publisher juga harus mengirim dengan ordering key
// Semua pesan dengan ordering key yang sama dijamin terurut
await publisher.kirim(
  data: {'tipe': 'order_dibuat', 'idOrder': 'ORD-100'},
  orderingKey: 'customer-C001',  // semua event customer ini terurut
);

await publisher.kirim(
  data: {'tipe': 'pembayaran_dikonfirmasi', 'idOrder': 'ORD-100'},
  orderingKey: 'customer-C001',  // diterima setelah order_dibuat untuk customer ini
);

Perbandingan Empat Message Broker #

Aspek Kafka RabbitMQ AWS SQS Google Pub/Sub
Model Log terdistribusi AMQP routing Managed queue Managed pub/sub
Retain pesan ✓ (hari/bulan) ✗ setelah ACK ✓ max 14 hari ✓ max 7 hari
Replay ✓ offset ✓ seek to time
Ordering Per partition Per queue FIFO queue Per ordering key
Push delivery ✓ HTTP push
Filter ✗ (client-side) routing key ✓ attribute filter
Setup Kompleks Sedang Minimal Minimal
Cloud native Confluent Cloud CloudAMQP AWS GCP
Throughput Sangat tinggi Tinggi Tinggi Sangat tinggi

Ringkasan #

  • Application Default Credentials (ADC) adalah cara terbaik autentikasi — bekerja otomatis di GCE, Cloud Run, GKE, dan Cloud Functions. Lokal gunakan gcloud auth application-default login.
  • Data harus dalam format base64 saat publish dan di-decode saat receive — base64Encode(utf8.encode(jsonEncode(data))) untuk publish, utf8.decode(base64Decode(msg.data!)) untuk receive.
  • Dua model delivery: pull (subscriber aktif menarik, cocok untuk batch/background processing) dan push (Pub/Sub mengirim ke HTTP endpoint, cocok untuk serverless/webhook).
  • Filter subscription — setiap subscription bisa memiliki filter attributes sehingga hanya menerima subset pesan dari topic yang sama. Lebih efisien dari filter di sisi consumer.
  • Dead Letter Topic dikonfigurasi di level subscription (maxDeliveryAttempts) — setelah N kali gagal, pesan otomatis diteruskan ke topic DLQ.
  • Ordering key menjamin pesan dengan key yang sama dikirim dan diterima secara berurutan — aktifkan enableMessageOrdering di subscription dan kirim dengan orderingKey yang sama.
  • Acknowledge menggunakan ackId yang diterima saat pull — bukan message ID. Untuk “nack”, set ackDeadlineSeconds: 0 via modifyAckDeadline agar pesan segera bisa di-pull kembali.
  • Batch publish hingga 1.000 pesan atau 10MB per request — sangat efisien untuk throughput tinggi.
  • Pub/Sub emulator tersedia untuk development lokal — set PUBSUB_EMULATOR_HOST=localhost:8085 dan gunakan ADC yang secara otomatis redirect ke emulator.
  • Push webhook harus mengembalikan HTTP 2xx untuk acknowledge — response selain 2xx menyebabkan Pub/Sub retry dengan exponential backoff.

← Sebelumnya: Amazon SQS   Berikutnya: Redis →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact