Async

Async #

dart:async adalah library bawaan Dart yang menjadi fondasi seluruh model concurrency — tanpanya tidak ada Future, Stream, async/await, atau StreamController. Meski keyword async dan await terasa seperti fitur bahasa, di baliknya mereka bekerja sepenuhnya melalui dart:async. Memahami library ini secara mendalam — terutama Stream dan cara mengontrolnya — adalah perbedaan antara developer yang sekadar menggunakan async dan developer yang benar-benar memahami event loop Dart.

Gambaran dart:async #

flowchart LR
    DA["dart:async"] --> FUT["Future\nNilai tunggal di masa depan"]
    DA --> STR["Stream\nUrutan nilai di masa depan"]
    DA --> SC["StreamController\nBuat dan kontrol stream"]
    DA --> COMP["Completer\nBuat Future secara manual"]
    DA --> ZONE["Zone\nKonteks eksekusi async"]
    DA --> TIMER["Timer\nJadwalkan eksekusi"]

Future — Nilai Tunggal di Masa Depan #

Future merepresentasikan nilai yang mungkin belum tersedia — hasil operasi asinkron seperti HTTP request, baca file, atau query database:

import 'dart:async';

// Future yang sudah selesai dengan nilai
Future<int> futureLangsung = Future.value(42);
Future<String> futureError = Future.error(Exception('Gagal'));

// Future dengan delay
Future<String> futureDelay = Future.delayed(
  Duration(seconds: 2),
  () => 'Selesai setelah 2 detik',
);

// Membuat Future dari komputasi sync
Future<int> futureSync = Future(() => 1 + 1);

// async/await — cara paling idiomatis
Future<String> ambilData() async {
  final result = await Future.delayed(Duration(seconds: 1), () => 'data');
  return result.toUpperCase();
}

Future.wait — Paralel #

// Jalankan beberapa Future secara paralel — tunggu semua selesai
Future<void> contohParalel() async {
  final mulai = DateTime.now();

  // Sequential — total ~3 detik
  final a = await Future.delayed(Duration(seconds: 1), () => 'A');
  final b = await Future.delayed(Duration(seconds: 1), () => 'B');
  final c = await Future.delayed(Duration(seconds: 1), () => 'C');

  // Paralel — total ~1 detik
  final hasil = await Future.wait([
    Future.delayed(Duration(seconds: 1), () => 'A'),
    Future.delayed(Duration(seconds: 1), () => 'B'),
    Future.delayed(Duration(seconds: 1), () => 'C'),
  ]);
  print(hasil); // ['A', 'B', 'C']

  final durasi = DateTime.now().difference(mulai);
  print('Selesai dalam ${durasi.inSeconds} detik');
}

// Future.wait dengan error handling
Future<void> waitDenganError() async {
  try {
    final hasil = await Future.wait([
      Future.value(1),
      Future.error(Exception('Gagal')),
      Future.value(3),
    ]);
  } catch (e) {
    print('Salah satu gagal: $e');
    // Future.wait gagal segera saat salah satu Future error
  }

  // eagerError: false — tunggu semua selesai meski ada yang error
  final hasil = await Future.wait(
    [Future.value(1), Future.error('error'), Future.value(3)],
    eagerError: false,
  );
}

Method Future yang Penting #

Future<int> nilai = Future.value(42);

// then — transform nilai setelah selesai
nilai.then((n) => print('Nilai: $n'));

// then chaining — setiap then mengembalikan Future baru
Future<String> transformed = nilai
    .then((n) => n * 2)       // 84
    .then((n) => 'Hasil: $n'); // 'Hasil: 84'

// catchError — tangkap error
nilai
    .then((n) => throw Exception('Gagal'))
    .catchError((e) => print('Error: $e'));

// whenComplete — selalu dieksekusi, seperti finally
nilai.whenComplete(() => print('Selesai (selalu)'));

// timeout — lempar TimeoutException jika tidak selesai tepat waktu
final result = await nilai.timeout(
  Duration(seconds: 5),
  onTimeout: () => -1, // nilai fallback
);

// Future.any — nilai dari Future pertama yang selesai
final pertama = await Future.any([
  Future.delayed(Duration(seconds: 3), () => 'lambat'),
  Future.delayed(Duration(seconds: 1), () => 'cepat'),
]);
print(pertama); // 'cepat'

Stream — Urutan Nilai di Masa Depan #

Stream adalah sequence nilai asinkron — seperti Future tapi bisa mengeluarkan banyak nilai seiring waktu:

import 'dart:async';

// Stream sederhana dari iterable
Stream<int> angka = Stream.fromIterable([1, 2, 3, 4, 5]);

// Stream dengan delay antar nilai
Stream<int> hitungMundur = Stream.periodic(
  Duration(seconds: 1),
  (i) => 5 - i,
).take(5); // 5, 4, 3, 2, 1

// async* generator — cara paling fleksibel membuat stream
Stream<int> fibonacci() async* {
  int a = 0, b = 1;
  while (true) {
    yield a;
    final temp = a + b;
    a = b;
    b = temp;
  }
}

// Baca stream
await for (final n in fibonacci().take(10)) {
  print(n); // 0, 1, 1, 2, 3, 5, 8, 13, 21, 34
}

Tipe Stream: Single-Subscription vs Broadcast #

// Single-subscription stream (default)
// Hanya bisa didengarkan SATU listener pada satu waktu
final single = Stream.fromIterable([1, 2, 3]);
single.listen(print); // ✓
// single.listen(print); // ✗ StateError: Stream has already been listened to

// Broadcast stream — bisa didengarkan banyak listener sekaligus
final broadcast = Stream.fromIterable([1, 2, 3]).asBroadcastStream();
broadcast.listen((n) => print('Listener 1: $n'));
broadcast.listen((n) => print('Listener 2: $n'));
// Keduanya menerima semua nilai

// Cek tipe stream
print(single.isBroadcast);    // false
print(broadcast.isBroadcast); // true

Mendengarkan Stream #

Stream<int> stream = Stream.periodic(Duration(milliseconds: 500), (i) => i).take(5);

// Cara 1: await for (paling idiomatis)
await for (final nilai in stream) {
  print(nilai);
}

// Cara 2: listen (lebih kontrol)
final subscription = stream.listen(
  (nilai) => print('Data: $nilai'),        // onData
  onError: (e) => print('Error: $e'),       // onError
  onDone: () => print('Stream selesai'),    // onDone
  cancelOnError: false,                     // lanjutkan meski ada error
);

// Kontrol subscription
await subscription.pause(Future.delayed(Duration(seconds: 1))); // pause 1 detik
subscription.resume();
await subscription.cancel(); // hentikan listening

Transformasi Stream #

Stream mendukung semua method fungsional yang mirip dengan Iterable, tapi bekerja secara asinkron:

Stream<int> angka = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

// map — transformasi setiap nilai
Stream<String> label = angka.map((n) => 'item-$n');

// where — filter nilai
Stream<int> genap = angka.where((n) => n.isEven); // 2, 4, 6, 8, 10

// take dan skip
Stream<int> tiga = angka.take(3);      // 1, 2, 3
Stream<int> sisanya = angka.skip(7);   // 8, 9, 10

// takeWhile dan skipWhile
Stream<int> kurangDari5 = angka.takeWhile((n) => n < 5); // 1, 2, 3, 4
Stream<int> setelah5 = angka.skipWhile((n) => n <= 5);   // 6, 7, 8, 9, 10

// expand — satu nilai menjadi banyak nilai
Stream<int> diperluas = angka.expand((n) => [n, n * 10]);
// 1, 10, 2, 20, 3, 30, ...

// asyncMap — map dengan operasi async
Stream<String> dariDB = angka.asyncMap(
  (id) async => await ambilNamaDariDatabase(id),
);

// asyncExpand — expand dengan operasi async
Stream<String> diperluasAsync = angka.asyncExpand(
  (id) => ambilTagDariDatabase(id), // mengembalikan Stream
);

// distinct — hapus nilai duplikat berurutan
Stream<int> unik = Stream.fromIterable([1, 1, 2, 2, 3, 1, 1]).distinct();
// 1, 2, 3, 1

// handleError — tangkap error di dalam pipeline
Stream<int> aman = angka
    .map((n) { if (n == 5) throw Exception('Lima!'); return n; })
    .handleError(
      (e) => print('Error ditangkap: $e'),
      test: (e) => e is Exception,
    );

// fold — agregasi semua nilai menjadi satu
int jumlah = await angka.fold(0, (acc, n) => acc + n); // 55

// reduce — fold tanpa nilai awal
int maks = await angka.reduce((a, b) => a > b ? a : b); // 10

// toList — kumpulkan semua nilai ke List
List<int> semuaNilai = await angka.toList();

// first, last, single
int pertama = await angka.first;
int terakhir = await angka.last;
bool ada = await angka.any((n) => n > 5);     // true
bool semua = await angka.every((n) => n > 0);  // true
int jumlahEl = await angka.length;             // 10

StreamController — Buat dan Kontrol Stream #

StreamController memungkinkan kamu membuat stream sendiri dan menambahkan nilai ke dalamnya secara manual:

import 'dart:async';

// StreamController dasar
final controller = StreamController<int>();

// Stream yang bisa didengarkan
final stream = controller.stream;

// Tambahkan nilai
controller.sink.add(1);
controller.sink.add(2);
controller.sink.add(3);

// Tambahkan error
controller.sink.addError(Exception('Terjadi kesalahan'));

// Tutup stream (trigger onDone)
await controller.sink.close();

// Selalu cek apakah ada listener sebelum add
if (!controller.isClosed) {
  controller.add(nilai);
}

Broadcast StreamController #

// Untuk stream yang perlu banyak listener
final broadcastController = StreamController<String>.broadcast();

// Bisa subscribe berulang kali
broadcastController.stream.listen((s) => print('A: $s'));
broadcastController.stream.listen((s) => print('B: $s'));

broadcastController.add('pesan 1'); // diterima A dan B
broadcastController.add('pesan 2'); // diterima A dan B

// onListen dan onCancel — callback saat ada listener yang bergabung/keluar
final terkontrol = StreamController<int>.broadcast(
  onListen: () => print('Ada listener baru!'),
  onCancel: () => print('Listener terakhir pergi'),
);

Pola Repository dengan Stream #

// Pattern umum: StreamController sebagai state management sederhana
class ProdukRepository {
  final _controller = StreamController<List<Produk>>.broadcast();
  final List<Produk> _cache = [];

  Stream<List<Produk>> get produkStream => _controller.stream;

  Future<void> muat() async {
    final data = await _api.ambilProduk();
    _cache
      ..clear()
      ..addAll(data);
    _controller.add(List.unmodifiable(_cache));
  }

  Future<void> tambah(Produk produk) async {
    await _api.buatProduk(produk);
    _cache.add(produk);
    _controller.add(List.unmodifiable(_cache));
  }

  void dispose() {
    _controller.close(); // wajib! mencegah memory leak
  }
}

// Konsumsi stream di UI
final repo = ProdukRepository();
repo.produkStream.listen((produk) {
  print('Update: ${produk.length} produk');
});

await repo.muat();

StreamTransformer — Transformasi Kustom #

Untuk transformasi yang tidak bisa diekspresikan dengan method bawaan Stream:

import 'dart:async';

// Buat transformer kustom
StreamTransformer<int, String> intKeString = StreamTransformer.fromHandlers(
  handleData: (data, sink) {
    sink.add('Nilai: $data');
  },
  handleError: (error, stackTrace, sink) {
    sink.addError('Error: $error', stackTrace);
  },
  handleDone: (sink) {
    sink.add('Selesai');
    sink.close();
  },
);

// Gunakan transformer
final transformed = Stream.fromIterable([1, 2, 3]).transform(intKeString);
await for (final s in transformed) {
  print(s); // 'Nilai: 1', 'Nilai: 2', 'Nilai: 3', 'Selesai'
}

// Transformer debounce — hanya emit nilai terakhir setelah jeda
StreamTransformer<T, T> debounce<T>(Duration duration) {
  Timer? timer;
  return StreamTransformer.fromHandlers(
    handleData: (data, sink) {
      timer?.cancel();
      timer = Timer(duration, () => sink.add(data));
    },
    handleDone: (sink) {
      timer?.cancel();
      sink.close();
    },
  );
}

// Debounce pencarian — hanya cari setelah user berhenti mengetik 300ms
final searchStream = searchController.stream
    .transform(debounce(Duration(milliseconds: 300)));

Completer — Buat Future Secara Manual #

Completer memungkinkan membuat Future yang bisa diselesaikan dari luar:

import 'dart:async';

// Contoh sederhana
final completer = Completer<String>();

// Di suatu tempat: selesaikan future
Future.delayed(Duration(seconds: 2), () {
  completer.complete('Selesai!');
  // atau: completer.completeError(Exception('Gagal'));
});

// Di tempat lain: tunggu future
final hasil = await completer.future;
print(hasil); // 'Selesai!'

// Cek status
print(completer.isCompleted); // true setelah complete() dipanggil

Use Case Nyata — Callback ke Future #

// Mengubah API callback-based menjadi Future-based
Future<String> callbackKeFuture() {
  final completer = Completer<String>();

  // API lama yang menggunakan callback
  legacyAPI.fetch(
    onSuccess: (data) => completer.complete(data),
    onError: (error) => completer.completeError(error),
  );

  return completer.future;
}

// Penggunaan
final data = await callbackKeFuture(); // bisa dipakai dengan async/await!

// Lock asinkron menggunakan Completer
class AsyncLock {
  Completer<void>? _completer;

  Future<void> ambil() async {
    while (_completer != null) {
      await _completer!.future; // tunggu lock dilepas
    }
    _completer = Completer<void>();
  }

  void lepas() {
    _completer?.complete();
    _completer = null;
  }
}

Timer — Jadwalkan Eksekusi #

import 'dart:async';

// Timer sekali jalan
final timer = Timer(Duration(seconds: 5), () {
  print('Dieksekusi setelah 5 detik');
});

// Batalkan sebelum dieksekusi
timer.cancel();
print(timer.isActive); // false setelah cancel

// Timer periodik — dieksekusi berulang
int hitungan = 0;
final periodic = Timer.periodic(Duration(seconds: 1), (timer) {
  hitungan++;
  print('Tick: $hitungan');
  if (hitungan >= 5) timer.cancel(); // hentikan setelah 5 kali
});

// Timer.run — eksekusi di event queue berikutnya (setelah microtask)
Timer.run(() => print('Di event queue berikutnya'));

Error Handling Async #

// try/catch untuk Future
Future<void> denganError() async {
  try {
    await Future.error(Exception('Gagal'));
  } on Exception catch (e) {
    print('Ditangkap: $e');
  } finally {
    print('Selalu dieksekusi');
  }
}

// Error di Stream
final stream = Stream<int>.error(Exception('Stream error'));
stream.listen(
  (nilai) => print(nilai),
  onError: (e) => print('Stream error: $e'),
  cancelOnError: true, // hentikan stream saat error
);

// runZonedGuarded — tangkap error yang tidak tertangkap
runZonedGuarded(() {
  // Kode yang mungkin throw error tak tertangkap
  Timer(Duration.zero, () => throw Exception('Error di timer!'));
}, (error, stackTrace) {
  print('Error tak tertangkap: $error');
  // log ke Sentry, Crashlytics, dll.
});

Anti-Pattern dart:async #

Tidak Menutup StreamController #

// ANTI-PATTERN: StreamController tidak pernah ditutup — memory leak!
class Repository {
  final _controller = StreamController<List<Item>>();
  Stream<List<Item>> get stream => _controller.stream;
  // ✗ tidak ada dispose/close
}

// BENAR: selalu tutup controller saat tidak digunakan
class Repository {
  final _controller = StreamController<List<Item>>();
  Stream<List<Item>> get stream => _controller.stream;

  void dispose() {
    _controller.close(); // ✓ wajib dipanggil saat repository tidak digunakan
  }
}

Lupa await pada Future #

// ANTI-PATTERN: Future tidak di-await — error terbuang begitu saja
void simpanData(Data data) {
  repository.simpan(data); // ✗ jika ini gagal, error hilang tanpa jejak
}

// BENAR: await atau tangani Future secara eksplisit
Future<void> simpanData(Data data) async {
  await repository.simpan(data); // ✓ error akan propagate ke caller
}

// Atau jika memang ingin fire-and-forget, tangkap errornya
void simpanData(Data data) {
  repository.simpan(data).catchError((e) {
    log.error('Gagal simpan: $e'); // ✓ error tidak hilang
  });
}

Ringkasan #

  • Future untuk nilai tunggal asinkron, Stream untuk sequence nilai asinkron — pilih berdasarkan jumlah nilai yang dihasilkan operasi.
  • Future.wait() untuk menjalankan beberapa Future secara paralel — jauh lebih cepat dari sequential await. Gunakan eagerError: false jika ingin menunggu semua selesai meski ada yang error.
  • Single-subscription vs broadcast — stream default hanya bisa satu listener. Gunakan asBroadcastStream() atau StreamController.broadcast() untuk multi-listener.
  • await for adalah cara paling idiomatis membaca stream — lebih bersih dari listen() dan secara otomatis menutup subscription.
  • StreamController untuk membuat stream kustom — selalu tutup dengan controller.close() di dispose() untuk mencegah memory leak.
  • StreamTransformer untuk transformasi yang tidak tersedia sebagai method bawaan — debounce, throttle, buffer, window, dan lainnya.
  • Completer untuk mengkonversi callback-based API menjadi Future-based — satu-satunya cara idiomatis membuat Future yang diselesaikan dari luar.
  • Timer.periodic untuk eksekusi berulang — selalu simpan referensinya dan panggil cancel() saat tidak dibutuhkan untuk mencegah memory leak.
  • runZonedGuarded untuk menangkap error async yang tidak tertangkap — penting untuk logging di aplikasi produksi.
  • Selalu await atau tangkap Future — mengabaikan Future yang mungkin error adalah salah satu bug yang paling sulit dilacak karena error menghilang tanpa jejak.

← Sebelumnya: Math   Berikutnya: dart:convert →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact