Концептуально то, что я пытаюсь сделать, очень просто. У меня есть поток Readable
в узле, и я передаю его встроенному аддону C++, где я хочу подключить его к IInputStream
.
Нативная библиотека, которую я использую, работает подобно многим потоковым интерфейсам C++ (или Java), которые я видел. Библиотека предоставляет интерфейс IInputStream
(технически абстрактный класс), который я наследую от виртуальных функций и переопределяю их. Выглядит так:
class JsReadable2InputStream : public IInputStream {
public:
// Constructor takes a js v8 object, makes a stream out of it
JsReadable2InputStream(const v8::Local<v8::Object>& streamObj);
~JsReadable2InputStream();
/**
* Blocking read. Blocks until the requested amount of data has been read. However,
* if the stream reaches its end before the requested amount of bytes has been read
* it returns the number of bytes read thus far.
*
* @param begin memory into which read data is copied
* @param byteCount the requested number of bytes
* @return the number of bytes actually read. Is less than bytesCount iff
* end of stream has been reached.
*/
virtual int read(char* begin, const int byteCount) override;
virtual int available() const override;
virtual bool isActive() const override;
virtual void close() override;
private:
Nan::Persistent<v8::Object> _stream;
bool _active;
JsEventLoopSync _evtLoop;
};
Из этих функций важной здесь является read
. Собственная библиотека будет вызывать эту функцию, когда ей нужно больше данных, и функция должна блокироваться, пока она не сможет вернуть запрошенные данные (или поток не закончится). Вот моя реализация read
:
int JsReadable2InputStream::read(char* begin, const int byteCount) {
if (!this->_active) { return 0; }
int read = -1;
while (read < 0 && this->_active) {
this->_evtLoop.invoke(
(voidLambda)[this,&read,begin,byteCount](){
v8::Local<v8::Object> stream = Nan::New(this->_stream);
const v8::Local<v8::Function> readFn = Nan::To<v8::Function>(Nan::Get(stream, JS_STR("read")).ToLocalChecked()).ToLocalChecked();
v8::Local<v8::Value> argv[] = { Nan::New<v8::Number>(byteCount) };
v8::Local<v8::Value> result = Nan::Call(readFn, stream, 1, argv).ToLocalChecked();
if (result->IsNull()) {
// Somewhat hacky/brittle way to check if stream has ended, but it's the only option
v8::Local<v8::Object> readableState = Nan::To<v8::Object>(Nan::Get(stream, JS_STR("_readableState")).ToLocalChecked()).ToLocalChecked();
if (Nan::To<bool>(Nan::Get(readableState, JS_STR("ended")).ToLocalChecked()).ToChecked()) {
// End of stream, all data has been read
this->_active = false;
read = 0;
return;
}
// Not enough data available, but stream is still open.
// Set a flag for the c++ thread to go to sleep
// This is the case that it gets stuck in
read = -1;
return;
}
v8::Local<v8::Object> bufferObj = Nan::To<v8::Object>(result).ToLocalChecked();
int len = Nan::To<int32_t>(Nan::Get(bufferObj, JS_STR("length")).ToLocalChecked()).ToChecked();
char* buffer = node::Buffer::Data(bufferObj);
if (len < byteCount) {
this->_active = false;
}
// copy the data out of the buffer
if (len > 0) {
std::memcpy(begin, buffer, len);
}
read = len;
}
);
if (read < 0) {
// Give js a chance to read more data
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
return read;
}
Идея в том, что код C++ хранит ссылку на объект потока узлов. Когда собственный код хочет прочитать, он должен синхронизироваться с циклом событий узла, а затем попытаться вызвать read
в потоке узла. Если поток узла возвращает null
, это указывает на то, что данные не готовы, поэтому собственный поток засыпает, давая потоку цикла событий узла возможность запустить и заполнить свои буферы.
Это решение работает с отлично для одного потока или даже для 2 или 3 потоков, работающих параллельно. Затем по какой-то причине, когда я попадаю в магическое количество параллельных потоков 4+, это полностью блокируется. Ни один из потоков вообще не может успешно прочитать какие-либо байты. Вышеупомянутый цикл while
выполняется бесконечно, при этом вызов в поток узла каждый раз возвращает null
.
Он ведет себя так, как будто узел голодает, и потоки никогда не получают возможности заполниться данными. Однако я попытался отрегулировать продолжительность сна (на гораздо большие значения и рандомизированные значения), и это не дало никакого эффекта. Также ясно, что цикл обработки событий продолжает выполняться, поскольку моя лямбда-функция продолжает выполняться там (я положил внутрь несколько printf
, чтобы подтвердить это).
На всякий случай, если это может быть актуально (я не думаю, что это так), я также включаю свою реализацию JsEventLoopSync
. Это использует libuv для планирования выполнения лямбда-выражения в цикле событий узла. Он спроектирован таким образом, что одновременно можно запланировать только один вызов, а другие вызовы должны ждать завершения первого.
#include <nan.h>
#include <functional>
// simplified type declarations for the lambda functions
using voidLambda = std::function<void ()>;
// Synchronize with the node v8 event loop. Invokes a lambda function on the event loop, where access to js objects is safe.
// Blocks execution of the invoking thread until execution of the lambda completes.
class JsEventLoopSync {
public:
JsEventLoopSync() : _destroyed(false) {
// register on the default (same as node) event loop, so that we can execute callbacks in that context
// This takes a function pointer, which only works with a static function
this->_handles = new async_handles_t();
this->_handles->inst = this;
uv_async_init(uv_default_loop(), &this->_handles->async, JsEventLoopSync::_processUvCb);
// mechanism for passing this instance through to the native uv callback
this->_handles->async.data = this->_handles;
// mutex has to be initialized
uv_mutex_init(&this->_handles->mutex);
uv_cond_init(&this->_handles->cond);
}
~JsEventLoopSync() {
uv_mutex_lock(&this->_handles->mutex);
// prevent access to deleted instance by callback
this->_destroyed = true;
uv_mutex_unlock(&this->_handles->mutex);
// NOTE: Important, this->_handles must be a dynamically allocated pointer because uv_close() is
// async, and still has a reference to it. If it were statically allocated as a class member, this
// destructor would free the memory before uv_close was done with it (leading to asserts in libuv)
uv_close(reinterpret_cast<uv_handle_t*>(&this->_handles->async), JsEventLoopSync::_asyncClose);
}
// called from the native code to invoke the function
void invoke(const voidLambda& fn) {
if (v8::Isolate::GetCurrent() != NULL) {
// Already on the event loop, process now
return fn();
}
// Need to sync with the event loop
uv_mutex_lock(&this->_handles->mutex);
if (this->_destroyed) { return; }
this->_fn = fn;
// this will invoke processUvCb, on the node event loop
uv_async_send(&this->_handles->async);
// wait for it to complete processing
uv_cond_wait(&this->_handles->cond, &this->_handles->mutex);
uv_mutex_unlock(&this->_handles->mutex);
}
private:
// pulls data out of uv's void* to call the instance method
static void _processUvCb(uv_async_t* handle) {
if (handle->data == NULL) { return; }
auto handles = static_cast<async_handles_t*>(handle->data);
handles->inst->_process();
}
inline static void _asyncClose(uv_handle_t* handle) {
auto handles = static_cast<async_handles_t*>(handle->data);
handle->data = NULL;
uv_mutex_destroy(&handles->mutex);
uv_cond_destroy(&handles->cond);
delete handles;
}
// Creates the js arguments (populated by invoking the lambda), then invokes the js function
// Invokes resultLambda on the result
// Must be run on the node event loop!
void _process() {
if (v8::Isolate::GetCurrent() == NULL) {
// This is unexpected!
throw std::logic_error("Unable to sync with node event loop for callback!");
}
uv_mutex_lock(&this->_handles->mutex);
if (this->_destroyed) { return; }
Nan::HandleScope scope; // looks unused, but this is very important
// invoke the lambda
this->_fn();
// signal that we're done
uv_cond_signal(&this->_handles->cond);
uv_mutex_unlock(&this->_handles->mutex);
}
typedef struct async_handles {
uv_mutex_t mutex;
uv_cond_t cond;
uv_async_t async;
JsEventLoopSync* inst;
} async_handles_t;
async_handles_t* _handles;
voidLambda _fn;
bool _destroyed;
};
Итак, что мне не хватает? Есть ли лучший способ подождать, пока поток узла получит возможность запуститься? Есть ли совершенно другой шаблон дизайна, который бы работал лучше? Есть ли у узла верхний предел количества потоков, которые он может обрабатывать за один раз?
Как оказалось, проблемы, которые я видел, на самом деле были ограничениями на стороне клиента. Браузеры (и, по-видимому, также узел) имеют ограничение на количество открытых TCP-соединений с одним и тем же источником. Я решил эту проблему, создав несколько узловых процессов для тестирования.
Если кто-то пытается сделать что-то подобное, код, которым я поделился, вполне жизнеспособен. Если у меня когда-нибудь появится свободное время, я могу превратить его в библиотеку.