Fixed not being able to download in parallel

- Removed `transfer` class in `http` namespace
- Consolidated the `http::context` and `http::multi` classes
This commit is contained in:
David Allen 2023-07-15 18:56:14 -06:00
parent 8ac3910730
commit 9d5c52f450
6 changed files with 346 additions and 244 deletions

View file

@ -3,6 +3,7 @@
#include "utils.hpp"
#include "log.hpp"
#include "error.hpp"
#include <atomic>
#include <curl/curl.h>
#include <curl/easy.h>
#include <curl/multi.h>
@ -14,14 +15,17 @@
namespace gdpm::http{
context::context(){
context::context(int max_transfers): max_transfers(max_transfers){
curl_global_init(CURL_GLOBAL_ALL);
curl = curl_easy_init();
cm = curl_multi_init();
set_max_transfers(max_transfers);
}
context::~context(){
curl_easy_cleanup(curl);
curl_multi_cleanup(cm);
curl_global_cleanup();
}
@ -39,11 +43,6 @@ namespace gdpm::http{
utils::memory_buffer buf = utils::make_buffer();
utils::memory_buffer data = utils::make_buffer();
response r;
#if (GDPM_DELAY_HTTP_REQUESTS == 1)
using namespace std::chrono_literals;
utils::delay();
#endif
if(curl){
curl_slist *list = add_headers(curl, params.headers);
if(params.method == method::POST){
@ -87,6 +86,122 @@ namespace gdpm::http{
}
responses context::requests(
const string_list& urls,
const http::request& params
){
if(cm == nullptr){
log::error(error(PRECONDITION_FAILED,
"http::multi::make_downloads(): multi client not initialized.")
);
}
if(urls.size() <= 0){
log::warn("No requests to make.");
}
transfers_left = urls.size();
transfers_index = 0;
auto add_transfer = [this](const string& url, const http::request& params)-> void {
CURL *curl = curl_easy_init();
utils::memory_buffer *data;
if(curl){
curl_slist *list = add_headers(curl, params.headers);
if(params.method == method::POST){
string h;
std::for_each(
params.headers.begin(),
params.headers.end(),
[&h](const string_pair& kv){
h += kv.first + "=" + kv.second + "&";
}
);
h.pop_back();
h = url_escape(h);
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, h.size());
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, h.c_str());
}
else if(params.method == method::GET){
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "GET");
}
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void*)&data);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_to_buffer);
curl_easy_setopt(curl, CURLOPT_NOPROGRESS, false);
curl_easy_setopt(curl, CURLOPT_XFERINFODATA, &data);
curl_easy_setopt(curl, CURLOPT_XFERINFOFUNCTION, show_download_progress);
curl_easy_setopt(curl, CURLOPT_USERAGENT, constants::UserAgent.c_str());
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, params.timeout);
}
transfers_index += 1;
};
for(int i = 0; i < max_transfers; i++){
add_transfer(urls.at(i), params);
}
do{
int still_running = 1;
int numfds = 0;
cres = curl_multi_perform(cm, &still_running);
if(cres == CURLM_OK){
/* wait for activity, timeout or "nothing" */
cres = curl_multi_poll(cm, NULL, 0, params.timeout, &numfds);
}
if(cres != CURLM_OK){
log::error(error(
ec::LIBCURL_ERR,
"http::multi::execute(): curl_multi_poll() returned an error"
));
break;
}
int messages_left = -1;
while((cmessage = curl_multi_info_read(cm, &messages_left))){
if(cmessage->msg == CURLMSG_DONE){
char *url;
CURL *eh = cmessage->easy_handle;
int code = 0;
curl_easy_getinfo(cmessage->easy_handle, CURLINFO_EFFECTIVE_URL, &url);
curl_easy_getinfo(cmessage->easy_handle, CURLINFO_RESPONSE_CODE, &code);
if((int)cmessage->data.result != CURLM_OK){
log::error(error(ec::LIBCURL_ERR,
std::format("http::context::execute({}): {} <url: {}>",
(int)cmessage->data.result, curl_easy_strerror(cmessage->data.result), url))
);
}
log::println("transfers left: {}", transfers_left);
cres = curl_multi_remove_handle(cm, eh);
if(cres != CURLM_OK){
log::error("http::context::execute(): curl_multi_remove_handle() returned error {}", cres);
}
curl_easy_cleanup(eh);
transfers_left -= 1;
}
else{
log::error(error(ec::LIBCURL_ERR,
std::format("http::context::execute(): {}", (int)cmessage->msg))
);
}
if(transfers_index < urls.size()-1){
add_transfer(urls.at(transfers_index), params);
}
}
if(transfers_left){
cres = curl_multi_wait(cm, NULL, 0, params.timeout, NULL);
if(cres != CURLM_OK){
log::error("http::context::execute(): curl_multi_wait() returned an error {}", cres);
}
}
}while(transfers_left);
cres = curl_multi_cleanup(cm);
if(cres != CURLM_OK){
log::error("http::context::execute(): curl_multi_cleanup() returned an error {}", cres);
}
return responses();
}
response context::download_file(
const string& url,
const string& storage_path,
@ -97,10 +212,6 @@ namespace gdpm::http{
response r;
FILE *fp;
#if (GDPM_DELAY_HTTP_REQUESTS == 1)
using namespace std::chrono_literals;
utils::delay();
#endif
if(curl){
fp = fopen(storage_path.c_str(), "wb");
utils::memory_buffer *data;
@ -131,6 +242,124 @@ namespace gdpm::http{
return r;
}
responses context::download_files(
const string_list &urls,
const string_list &storage_paths,
const http::request& params
){
if(cm == nullptr){
log::error(error(ec::PRECONDITION_FAILED,
"http::multi::make_downloads(): multi client not initialized.")
);
return responses();
}
if(urls.size() != storage_paths.size()){
log::error(error(ec::ASSERTION_FAILED,
"http::context::make_downloads(): urls.size() != storage_paths.size()"
));
}
transfers_left = urls.size();
transfers_index = 0;
auto add_transfer = [this](const string& url, const string& storage_path, const http::request& params){
CURL *curl = curl_easy_init();
utils::memory_buffer *data;
if(curl){
FILE *fp = fopen(storage_path.c_str(), "wb");
curl_slist *list = add_headers(curl, params.headers);
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_FAILONERROR, true);
curl_easy_setopt(curl, CURLOPT_HEADER, 0);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, true);
curl_easy_setopt(curl, CURLOPT_PRIVATE, fp);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, fp);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_to_stream);
curl_easy_setopt(curl, CURLOPT_NOPROGRESS, false);
curl_easy_setopt(curl, CURLOPT_XFERINFODATA, &data);
curl_easy_setopt(curl, CURLOPT_XFERINFOFUNCTION, show_download_progress);
curl_easy_setopt(curl, CURLOPT_USERAGENT, constants::UserAgent.c_str());
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, params.timeout);
if(params.verbose >= log::INFO){
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
}
cres = curl_multi_add_handle(cm, curl);
curl_slist_free_all(list);
if(cres != CURLM_OK){
log::error(ec::LIBCURL_ERR,
std::format("http::context::make_downloads(): {}", curl_multi_strerror(cres))
);
}
}
transfers_index += 1;
};
for(size_t i = 0; i < urls.size(); i++){
add_transfer(urls.at(i), storage_paths.at(i), params);
}
int still_running = 1;
int numfds = 0;
do{
cres = curl_multi_perform(cm, &still_running);
if(cres == CURLM_OK){
/* wait for activity, timeout or "nothing" */
cres = curl_multi_poll(cm, NULL, 0, 1000, &numfds);
}
if(cres != CURLM_OK){
log::error(error(
ec::LIBCURL_ERR,
"http::multi::execute(): curl_multi_poll() returned an error"
));
break;
}
int messages_left = -1;
while((cmessage = curl_multi_info_read(cm, &messages_left))){
if(cmessage->msg == CURLMSG_DONE){
char *url = nullptr;
FILE *fp = nullptr;
CURL *eh = cmessage->easy_handle;
int code = 0;
curl_easy_getinfo(cmessage->easy_handle, CURLINFO_EFFECTIVE_URL, &url);
curl_easy_getinfo(cmessage->easy_handle, CURLINFO_RESPONSE_CODE, &code);
curl_easy_getinfo(cmessage->easy_handle, CURLINFO_PRIVATE, &fp);
if((int)cmessage->data.result != CURLM_OK){
log::error(error(ec::LIBCURL_ERR,
std::format("http::context::execute({}): {} <url: {}>", (int)cmessage->data.result, curl_easy_strerror(cmessage->data.result), url))
);
}
cres = curl_multi_remove_handle(cm, eh);
if(cres != CURLM_OK){
log::error("http::context::execute(): curl_multi_remove_handle() returned error {}", cres);
}
transfers_left -= 1;
curl_easy_cleanup(eh);
if(fp)
fclose(fp);
}
else{
log::error(error(ec::LIBCURL_ERR,
std::format("http::context::execute(): {}", (int)cmessage->msg))
);
}
if(transfers_index < urls.size()-1){
add_transfer(urls.at(transfers_index), storage_paths.at(transfers_index), params);
}
}
if(transfers_left){
cres = curl_multi_wait(cm, NULL, 0, params.timeout, NULL);
if(cres != CURLM_OK){
log::error("http::context::execute(): curl_multi_wait() returned an error {}", cres);
}
}
}while(transfers_left);
cres = curl_multi_cleanup(cm);
if(cres != CURLM_OK){
log::error("http::context::execute(): curl_multi_cleanup() returned an error {}", cres);
}
return responses();
}
long context::get_download_size(const string& url){
CURLcode res;
if(curl){
@ -166,182 +395,55 @@ namespace gdpm::http{
}
multi::multi(long max_allowed_transfers){
curl_global_init(CURL_GLOBAL_ALL);
if(max_allowed_transfers > 1)
cm = curl_multi_init();
curl_multi_setopt(cm, CURLMOPT_MAXCONNECTS, (long)max_allowed_transfers);
}
multi::~multi(){
if(cm != nullptr)
curl_multi_cleanup(cm);
curl_global_cleanup();
void context::set_max_transfers(int max_transfers){
curl_multi_setopt(cm, CURLMOPT_MAX_TOTAL_CONNECTIONS, max_transfers);
}
string multi::url_escape(const string &url){
return curl_easy_escape(cm, url.c_str(), url.size());;
}
// multi::multi(long max_allowed_transfers){
// curl_global_init(CURL_GLOBAL_ALL);
// if(max_allowed_transfers > 1)
// cm = curl_multi_init();
// curl_multi_setopt(cm, CURLMOPT_MAXCONNECTS, (long)max_allowed_transfers);
// }
// multi::~multi(){
// cleanup();
// }
ptr<transfers> multi::make_requests(
const string_list& urls,
const http::request& params
){
if(cm == nullptr){
log::error(error(PRECONDITION_FAILED,
"http::multi::make_downloads(): multi client not initialized.")
);
return std::make_unique<transfers>();
}
if(urls.size() <= 0){
log::warn("No requests to make.");
return std::make_unique<transfers>();
}
ptr<transfers> ts = std::make_unique<transfers>();
for(const auto& url : urls){
transfer t;
if(t.curl){
curl_slist *list = add_headers(t.curl, params.headers);
if(params.method == method::POST){
string h;
std::for_each(
params.headers.begin(),
params.headers.end(),
[&h](const string_pair& kv){
h += kv.first + "=" + kv.second + "&";
}
);
h.pop_back();
h = url_escape(h);
curl_easy_setopt(t.curl, CURLOPT_POSTFIELDSIZE, h.size());
curl_easy_setopt(t.curl, CURLOPT_POSTFIELDS, h.c_str());
}
else if(params.method == method::GET){
curl_easy_setopt(t.curl, CURLOPT_CUSTOMREQUEST, "GET");
}
curl_easy_setopt(t.curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(t.curl, CURLOPT_WRITEDATA, (void*)&t.data);
curl_easy_setopt(t.curl, CURLOPT_WRITEFUNCTION, write_to_buffer);
curl_easy_setopt(t.curl, CURLOPT_NOPROGRESS, false);
curl_easy_setopt(t.curl, CURLOPT_XFERINFODATA, &t.data);
curl_easy_setopt(t.curl, CURLOPT_XFERINFOFUNCTION, show_download_progress);
curl_easy_setopt(t.curl, CURLOPT_USERAGENT, constants::UserAgent.c_str());
curl_easy_setopt(t.curl, CURLOPT_TIMEOUT_MS, params.timeout);
}
}
return ts;
}
// string multi::url_escape(const string &url){
// return curl_easy_escape(cm, url.c_str(), url.size());;
// }
ptr<transfers> multi::make_downloads(
const string_list& urls,
const string_list& storage_paths,
const http::request& params
){
if(cm == nullptr){
log::error(error(ec::PRECONDITION_FAILED,
"http::multi::make_downloads(): multi client not initialized.")
);
return std::make_unique<transfers>();
}
if(urls.size() != storage_paths.size()){
log::error(error(ec::ASSERTION_FAILED,
"http::context::make_downloads(): urls.size() != storage_paths.size()"
));
}
ptr<transfers> ts = std::make_unique<transfers>();
for(size_t i = 0; i < urls.size(); i++){
const string& url = urls.at(i);
const string& storage_path = storage_paths.at(i);
response r;
transfer t;
t.id = i;
if(t.curl){
t.fp = fopen(storage_path.c_str(), "wb");
curl_slist *list = add_headers(t.curl, params.headers);
curl_easy_setopt(t.curl, CURLOPT_URL, url.c_str());
// curl_easy_setopt(t.curl, CURLOPT_PRIVATE, url.c_str());
curl_easy_setopt(t.curl, CURLOPT_FAILONERROR, true);
curl_easy_setopt(t.curl, CURLOPT_HEADER, 0);
curl_easy_setopt(t.curl, CURLOPT_FOLLOWLOCATION, true);
curl_easy_setopt(t.curl, CURLOPT_WRITEDATA, t.fp);
curl_easy_setopt(t.curl, CURLOPT_WRITEFUNCTION, write_to_stream);
curl_easy_setopt(t.curl, CURLOPT_NOPROGRESS, false);
curl_easy_setopt(t.curl, CURLOPT_XFERINFODATA, &t.data);
curl_easy_setopt(t.curl, CURLOPT_XFERINFOFUNCTION, show_download_progress);
curl_easy_setopt(t.curl, CURLOPT_USERAGENT, constants::UserAgent.c_str());
curl_easy_setopt(t.curl, CURLOPT_TIMEOUT_MS, params.timeout);
cres = curl_multi_add_handle(cm, t.curl);
curl_slist_free_all(list);
if(cres != CURLM_OK){
log::error(ec::LIBCURL_ERR,
std::format("http::context::make_downloads(): {}", curl_multi_strerror(cres))
);
}
ts->emplace_back(std::move(t));
/* NOTE: Should the file pointer be closed here? */
}
}
// ptr<responses> multi::execute(
// ptr<transfers> transfers,
// size_t timeout
// ){
// if(cm == nullptr){
// log::error(error(PRECONDITION_FAILED,
// "http::multi::execute(): multi client not initialized")
// );
// return std::make_unique<responses>();
// }
// // if(transfers->empty()){
// // log::debug("http::multi::execute(): no transfers found");
// // return std::make_unique<responses>();
// // }
// // size_t transfers_left = transfers->size();
// ptr<responses> responses = std::make_unique<http::responses>(transfers->size());
// }
return ts;
}
ptr<responses> multi::execute(
ptr<transfers> transfers,
size_t timeout
){
if(cm == nullptr){
log::error(error(PRECONDITION_FAILED,
"http::multi::execute(): multi client not initialized")
);
return std::make_unique<responses>();
}
if(transfers->empty()){
log::debug("http::multi::execute(): no transfers found");
return std::make_unique<responses>();
}
size_t transfers_left = transfers->size();
ptr<responses> responses = std::make_unique<http::responses>(transfers->size());
do{
int still_alive = 1;
cres = curl_multi_perform(cm, &still_alive);
while((cmessage = curl_multi_info_read(cm, &messages_left))){
if(cmessage->msg == CURLMSG_DONE){
char *url = nullptr;
transfer& t = transfers->at(transfers_left-1);
response& r = responses->at(transfers_left-1);
t.curl = cmessage->easy_handle;
curl_easy_getinfo(cmessage->easy_handle, CURLINFO_EFFECTIVE_URL, &url);
curl_easy_getinfo(cmessage->easy_handle, CURLINFO_RESPONSE_CODE, &r.code);
if((int)cmessage->data.result != CURLM_OK){
log::error(error(ec::LIBCURL_ERR,
std::format("http::context::execute({}): {} <url: {}>", (int)cmessage->data.result, curl_easy_strerror(cmessage->data.result), url))
);
}
curl_multi_remove_handle(cm, t.curl);
curl_easy_cleanup(t.curl);
if(t.fp) fclose(t.fp);
transfers->pop_back();
transfers_left -= 1;
}
else{
log::error(error(ec::LIBCURL_ERR,
std::format("http::context::execute(): {}", (int)cmessage->msg))
);
}
}
if(transfers_left)
curl_multi_wait(cm, NULL, 0, timeout, NULL);
}while(transfers_left);
return responses;
}
// void multi::cleanup(){
// if(cm != nullptr){
// log::println("curl_multi_cleanup");
// curl_multi_cleanup(cm);
// }
// log::println("curl_global_cleanup");
// curl_global_cleanup();
// }
curl_slist* add_headers(
@ -392,7 +494,7 @@ namespace gdpm::http{
){
if(nmemb == 0)
return 0;
return fwrite(ptr, size, nmemb, (FILE*)userdata);
}