Skip to content

发现并行量上不来,drogon有隐含的设置的么 #2457

@liguheng

Description

@liguheng
using namespace drogon;
using namespace std::chrono_literals;

constexpr auto delay_time{100ms};
constexpr auto consumer_wait{100ms};
constexpr size_t queue_limit{1000000};



void CacheData::initAndStart(const Json::Value &config)
{
    moodycamel::ConcurrentQueue<CacheDatat> qs;
    std::vector<CacheDatat> vv(1000000);
    std::unordered_map<std::string, int64_t> m;
    std::cout<<"plugins"<<std::endl;

    auto loop=app().getLoop();
    this->TimerId=loop->runEvery(std::chrono::seconds(1),[this](){
        CacheDatat item;
        bool bb =true;
        bb=this->qs.try_dequeue(item);
        this->vv.push_back(std::move(item));
        while (bb) {
        bb=this->qs.try_dequeue(item);
        this->vv.push_back(std::move(item));
        }
        Json::Value json(Json::arrayValue);
        for(const auto &vv:this->vv){
            Json::Value v;
            v["start_time"]=static_cast<Json::Int64>(vv.start_time);
            v["end_time"]=static_cast<Json::Int64>(vv.end_time);
            v["msg"]=vv.msg;
            v["n"]=vv.n;
            v["entity"]=vv.entity;
            json.append(std::move(v));
        };
        this->vv.clear();
        Json::StreamWriterBuilder builder;
        builder["indentation"]="";
        auto json_data=Json::writeString(builder, json);
        auto redis_client=app().getRedisClient();
        redis_client->execCommandAsync([](const nosql::RedisResult& r)
        {}
        , [json_data](const nosql::RedisException& err){
            LOG_ERROR<<err.what();
        }, "SET client %s",json_data.c_str());
        redis_client->execCommandAsync([this](const nosql::RedisResult& r)
        {
            if (!r.isNil()) {         
                Json::CharReaderBuilder builder;
                Json::Value root;
                std::string errors;
                
                std::unique_ptr<Json::CharReader> reader(builder.newCharReader());
                auto data=r.asString();
                bool ok=reader->parse(data.c_str(), data.c_str()+data.size(), &root, &errors);
                if (!ok) {
                    LOG_ERROR<<errors;
                }else{
                    auto new_map=std::make_shared<std::unordered_map<std::string, double>>();
                    for(const auto &dd:root)(*new_map)[dd["entity"].asString()]=dd["n"].asDouble();
                    m_.store(new_map);
                }
            };
        }
        , [](const nosql::RedisException &err)
        {LOG_ERROR<<err.what();}
        , "GET server");

    });
    /// Initialize and start the plugin
}

void CacheData::producers(CacheData::CacheDatat&& ii){
    qs.try_enqueue(std::move(ii));
}

double CacheData::get(const std::string &key){
    auto current_map=m_.load();
    if (!current_map) {
        return 0;
    }
    auto it=current_map->find(key);
    return (it!=current_map->end())?it->second:0;
}

void CacheData::shutdown() 
{
    /// Shutdown the plugin
    if (this->TimerId!=trantor::InvalidTimerId) {
        app().getLoop()->invalidateTimer(this->TimerId);
    }
}

我注册了一个插件,在中间件中会使用这个插件producers将访问的数据插入应用的缓存里面,
定时将缓存数据插入到client键中,另一个服务器的应用会不断将这个键里面的数据提取出来不断地累计计算,
将计算结果存到server键里(数据量很小),测试了一下性能,
用rust的tokio的异步任务调度器不断增加并行用户量,到一万左右就上不去了,出现大连的timeout现象,我将threadnum调到4左右性能是最强的,再提升或者降低并行量都会下降,
测试视频,我想了很久,
还是质询一下你这个高手才行,代码仓库在这里,因为是自己手写的,有一些问题请多指正。

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions