VisionFive2 Linux kernel

StarFive Tech Linux Kernel for VisionFive (JH7110) boards (mirror)

More than 9999 Commits   32 Branches   54 Tags
author: Jens Axboe <axboe@kernel.dk> 2021-03-08 09:37:51 -0700 committer: Jens Axboe <axboe@kernel.dk> 2021-04-11 17:42:00 -0600 commit: 685fe7feedb96771683437107ba72131410e2350 parent: 66ae0d1e2d9fe6ec70e73fcfdcf4b390e271c1ac
Commit Summary:
io-wq: eliminate the need for a manager thread
Diffstat:
1 file changed, 102 insertions, 137 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c
index 315b54eb0548..8ba4ccaafbaf 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -68,6 +68,7 @@ struct io_worker {
 struct io_wqe_acct {
 	unsigned nr_workers;
 	unsigned max_workers;
+	int index;
 	atomic_t nr_running;
 };
 
@@ -108,19 +109,16 @@ struct io_wq {
 	free_work_fn *free_work;
 	io_wq_work_fn *do_work;
 
-	struct task_struct *manager;
-
 	struct io_wq_hash *hash;
 
 	refcount_t refs;
-	struct completion exited;
 
 	atomic_t worker_refs;
 	struct completion worker_done;
 
 	struct hlist_node cpuhp_node;
 
-	pid_t task_pid;
+	struct task_struct *task;
 };
 
 static enum cpuhp_state io_wq_online;
@@ -133,8 +131,7 @@ struct io_cb_cancel_data {
 	bool cancel_all;
 };
 
-static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
-				       struct io_cb_cancel_data *match);
+static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
 
 static bool io_worker_get(struct io_worker *worker)
 {
@@ -163,6 +160,12 @@ static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
 	return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
 }
 
+static void io_worker_ref_put(struct io_wq *wq)
+{
+	if (atomic_dec_and_test(&wq->worker_refs))
+		complete(&wq->worker_done);
+}
+
 static void io_worker_exit(struct io_worker *worker)
 {
 	struct io_wqe *wqe = worker->wqe;
@@ -190,8 +193,7 @@ static void io_worker_exit(struct io_worker *worker)
 	raw_spin_unlock_irq(&wqe->lock);
 
 	kfree_rcu(worker, rcu);
-	if (atomic_dec_and_test(&wqe->wq->worker_refs))
-		complete(&wqe->wq->worker_done);
+	io_worker_ref_put(wqe->wq);
 	do_exit(0);
 }
 
@@ -206,7 +208,7 @@ static inline bool io_wqe_run_queue(struct io_wqe *wqe)
 
 /*
  * Check head of free list for an available worker. If one isn't available,
- * caller must wake up the wq manager to create one.
+ * caller must create one.
  */
 static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
 	__must_hold(RCU)
@@ -230,7 +232,7 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
 
 /*
  * We need a worker. If we find a free one, we're good. If not, and we're
- * below the max number of workers, wake up the manager to create one.
+ * below the max number of workers, create one.
  */
 static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
 {
@@ -246,8 +248,11 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
 	ret = io_wqe_activate_free_worker(wqe);
 	rcu_read_unlock();
 
-	if (!ret && acct->nr_workers < acct->max_workers)
-		wake_up_process(wqe->wq->manager);
+	if (!ret && acct->nr_workers < acct->max_workers) {
+		atomic_inc(&acct->nr_running);
+		atomic_inc(&wqe->wq->worker_refs);
+		create_io_worker(wqe->wq, wqe, acct->index);
+	}
 }
 
 static void io_wqe_inc_running(struct io_worker *worker)
@@ -257,14 +262,61 @@ static void io_wqe_inc_running(struct io_worker *worker)
 	atomic_inc(&acct->nr_running);
 }
 
+struct create_worker_data {
+	struct callback_head work;
+	struct io_wqe *wqe;
+	int index;
+};
+
+static void create_worker_cb(struct callback_head *cb)
+{
+	struct create_worker_data *cwd;
+	struct io_wq *wq;
+
+	cwd = container_of(cb, struct create_worker_data, work);
+	wq = cwd->wqe->wq;
+	create_io_worker(wq, cwd->wqe, cwd->index);
+	kfree(cwd);
+}
+
+static void io_queue_worker_create(struct io_wqe *wqe, struct io_wqe_acct *acct)
+{
+	struct create_worker_data *cwd;
+	struct io_wq *wq = wqe->wq;
+
+	/* raced with exit, just ignore create call */
+	if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
+		goto fail;
+
+	cwd = kmalloc(sizeof(*cwd), GFP_ATOMIC);
+	if (cwd) {
+		init_task_work(&cwd->work, create_worker_cb);
+		cwd->wqe = wqe;
+		cwd->index = acct->index;
+		if (!task_work_add(wq->task, &cwd->work, TWA_SIGNAL))
+			return;
+
+		kfree(cwd);
+	}
+fail:
+	atomic_dec(&acct->nr_running);
+	io_worker_ref_put(wq);
+}
+
 static void io_wqe_dec_running(struct io_worker *worker)
 	__must_hold(wqe->lock)
 {
 	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
 	struct io_wqe *wqe = worker->wqe;
 
-	if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe))
-		io_wqe_wake_worker(wqe, acct);
+	if (!(worker->flags & IO_WORKER_F_UP))
+		return;
+
+	if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
+		atomic_inc(&acct->nr_running);
+		atomic_inc(&wqe->wq->worker_refs);
+		io_queue_worker_create(wqe, acct);
+	}
 }
 
 /*
@@ -483,9 +535,8 @@ static int io_wqe_worker(void *data)
 	char buf[TASK_COMM_LEN];
 
 	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
-	io_wqe_inc_running(worker);
 
-	snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task_pid);
+	snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
 	set_task_comm(current, buf);
 
 	while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
@@ -549,8 +600,7 @@ void io_wq_worker_running(struct task_struct *tsk)
 
 /*
  * Called when worker is going to sleep. If there are no workers currently
- * running and we have work pending, wake up a free one or have the manager
- * set one up.
+ * running and we have work pending, wake up a free one or create a new one.
  */
 void io_wq_worker_sleeping(struct task_struct *tsk)
 {
@@ -570,7 +620,7 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
 	raw_spin_unlock_irq(&worker->wqe->lock);
 }
 
-static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
+static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
 {
 	struct io_wqe_acct *acct = &wqe->acct[index];
 	struct io_worker *worker;
@@ -580,7 +630,7 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
 
 	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
 	if (!worker)
-		return false;
+		goto fail;
 
 	refcount_set(&worker->ref, 1);
 	worker->nulls_node.pprev = NULL;
@@ -588,14 +638,13 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
 	spin_lock_init(&worker->lock);
 	init_completion(&worker->ref_done);
 
-	atomic_inc(&wq->worker_refs);
-
 	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
 	if (IS_ERR(tsk)) {
-		if (atomic_dec_and_test(&wq->worker_refs))
-			complete(&wq->worker_done);
 		kfree(worker);
-		return false;
+fail:
+		atomic_dec(&acct->nr_running);
+		io_worker_ref_put(wq);
+		return;
 	}
 
 	tsk->pf_io_worker = worker;
@@ -614,7 +663,6 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
 	acct->nr_workers++;
 	raw_spin_unlock_irq(&wqe->lock);
 	wake_up_new_task(tsk);
-	return true;
 }
 
 static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
@@ -662,93 +710,11 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data)
 	return false;
 }
 
-static void io_wq_check_workers(struct io_wq *wq)
-{
-	int node;
-
-	for_each_node(node) {
-		struct io_wqe *wqe = wq->wqes[node];
-		bool fork_worker[2] = { false, false };
-
-		if (!node_online(node))
-			continue;
-
-		raw_spin_lock_irq(&wqe->lock);
-		if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
-			fork_worker[IO_WQ_ACCT_BOUND] = true;
-		if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
-			fork_worker[IO_WQ_ACCT_UNBOUND] = true;
-		raw_spin_unlock_irq(&wqe->lock);
-		if (fork_worker[IO_WQ_ACCT_BOUND])
-			create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
-		if (fork_worker[IO_WQ_ACCT_UNBOUND])
-			create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
-	}
-}
-
 static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
 {
 	return true;
 }
 
-static void io_wq_cancel_pending(struct io_wq *wq)
-{
-	struct io_cb_cancel_data match = {
-		.fn		= io_wq_work_match_all,
-		.cancel_all	= true,
-	};
-	int node;
-
-	for_each_node(node)
-		io_wqe_cancel_pending_work(wq->wqes[node], &match);
-}
-
-/*
- * Manager thread. Tasked with creating new workers, if we need them.
- */
-static int io_wq_manager(void *data)
-{
-	struct io_wq *wq = data;
-	char buf[TASK_COMM_LEN];
-	int node;
-
-	snprintf(buf, sizeof(buf), "iou-mgr-%d", wq->task_pid);
-	set_task_comm(current, buf);
-
-	do {
-		set_current_state(TASK_INTERRUPTIBLE);
-		io_wq_check_workers(wq);
-		schedule_timeout(HZ);
-		if (signal_pending(current)) {
-			struct ksignal ksig;
-
-			if (!get_signal(&ksig))
-				continue;
-			set_bit(IO_WQ_BIT_EXIT, &wq->state);
-		}
-	} while (!test_bit(IO_WQ_BIT_EXIT, &wq->state));
-
-	io_wq_check_workers(wq);
-
-	rcu_read_lock();
-	for_each_node(node)
-		io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
-	rcu_read_unlock();
-
-	if (atomic_dec_and_test(&wq->worker_refs))
-		complete(&wq->worker_done);
-	wait_for_completion(&wq->worker_done);
-
-	spin_lock_irq(&wq->hash->wait.lock);
-	for_each_node(node)
-		list_del_init(&wq->wqes[node]->wait.entry);
-	spin_unlock_irq(&wq->hash->wait.lock);
-
-	io_wq_cancel_pending(wq);
-	complete(&wq->exited);
-	do_exit(0);
-}
-
 static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
 {
 	struct io_wq *wq = wqe->wq;
@@ -780,39 +746,13 @@ append:
 	wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
 }
 
-static int io_wq_fork_manager(struct io_wq *wq)
-{
-	struct task_struct *tsk;
-
-	if (wq->manager)
-		return 0;
-
-	WARN_ON_ONCE(test_bit(IO_WQ_BIT_EXIT, &wq->state));
-
-	init_completion(&wq->worker_done);
-	atomic_set(&wq->worker_refs, 1);
-	tsk = create_io_thread(io_wq_manager, wq, NUMA_NO_NODE);
-	if (!IS_ERR(tsk)) {
-		wq->manager = get_task_struct(tsk);
-		wake_up_new_task(tsk);
-		return 0;
-	}
-
-	if (atomic_dec_and_test(&wq->worker_refs))
-		complete(&wq->worker_done);
-
-	return PTR_ERR(tsk);
-}
-
 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
 {
 	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
 	int work_flags;
 	unsigned long flags;
 
-	/* Can only happen if manager creation fails after exec */
-	if (io_wq_fork_manager(wqe->wq) ||
-	    test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) {
+	if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) {
 		io_run_cancel(work, wqe);
 		return;
 	}
@@ -967,17 +907,12 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
 			    int sync, void *key)
 {
 	struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
-	int ret;
 
 	list_del_init(&wait->entry);
 
 	rcu_read_lock();
-	ret = io_wqe_activate_free_worker(wqe);
+	io_wqe_activate_free_worker(wqe);
 	rcu_read_unlock();
-
-	if (!ret)
-		wake_up_process(wqe->wq->manager);
-
 	return 1;
 }
 
@@ -1018,6 +953,8 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
 			goto err;
 		wq->wqes[node] = wqe;
 		wqe->node = alloc_node;
+		wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND;
+		wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND;
 		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
 		atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
 		wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
@@ -1032,13 +969,11 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
 		INIT_LIST_HEAD(&wqe->all_list);
 	}
 
-	wq->task_pid = current->pid;
-	init_completion(&wq->exited);
+	wq->task = get_task_struct(data->task);
 	refcount_set(&wq->refs, 1);
-
-	ret = io_wq_fork_manager(wq);
-	if (!ret)
-		return wq;
+	atomic_set(&wq->worker_refs, 1);
+	init_completion(&wq->worker_done);
+	return wq;
 err:
 	io_wq_put_hash(data->hash);
 	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
@@ -1051,14 +986,39 @@ err_wq:
 	return ERR_PTR(ret);
 }
 
-static void io_wq_destroy_manager(struct io_wq *wq)
+static void io_wq_exit_workers(struct io_wq *wq)
 {
-	if (wq->manager) {
-		wake_up_process(wq->manager);
-		wait_for_completion(&wq->exited);
-		put_task_struct(wq->manager);
-		wq->manager = NULL;
+	struct callback_head *cb;
+	int node;
+
+	set_bit(IO_WQ_BIT_EXIT, &wq->state);
+
+	if (!wq->task)
+		return;
+
+	while ((cb = task_work_cancel(wq->task, create_worker_cb)) != NULL) {
+		struct create_worker_data *cwd;
+
+		cwd = container_of(cb, struct create_worker_data, work);
+		atomic_dec(&cwd->wqe->acct[cwd->index].nr_running);
+		io_worker_ref_put(wq);
+		kfree(cwd);
+	}
+
+	rcu_read_lock();
+	for_each_node(node) {
+		struct io_wqe *wqe = wq->wqes[node];
+
+		io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
+		spin_lock_irq(&wq->hash->wait.lock);
+		list_del_init(&wq->wqes[node]->wait.entry);
+		spin_unlock_irq(&wq->hash->wait.lock);
 	}
+	rcu_read_unlock();
+	io_worker_ref_put(wq);
+	wait_for_completion(&wq->worker_done);
+	put_task_struct(wq->task);
+	wq->task = NULL;
 }
 
 static void io_wq_destroy(struct io_wq *wq)
@@ -1067,8 +1027,7 @@ static void io_wq_destroy(struct io_wq *wq)
 
 	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
 
-	set_bit(IO_WQ_BIT_EXIT, &wq->state);
-	io_wq_destroy_manager(wq);
+	io_wq_exit_workers(wq);
 
 	for_each_node(node) {
 		struct io_wqe *wqe = wq->wqes[node];
@@ -1092,8 +1051,7 @@ void io_wq_put(struct io_wq *wq)
 
 void io_wq_put_and_exit(struct io_wq *wq)
 {
-	set_bit(IO_WQ_BIT_EXIT, &wq->state);
-	io_wq_destroy_manager(wq);
+	io_wq_exit_workers(wq);
 	io_wq_put(wq);
 }