异步编程

概要

  • 优先使用异步调用,而不是同步调用或显式使用线程

  • 学习并遵循 GLib 模式来声明异步 API

  • 将异步函数的的回调函数按文件顺序排列,以便于跟踪控制流

  • 使用 GTaskGCancellable 的存在来指示操作是否正在进行

  • 如果并行运行操作,则跟踪尚未开始和尚未完成的操作数量——当两个计数都为零时,整个操作完成

  • 将操作的状态分离到 GTask 的“任务数据”结构中,以便更容易地重用操作,而无需更改全局状态处理

  • 考虑对象实例上的异步方法与该实例的最终化之间的交互方式

概念

GLib 支持异步编程,其中可以启动耗时操作,在“后台”运行,并在操作完成且结果可用时调用回调函数。这与同步耗时操作形成鲜明对比,同步耗时操作是一个函数调用,会阻塞程序控制流直到完成。

主上下文线程 教程中所讨论的,应优先使用异步操作而不是同步操作或显式使用线程。它们不会像同步操作那样阻塞主上下文;并且比线程更容易正确使用。它们通常也比生成线程并向其发送工作具有更低的性能开销。

API 模式

GLib 代码中的异步调用遵循标准模式。对于在 Foo 命名空间中的 File 类上的名为 load_data 的操作,将存在

void
foo_file_load_data_async (FooFile             *self,
                          // ...,
                          GCancellable        *cancellable,
                          GAsyncReadyCallback  callback,
                          gpointer             user_data);

gboolean
foo_file_load_data_finish (FooFile       *self,
                           GAsyncResult  *result,
                           // ...,
                           GError       **error);

传递给 foo_file_load_data_async() 参数是特定于该操作的参数——在本例中,可能是加载到缓冲区中的大小。 同样,对于 foo_file_load_data_finish(),它们是特定于操作的返回值——在本例中,可能是返回内容类型字符串的位置。

当调用 foo_file_load_data_async() 时,它会在后台调度加载操作(例如,作为 GMainContext 上的新文件描述符或作为工作线程),然后返回而不阻塞。

当操作完成时,回调函数将在与原始异步调用相同的 GMainContext 中执行。无论操作成功还是失败,回调函数都会被精确调用一次。

从回调函数中,用户的代码可以通过传递传递给回调函数的 GAsyncResult 实例来调用 foo_file_load_data_finish() 以检索返回值和错误详细信息。

操作生命周期

在编写异步操作时,通常将它们作为类的方法来编写。在这种情况下,重要的是定义类实例上的正在进行的操作与该实例的最终化之间的交互方式。有两种方法

强引用

正在进行的操作保留对类实例的引用,强制其在操作持续期间保持存活。该类应提供某种“关闭”或“取消”方法,其他类可以使用该方法来强制取消操作并允许该实例被最终化。

弱引用

正在进行的操作保留对类实例的引用,并且该类在它的 dispose 函数中取消操作(使用 g_cancellable_cancel())。

使用哪种方法取决于类的设计。包装特定操作的类(例如,MyFileTransfer 类)可能希望使用弱引用方法。管理多个网络连接和异步操作的类可以使用强引用方法。由于传入的网络连接,例如,它可能无法完全控制异步调用的调度,因此弱引用方法不合适——任何删除对该对象的引用代码都无法确定它是否意外地杀死了新的网络连接。

使用异步函数

通常,需要使用多个异步调用来完成一个操作。例如,打开一个文件进行读取,然后执行几次读取,然后关闭该文件。或者并行打开几个网络套接字,并在所有套接字都打开后继续其他工作。以下是一些这些情况的示例。

单个操作

单个异步调用需要两个函数:一个用于启动操作,一个用于完成操作。在 C 中,执行异步调用的难点在于正确地在这两个函数之间存储状态,以及处理这两个函数被调用之间状态的变化。例如,取消正在进行的异步调用是一种状态变化,如果未正确实现,在取消操作时所做的任何 UI 更新(例如)都将被操作的回调中的更新撤消。

下面的示例演示了将文件从文件系统的一个位置复制到另一个位置。这里演示的关键原则是

  • 通过使用 copy_finish_cb() 的前向声明,按顺序放置 copy_button_clicked_cb()(开始)和 copy_finish_cb()(完成)函数。这意味着控制流线性地向下移动文件,而不是到达 copy_button_clicked_cb() 的底部并在文件中的其他位置恢复到 copy_finish_cb()

  • 使用 GCancellable 允许在操作启动后取消操作。 cancel_button_clicked_cb() 中的代码非常简单:由于保证在操作完成时(即使由于取消而提前完成)会调用 copy_finish_cb() 回调函数,因此所有取消的 UI 和状态更新都可以处理在那里,而不是在 cancel_button_clicked_cb() 中。

  • 操作正在进行,当且仅当 MyObjectPrivate.copy_cancellable 不为 NULL 时,这使得跟踪正在运行的操作变得容易。请注意,这意味着一次只能通过 copy_button_clicked_cb() 启动一个文件复制操作。一个 GCancellable 不能轻易地用于像这样多个操作。

static void
copy_finish_cb (GObject      *source_object,
                GAsyncResult *result,
                gpointer      user_data);

static void
copy_button_clicked_cb (GtkButton *button
                        gpointer   user_data)
{
  MyObjectPrivate *priv;
  GFile *source = NULL, *destination = NULL;  /* owned */

  priv = my_object_get_instance_private (MY_OBJECT (user_data));

  /* Operation already in progress? */
  if (priv->copy_cancellable != NULL)
    {
      g_debug ("Copy already in progress.");
      return;
    }

  /* Build source and destination file paths. */
  source = g_file_new_for_path (/* some path generated from UI */);
  destination = g_file_new_for_path (/* some other path generated from UI */);

  /* Set up a cancellable. */
  priv->copy_cancellable = g_cancellable_new ();

  g_file_copy_async (source, destination, G_FILE_COPY_NONE, G_PRIORITY_DEFAULT,
                     priv->copy_cancellable, NULL, NULL,
                     copy_finish_cb, user_data);

  g_object_unref (destination);
  g_object_unref (source);

  /* Update UI to show copy is in progress. */
}

static void
copy_finish_cb (GObject      *source_object,
                GAsyncResult *result,
                gpointer      user_data)
{
  MyObjectPrivate *priv;
  GFile *source;  /* unowned */
  GError *error = NULL;

  source = G_FILE (source_object);
  priv = my_object_get_instance_private (MY_OBJECT (user_data));

  /* Handle completion of the operation. */
  g_file_copy_finish (source, result, &error);

  if (error != NULL &&
      !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
    {
      /* Should update the UI to signal failure.
       * Ignore failure due to cancellation. */
      g_warning ("Failed to copy file: %s", error->message);
    }

  g_clear_error (&error);

  /* Clear the cancellable to signify the operation has finished. */
  g_clear_object (&priv->copy_cancellable);

  /* Update UI to show copy as complete. */
}

static void
cancel_button_clicked_cb (GtkButton *button,
                          gpointer   user_data)
{
  MyObjectPrivate *priv;
  GFile *source = NULL, *destination = NULL;  /* owned */

  priv = my_object_get_instance_private (MY_OBJECT (user_data));

  /* Operation in progress? No-op if @copy_cancellable is %NULL. */
  g_cancellable_cancel (priv->copy_cancellable);
}

static void
my_object_dispose (GObject *obj)
{
  MyObjectPrivate *priv;

  priv = my_object_get_instance_private (MY_OBJECT (obj));

  /* Cancel any ongoing copy operation.
   *
   * This ensures that if #MyObject is disposed part-way through a copy, the
   * callback doesn’t get invoked with an invalid #MyObject pointer. */
  g_cancellable_cancel (priv->copy_cancellable);

  /* Do other dispose calls here. */

  /* Chain up. */
  G_OBJECT_CLASS (my_object_parent_class)->dispose (obj);
}

作为比较,这是使用 g_file_copy() 的同步版本实现的相同代码。请注意,语句的顺序几乎相同。UI 被阻止更新和接收用户输入,这也意味着不支持取消。这是不应该在实践中使用此代码的主要原因

static void
copy_button_clicked_cb (GtkButton *button
                        gpointer   user_data)
{
  MyObjectPrivate *priv;
  GFile *source = NULL, *destination = NULL;  /* owned */

  priv = my_object_get_instance_private (MY_OBJECT (user_data));

  /* Build source and destination file paths. */
  source = g_file_new_for_path (/* some path generated from UI */);
  destination = g_file_new_for_path (/* some other path generated from UI */);

  g_file_copy (source, destination, G_FILE_COPY_NONE,
               NULL  /* cancellable */, NULL, NULL,
               &error);

  g_object_unref (destination);
  g_object_unref (source);

  /* Handle completion of the operation. */
  if (error != NULL)
    {
      /* Should update the UI to signal failure.
       * Ignore failure due to cancellation. */
      g_warning ("Failed to copy file: %s", error->message);
    }

  g_clear_error (&error);

  /* Update UI to show copy as complete. */
}

串行操作

一种常见的情况是运行多个串行异步操作,其中每个操作都依赖于前一个操作完成。

在此示例中,应用程序从文件中读取套接字地址,打开到该地址的连接,读取消息,然后完成。

此示例中的关键点是

  • 每个回调函数都一致地编号,并且它们都按文件中的顺序排列,以便代码按顺序执行。

  • 与单个调用示例一样,单个 GCancellable 指示一系列操作正在进行。取消它将中止整个序列。

  • 与单个调用示例一样,如果拥有 MyObject 实例被释放,则会取消待处理的操作,以防止在无效的 MyObject 指针的情况下调用回调函数。

static void
connect_to_server_cb1 (GObject      *source_object,
                       GAsyncResult *result,
                       gpointer      user_data);
static void
connect_to_server_cb2 (GObject      *source_object,
                       GAsyncResult *result,
                       gpointer      user_data);
static void
connect_to_server_cb3 (GObject      *source_object,
                       GAsyncResult *result,
                       gpointer      user_data);

static void
connect_to_server (MyObject *self)
{
  MyObjectPrivate *priv;
  GFile *address_file = NULL;  /* owned */

  priv = my_object_get_instance_private (self);

  if (priv->connect_cancellable != NULL)
    {
      /* Already connecting. */
      return;
    }

  /* Set up a cancellable. */
  priv->connect_cancellable = g_cancellable_new ();

  /* Read the socket address. */
  address_file = build_address_file ();
  g_file_load_contents_async (address_file, priv->connect_cancellable,
                              connect_to_server_cb1, self);
  g_object_unref (address_file);
}

static void
connect_to_server_cb1 (GObject      *source_object,
                       GAsyncResult *result,
                       gpointer      user_data)
{
  MyObject *self;
  MyObjectPrivate *priv;
  GFile *address_file;  /* unowned */
  gchar *address = NULL;  /* owned */
  gsize address_size = 0;
  GInetAddress *inet_address = NULL;  /* owned */
  GInetSocketAddress *inet_socket_address = NULL;  /* owned */
  guint16 port = 123;
  GSocketClient *socket_client = NULL;  /* owned */
  GError *error = NULL;

  address_file = G_FILE (source_object);
  self = MY_OBJECT (user_data);
  priv = my_object_get_instance_private (self);

  /* Finish loading the address. */
  g_file_load_contents_finish (address_file, result, &address,
                               &address_size, NULL, &error);

  if (error != NULL)
    {
      goto done;
    }

  /* Parse the address. */
  inet_address = g_inet_address_new_from_string (address);

  if (inet_address == NULL)
    {
      /* Error. */
      g_set_error (&error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
                   "Invalid address ‘%s’.", address);
      goto done;
    }

  inet_socket_address = g_inet_socket_address_new (inet_address, port);

  /* Connect to the given address. */
  socket_client = g_socket_client_new ();

  g_socket_client_connect_async (socket_client,
                                 G_SOCKET_CONNECTABLE (inet_socket_address),
                                 priv->connect_cancellable,
                                 connect_to_server_cb2,
                                 self);

done:
  if (error != NULL)
    {
      /* Stop the operation. */
      if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
        {
          g_warning ("Failed to load server address: %s", error->message);
        }

      g_clear_object (&priv->connect_cancellable);
      g_error_free (error);
    }

  g_free (address);
  g_clear_object (&inet_address);
  g_clear_object (&inet_socket_address);
  g_clear_object (&socket_client);
}

static void
connect_to_server_cb2 (GObject      *source_object,
                       GAsyncResult *result,
                       gpointer      user_data)
{
  MyObject *self;
  MyObjectPrivate *priv;
  GSocketClient *socket_client;  /* unowned */
  GSocketConnection *connection = NULL;  /* owned */
  GInputStream *input_stream;  /* unowned */
  GError *error = NULL;

  socket_client = G_SOCKET_CLIENT (source_object);
  self = MY_OBJECT (user_data);
  priv = my_object_get_instance_private (self);

  /* Finish connecting to the socket. */
  connection = g_socket_client_connect_finish (socket_client, result,
                                               &error);

  if (error != NULL)
    {
      goto done;
    }

  /* Store a reference to the connection so it is kept open while we read from
   * it: #GInputStream does not keep a reference to a #GIOStream which contains
   * it. */
  priv->connection = g_object_ref (connection);

  /* Read a message from the connection. This uses a single buffer stored in
   * #MyObject, meaning that only one connect_to_server() operation can run at
   * any time. The buffer could instead be allocated dynamically if this is a
   * problem. */
  input_stream = g_io_stream_get_input_stream (G_IO_STREAM (connection));

  g_input_stream_read_async (input_stream,
                             priv->message_buffer,
                             sizeof (priv->message_buffer),
                             G_PRIORITY_DEFAULT, priv->connect_cancellable,
                             connect_to_server_cb3, self);

done:
  if (error != NULL)
    {
      /* Stop the operation. */
      if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
        {
          g_warning ("Failed to connect to server: %s", error->message);
        }

      g_clear_object (&priv->connect_cancellable);
      g_clear_object (&priv->connection);
      g_error_free (error);
    }

  g_clear_object (&connection);
}

static void
connect_to_server_cb3 (GObject      *source_object,
                       GAsyncResult *result,
                       gpointer      user_data)
{
  MyObject *self;
  MyObjectPrivate *priv;
  GInputStream *input_stream;  /* unowned */
  gssize len = 0;
  GError *error = NULL;

  input_stream = G_INPUT_STREAM (source_object);
  self = MY_OBJECT (user_data);
  priv = my_object_get_instance_private (self);

  /* Finish reading from the socket. */
  len = g_input_stream_read_finish (input_stream, result, &error);

  if (error != NULL)
    {
      goto done;
    }

  /* Handle the message. */
  g_assert_cmpint (len, >=, 0);
  g_assert_cmpuint ((gsize) len, <=, sizeof (priv->message_buffer));

  handle_received_message (self, priv->message_buffer, len, &error);

  if (error != NULL)
    {
      goto done;
    }

done:
  /* Unconditionally mark the operation as finished.
   *
   * The streams should automatically close as this
   * last reference is dropped. */
  g_clear_object (&priv->connect_cancellable);
  g_clear_object (&priv->connection);

  if (error != NULL)
    {
      /* Warn about the error. */
      if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
        {
          g_warning ("Failed to read from the server: %s", error->message);
        }

      g_error_free (error);
    }
}

static void
my_object_dispose (GObject *obj)
{
  MyObjectPrivate *priv;

  priv = my_object_get_instance_private (MY_OBJECT (obj));

  /* Cancel any ongoing connection operations.
   *
   * This ensures that if #MyObject is disposed part-way through the
   * connect_to_server() sequence of operations, the sequence gets cancelled and
   * doesn’t continue with an invalid #MyObject pointer. */
  g_cancellable_cancel (priv->connect_cancellable);

  /* Do other dispose calls here. */

  /* Chain up. */
  G_OBJECT_CLASS (my_object_parent_class)->dispose (obj);
}

并行操作

另一种常见的情况是并行运行多个异步操作,并在其所有组成部分都完成后才认为整个操作完成。

在此示例中,应用程序并行删除多个文件。

此示例中的关键点是

  • 待处理的异步操作数(已启动但尚未完成的操作)被跟踪为 n_deletions_pendingdelete_files_cb() 回调函数仅在达到零时才认为整个操作完成。

  • n_deletions_to_start 跟踪正在启动的删除操作,以防 g_file_delete_async() 能够使用快速路径并同步完成(不阻塞)。

  • 与单个调用示例一样,如果拥有 MyObject 实例被释放,则会取消所有待处理的删除操作,以防止在无效的 MyObject 指针的情况下调用回调函数。

static void
delete_files_cb (GObject      *source_object,
                 GAsyncResult *result,
                 gpointer      user_data);

static void
delete_files (MyObject *self,
              GPtrArray/*<owned GFile*>>*/ *files)
{
  MyObjectPrivate *priv;
  GFile *address_file = NULL;  /* owned */

  priv = my_object_get_instance_private (self);

  /* Set up a cancellable if no operation is ongoing already. */
  if (priv->delete_cancellable == NULL)
    {
      priv->delete_cancellable = g_cancellable_new ();
      priv->n_deletions_pending = 0;
      priv->n_deletions_total = 0;
    }

  /* Update internal state, and temporarily set @n_deletions_to_start. This is
   * used in delete_files_cb() to avoid indicating the overall operation has
   * completed while deletions are still being started. This can happen if
   * g_file_delete_async() completes synchronously, for example if there’s a
   * non-blocking fast path for the given file system. */
  priv->n_deletions_pending += files->len;
  priv->n_deletions_total += files->len;
  priv->n_deletions_to_start = files->len;

  /* Update the UI to indicate the files are being deleted. */
  update_ui_to_show_progress (self,
                              priv->n_deletions_pending,
                              priv->n_deletions_total);

  /* Start all the deletion operations in parallel. They share the same
   * #GCancellable. */
  for (i = 0; i < files->len; i++)
    {
      GFile *file = files->pdata[i];

      priv->n_deletions_to_start--;
      g_file_delete_async (file, G_PRIORITY_DEFAULT, priv->delete_cancellable,
                           delete_files_cb, self);
    }
}

static void
delete_files_cb (GObject      *source_object,
                 GAsyncResult *result,
                 gpointer      user_data)
{
  MyObject *self;
  MyObjectPrivate *priv;
  GFile *file;  /* unowned */
  GError *error = NULL;

  file = G_FILE (source_object);
  self = MY_OBJECT (user_data);
  priv = my_object_get_instance_private (self);

  /* Finish deleting the file. */
  g_file_delete_finish (file, result, &error);

  if (error != NULL &&
      !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
    {
      g_warning ("Error deleting file: %s", error->message);
    }

  g_clear_error (&error);

  /* Update the internal state. */
  g_assert_cmpuint (priv->n_deletions_pending, >, 0);
  priv->n_deletions_pending--;

  /* Update the UI to show progress. */
  update_ui_to_show_progress (self,
                              priv->n_deletions_pending,
                              priv->n_deletions_total);

  /* If all deletions have completed, and no more are being started,
   * update the UI to show completion. */
  if (priv->n_deletions_pending == 0 && priv->n_deletions_to_start == 0)
    {
      update_ui_to_show_completion (self);

      /* Clear the operation state. */
      g_clear_object (&priv->delete_cancellable);
      priv->n_deletions_total = 0;
    }
}

static void
my_object_dispose (GObject *obj)
{
  MyObjectPrivate *priv;

  priv = my_object_get_instance_private (MY_OBJECT (obj));

  /* Cancel any ongoing deletion operations.
   *
   * This ensures that if #MyObject is disposed part-way through the
   * delete_files() set of operations, the set gets cancelled and
   * doesn’t continue with an invalid #MyObject pointer. */
  g_cancellable_cancel (priv->delete_cancellable);

  /* Do other dispose calls here. */

  /* Chain up. */
  G_OBJECT_CLASS (my_object_parent_class)->dispose (obj);
}

使用 GTask 包装

通常,当异步操作(或一组操作)变得更加复杂时,它需要关联状态。通常将其存储在自定义结构中——但是,定义一个新的结构来存储标准的回调函数、用户数据和可取消元组是繁琐的。 GTask 通过提供一种标准化的方法来包装所有三个,以及额外的自定义“任务数据”来简化此过程。

使用 GTask 可以替代使用 GCancellable 来指示操作是否正在进行。

此示例在功能上与串行操作示例相同,但已重构为使用 GTask 来包装操作序列。

此示例中的关键点是

  • 在串行示例中位于 MyObjectPrivate 中的状态现在位于 ConnectToServerData 闭包中,该闭包设置为表示整个操作的 GTask 的“任务数据”。这意味着在操作返回后会自动释放它。

  • 此外,这意味着对 MyObjectPrivate 状态的操纵仅限于操作序列的开始和结束,因此可以在不同的情况下重用任务——例如,现在更容易支持并行运行多个这样的任务。

  • 由于 GTask 持有对 MyObject 的引用,因此在操作序列进行中,对象无法被销毁,所以移除了 my_object_dispose() 代码。相反,存在一个 my_object_close() 方法,允许取消任何待处理的操作,以便在需要时可以销毁 MyObject

static void
connect_to_server_cb1 (GObject      *source_object,
                       GAsyncResult *result,
                       gpointer      user_data);
static void
connect_to_server_cb2 (GObject      *source_object,
                       GAsyncResult *result,
                       gpointer      user_data);
static void
connect_to_server_cb3 (GObject      *source_object,
                       GAsyncResult *result,
                       gpointer      user_data);

typedef struct {
  GSocketConnection *connection;  /* nullable; owned */
  guint8 message_buffer[128];
} ConnectToServerData;

static void
connect_to_server_data_free (ConnectToServerData *data)
{
  g_clear_object (&data->connection);
}

void
my_object_connect_to_server_async (MyObject            *self,
                                   GCancellable        *cancellable,
                                   GAsyncReadyCallback  callback,
                                   gpointer             user_data)
{
  MyObjectPrivate *priv;
  GTask *task = NULL;  /* owned */
  ConnectToServerData *data = NULL;  /* owned */
  GFile *address_file = NULL;  /* owned */

  g_return_if_fail (MY_IS_OBJECT (self));
  g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));

  priv = my_object_get_instance_private (self);

  if (priv->connect_task != NULL)
    {
      g_task_report_new_error (self, callback, user_data, NULL,
                               G_IO_ERROR, G_IO_ERROR_PENDING,
                               "Already connecting to the server.");
      return;
    }

  /* Set up a cancellable. */
  if (cancellable != NULL)
    {
      g_object_ref (cancellable);
    }
  else
    {
      cancellable = g_cancellable_new ();
    }

  /* Set up the task. */
  task = g_task_new (self, cancellable, callback, user_data);
  g_task_set_check_cancellable (task, FALSE);

  data = g_malloc0 (sizeof (ConnectToServerData));
  g_task_set_task_data (task, data,
                        (GDestroyNotify) connect_to_server_data_free);

  g_object_unref (cancellable);

  priv->connect_task = g_object_ref (task);

  /* Read the socket address. */
  address_file = build_address_file ();
  g_file_load_contents_async (address_file, g_task_get_cancellable (task),
                              connect_to_server_cb1, g_object_ref (task));
  g_object_unref (address_file);

  g_clear_object (&task);
}

static void
connect_to_server_cb1 (GObject      *source_object,
                       GAsyncResult *result,
                       gpointer      user_data)
{
  MyObject *self;
  MyObjectPrivate *priv;
  GTask *task = NULL;  /* owned */
  GFile *address_file;  /* unowned */
  gchar *address = NULL;  /* owned */
  gsize address_size = 0;
  GInetAddress *inet_address = NULL;  /* owned */
  GInetSocketAddress *inet_socket_address = NULL;  /* owned */
  guint16 port = 123;
  GSocketClient *socket_client = NULL;  /* owned */
  GError *error = NULL;

  address_file = G_FILE (source_object);
  task = G_TASK (user_data);
  self = g_task_get_source_object (task);
  priv = my_object_get_instance_private (self);

  /* Finish loading the address. */
  g_file_load_contents_finish (address_file, result, &address,
                               &address_size, NULL, &error);

  if (error != NULL)
    {
      goto done;
    }

  /* Parse the address. */
  inet_address = g_inet_address_new_from_string (address);

  if (inet_address == NULL)
    {
      /* Error. */
      g_set_error (&error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
                   "Invalid address ‘%s’.", address);
      goto done;
    }

  inet_socket_address = g_inet_socket_address_new (inet_address, port);

  /* Connect to the given address. */
  socket_client = g_socket_client_new ();

  g_socket_client_connect_async (socket_client,
                                 G_SOCKET_CONNECTABLE (inet_socket_address),
                                 g_task_get_cancellable (task),
                                 connect_to_server_cb2,
                                 g_object_ref (task));

done:
  if (error != NULL)
    {
      /* Stop the operation and propagate the error. */
      g_clear_object (&priv->connect_task);
      g_task_return_error (task, error);
    }

  g_free (address);
  g_clear_object (&inet_address);
  g_clear_object (&inet_socket_address);
  g_clear_object (&socket_client);
  g_clear_object (&task);
}

static void
connect_to_server_cb2 (GObject      *source_object,
                       GAsyncResult *result,
                       gpointer      user_data)
{
  MyObject *self;
  MyObjectPrivate *priv;
  GTask *task = NULL;  /* owned */
  ConnectToServerData *data;  /* unowned */
  GSocketClient *socket_client;  /* unowned */
  GSocketConnection *connection = NULL;  /* owned */
  GInputStream *input_stream;  /* unowned */
  GError *error = NULL;

  socket_client = G_SOCKET_CLIENT (source_object);
  task = G_TASK (user_data);
  data = g_task_get_task_data (task);
  self = g_task_get_source_object (task);
  priv = my_object_get_instance_private (self);

  /* Finish connecting to the socket. */
  connection = g_socket_client_connect_finish (socket_client, result,
                                               &error);

  if (error != NULL)
    {
      goto done;
    }

  /* Store a reference to the connection so it is kept open while we read from
   * it: #GInputStream does not keep a reference to a #GIOStream which contains
   * it. */
  data->connection = g_object_ref (connection);

  /* Read a message from the connection. As the buffer is allocated as part of
   * the per-task @data, multiple tasks can run concurrently. */
  input_stream = g_io_stream_get_input_stream (G_IO_STREAM (connection));

  g_input_stream_read_async (input_stream,
                             data->message_buffer,
                             sizeof (data->message_buffer),
                             G_PRIORITY_DEFAULT, g_task_get_cancellable (task),
                             connect_to_server_cb3, g_object_ref (task));

done:
  if (error != NULL)
    {
      /* Stop the operation and propagate the error. */
      g_clear_object (&priv->connect_task);
      g_task_return_error (task, error);
    }

  g_clear_object (&connection);
  g_clear_object (&task);
}

static void
connect_to_server_cb3 (GObject      *source_object,
                       GAsyncResult *result,
                       gpointer      user_data)
{
  MyObject *self;
  MyObjectPrivate *priv;
  GTask *task = NULL;  /* owned */
  ConnectToServerData *data;  /* unowned */
  GInputStream *input_stream;  /* unowned */
  gssize len = 0;
  GError *error = NULL;

  input_stream = G_INPUT_STREAM (source_object);
  task = G_TASK (user_data);
  data = g_task_get_task_data (task);
  self = g_task_get_source_object (task);
  priv = my_object_get_instance_private (self);

  /* Finish reading from the socket. */
  len = g_input_stream_read_finish (input_stream, result, &error);

  if (error != NULL)
    {
      goto done;
    }

  /* Handle the message. */
  g_assert_cmpint (len, >=, 0);
  g_assert_cmpuint ((gsize) len, <=, sizeof (data->message_buffer));

  handle_received_message (self, data->message_buffer, len, &error);

  if (error != NULL)
    {
      goto done;
    }

  /* Success! */
  g_task_return_boolean (task, TRUE);

done:
  /* Unconditionally mark the operation as finished.
   *
   * The streams should automatically close as this
   * last reference is dropped. */
  g_clear_object (&priv->connect_task);

  if (error != NULL)
    {
      /* Stop the operation and propagate the error. */
      g_task_return_error (task, error);
    }

  g_clear_object (&task);
}

void
my_object_connect_to_server_finish (MyObject      *self,
                                    GAsyncResult  *result,
                                    GError       **error)
{
  g_return_if_fail (MY_IS_OBJECT (self));
  g_return_if_fail (g_task_is_valid (result, self));
  g_return_if_fail (error == NULL || *error == NULL);

  g_task_propagate_boolean (G_TASK (result), error);
}

void
my_object_close (MyObject *self)
{
  MyObjectPrivate *priv;

  g_return_if_fail (MY_IS_OBJECT (self));

  priv = my_object_get_instance_private (self);

  if (priv->connect_task != NULL)
    {
      GCancellable *cancellable = g_task_get_cancellable (priv->connect_task);
      g_cancellable_cancel (cancellable);
    }
}