Google Pub/Sub #
Google Cloud Pub/Sub adalah layanan messaging terdistribusi yang dikelola sepenuhnya oleh Google Cloud — dirancang untuk skala global dengan latensi sangat rendah dan throughput hampir tak terbatas. Pub/Sub mendukung dua model pengiriman: pull (subscriber aktif menarik pesan) dan push (Google mendorong pesan ke endpoint HTTP subscriber). Ini menjadikannya sangat fleksibel untuk berbagai arsitektur — dari microservices hingga streaming data analytics yang terintegrasi langsung dengan BigQuery, Dataflow, dan layanan Google Cloud lainnya. Package googleapis menyediakan client Dart resmi untuk semua layanan Google Cloud termasuk Pub/Sub.
Konsep Inti Google Pub/Sub #
flowchart LR
P["Publisher\n(Dart App)"] -->|publish message| T["Topic\nprojects/my-project/topics/order-events"]
T -->|fan-out| S1["Subscription A\n(pull)\n→ order-processor"]
T -->|fan-out| S2["Subscription B\n(pull)\n→ inventory-service"]
T -->|push| S3["Subscription C\n(push)\n→ https://webhook.example.com"]
S1 -->|pull| C1["Consumer\n(Dart App)"]
S2 -->|pull| C2["Consumer\n(Dart App)"]
S3 -->|HTTP POST| W["Webhook\nEndpoint"]
| Konsep | Deskripsi |
|---|---|
| Topic | Saluran pengiriman pesan — publisher mengirim ke topic |
| Subscription | Pelanggan topic — setiap subscription mendapat salinan pesan |
| Message | Data (bytes) dengan optional attributes key-value |
| Acknowledge | Konfirmasi bahwa pesan sudah diproses |
| Ack Deadline | Batas waktu untuk acknowledge sebelum pesan dikirim ulang |
| Dead Letter Topic | Topic untuk pesan yang gagal diproses berkali-kali |
| Ordering Key | Kunci untuk menjamin urutan pesan per key |
| Filter | Kondisi untuk menyaring pesan berdasarkan attributes |
Setup Package #
dart pub add googleapis googleapis_auth http
# pubspec.yaml
dependencies:
googleapis: ^12.0.0
googleapis_auth: ^1.5.0
http: ^1.2.0
Autentikasi Google Cloud #
Google Cloud menggunakan OAuth2 dengan Service Account untuk autentikasi server-to-server:
import 'package:googleapis_auth/auth_io.dart';
import 'package:googleapis/pubsub/v1.dart';
import 'dart:io';
import 'dart:convert';
// Cara 1: Service Account JSON Key File
Future<PubsubApi> buatKlienDariFile(String pathKeyFile) async {
final keyJson = jsonDecode(await File(pathKeyFile).readAsString())
as Map<String, dynamic>;
final credentials = ServiceAccountCredentials.fromJson(keyJson);
final scopes = [PubsubApi.cloudPlatformScope];
final httpClient = await clientViaServiceAccount(credentials, scopes);
return PubsubApi(httpClient);
}
// Cara 2: Application Default Credentials (ADC)
// Bekerja secara otomatis di GCE, Cloud Run, GKE, Cloud Functions
// Lokal: jalankan `gcloud auth application-default login`
Future<PubsubApi> buatKlienADC() async {
final scopes = [PubsubApi.cloudPlatformScope];
final httpClient = await clientViaApplicationDefaultCredentials(
scopes: scopes,
);
return PubsubApi(httpClient);
}
// Cara 3: Emulator untuk development lokal
// Jalankan: gcloud beta emulators pubsub start
Future<PubsubApi> buatKlienEmulator() async {
// Set environment variable: PUBSUB_EMULATOR_HOST=localhost:8085
final emulatorHost = Platform.environment['PUBSUB_EMULATOR_HOST'];
if (emulatorHost != null) {
// Emulator tidak butuh autentikasi
print('Menggunakan Pub/Sub emulator: $emulatorHost');
}
return buatKlienADC(); // ADC akan otomatis gunakan emulator jika env var di-set
}
Manajemen Topic dan Subscription #
import 'package:googleapis/pubsub/v1.dart';
const projectId = 'my-gcp-project';
String topicPath(String nama) => 'projects/$projectId/topics/$nama';
String subPath(String nama) => 'projects/$projectId/subscriptions/$nama';
Future<void> setupInfrastruktur(PubsubApi pubsub) async {
// Buat topic
try {
await pubsub.projects.topics.create(
Topic(
name: topicPath('order-events'),
messageRetentionDuration: '604800s', // simpan 7 hari (untuk replay)
labels: {'environment': 'production', 'team': 'platform'},
),
topicPath('order-events'),
);
print('Topic berhasil dibuat');
} on DetailedApiRequestError catch (e) {
if (e.status == 409) {
print('Topic sudah ada, lewati');
} else {
rethrow;
}
}
// Buat Dead Letter Topic terlebih dahulu
try {
await pubsub.projects.topics.create(
Topic(name: topicPath('order-events-dlq')),
topicPath('order-events-dlq'),
);
} on DetailedApiRequestError catch (e) {
if (e.status != 409) rethrow;
}
// Buat Pull Subscription
await pubsub.projects.subscriptions.create(
Subscription(
name: subPath('order-processor'),
topic: topicPath('order-events'),
ackDeadlineSeconds: 60, // 60 detik untuk acknowledge
enableMessageOrdering: false, // true jika butuh ordering per key
deadLetterPolicy: DeadLetterPolicy(
deadLetterTopic: topicPath('order-events-dlq'),
maxDeliveryAttempts: 5, // coba 5 kali sebelum ke DLQ
),
retryPolicy: RetryPolicy(
minimumBackoff: '10s',
maximumBackoff: '300s', // exponential backoff hingga 5 menit
),
messageRetentionDuration: '604800s',
expirationPolicy: ExpirationPolicy(ttl: '2592000s'), // 30 hari
),
subPath('order-processor'),
);
// Buat Push Subscription — Pub/Sub kirim ke HTTP endpoint
await pubsub.projects.subscriptions.create(
Subscription(
name: subPath('webhook-notifier'),
topic: topicPath('order-events'),
ackDeadlineSeconds: 30,
pushConfig: PushConfig(
pushEndpoint: 'https://myapp.example.com/pubsub/webhook',
oidcToken: OidcToken(
serviceAccountEmail: '[email protected]',
),
),
),
subPath('webhook-notifier'),
);
// List semua topic
final topics = await pubsub.projects.topics.list('projects/$projectId');
print('Topics: ${topics.topics?.map((t) => t.name).toList()}');
}
Publisher — Mengirim Pesan #
import 'package:googleapis/pubsub/v1.dart';
import 'dart:convert';
class GCPPublisher {
final PubsubApi _pubsub;
final String _topicPath;
GCPPublisher({required PubsubApi pubsub, required String topicName})
: _pubsub = pubsub,
_topicPath = 'projects/$projectId/topics/$topicName';
// Kirim satu pesan
Future<String> kirim({
required Map<String, dynamic> data,
Map<String, String>? attributes,
String? orderingKey, // untuk pesan terurut per key
}) async {
final pesanBytes = base64Encode(utf8.encode(jsonEncode(data)));
final response = await _pubsub.projects.topics.publish(
PublishRequest(
messages: [
PubsubMessage(
data: pesanBytes, // data harus dalam format base64
attributes: {
'tipeEvent': data['tipe'] as String? ?? 'unknown',
'versi': '1.0',
'timestamp': DateTime.now().toUtc().toIso8601String(),
...?attributes,
},
orderingKey: orderingKey, // null = tidak ada ordering guarantee
),
],
),
_topicPath,
);
final messageId = response.messageIds?.first ?? '';
print('Pesan terkirim: $messageId');
return messageId;
}
// Batch publish — hingga 1.000 pesan atau 10MB per request
Future<List<String>> kirimBatch(List<Map<String, dynamic>> pesanList) async {
final messages = pesanList.map((data) => PubsubMessage(
data: base64Encode(utf8.encode(jsonEncode(data))),
attributes: {
'tipeEvent': data['tipe'] as String? ?? 'unknown',
},
)).toList();
final response = await _pubsub.projects.topics.publish(
PublishRequest(messages: messages),
_topicPath,
);
return response.messageIds ?? [];
}
}
// Penggunaan
Future<void> main() async {
final pubsub = await buatKlienADC();
final publisher = GCPPublisher(pubsub: pubsub, topicName: 'order-events');
await publisher.kirim(
data: {
'tipe': 'order_dibuat',
'idOrder': 'ORD-001',
'idPengguna': 'USR-123',
'total': 150_000,
},
attributes: {'sumber': 'checkout-service'},
orderingKey: 'USR-123', // semua event user ini dikirim terurut
);
}
Pull Subscriber — Menerima Pesan #
import 'package:googleapis/pubsub/v1.dart';
import 'dart:convert';
class GCPPullSubscriber {
final PubsubApi _pubsub;
final String _subscriptionPath;
bool _jalan = true;
GCPPullSubscriber({
required PubsubApi pubsub,
required String subscriptionName,
}) : _pubsub = pubsub,
_subscriptionPath = 'projects/$projectId/subscriptions/$subscriptionName';
Future<void> mulai(
Future<void> Function(Map<String, dynamic> data, Map<String, String> attributes) handler,
) async {
print('Subscriber mulai pada: $_subscriptionPath');
while (_jalan) {
try {
// Pull pesan — max 100 per request
final response = await _pubsub.projects.subscriptions.pull(
PullRequest(
maxMessages: 100,
returnImmediately: false, // tunggu hingga ada pesan (blocking)
),
_subscriptionPath,
);
final messages = response.receivedMessages ?? [];
if (messages.isEmpty) continue;
print('Menerima ${messages.length} pesan');
final ackIds = <String>[];
final nackIds = <String>[];
await Future.wait(
messages.map((received) async {
final msg = received.message!;
try {
// Decode data dari base64
final rawData = utf8.decode(base64Decode(msg.data!));
final data = jsonDecode(rawData) as Map<String, dynamic>;
final attributes = msg.attributes ?? {};
print('Memproses: ${msg.messageId} (tipe: ${attributes['tipeEvent']})');
await handler(data, Map<String, String>.from(attributes));
ackIds.add(received.ackId!); // tandai berhasil
} catch (e) {
print('Gagal memproses ${msg.messageId}: $e');
nackIds.add(received.ackId!); // tandai gagal untuk retry
}
}),
);
// Acknowledge semua yang berhasil
if (ackIds.isNotEmpty) {
await _pubsub.projects.subscriptions.acknowledge(
AcknowledgeRequest(ackIds: ackIds),
_subscriptionPath,
);
}
// Modify ack deadline untuk yang gagal — perpendek agar lebih cepat retry
if (nackIds.isNotEmpty) {
await _pubsub.projects.subscriptions.modifyAckDeadline(
ModifyAckDeadlineRequest(
ackIds: nackIds,
ackDeadlineSeconds: 0, // 0 = segera tersedia kembali untuk retry
),
_subscriptionPath,
);
}
} catch (e) {
print('Error saat pull: $e');
await Future.delayed(Duration(seconds: 5));
}
}
}
void hentikan() => _jalan = false;
}
// Penggunaan
Future<void> main() async {
final pubsub = await buatKlienADC();
final subscriber = GCPPullSubscriber(
pubsub: pubsub,
subscriptionName: 'order-processor',
);
await subscriber.mulai((data, attributes) async {
print('Event: ${attributes['tipeEvent']}');
print('Data: $data');
await prosesEventOrder(data);
});
}
Push Subscription — Terima via Webhook #
Untuk push subscription, Pub/Sub mengirim HTTP POST ke endpoint yang kamu definisikan. Kamu perlu membuat HTTP server yang menerima push:
import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart' as shelf_io;
import 'dart:convert';
// Handler webhook yang menerima push dari Pub/Sub
Handler buatWebhookHandler() {
return (Request request) async {
if (request.method != 'POST') {
return Response.methodNotAllowed();
}
try {
final body = await request.readAsString();
final envelope = jsonDecode(body) as Map<String, dynamic>;
// Pub/Sub push envelope memiliki struktur:
// {
// "message": {
// "data": "<base64-encoded-data>",
// "attributes": {...},
// "messageId": "...",
// "publishTime": "..."
// },
// "subscription": "projects/.../subscriptions/..."
// }
final message = envelope['message'] as Map<String, dynamic>;
final rawData = utf8.decode(base64Decode(message['data'] as String));
final data = jsonDecode(rawData) as Map<String, dynamic>;
final attributes = (message['attributes'] as Map<String, dynamic>?)
?.map((k, v) => MapEntry(k, v as String)) ??
{};
print('Push diterima: ${message['messageId']}');
print('Subscription: ${envelope['subscription']}');
await prosesEventOrder(data);
// Kembalikan 2xx agar Pub/Sub tahu pesan berhasil (auto-acknowledge)
return Response.ok('OK');
} catch (e) {
print('Error memproses push: $e');
// Kembalikan non-2xx agar Pub/Sub kirim ulang (retry)
return Response.internalServerError(body: e.toString());
}
};
}
// Jalankan server webhook
Future<void> main() async {
final handler = Pipeline()
.addMiddleware(logRequests())
.addHandler(buatWebhookHandler());
final server = await shelf_io.serve(handler, '0.0.0.0', 8080);
print('Webhook server: http://${server.address.host}:${server.port}');
}
Filter Subscription #
Pub/Sub mendukung filter berbasis attributes — subscriber hanya menerima pesan yang memenuhi kondisi:
// Subscription dengan filter — hanya terima pesan tertentu
await pubsub.projects.subscriptions.create(
Subscription(
name: subPath('payment-processor'),
topic: topicPath('order-events'),
ackDeadlineSeconds: 60,
// Filter menggunakan Pub/Sub filter language
filter: 'attributes.tipeEvent = "pembayaran_dikonfirmasi"',
),
subPath('payment-processor'),
);
// Filter lebih kompleks
await pubsub.projects.subscriptions.create(
Subscription(
name: subPath('premium-order-processor'),
topic: topicPath('order-events'),
filter: 'attributes.tipeEvent = "order_dibuat" AND attributes.levelPengguna = "premium"',
),
subPath('premium-order-processor'),
);
// Filter untuk exclude tipe tertentu
await pubsub.projects.subscriptions.create(
Subscription(
name: subPath('non-payment-processor'),
topic: topicPath('order-events'),
filter: 'NOT attributes.tipeEvent = "pembayaran_dikonfirmasi"',
),
subPath('non-payment-processor'),
);
Ordering Key — Jaminan Urutan #
// Untuk subscription dengan ordering, enable di subscription
await pubsub.projects.subscriptions.create(
Subscription(
name: subPath('ordered-processor'),
topic: topicPath('order-events'),
enableMessageOrdering: true, // wajib untuk menerima ordering key
),
subPath('ordered-processor'),
);
// Publisher juga harus mengirim dengan ordering key
// Semua pesan dengan ordering key yang sama dijamin terurut
await publisher.kirim(
data: {'tipe': 'order_dibuat', 'idOrder': 'ORD-100'},
orderingKey: 'customer-C001', // semua event customer ini terurut
);
await publisher.kirim(
data: {'tipe': 'pembayaran_dikonfirmasi', 'idOrder': 'ORD-100'},
orderingKey: 'customer-C001', // diterima setelah order_dibuat untuk customer ini
);
Perbandingan Empat Message Broker #
| Aspek | Kafka | RabbitMQ | AWS SQS | Google Pub/Sub |
|---|---|---|---|---|
| Model | Log terdistribusi | AMQP routing | Managed queue | Managed pub/sub |
| Retain pesan | ✓ (hari/bulan) | ✗ setelah ACK | ✓ max 14 hari | ✓ max 7 hari |
| Replay | ✓ offset | ✗ | ✗ | ✓ seek to time |
| Ordering | Per partition | Per queue | FIFO queue | Per ordering key |
| Push delivery | ✗ | ✗ | ✗ | ✓ HTTP push |
| Filter | ✗ (client-side) | routing key | ✗ | ✓ attribute filter |
| Setup | Kompleks | Sedang | Minimal | Minimal |
| Cloud native | Confluent Cloud | CloudAMQP | AWS | GCP |
| Throughput | Sangat tinggi | Tinggi | Tinggi | Sangat tinggi |
Ringkasan #
- Application Default Credentials (ADC) adalah cara terbaik autentikasi — bekerja otomatis di GCE, Cloud Run, GKE, dan Cloud Functions. Lokal gunakan
gcloud auth application-default login.- Data harus dalam format base64 saat publish dan di-decode saat receive —
base64Encode(utf8.encode(jsonEncode(data)))untuk publish,utf8.decode(base64Decode(msg.data!))untuk receive.- Dua model delivery: pull (subscriber aktif menarik, cocok untuk batch/background processing) dan push (Pub/Sub mengirim ke HTTP endpoint, cocok untuk serverless/webhook).
- Filter subscription — setiap subscription bisa memiliki filter attributes sehingga hanya menerima subset pesan dari topic yang sama. Lebih efisien dari filter di sisi consumer.
- Dead Letter Topic dikonfigurasi di level subscription (
maxDeliveryAttempts) — setelah N kali gagal, pesan otomatis diteruskan ke topic DLQ.- Ordering key menjamin pesan dengan key yang sama dikirim dan diterima secara berurutan — aktifkan
enableMessageOrderingdi subscription dan kirim denganorderingKeyyang sama.- Acknowledge menggunakan ackId yang diterima saat pull — bukan message ID. Untuk “nack”, set
ackDeadlineSeconds: 0viamodifyAckDeadlineagar pesan segera bisa di-pull kembali.- Batch publish hingga 1.000 pesan atau 10MB per request — sangat efisien untuk throughput tinggi.
- Pub/Sub emulator tersedia untuk development lokal — set
PUBSUB_EMULATOR_HOST=localhost:8085dan gunakan ADC yang secara otomatis redirect ke emulator.- Push webhook harus mengembalikan HTTP 2xx untuk acknowledge — response selain 2xx menyebabkan Pub/Sub retry dengan exponential backoff.