Isolate

Isolate #

dart:isolate adalah library bawaan Dart yang menyediakan primitif tingkat rendah untuk concurrency berbasis Isolate. Setiap Isolate berjalan di thread tersendiri dengan heap memori yang terpisah — tidak ada shared state antar Isolate, semua komunikasi melalui message passing via SendPort dan ReceivePort. Artikel ini membahas API dart:isolate secara mendalam: cara membuat Isolate, berkomunikasi dua arah, menangani error lintas Isolate, dan pola yang digunakan dalam produksi nyata.

Artikel ini membahas dart:isolate dari perspektif Standard Library — API tingkat rendah. Untuk gambaran umum tentang kapan menggunakan Isolate vs async/await, lihat artikel Multi Threading di section Lanjutan.

Model Memori Isolate #

flowchart LR
    subgraph "Main Isolate"
        MI["Event Loop\nHeap A\nDart Objects"]
        RPA["ReceivePort A"]
    end

    subgraph "Worker Isolate"
        WI["Event Loop\nHeap B\n(terpisah)"]
        RPB["ReceivePort B"]
    end

    MI -->|"SendPort.send(msg)\n(message di-copy)"| RPB
    WI -->|"SendPort.send(result)\n(message di-copy)"| RPA

    note["❌ Tidak ada shared memory\n✅ Semua komunikasi via message"]

ReceivePort dan SendPort #

ReceivePort adalah endpoint penerima pesan. Setiap ReceivePort punya sendPort yang bisa diberikan ke pihak lain untuk mengirim pesan:

import 'dart:isolate';

Future<void> main() async {
  // Buat ReceivePort — endpoint untuk menerima pesan
  final receivePort = ReceivePort();

  // sendPort adalah "alamat" yang bisa dikirim ke Isolate lain
  final SendPort sendPort = receivePort.sendPort;

  // Kirim pesan ke diri sendiri (untuk demonstrasi)
  sendPort.send('Halo!');
  sendPort.send(42);
  sendPort.send({'kunci': 'nilai'});
  sendPort.send(null);

  // Baca pesan — ReceivePort adalah Stream
  int hitungan = 0;
  await for (final pesan in receivePort) {
    print('Diterima: $pesan');
    hitungan++;
    if (hitungan >= 4) break; // stop setelah 4 pesan
  }

  receivePort.close(); // wajib tutup untuk menghentikan event loop
}

Tipe Data yang Bisa Dikirim #

import 'dart:isolate';
import 'dart:typed_data';

final port = ReceivePort();
final send = port.sendPort;

// ✓ BISA dikirim antar Isolate:
send.send(null);                          // null
send.send(true);                          // bool
send.send(42);                            // int
send.send(3.14);                          // double
send.send('teks');                        // String
send.send([1, 2, 3]);                     // List (di-copy)
send.send({'a': 1});                      // Map (di-copy)
send.send({1, 2, 3});                     // Set (di-copy)
send.send(Uint8List.fromList([1, 2, 3])); // TypedData (di-copy)
send.send(port.sendPort);                 // SendPort ✓
send.send(                                // TransferableTypedData (zero-copy)
  TransferableTypedData.fromList([Uint8List(100)])
);

// ✗ TIDAK BISA dikirim:
// send.send(File('test.txt'));    // objek native resource
// send.send(Socket(...));         // native socket
// send.send(() => print('hi'));   // closure/function (kecuali top-level)
// send.send(Stream.empty());      // Stream

Isolate.run — Cara Termudah (Dart 2.19+) #

import 'dart:isolate';

// Isolate.run — buat, jalankan, kembalikan hasil, hapus otomatis
Future<void> main() async {
  // Fungsi yang dijalankan di Isolate baru
  // Harus top-level atau static (tidak bisa closure yang capture state)
  final hasil = await Isolate.run(() {
    // Kode ini berjalan di Isolate terpisah
    int total = 0;
    for (int i = 0; i < 10000000; i++) {
      total += i;
    }
    return total;
  });

  print('Total: $hasil'); // 49999995000000
}

// Dengan argumen — kirim data via closure
Future<List<int>> sortkanDiBackground(List<int> data) async {
  return await Isolate.run(() {
    // data di-copy ke Isolate ini
    final salinan = List<int>.from(data);
    salinan.sort();
    return salinan;
  });
}

// Error di Isolate.run — propagate ke caller
Future<void> contohError() async {
  try {
    await Isolate.run(() {
      throw Exception('Error di Isolate!');
    });
  } catch (e) {
    print('Error tertangkap di main: $e');
    // Exception dari Isolate.run di-wrap sebagai RemoteError
  }
}

Isolate.spawn — Kontrol Penuh #

Isolate.spawn membuat Isolate persisten yang bisa menerima banyak pesan:

import 'dart:isolate';

// Entry point HARUS top-level atau static function
void workerEntryPoint(SendPort kirimKeMain) {
  final terimaPerintah = ReceivePort();

  // Kirim SendPort kita ke main agar main bisa kirim perintah
  kirimKeMain.send(terimaPerintah.sendPort);

  // Proses perintah yang masuk
  terimaPerintah.listen((pesan) {
    if (pesan == null) {
      // Signal untuk tutup
      terimaPerintah.close();
      return;
    }

    final int angka = pesan as int;
    final hasil = _hitungPrime(angka); // komputasi berat
    kirimKeMain.send(hasil);
  });
}

bool _hitungPrime(int n) {
  if (n < 2) return false;
  for (int i = 2; i <= n ~/ 2; i++) {
    if (n % i == 0) return false;
  }
  return true;
}

Future<void> main() async {
  final terimaMain = ReceivePort();

  // Spawn worker Isolate
  final isolate = await Isolate.spawn(
    workerEntryPoint,
    terimaMain.sendPort,
  );

  // Terima SendPort dari worker
  final SendPort kirimKeWorker = await terimaMain.first;

  // Kirim beberapa pekerjaan
  final hasilStream = terimaMain.skip(1); // skip SendPort yang sudah diambil
  final iterator = hasilStream.iterator;

  for (final n in [97, 100, 101, 200, 997]) {
    kirimKeWorker.send(n);
    await iterator.moveNext();
    final isPrime = iterator.current as bool;
    print('$n adalah bilangan prima: $isPrime');
  }

  // Kirim signal untuk tutup
  kirimKeWorker.send(null);
  terimaMain.close();

  // Matikan Isolate jika masih hidup
  isolate.kill(priority: Isolate.beforeNextEvent);
}

Komunikasi Dua Arah — Pola Robust #

Untuk komunikasi dua arah yang andal, gunakan pola dengan correlation ID:

import 'dart:isolate';
import 'dart:async';

// Pesan request dari main ke worker
class Request {
  final int id;
  final dynamic data;
  final SendPort replyTo;

  const Request({required this.id, required this.data, required this.replyTo});
}

// Pesan response dari worker ke main
class Response {
  final int id;
  final dynamic hasil;
  final String? error;

  const Response({required this.id, this.hasil, this.error});
}

// Worker yang andal dengan ID tracking
void workerRobust(SendPort kirimKeMain) {
  final terima = ReceivePort();
  kirimKeMain.send(terima.sendPort);

  terima.listen((pesan) {
    if (pesan is Request) {
      try {
        final hasil = _prosesData(pesan.data);
        pesan.replyTo.send(Response(id: pesan.id, hasil: hasil));
      } catch (e) {
        pesan.replyTo.send(Response(id: pesan.id, error: e.toString()));
      }
    }
  });
}

dynamic _prosesData(dynamic data) {
  // Simulasi komputasi
  return 'Hasil dari: $data';
}

// Client yang thread-safe
class IsolateClient {
  final SendPort _kirimKeWorker;
  final ReceivePort _terima;
  final Map<int, Completer<dynamic>> _pending = {};
  int _idCounter = 0;

  IsolateClient._(this._kirimKeWorker, this._terima) {
    _terima.listen((pesan) {
      if (pesan is Response) {
        final completer = _pending.remove(pesan.id);
        if (pesan.error != null) {
          completer?.completeError(Exception(pesan.error));
        } else {
          completer?.complete(pesan.hasil);
        }
      }
    });
  }

  static Future<IsolateClient> buat() async {
    final terima = ReceivePort();
    await Isolate.spawn(workerRobust, terima.sendPort);
    final SendPort kirimKeWorker = await terima.first;
    return IsolateClient._(kirimKeWorker, terima);
  }

  Future<dynamic> kirim(dynamic data, {Duration timeout = const Duration(seconds: 30)}) {
    final id = _idCounter++;
    final completer = Completer<dynamic>();
    _pending[id] = completer;

    _kirimKeWorker.send(Request(
      id: id,
      data: data,
      replyTo: _terima.sendPort,
    ));

    return completer.future.timeout(
      timeout,
      onTimeout: () {
        _pending.remove(id);
        throw TimeoutException('Request $id timeout setelah ${timeout.inSeconds}s');
      },
    );
  }

  void tutup() {
    _terima.close();
    for (final c in _pending.values) {
      c.completeError(StateError('Client ditutup'));
    }
    _pending.clear();
  }
}

// Penggunaan
Future<void> main() async {
  final client = await IsolateClient.buat();

  // Kirim banyak request secara paralel
  final futures = [
    client.kirim('data A'),
    client.kirim('data B'),
    client.kirim('data C'),
  ];

  final hasil = await Future.wait(futures);
  print(hasil); // ['Hasil dari: data A', ...]

  client.tutup();
}

Error Handling Antar Isolate #

import 'dart:isolate';

Future<void> main() async {
  final terimaError = ReceivePort();
  final terimaExit = ReceivePort();
  final terimaHasil = ReceivePort();

  final isolate = await Isolate.spawn(
    workerDenganError,
    terimaHasil.sendPort,
    // Error listener — tangkap error yang tidak tertangkap di Isolate
    onError: terimaError.sendPort,
    // Exit listener — notifikasi saat Isolate selesai
    onExit: terimaExit.sendPort,
    // Error TIDAK mematikan Isolate (default: true = mematikan)
    errorsAreFatal: false,
  );

  // Listen error
  terimaError.listen((error) {
    // error adalah List: [errorMessage, stackTrace]
    final List<dynamic> errorList = error as List<dynamic>;
    print('Error di Isolate: ${errorList[0]}');
    print('Stack trace: ${errorList[1]}');
  });

  // Listen exit
  terimaExit.first.then((_) {
    print('Isolate selesai');
    terimaError.close();
    terimaExit.close();
    terimaHasil.close();
  });

  // Listen hasil
  terimaHasil.listen((pesan) {
    print('Hasil: $pesan');
  });

  // Kontrol dari luar
  // Tambahkan error listener ke Isolate yang sudah berjalan
  isolate.addErrorListener(terimaError.sendPort);
  isolate.removeErrorListener(terimaError.sendPort);

  // Pause dan resume
  final capability = isolate.pause();
  await Future.delayed(Duration(seconds: 1));
  isolate.resume(capability);

  // Kirim kill signal
  // isolate.kill(priority: Isolate.immediate);  // segera
  // isolate.kill(priority: Isolate.beforeNextEvent); // setelah event saat ini
}

void workerDenganError(SendPort kirimHasil) {
  kirimHasil.send('Mulai bekerja');

  try {
    kirimHasil.send('Langkah 1 selesai');
    throw Exception('Error simulasi');
    kirimHasil.send('Ini tidak pernah dicapai');
  } catch (e) {
    kirimHasil.send('Error ditangkap lokal: $e');
  }

  // Error yang tidak tertangkap akan dikirim ke onError port
  throw StateError('Error tidak tertangkap!');
}

RawReceivePort — Performa Tinggi #

RawReceivePort adalah versi lebih rendah level dari ReceivePort tanpa overhead Stream:

import 'dart:isolate';

Future<void> main() async {
  // RawReceivePort — tidak ada overhead Stream, langsung callback
  final rawPort = RawReceivePort((pesan) {
    print('Diterima (raw): $pesan');
  });

  rawPort.sendPort.send('Halo');
  rawPort.sendPort.send(42);

  await Future.delayed(Duration(milliseconds: 100));

  // Ubah handler
  rawPort.handler = (pesan) {
    print('Handler baru: $pesan');
  };

  rawPort.sendPort.send('Pesan baru');

  await Future.delayed(Duration(milliseconds: 100));
  rawPort.close(); // wajib tutup
}

Isolate Pool — Reuse untuk Performa #

Membuat Isolate baru setiap request punya overhead yang signifikan. Untuk task yang sering, buat pool:

import 'dart:isolate';
import 'dart:async';
import 'dart:collection';

typedef IsolateTask<T> = Future<T> Function();

class IsolatePool {
  final int ukuran;
  final _workers = <_WorkerIsolate>[];
  final _antrian = Queue<_PendingTask>();
  bool _ditutup = false;

  IsolatePool(this.ukuran);

  Future<void> inisialisasi() async {
    for (int i = 0; i < ukuran; i++) {
      final worker = await _WorkerIsolate.buat();
      _workers.add(worker);
    }
  }

  Future<T> jalankan<T>(dynamic Function() tugas) async {
    if (_ditutup) throw StateError('Pool sudah ditutup');

    final completer = Completer<T>();
    final pending = _PendingTask(tugas, completer as Completer<dynamic>);

    // Cari worker yang kosong
    final workerKosong = _workers.firstWhere(
      (w) => !w.sibuk,
      orElse: () => throw StateError('Tidak ada worker yang tersedia'),
    );

    if (!workerKosong.sibuk) {
      await workerKosong.jalankan(pending);
    } else {
      _antrian.addLast(pending);
    }

    return completer.future as Future<T>;
  }

  Future<void> tutup() async {
    _ditutup = true;
    for (final worker in _workers) {
      await worker.tutup();
    }
  }
}

class _PendingTask {
  final dynamic Function() tugas;
  final Completer<dynamic> completer;
  _PendingTask(this.tugas, this.completer);
}

class _WorkerIsolate {
  bool sibuk = false;
  late final SendPort _kirimKeWorker;
  late final ReceivePort _terima;
  late final Isolate _isolate;

  static Future<_WorkerIsolate> buat() async {
    final worker = _WorkerIsolate();
    worker._terima = ReceivePort();
    worker._isolate = await Isolate.spawn(
      _workerLoop,
      worker._terima.sendPort,
    );
    worker._kirimKeWorker = await worker._terima.first;
    return worker;
  }

  Future<void> jalankan(_PendingTask task) async {
    sibuk = true;
    _kirimKeWorker.send(task.tugas);
    final hasil = await _terima.first;
    task.completer.complete(hasil);
    sibuk = false;
  }

  Future<void> tutup() async {
    _kirimKeWorker.send(null); // signal tutup
    _terima.close();
    _isolate.kill();
  }
}

void _workerLoop(SendPort kirimKeMain) {
  final terima = ReceivePort();
  kirimKeMain.send(terima.sendPort);

  terima.listen((tugas) {
    if (tugas == null) {
      terima.close();
      return;
    }
    final hasil = (tugas as dynamic Function())();
    kirimKeMain.send(hasil);
  });
}

// Penggunaan pool
Future<void> main() async {
  final pool = IsolatePool(4); // 4 worker Isolate
  await pool.inisialisasi();

  // Kirim banyak tugas secara paralel
  final futures = List.generate(
    10,
    (i) => pool.jalankan<int>(() {
      // Simulasi komputasi berat
      int sum = 0;
      for (int j = 0; j < 1000000; j++) sum += j;
      return sum + i;
    }),
  );

  final hasil = await Future.wait(futures);
  print('Hasil: ${hasil.length} tugas selesai');

  await pool.tutup();
}

Kapabilitas (Capabilities) #

Capability adalah token unik yang digunakan untuk kontrol akses Isolate — pause dan resume:

import 'dart:isolate';

Future<void> main() async {
  final isolate = await Isolate.spawn(
    (SendPort _) { /* ... */ },
    ReceivePort().sendPort,
  );

  // Pause mengembalikan Capability sebagai "kunci"
  final pauseCapability = isolate.pause();
  print('Isolate di-pause');

  await Future.delayed(Duration(seconds: 2));

  // Resume hanya bisa dilakukan oleh yang punya Capability
  isolate.resume(pauseCapability);
  print('Isolate di-resume');

  // Capability juga bisa dibuat secara independen
  final cap1 = Capability();
  final cap2 = Capability();
  print(cap1 == cap2); // false — setiap Capability unik
  print(cap1 == cap1); // true — sama persis
}

Perbandingan: Isolate.run vs Isolate.spawn vs compute #

AspekIsolate.runIsolate.spawncompute (Flutter)
Tersedia diDart 2.19+Semua versiFlutter saja
LifecycleOtomatis (satu tugas)ManualOtomatis
KomunikasiSatu nilai kembaliMulti-messageSatu nilai kembali
Error handlingVia try/catchManualVia try/catch
BoilerplateMinimalBanyakMinimal
Cocok untukSatu komputasiPersistent workerFlutter (seperti Isolate.run)

Anti-Pattern dart:isolate #

Tidak Menutup ReceivePort #

// ANTI-PATTERN: ReceivePort tidak ditutup — event loop tidak bisa berhenti!
Future<void> buruk() async {
  final rp = ReceivePort();
  await Isolate.spawn(worker, rp.sendPort);
  final hasil = await rp.first;
  // ✗ rp tidak ditutup — program tidak akan pernah selesai!
  print(hasil);
}

// BENAR: selalu tutup ReceivePort setelah selesai
Future<void> baik() async {
  final rp = ReceivePort();
  try {
    await Isolate.spawn(worker, rp.sendPort);
    final hasil = await rp.first;
    print(hasil);
  } finally {
    rp.close(); // ✓ selalu ditutup
  }
}

Mengirim Objek yang Tidak Bisa Di-copy #

// ANTI-PATTERN: kirim objek non-serializable
class ProdukDenganCallback {
  final String nama;
  final void Function() onUpdate; // callback/closure

  ProdukDenganCallback(this.nama, this.onUpdate);
}

sendPort.send(ProdukDenganCallback('Laptop', () {}));
// ✗ Illegal argument in isolate message: closure

// BENAR: kirim hanya data yang bisa di-serialize
sendPort.send({'nama': 'Laptop', 'harga': 15000000}); // ✓ Map bisa dikirim

Ringkasan #

  • Isolate tidak berbagi memori — semua komunikasi melalui pesan yang di-copy. Tidak ada race condition, tidak ada mutex, tapi tidak ada shared state.
  • Isolate.run untuk satu komputasi berat — buat, jalankan, kembalikan hasil, hapus otomatis. API paling sederhana untuk Dart 2.19+.
  • Isolate.spawn untuk worker persisten — Isolate yang menerima banyak pesan selama hidupnya. Perlu SendPort/ReceivePort untuk komunikasi dua arah.
  • Entry point harus top-level atau static — closure yang menangkap variabel dari luar tidak bisa dikirim sebagai entry point Isolate.
  • Selalu tutup ReceivePort dengan close() — port yang terbuka mencegah event loop berhenti dan program tidak akan pernah selesai.
  • onError port untuk menangkap exception yang tidak tertangkap di Isolate — tanpa ini, error di Isolate hanya mematikan Isolate tersebut tanpa pemberitahuan.
  • errorsAreFatal: false agar error di Isolate tidak langsung membunuh Isolate — berguna untuk worker yang harus tetap hidup meski ada error sesekali.
  • Gunakan Completer + Map untuk tracking request-response yang async — correlation ID memastikan response dipasangkan ke request yang benar.
  • RawReceivePort untuk throughput tinggi — tanpa overhead Stream Dart, handler langsung dipanggil saat pesan tiba.
  • Isolate pool untuk task yang sering — spawn sekali, reuse berkali-kali. Overhead spawn Isolate baru (~5ms) sangat terasa untuk ribuan task kecil.

← Sebelumnya: dart:typed_data   Berikutnya: dart:developer →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact