Meh I'll figure out submodules later

This commit is contained in:
mustard 2025-09-16 01:01:02 +02:00
parent 4ca9d44a90
commit 8cb281f436
352 changed files with 66107 additions and 0 deletions

View file

@ -0,0 +1,14 @@
local util = require "plenary.async.util"
return setmetatable({}, {
__index = function(t, k)
return function(...)
-- if we are in a fast event await the scheduler
if vim.in_fast_event() then
util.scheduler()
end
return vim.api[k](...)
end
end,
})

View file

@ -0,0 +1,122 @@
local co = coroutine
local vararg = require "plenary.vararg"
local errors = require "plenary.errors"
local traceback_error = errors.traceback_error
local f = require "plenary.functional"
local M = {}
local function is_callable(fn)
return type(fn) == "function" or (type(fn) == "table" and type(getmetatable(fn)["__call"]) == "function")
end
---because we can't store varargs
local function callback_or_next(step, thread, callback, ...)
local stat = f.first(...)
if not stat then
error(string.format("The coroutine failed with this message: %s", f.second(...)))
end
if co.status(thread) == "dead" then
if callback == nil then
return
end
callback(select(2, ...))
else
local returned_function = f.second(...)
local nargs = f.third(...)
assert(is_callable(returned_function), "type error :: expected func")
returned_function(vararg.rotate(nargs, step, select(4, ...)))
end
end
---Executes a future with a callback when it is done
---@param async_function Future: the future to execute
---@param callback function: the callback to call when done
local execute = function(async_function, callback, ...)
assert(is_callable(async_function), "type error :: expected func")
local thread = co.create(async_function)
local step
step = function(...)
callback_or_next(step, thread, callback, co.resume(thread, ...))
end
step(...)
end
local add_leaf_function
do
---A table to store all leaf async functions
_PlenaryLeafTable = setmetatable({}, {
__mode = "k",
})
add_leaf_function = function(async_func, argc)
assert(_PlenaryLeafTable[async_func] == nil, "Async function should not already be in the table")
_PlenaryLeafTable[async_func] = argc
end
function M.is_leaf_function(async_func)
return _PlenaryLeafTable[async_func] ~= nil
end
function M.get_leaf_function_argc(async_func)
return _PlenaryLeafTable[async_func]
end
end
---Creates an async function with a callback style function.
---@param func function: A callback style function to be converted. The last argument must be the callback.
---@param argc number: The number of arguments of func. Must be included.
---@return function: Returns an async function
M.wrap = function(func, argc)
if not is_callable(func) then
traceback_error("type error :: expected func, got " .. type(func))
end
if type(argc) ~= "number" then
traceback_error("type error :: expected number, got " .. type(argc))
end
local function leaf(...)
local nargs = select("#", ...)
if nargs == argc then
return func(...)
else
return co.yield(func, argc, ...)
end
end
add_leaf_function(leaf, argc)
return leaf
end
---Use this to either run a future concurrently and then do something else
---or use it to run a future with a callback in a non async context
---@param async_function function
---@param callback function
M.run = function(async_function, callback)
if M.is_leaf_function(async_function) then
async_function(callback)
else
execute(async_function, callback)
end
end
---Use this to create a function which executes in an async context but
---called from a non-async context. Inherently this cannot return anything
---since it is non-blocking
---@param func function
M.void = function(func)
return function(...)
execute(func, nil, ...)
end
end
return M

View file

@ -0,0 +1,229 @@
local a = require "plenary.async.async"
local Deque = require("plenary.async.structs").Deque
local tbl = require "plenary.tbl"
local M = {}
local Condvar = {}
Condvar.__index = Condvar
---@class Condvar
---@return Condvar
function Condvar.new()
return setmetatable({ handles = {} }, Condvar)
end
---`blocks` the thread until a notification is received
Condvar.wait = a.wrap(function(self, callback)
-- not calling the callback will block the coroutine
table.insert(self.handles, callback)
end, 2)
---notify everyone that is waiting on this Condvar
function Condvar:notify_all()
local len = #self.handles
for i, callback in ipairs(self.handles) do
if i > len then
-- this means that more handles were added while we were notifying
-- if we don't break we can get starvation notifying as soon as new handles are added
break
end
callback()
end
for _ = 1, len do
-- table.remove will ensure that indexes are correct and make "ipairs" safe,
-- which is not the case for "self.handles[i] = nil"
table.remove(self.handles)
end
end
---notify randomly one person that is waiting on this Condvar
function Condvar:notify_one()
if #self.handles == 0 then
return
end
local idx = math.random(#self.handles)
self.handles[idx]()
table.remove(self.handles, idx)
end
M.Condvar = Condvar
local Semaphore = {}
Semaphore.__index = Semaphore
---@class Semaphore
---@param initial_permits number: the number of permits that it can give out
---@return Semaphore
function Semaphore.new(initial_permits)
vim.validate {
initial_permits = {
initial_permits,
function(n)
return n > 0
end,
"number greater than 0",
},
}
return setmetatable({ permits = initial_permits, handles = {} }, Semaphore)
end
---async function, blocks until a permit can be acquired
---example:
---local semaphore = Semaphore.new(1024)
---local permit = semaphore:acquire()
---permit:forget()
---when a permit can be acquired returns it
---call permit:forget() to forget the permit
Semaphore.acquire = a.wrap(function(self, callback)
if self.permits > 0 then
self.permits = self.permits - 1
else
table.insert(self.handles, callback)
return
end
local permit = {}
permit.forget = function(self_permit)
self.permits = self.permits + 1
if self.permits > 0 and #self.handles > 0 then
self.permits = self.permits - 1
table.remove(self.handles)(self_permit)
end
end
callback(permit)
end, 2)
M.Semaphore = Semaphore
M.channel = {}
---Creates a oneshot channel
---returns a sender and receiver function
---the sender is not async while the receiver is
---@return function, function
M.channel.oneshot = function()
local val = nil
local saved_callback = nil
local sent = false
local received = false
local is_single = false
--- sender is not async
--- sends a value which can be nil
local sender = function(...)
assert(not sent, "Oneshot channel can only send once")
sent = true
if saved_callback ~= nil then
saved_callback(...)
return
end
-- optimise for when there is only one or zero argument, no need to pack
local nargs = select("#", ...)
if nargs == 1 or nargs == 0 then
val = ...
is_single = true
else
val = tbl.pack(...)
end
end
--- receiver is async
--- blocks until a value is received
local receiver = a.wrap(function(callback)
assert(not received, "Oneshot channel can only receive one value!")
if sent then
received = true
if is_single then
return callback(val)
else
return callback(tbl.unpack(val))
end
else
saved_callback = callback
end
end, 1)
return sender, receiver
end
---A counter channel.
---Basically a channel that you want to use only to notify and not to send any actual values.
---@return function: sender
---@return function: receiver
M.channel.counter = function()
local counter = 0
local condvar = Condvar.new()
local Sender = {}
function Sender:send()
counter = counter + 1
condvar:notify_all()
end
local Receiver = {}
Receiver.recv = function()
if counter == 0 then
condvar:wait()
end
counter = counter - 1
end
Receiver.last = function()
if counter == 0 then
condvar:wait()
end
counter = 0
end
return Sender, Receiver
end
---A multiple producer single consumer channel
---@return table
---@return table
M.channel.mpsc = function()
local deque = Deque.new()
local condvar = Condvar.new()
local Sender = {}
function Sender.send(...)
deque:pushleft { ... }
condvar:notify_all()
end
local Receiver = {}
Receiver.recv = function()
if deque:is_empty() then
condvar:wait()
end
return unpack(deque:popright())
end
Receiver.last = function()
if deque:is_empty() then
condvar:wait()
end
local val = deque:popleft()
deque:clear()
return unpack(val or {})
end
return Sender, Receiver
end
return M

View file

@ -0,0 +1,57 @@
---@brief [[
--- NOTE: This API is still under construction.
--- It may change in the future :)
---@brief ]]
local lookups = {
uv = "plenary.async.uv_async",
util = "plenary.async.util",
lsp = "plenary.async.lsp",
api = "plenary.async.api",
tests = "plenary.async.tests",
control = "plenary.async.control",
}
local exports = setmetatable(require "plenary.async.async", {
__index = function(t, k)
local require_path = lookups[k]
if not require_path then
return
end
local mod = require(require_path)
t[k] = mod
return mod
end,
})
exports.tests.add_globals = function()
a = exports
-- must prefix with a or stack overflow, plenary.test harness already added it
a.describe = exports.tests.describe
-- must prefix with a or stack overflow
a.it = exports.tests.it
a.pending = exports.tests.pending
a.before_each = exports.tests.before_each
a.after_each = exports.tests.after_each
end
exports.tests.add_to_env = function()
local env = getfenv(2)
env.a = exports
-- must prefix with a or stack overflow, plenary.test harness already added it
env.a.describe = exports.tests.describe
-- must prefix with a or stack overflow
env.a.it = exports.tests.it
env.a.pending = exports.tests.pending
env.a.before_each = exports.tests.before_each
env.a.after_each = exports.tests.after_each
setfenv(2, env)
end
return exports

View file

@ -0,0 +1,12 @@
local a = require "plenary.async.async"
local M = {}
---This will be deprecated because the callback can be called multiple times.
---This will give a coroutine error because the coroutine will be resumed multiple times.
---Please use buf_request_all instead.
M.buf_request = a.wrap(vim.lsp.buf_request, 4)
M.buf_request_all = a.wrap(vim.lsp.buf_request_all, 4)
return M

View file

@ -0,0 +1,116 @@
local M = {}
local Deque = {}
Deque.__index = Deque
---@class Deque
---A double ended queue
---
---@return Deque
function Deque.new()
-- the indexes are created with an offset so that the indices are consequtive
-- otherwise, when both pushleft and pushright are used, the indices will have a 1 length hole in the middle
return setmetatable({ first = 0, last = -1 }, Deque)
end
---push to the left of the deque
---@param value any
function Deque:pushleft(value)
local first = self.first - 1
self.first = first
self[first] = value
end
---push to the right of the deque
---@param value any
function Deque:pushright(value)
local last = self.last + 1
self.last = last
self[last] = value
end
---pop from the left of the deque
---@return any
function Deque:popleft()
local first = self.first
if first > self.last then
return nil
end
local value = self[first]
self[first] = nil -- to allow garbage collection
self.first = first + 1
return value
end
---pops from the right of the deque
---@return any
function Deque:popright()
local last = self.last
if self.first > last then
return nil
end
local value = self[last]
self[last] = nil -- to allow garbage collection
self.last = last - 1
return value
end
---checks if the deque is empty
---@return boolean
function Deque:is_empty()
return self:len() == 0
end
---returns the number of elements of the deque
---@return number
function Deque:len()
return self.last - self.first + 1
end
---returns and iterator of the indices and values starting from the left
---@return function
function Deque:ipairs_left()
local i = self.first
return function()
local res = self[i]
local idx = i
if res then
i = i + 1
return idx, res
end
end
end
---returns and iterator of the indices and values starting from the right
---@return function
function Deque:ipairs_right()
local i = self.last
return function()
local res = self[i]
local idx = i
if res then
i = i - 1 -- advance the iterator before we return
return idx, res
end
end
end
---removes all values from the deque
---@return nil
function Deque:clear()
for i, _ in self:ipairs_left() do
self[i] = nil
end
self.first = 0
self.last = -1
end
M.Deque = Deque
return M

View file

@ -0,0 +1,25 @@
local util = require "plenary.async.util"
local M = {}
M.describe = function(s, async_func)
describe(s, async_func)
end
M.it = function(s, async_func)
it(s, util.will_block(async_func, tonumber(vim.env.PLENARY_TEST_TIMEOUT)))
end
M.pending = function(async_func)
pending(async_func)
end
M.before_each = function(async_func)
before_each(util.will_block(async_func))
end
M.after_each = function(async_func)
after_each(util.will_block(async_func))
end
return M

View file

@ -0,0 +1,145 @@
local a = require "plenary.async.async"
local vararg = require "plenary.vararg"
-- local control = a.control
local control = require "plenary.async.control"
local channel = control.channel
local M = {}
local defer_swapped = function(timeout, callback)
vim.defer_fn(callback, timeout)
end
---Sleep for milliseconds
---@param ms number
M.sleep = a.wrap(defer_swapped, 2)
---This will COMPLETELY block neovim
---please just use a.run unless you have a very special usecase
---for example, in plenary test_harness you must use this
---@param async_function Future
---@param timeout number: Stop blocking if the timeout was surpassed. Default 2000.
M.block_on = function(async_function, timeout)
async_function = M.protected(async_function)
local stat
local ret = {}
a.run(async_function, function(stat_, ...)
stat = stat_
ret = { ... }
end)
vim.wait(timeout or 2000, function()
return stat ~= nil
end, 20, false)
if stat == false then
error(string.format("Blocking on future timed out or was interrupted.\n%s", unpack(ret)))
end
return unpack(ret)
end
---@see M.block_on
---@param async_function Future
---@param timeout number
M.will_block = function(async_function, timeout)
return function()
M.block_on(async_function, timeout)
end
end
M.join = function(async_fns)
local len = #async_fns
local results = {}
if len == 0 then
return results
end
local done = 0
local tx, rx = channel.oneshot()
for i, async_fn in ipairs(async_fns) do
assert(type(async_fn) == "function", "type error :: future must be function")
local cb = function(...)
results[i] = { ... }
done = done + 1
if done == len then
tx()
end
end
a.run(async_fn, cb)
end
rx()
return results
end
---Returns a result from the future that finishes at the first
---@param async_functions table: The futures that you want to select
---@return ...
M.run_first = a.wrap(function(async_functions, step)
local ran = false
for _, async_function in ipairs(async_functions) do
assert(type(async_function) == "function", "type error :: future must be function")
local callback = function(...)
if not ran then
ran = true
step(...)
end
end
async_function(callback)
end
end, 2)
---Returns a result from the functions that finishes at the first
---@param funcs table: The async functions that you want to select
---@return ...
M.race = function(funcs)
local async_functions = vim.tbl_map(function(func)
return function(callback)
a.run(func, callback)
end
end, funcs)
return M.run_first(async_functions)
end
M.run_all = function(async_fns, callback)
a.run(function()
M.join(async_fns)
end, callback)
end
function M.apcall(async_fn, ...)
local nargs = a.get_leaf_function_argc(async_fn)
if nargs then
local tx, rx = channel.oneshot()
local stat, ret = pcall(async_fn, vararg.rotate(nargs, tx, ...))
if not stat then
return stat, ret
else
return stat, rx()
end
else
return pcall(async_fn, ...)
end
end
function M.protected(async_fn)
return function()
return M.apcall(async_fn)
end
end
---An async function that when called will yield to the neovim scheduler to be able to call the api.
M.scheduler = a.wrap(vim.schedule, 1)
return M

View file

@ -0,0 +1,84 @@
local a = require "plenary.async.async"
local uv = vim.loop
local M = {}
local function add(name, argc, custom)
local success, ret = pcall(a.wrap, custom or uv[name], argc)
if not success then
error("Failed to add function with name " .. name)
end
M[name] = ret
end
add("close", 4) -- close a handle
-- filesystem operations
add("fs_open", 4)
add("fs_read", 4)
add("fs_close", 2)
add("fs_unlink", 2)
add("fs_write", 4)
add("fs_mkdir", 3)
add("fs_mkdtemp", 2)
-- 'fs_mkstemp',
add("fs_rmdir", 2)
add("fs_scandir", 2)
add("fs_stat", 2)
add("fs_fstat", 2)
add("fs_lstat", 2)
add("fs_rename", 3)
add("fs_fsync", 2)
add("fs_fdatasync", 2)
add("fs_ftruncate", 3)
add("fs_sendfile", 5)
add("fs_access", 3)
add("fs_chmod", 3)
add("fs_fchmod", 3)
add("fs_utime", 4)
add("fs_futime", 4)
-- 'fs_lutime',
add("fs_link", 3)
add("fs_symlink", 4)
add("fs_readlink", 2)
add("fs_realpath", 2)
add("fs_chown", 4)
add("fs_fchown", 4)
-- 'fs_lchown',
add("fs_copyfile", 4)
add("fs_opendir", 3, function(path, entries, callback)
return uv.fs_opendir(path, callback, entries)
end)
add("fs_readdir", 2)
add("fs_closedir", 2)
-- 'fs_statfs',
-- stream
add("shutdown", 2)
add("listen", 3)
-- add('read_start', 2) -- do not do this one, the callback is made multiple times
add("write", 3)
add("write2", 4)
add("shutdown", 2)
-- tcp
add("tcp_connect", 4)
-- 'tcp_close_reset',
-- pipe
add("pipe_connect", 3)
-- udp
add("udp_send", 5)
add("udp_recv_start", 2)
-- fs event (wip make into async await event)
-- fs poll event (wip make into async await event)
-- dns
add("getaddrinfo", 4)
add("getnameinfo", 2)
return M