libzypp 17.31.31
MediaMultiCurl.cc
Go to the documentation of this file.
1/*---------------------------------------------------------------------\
2| ____ _ __ __ ___ |
3| |__ / \ / / . \ . \ |
4| / / \ V /| _/ _/ |
5| / /__ | | | | | | |
6| /_____||_| |_| |_| |
7| |
8\---------------------------------------------------------------------*/
13#include <ctype.h>
14#include <sys/types.h>
15#include <signal.h>
16#include <sys/wait.h>
17#include <netdb.h>
18#include <arpa/inet.h>
19#include <glib.h>
20
21#include <vector>
22#include <iostream>
23#include <algorithm>
24
25
26#include <zypp/ManagedFile.h>
27#include <zypp/ZConfig.h>
28#include <zypp/base/Logger.h>
31#include <zypp-core/zyppng/base/private/linuxhelpers_p.h>
32#include <zypp-curl/parser/MetaLinkParser>
35#include <zypp-curl/auth/CurlAuthData>
38
39using std::endl;
40using namespace zypp::base;
41
42#undef CURLVERSION_AT_LEAST
43#define CURLVERSION_AT_LEAST(M,N,O) LIBCURL_VERSION_NUM >= ((((M)<<8)+(N))<<8)+(O)
44
45namespace zypp {
46 namespace media {
47
49
50
114
115struct Stripe {
116
117 enum RState {
118 PENDING, //< Pending Range
119 FETCH, //< Fetch is running!
120 COMPETING, //< Competing workers, needs checksum recheck
121 FINALIZED, //< Done, don't write to it anymore
122 REFETCH //< This block needs a refetch
123 };
124
125 std::vector<off_t> blocks; //< required block numbers from blocklist
126 std::vector<RState> blockStates; //< current state of each block in blocks
127};
128
138
139// Hack: we derive from MediaCurl just to get the storage space for
140// settings, url, curlerrors and the like
142 friend class multifetchrequest;
143
144public:
145 multifetchworker(int no, multifetchrequest &request, const Url &url);
147
152 void nextjob();
153
158 void runjob();
159
165 bool continueJob();
166
171 bool recheckChecksum( off_t blockIdx );
172
176 void disableCompetition();
177
181 void checkdns();
182 void adddnsfd( std::vector<GPollFD> &waitFds );
183 void dnsevent(const std::vector<GPollFD> &waitFds );
184
185 const int _workerno;
186
188 bool _competing = false;
189
190 std::vector<MultiByteHandler::Range> _blocks;
191 std::vector<off_t> _rangeToStripeBlock;
192
194 std::unique_ptr<MultiByteHandler> _multiByteHandler;
195
196 off_t _stripe = 0; //< The current stripe we are downloading
197 size_t _datasize = 0; //< The nr of bytes we need to download overall
198
199 double _starttime = 0; //< When was the current job started
200 size_t _datareceived = 0; //< Data downloaded in the current job only
201 off_t _received = 0; //< Overall data"MultiByteHandler::prepare failed" fetched by this worker
202
203 double _avgspeed = 0;
204 double _maxspeed = 0;
205
206 double _sleepuntil = 0;
207
208private:
209 void run();
210 void stealjob();
211 bool setupHandle();
212 MultiByteHandler::Range rangeFromBlock( off_t blockNo ) const;
213
214 size_t writefunction ( char *ptr, std::optional<off_t> offset, size_t bytes ) override;
215 size_t headerfunction ( char *ptr, size_t bytes ) override;
216 bool beginRange ( off_t range, std::string &cancelReason ) override;
217 bool finishedRange ( off_t range, bool validated, std::string &cancelReason ) override;
218
220 int _pass = 0;
221 std::string _urlbuf;
222
223 pid_t _pid = 0;
224 int _dnspipe = -1;
225};
226
228public:
229 multifetchrequest(const MediaMultiCurl *context, const Pathname &filename, const Url &baseurl, CURLM *multi, FILE *fp, callback::SendReport<DownloadProgressReport> *report, MediaBlockList &&blklist, off_t filesize);
231
232 void run(std::vector<Url> &urllist);
233 static ByteCount makeBlksize( uint maxConns, size_t filesize );
234
236 return _blklist;
237 }
238
239protected:
240 friend class multifetchworker;
241
245
246 FILE *_fp = nullptr;
249
250 std::vector<Stripe> _requiredStripes; // all the data we need
251
252 off_t _filesize = 0; //< size of the file we want to download
253
254 std::list< std::unique_ptr<multifetchworker> > _workers;
255 bool _stealing = false;
256 bool _havenewjob = false;
257
258 zypp::ByteCount _defaultBlksize = 0; //< The blocksize to use if the metalink file does not specify one
259 off_t _stripeNo = 0; //< next stripe to download
260
261 size_t _activeworkers = 0;
262 size_t _lookupworkers = 0;
263 size_t _sleepworkers = 0;
264 double _minsleepuntil = 0;
265 bool _finished = false;
266
267 off_t _totalsize = 0; //< nr of bytes we need to download ( e.g. filesize - ( bytes reused from deltafile ) )
268 off_t _fetchedsize = 0;
270
271 double _starttime = 0;
272 double _lastprogress = 0;
273
276 double _periodavg = 0;
277
278public:
279 double _timeout = 0;
281 double _maxspeed = 0;
282 int _maxworkers = 0;
283};
284
285constexpr auto MIN_REQ_MIRRS = 4;
286constexpr auto MAXURLS = 10;
287
289
290static double
292{
293#if _POSIX_C_SOURCE >= 199309L
294 struct timespec ts;
295 if ( clock_gettime( CLOCK_MONOTONIC, &ts) )
296 return 0;
297 return ts.tv_sec + ts.tv_nsec / 1000000000.;
298#else
299 struct timeval tv;
300 if (gettimeofday(&tv, NULL))
301 return 0;
302 return tv.tv_sec + tv.tv_usec / 1000000.;
303#endif
304}
305
306size_t
307multifetchworker::writefunction(char *ptr, std::optional<off_t> offset, size_t bytes)
308{
310 return bytes ? 0 : 1;
311
312 double now = currentTime();
313
314 // update stats of overall data
315 _datareceived += bytes;
316 _received += bytes;
317 _request->_lastprogress = now;
318
319 const auto &currRange = _multiByteHandler->currentRange();
320 if (!currRange)
321 return 0; // we always write to a range
322
323 auto &stripeDesc = _request->_requiredStripes[_stripe];
324 if ( !_request->_fp || stripeDesc.blockStates[ _rangeToStripeBlock[*currRange] ] == Stripe::FINALIZED ) {
325 // someone else finished our block first!
326 // we stop here and fetch new jobs if there are still some
328 _competing = false;
329 return 0;
330 }
331
332 const auto &blk = _blocks[*currRange];
333 off_t seekTo = blk.start + blk.bytesWritten;
334
335 if ( ftell( _request->_fp ) != seekTo ) {
336 // if we can't seek the file there is no purpose in trying again
337 if (fseeko(_request->_fp, seekTo, SEEK_SET))
338 return bytes ? 0 : 1;
339 }
340
341 size_t cnt = fwrite(ptr, 1, bytes, _request->_fp);
342 _request->_fetchedsize += cnt;
343 return cnt;
344}
345
346bool multifetchworker::beginRange ( off_t workerRangeOff, std::string &cancelReason )
347{
348 auto &stripeDesc = _request->_requiredStripes[_stripe];
349 auto stripeRangeOff = _rangeToStripeBlock[workerRangeOff];
350 const auto &currRangeState = stripeDesc.blockStates[stripeRangeOff];
351
352 if ( currRangeState == Stripe::FINALIZED ){
353 cancelReason = "Cancelled because stripe block is already finalized";
355 WAR << "#" << _workerno << ": trying to start a range ("<<stripeRangeOff<<"["<< _blocks[workerRangeOff].start <<" : "<<_blocks[workerRangeOff].len<<"]) that was already finalized, cancelling. Stealing was: " << _request->_stealing << endl;
356 return false;
357 }
358 stripeDesc.blockStates[stripeRangeOff] = currRangeState == Stripe::PENDING ? Stripe::FETCH : Stripe::COMPETING;
359 return true;
360}
361
362bool multifetchworker::finishedRange ( off_t workerRangeOff, bool validated, std::string &cancelReason )
363{
364 auto &stripeDesc = _request->_requiredStripes[_stripe];
365 auto stripeRangeOff = _rangeToStripeBlock[workerRangeOff];
366 const auto &currRangeState = stripeDesc.blockStates[stripeRangeOff];
367
368 if ( !validated ) {
369 // fail, worker will go into WORKER_BROKEN
370 cancelReason = "Block failed to validate";
371 return false;
372 }
373
374 if ( currRangeState == Stripe::FETCH ) {
375 // only us who wrote here, block is finalized
376 stripeDesc.blockStates[stripeRangeOff] = Stripe::FINALIZED;
377 _request->_fetchedgoodsize += _blocks[workerRangeOff].len;
378 } else {
379 // others wrote here, we need to check the full checksum
380 if ( recheckChecksum ( workerRangeOff ) ) {
381 stripeDesc.blockStates[stripeRangeOff] = Stripe::FINALIZED;
382 _request->_fetchedgoodsize += _blocks[workerRangeOff].len;
383 } else {
384 // someone messed that block up, set it to refetch but continue since our
385 // data is valid
386 WAR << "#" << _workerno << ": Broken data in COMPETING block, requesting refetch. Stealing is: " << _request->_stealing << endl;
387 stripeDesc.blockStates[stripeRangeOff] = Stripe::REFETCH;
388 }
389 }
390 return true;
391}
392
393size_t
394multifetchworker::headerfunction( char *p, size_t bytes )
395{
396 size_t l = bytes;
397 if (l > 9 && !strncasecmp(p, "Location:", 9)) {
398 std::string line(p + 9, l - 9);
399 if (line[l - 10] == '\r')
400 line.erase(l - 10, 1);
401 XXX << "#" << _workerno << ": redirecting to" << line << endl;
402 return bytes;
403 }
404
405 const auto &repSize = _multiByteHandler->reportedFileSize ();
406 if ( repSize && *repSize != _request->_filesize ) {
407 XXX << "#" << _workerno << ": filesize mismatch" << endl;
409 strncpy(_curlError, "filesize mismatch", CURL_ERROR_SIZE);
410 return 0;
411 }
412
413 return bytes;
414}
415
417: MediaCurl(url, Pathname())
418, _workerno( no )
419, _maxspeed( request._maxspeed )
420, _request ( &request )
421{
422 Url curlUrl( clearQueryString(url) );
423 _urlbuf = curlUrl.asString();
425 if (_curl)
426 XXX << "reused worker from pool" << endl;
427 if (!_curl && !(_curl = curl_easy_init()))
428 {
430 strncpy(_curlError, "curl_easy_init failed", CURL_ERROR_SIZE);
431 return;
432 }
433
434 if ( url.getScheme() == "http" || url.getScheme() == "https" )
436
437 setupHandle();
438 checkdns();
439}
440
442{
443 try {
444 setupEasy();
445 } catch (Exception &ex) {
446 curl_easy_cleanup(_curl);
447 _curl = 0;
449 strncpy(_curlError, "curl_easy_setopt failed", CURL_ERROR_SIZE);
450 return false;
451 }
452 curl_easy_setopt(_curl, CURLOPT_PRIVATE, this);
453 curl_easy_setopt(_curl, CURLOPT_URL, _urlbuf.c_str());
454
455 // if this is the same host copy authorization
456 // (the host check is also what curl does when doing a redirect)
457 // (note also that unauthorized exceptions are thrown with the request host)
458 if ( _url.getHost() == _request->_context->_url.getHost()) {
462 if ( _settings.userPassword().size() ) {
463 curl_easy_setopt(_curl, CURLOPT_USERPWD, _settings.userPassword().c_str());
464 std::string use_auth = _settings.authType();
465 if (use_auth.empty())
466 use_auth = "digest,basic"; // our default
467 long auth = CurlAuthData::auth_type_str2long(use_auth);
468 if( auth != CURLAUTH_NONE)
469 {
470 XXX << "#" << _workerno << ": Enabling HTTP authentication methods: " << use_auth
471 << " (CURLOPT_HTTPAUTH=" << auth << ")" << std::endl;
472 curl_easy_setopt(_curl, CURLOPT_HTTPAUTH, auth);
473 }
474 }
475 }
476 return true;
477}
478
480{
481 if (_curl)
482 {
484 curl_multi_remove_handle(_request->_multi, _curl);
486 {
487#if CURLVERSION_AT_LEAST(7,15,5)
488 curl_easy_setopt(_curl, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t)0);
489#endif
490 curl_easy_setopt(_curl, CURLOPT_PRIVATE, (void *)0);
491 curl_easy_setopt(_curl, CURLOPT_WRITEFUNCTION, (void *)0);
492 curl_easy_setopt(_curl, CURLOPT_WRITEDATA, (void *)0);
493 curl_easy_setopt(_curl, CURLOPT_HEADERFUNCTION, (void *)0);
494 curl_easy_setopt(_curl, CURLOPT_HEADERDATA, (void *)0);
496 }
497 else
498 curl_easy_cleanup(_curl);
499 _curl = 0;
500 }
501 if (_pid)
502 {
503 kill(_pid, SIGKILL);
504 int status;
505 while (waitpid(_pid, &status, 0) == -1)
506 if (errno != EINTR)
507 break;
508 _pid = 0;
509 }
510 if (_dnspipe != -1)
511 {
512 close(_dnspipe);
513 _dnspipe = -1;
514 }
515 // the destructor in MediaCurl doesn't call disconnect() if
516 // the media is not attached, so we do it here manually
518}
519
520static inline bool env_isset(std::string name)
521{
522 const char *s = getenv(name.c_str());
523 return s && *s ? true : false;
524}
525
526void
528{
529 std::string host = _url.getHost();
530
531 if (host.empty())
532 return;
533
534 if (_request->_context->isDNSok(host))
535 return;
536
537 // no need to do dns checking for numeric hosts
538 char addrbuf[128];
539 if (inet_pton(AF_INET, host.c_str(), addrbuf) == 1)
540 return;
541 if (inet_pton(AF_INET6, host.c_str(), addrbuf) == 1)
542 return;
543
544 // no need to do dns checking if we use a proxy
545 if (!_settings.proxy().empty())
546 return;
547 if (env_isset("all_proxy") || env_isset("ALL_PROXY"))
548 return;
549 std::string schemeproxy = _url.getScheme() + "_proxy";
550 if (env_isset(schemeproxy))
551 return;
552 if (schemeproxy != "http_proxy")
553 {
554 std::transform(schemeproxy.begin(), schemeproxy.end(), schemeproxy.begin(), ::toupper);
555 if (env_isset(schemeproxy))
556 return;
557 }
558
559 XXX << "checking DNS lookup of " << host << endl;
560 int pipefds[2];
561 if (pipe(pipefds))
562 {
564 strncpy(_curlError, "DNS pipe creation failed", CURL_ERROR_SIZE);
565 return;
566 }
567 _pid = fork();
568 if (_pid == pid_t(-1))
569 {
570 close(pipefds[0]);
571 close(pipefds[1]);
572 _pid = 0;
574 strncpy(_curlError, "DNS checker fork failed", CURL_ERROR_SIZE);
575 return;
576 }
577 else if (_pid == 0)
578 {
579 close(pipefds[0]);
580 // XXX: close all other file descriptors
581 struct addrinfo *ai, aihints;
582 memset(&aihints, 0, sizeof(aihints));
583 aihints.ai_family = PF_UNSPEC;
584 int tstsock = socket(PF_INET6, SOCK_DGRAM | SOCK_CLOEXEC, 0);
585 if (tstsock == -1)
586 aihints.ai_family = PF_INET;
587 else
588 close(tstsock);
589 aihints.ai_socktype = SOCK_STREAM;
590 aihints.ai_flags = AI_CANONNAME;
591 unsigned int connecttimeout = _request->_connect_timeout;
592 if (connecttimeout)
593 alarm(connecttimeout);
594 signal(SIGALRM, SIG_DFL);
595 if (getaddrinfo(host.c_str(), NULL, &aihints, &ai))
596 _exit(1);
597 _exit(0);
598 }
599 close(pipefds[1]);
600 _dnspipe = pipefds[0];
602}
603
604void
605multifetchworker::adddnsfd(std::vector<GPollFD> &waitFds)
606{
607 if (_state != WORKER_LOOKUP)
608 return;
609
610 waitFds.push_back (
611 GPollFD {
612 .fd = _dnspipe,
613 .events = G_IO_IN | G_IO_HUP | G_IO_ERR,
614 .revents = 0
615 });
616}
617
618void
619multifetchworker::dnsevent( const std::vector<GPollFD> &waitFds )
620{
621 bool hasEvent = std::any_of( waitFds.begin (), waitFds.end(),[this]( const GPollFD &waitfd ){
622 return ( waitfd.fd == _dnspipe && waitfd.revents != 0 );
623 });
624
625 if (_state != WORKER_LOOKUP || !hasEvent)
626 return;
627 int status;
628 while (waitpid(_pid, &status, 0) == -1)
629 {
630 if (errno != EINTR)
631 return;
632 }
633 _pid = 0;
634 if (_dnspipe != -1)
635 {
636 close(_dnspipe);
637 _dnspipe = -1;
638 }
639 if (!WIFEXITED(status))
640 {
642 strncpy(_curlError, "DNS lookup failed", CURL_ERROR_SIZE);
644 return;
645 }
646 int exitcode = WEXITSTATUS(status);
647 XXX << "#" << _workerno << ": DNS lookup returned " << exitcode << endl;
648 if (exitcode != 0)
649 {
651 strncpy(_curlError, "DNS lookup failed", CURL_ERROR_SIZE);
653 return;
654 }
656 nextjob();
657}
658
659bool multifetchworker::recheckChecksum( off_t workerRangeIdx )
660{
661 // XXX << "recheckChecksum block " << _blkno << endl;
662 if (!_request->_fp || !_datasize || !_blocks.size() )
663 return true;
664
665 auto &blk = _blocks[workerRangeIdx];
666 if ( !blk._digest )
667 return true;
668
669 const auto currOf = ftell( _request->_fp );
670 if ( currOf == -1 )
671 return false;
672
673 if (fseeko(_request->_fp, blk.start, SEEK_SET))
674 return false;
675
676 zypp::Digest newDig = blk._digest->clone();
677
678 char buf[4096];
679 size_t l = blk.len;
680 while (l) {
681 size_t cnt = l > sizeof(buf) ? sizeof(buf) : l;
682 if (fread(buf, cnt, 1, _request->_fp) != 1)
683 return false;
684 newDig.update(buf, cnt);
685 l -= cnt;
686 }
687
688 if (fseeko(_request->_fp, currOf, SEEK_SET))
689 return false;
690
691 blk._digest = std::move(newDig);
692 if (!_multiByteHandler->validateRange(blk)) {
693 WAR << "#" << _workerno << " Stripe: " << _stripe << ": Stripe-Block: " << _rangeToStripeBlock[workerRangeIdx] << " failed to validate" << endl;
694 return false;
695 }
696
697 return true;
698}
699
704{
705 UByteArray sum;
706 std::optional<zypp::Digest> digest;
707 std::optional<size_t> relDigLen;
708 std::optional<size_t> blkSumPad;
709
710 const auto &blk = _request->_blklist.getBlock( blkNo );
711 if ( _request->_blklist.haveChecksum ( blkNo ) ) {
712 sum = _request->_blklist.getChecksum( blkNo );
713 relDigLen = sum.size( );
714 blkSumPad = _request->_blklist.checksumPad( );
715 digest = zypp::Digest();
716 digest->create( _request->_blklist.getChecksumType() );
717 }
718
719 return MultiByteHandler::Range::make(
720 blk.off,
721 blk.size,
722 std::move(digest),
723 std::move(sum),
724 {}, // empty user data
725 std::move(relDigLen),
726 std::move(blkSumPad) );
727}
728
730{
731 if (!_request->_stealing)
732 {
733 XXX << "start stealing!" << endl;
734 _request->_stealing = true;
735 }
736
737 multifetchworker *best = 0; // best choice for the worker we want to compete with
738 double now = 0;
739
740 // look through all currently running workers to find the best candidate we
741 // could steal from
742 for (auto workeriter = _request->_workers.begin(); workeriter != _request->_workers.end(); ++workeriter)
743 {
744 multifetchworker *worker = workeriter->get();
745 if (worker == this)
746 continue;
747 if (worker->_pass == -1)
748 continue; // do not steal!
749 if (worker->_state == WORKER_DISCARD || worker->_state == WORKER_DONE || worker->_state == WORKER_SLEEP || !worker->_datasize)
750 continue; // do not steal finished jobs
751 if (!worker->_avgspeed && worker->_datareceived)
752 {
753 // calculate avg speed for the worker if that was not done yet
754 if (!now)
755 now = currentTime();
756 if (now > worker->_starttime)
757 worker->_avgspeed = worker->_datareceived / (now - worker->_starttime);
758 }
759 // only consider worker who still have work
760 if ( worker->_datasize - worker->_datareceived <= 0 )
761 continue;
762 if (!best || best->_pass > worker->_pass)
763 {
764 best = worker;
765 continue;
766 }
767 if (best->_pass < worker->_pass)
768 continue;
769 // if it is the same stripe, our current best choice is competing with the worker we are looking at
770 // we need to check if we are faster than the fastest one competing for this stripe, so we want the best.
771 // Otherwise the worst.
772 if (worker->_stripe == best->_stripe)
773 {
774 if ((worker->_datasize - worker->_datareceived) * best->_avgspeed < (best->_datasize - best->_datareceived) * worker->_avgspeed)
775 best = worker;
776 }
777 else
778 {
779 if ((worker->_datasize - worker->_datareceived) * best->_avgspeed > (best->_datasize - best->_datareceived) * worker->_avgspeed)
780 best = worker;
781 }
782 }
783 if (!best)
784 {
787 _request->_finished = true;
788 return;
789 }
790 // do not sleep twice
791 if (_state != WORKER_SLEEP)
792 {
793 if (!_avgspeed && _datareceived)
794 {
795 if (!now)
796 now = currentTime();
797 if (now > _starttime)
799 }
800
801 // lets see if we should sleep a bit
802 XXX << "me #" << _workerno << ": " << _avgspeed << ", size " << best->_datasize << endl;
803 XXX << "best #" << best->_workerno << ": " << best->_avgspeed << ", size " << (best->_datasize - best->_datareceived) << endl;
804
805 // check if we could download the full data from best faster than best could download its remaining data
806 if ( _avgspeed && best->_avgspeed // we and best have average speed information
807 && _avgspeed <= best->_avgspeed ) // and we are not faster than best
808 {
809 if (!now)
810 now = currentTime();
811 double sl = (best->_datasize - best->_datareceived) / best->_avgspeed * 2;
812 if (sl > 1)
813 sl = 1;
814 XXX << "#" << _workerno << ": going to sleep for " << sl * 1000 << " ms" << endl;
815 _sleepuntil = now + sl;
818 return;
819 }
820 }
821
822 _competing = true;
823 best->_competing = true;
824 _stripe = best->_stripe;
825
826 best->_pass++;
827 _pass = best->_pass;
828
829 runjob();
830}
831
832void
834{
835 for ( auto workeriter = _request->_workers.begin(); workeriter != _request->_workers.end(); ++workeriter)
836 {
837 multifetchworker *worker = workeriter->get();
838 if (worker == this)
839 continue;
840 if (worker->_stripe == _stripe)
841 {
842 if (worker->_state == WORKER_FETCH)
843 worker->_state = WORKER_DISCARD;
844 worker->_pass = -1; /* do not steal this one, we already have it */
845 }
846 }
847}
848
850{
851 _datasize = 0;
852 _blocks.clear();
853
854 // claim next stripe for us, or steal if there nothing left to claim
856 stealjob();
857 return;
858 }
859
861 runjob();
862}
863
865{
866 _datasize = 0;
867 _blocks.clear ();
868 _rangeToStripeBlock.clear ();
869
870 auto &stripeDesc = _request->_requiredStripes[_stripe];
871 for ( uint i = 0; i < stripeDesc.blocks.size(); i++ ) {
872 // ignore verified and finalized ranges
873 if( stripeDesc.blockStates[i] == Stripe::FINALIZED ) {
874 continue;
875 } else {
876 _blocks.push_back( rangeFromBlock(stripeDesc.blocks[i]) );
877 _rangeToStripeBlock.push_back( i );
878 _datasize += _blocks.back().len;
879 }
880 }
881
882 if ( _datasize == 0 ) {
883 // no blocks left in this stripe
886 if ( !_request->_activeworkers )
887 _request->_finished = true;
888 return;
889 }
890
891 DBG << "#" << _workerno << "Done adding blocks to download, going to download: " << _blocks.size() << " nr of block with " << _datasize << " nr of bytes" << std::endl;
892
893 _multiByteHandler.reset( nullptr );
894 _multiByteHandler = std::make_unique<MultiByteHandler>(_protocolMode, _curl, _blocks, *this );
896 _datareceived = 0;
897 run();
898}
899
901{
902 bool hadRangeFail = _multiByteHandler->lastError() == MultiByteHandler::Code::RangeFail;
903 if ( !_multiByteHandler->prepareToContinue() ) {
904 strncpy( _curlError, _multiByteHandler->lastErrorMessage().c_str(), CURL_ERROR_SIZE );
905 return false;
906 }
907
908 if ( hadRangeFail ) {
909 // we reset the handle to default values. We do this to not run into
910 // "transfer closed with outstanding read data remaining" error CURL sometimes returns when
911 // we cancel a connection because of a range error to request a smaller batch.
912 // The error will still happen but much less frequently than without resetting the handle.
913 //
914 // Note: Even creating a new handle will NOT fix the issue
915 curl_easy_reset( _curl );
916 if ( !setupHandle())
917 return false;
918 }
919
920 run();
921 return true;
922}
923
924void
926{
928 return; // just in case...
929
930 if ( !_multiByteHandler->prepare() ) {
933 strncpy( _curlError, _multiByteHandler->lastErrorMessage ().c_str() , CURL_ERROR_SIZE );
934 return;
935 }
936
937 if (curl_multi_add_handle(_request->_multi, _curl) != CURLM_OK) {
940 strncpy( _curlError, "curl_multi_add_handle failed", CURL_ERROR_SIZE );
941 return;
942 }
943
944 _request->_havenewjob = true;
946}
947
948
950
951
952multifetchrequest::multifetchrequest(const MediaMultiCurl *context, const Pathname &filename, const Url &baseurl, CURLM *multi, FILE *fp, callback::SendReport<DownloadProgressReport> *report, MediaBlockList &&blklist, off_t filesize)
953 : internal::CurlPollHelper::CurlPoll{ multi }
954 , _context(context)
955 , _filename(filename)
956 , _baseurl(baseurl)
957 , _fp(fp)
958 , _report(report)
959 , _blklist(std::move(blklist))
960 , _filesize(filesize)
961 , _starttime(currentTime())
962 , _timeout(context->_settings.timeout())
963 , _connect_timeout(context->_settings.connectTimeout())
964 , _maxspeed(context->_settings.maxDownloadSpeed())
965 , _maxworkers(context->_settings.maxConcurrentConnections())
966 {
968 if (_maxworkers > MAXURLS)
970 if (_maxworkers <= 0)
971 _maxworkers = 1;
972
973 // calculate the total size of our download
974 for (size_t blkno = 0; blkno < _blklist.numBlocks(); blkno++)
976
977 // equally distribute the data we want to download over all workers
979
980 // lets build stripe informations
981 zypp::ByteCount currStripeSize = 0;
982 for (size_t blkno = 0; blkno < _blklist.numBlocks(); blkno++) {
983
984 const MediaBlock &blk = _blklist.getBlock(blkno);
985 if ( _requiredStripes.empty() || currStripeSize >= _defaultBlksize ) {
986 _requiredStripes.push_back( Stripe{} );
987 currStripeSize = 0;
988 }
989
990 _requiredStripes.back().blocks.push_back(blkno);
991 _requiredStripes.back().blockStates.push_back(Stripe::PENDING);
992 currStripeSize += blk.size;
993 }
994
995 MIL << "Downloading " << _blklist.numBlocks() << " blocks via " << _requiredStripes.size() << " stripes on " << _maxworkers << " connections." << endl;
996}
997
1002
1003void
1004multifetchrequest::run(std::vector<Url> &urllist)
1005{
1006 int workerno = 0;
1007 std::vector<Url>::iterator urliter = urllist.begin();
1008
1009 internal::CurlPollHelper _curlHelper(*this);
1010
1011 // kickstart curl
1012 CURLMcode mcode = _curlHelper.handleTimout();
1013 if (mcode != CURLM_OK)
1014 ZYPP_THROW(MediaCurlException(_baseurl, "curl_multi_socket_action", "unknown error"));
1015
1016 for (;;)
1017 {
1018 // list of all fds we want to poll
1019 std::vector<GPollFD> waitFds;
1020 int dnsFdCount = 0;
1021
1022 if (_finished)
1023 {
1024 XXX << "finished!" << endl;
1025 break;
1026 }
1027
1028 if ((int)_activeworkers < _maxworkers && urliter != urllist.end() && _workers.size() < MAXURLS)
1029 {
1030 // spawn another worker!
1031 _workers.push_back(std::make_unique<multifetchworker>(workerno++, *this, *urliter));
1032 auto &worker = _workers.back();
1033 if (worker->_state != WORKER_BROKEN)
1034 {
1036 if (worker->_state != WORKER_LOOKUP)
1037 {
1038 worker->nextjob();
1039 }
1040 else
1042 }
1043 ++urliter;
1044 continue;
1045 }
1046 if (!_activeworkers)
1047 {
1048 WAR << "No more active workers!" << endl;
1049 // show the first worker error we find
1050 for (auto workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
1051 {
1052 if ((*workeriter)->_state != WORKER_BROKEN)
1053 continue;
1054 ZYPP_THROW(MediaCurlException(_baseurl, "Server error", (*workeriter)->_curlError));
1055 }
1056 break;
1057 }
1058
1059 if (_lookupworkers)
1060 for (auto workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
1061 (*workeriter)->adddnsfd( waitFds );
1062
1063 // if we added a new job we have to call multi_perform once
1064 // to make it show up in the fd set. do not sleep in this case.
1065 int timeoutMs = _havenewjob ? 0 : 200;
1066 if ( _sleepworkers && !_havenewjob ) {
1067 if (_minsleepuntil == 0) {
1068 for (auto workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter) {
1069 multifetchworker *worker = workeriter->get();
1070 if (worker->_state != WORKER_SLEEP)
1071 continue;
1072 if (!_minsleepuntil || _minsleepuntil > worker->_sleepuntil)
1073 _minsleepuntil = worker->_sleepuntil;
1074 }
1075 }
1076 double sl = _minsleepuntil - currentTime();
1077 if (sl < 0) {
1078 sl = 0;
1079 _minsleepuntil = 0;
1080 }
1081 if (sl < .2)
1082 timeoutMs = sl * 1000;
1083 }
1084
1085 if ( _curlHelper.timeout_ms.has_value() )
1086 timeoutMs = std::min<long>( timeoutMs, _curlHelper.timeout_ms.value() );
1087
1088 dnsFdCount = waitFds.size(); // remember how many dns fd's we have
1089 waitFds.insert( waitFds.end(), _curlHelper.socks.begin(), _curlHelper.socks.end() ); // add the curl fd's to the poll data
1090
1091 int r = zypp_detail::zypp_poll( waitFds, timeoutMs );
1092 if ( r == -1 )
1093 ZYPP_THROW(MediaCurlException(_baseurl, "zypp_poll() failed", "unknown error"));
1094 if ( r != 0 && _lookupworkers ) {
1095 for (auto workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
1096 {
1097 multifetchworker *worker = workeriter->get();
1098 if (worker->_state != WORKER_LOOKUP)
1099 continue;
1100 (*workeriter)->dnsevent( waitFds );
1101 if (worker->_state != WORKER_LOOKUP)
1103 }
1104 }
1105 _havenewjob = false;
1106
1107 // run curl
1108 if ( r == 0 ) {
1109 CURLMcode mcode = _curlHelper.handleTimout();
1110 if (mcode != CURLM_OK)
1111 ZYPP_THROW(MediaCurlException(_baseurl, "curl_multi_socket_action", "unknown error"));
1112 } else {
1113 CURLMcode mcode = _curlHelper.handleSocketActions( waitFds, dnsFdCount );
1114 if (mcode != CURLM_OK)
1115 ZYPP_THROW(MediaCurlException(_baseurl, "curl_multi_socket_action", "unknown error"));
1116 }
1117
1118 double now = currentTime();
1119
1120 // update periodavg
1121 if (now > _lastperiodstart + .5)
1122 {
1123 if (!_periodavg)
1125 else
1128 _lastperiodstart = now;
1129 }
1130
1131 // wake up sleepers
1132 if (_sleepworkers)
1133 {
1134 for (auto workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
1135 {
1136 multifetchworker *worker = workeriter->get();
1137 if (worker->_state != WORKER_SLEEP)
1138 continue;
1139 if (worker->_sleepuntil > now)
1140 continue;
1141 if (_minsleepuntil == worker->_sleepuntil)
1142 _minsleepuntil = 0;
1143 XXX << "#" << worker->_workerno << ": sleep done, wake up" << endl;
1144 _sleepworkers--;
1145 // nextjob changes the state
1146 worker->nextjob();
1147 }
1148 }
1149
1150 // collect all curl results, (re)schedule jobs
1151 CURLMsg *msg;
1152 int nqueue;
1153 while ((msg = curl_multi_info_read(_multi, &nqueue)) != 0)
1154 {
1155 if (msg->msg != CURLMSG_DONE)
1156 continue;
1157 CURL *easy = msg->easy_handle;
1158 CURLcode cc = msg->data.result;
1159 multifetchworker *worker;
1160
1161 if (curl_easy_getinfo(easy, CURLINFO_PRIVATE, &worker) != CURLE_OK)
1162 ZYPP_THROW(MediaCurlException(_baseurl, "curl_easy_getinfo", "unknown error"));
1163
1164 if (worker->_datareceived && now > worker->_starttime) {
1165 if (worker->_avgspeed)
1166 worker->_avgspeed = (worker->_avgspeed + worker->_datareceived / (now - worker->_starttime)) / 2;
1167 else
1168 worker->_avgspeed = worker->_datareceived / (now - worker->_starttime);
1169 }
1170
1171 XXX << "#" << worker->_workerno << " done code " << cc << " speed " << worker->_avgspeed << endl;
1172 curl_multi_remove_handle(_multi, easy);
1173
1174 const auto &setWorkerBroken = [&]( const std::string &str = {} ){
1175 worker->_state = WORKER_BROKEN;
1176 if ( !str.empty () )
1177 strncpy(worker->_curlError, str.c_str(), CURL_ERROR_SIZE);
1179
1180 if (!_activeworkers && !(urliter != urllist.end() && _workers.size() < MAXURLS)) {
1181 // end of workers reached! goodbye!
1182 worker->evaluateCurlCode(Pathname(), cc, false);
1183 }
1184 };
1185
1186 if ( !worker->_multiByteHandler ) {
1187 WAR << "#" << worker->_workerno << ": has no multibyte handler, this is a bug" << endl;
1188 setWorkerBroken("Multibyte handler error");
1189 continue;
1190 }
1191
1192 // tell the worker to finalize the current block
1193 worker->_multiByteHandler->finalize();
1194
1195 if ( worker->_multiByteHandler->hasMoreWork() && ( cc == CURLE_OK || worker->_multiByteHandler->canRecover() ) ) {
1196
1197 WAR << "#" << worker->_workerno << ": still has work to do or can recover from a error, continuing the job!" << endl;
1198 // the current job is not done, or we failed and need to try more, enqueue and start again
1199 if ( !worker->continueJob() ) {
1200 WAR << "#" << worker->_workerno << ": failed to continue (" << worker->_multiByteHandler->lastErrorMessage() << ")" << endl;
1201 setWorkerBroken( worker->_multiByteHandler->lastErrorMessage() );
1202 }
1203 continue;
1204 }
1205
1206 // --- from here on worker has no more ranges in its current job, or had a error it can't recover from ---
1207
1208 if ( cc != CURLE_OK ) {
1209 if ( worker->_state != WORKER_DISCARD ) {
1210 // something went wrong and we can not recover, broken worker!
1211 setWorkerBroken();
1212 continue;
1213 } else {
1214 WAR << "#" << worker->_workerno << ": failed, but was set to discard, reusing for new requests" << endl;
1215 }
1216 } else {
1217
1218 // we got what we asked for, maybe. Lets see if all ranges have been marked as finalized
1219 if( !worker->_multiByteHandler->verifyData() ) {
1220 WAR << "#" << worker->_workerno << ": error: " << worker->_multiByteHandler->lastErrorMessage() << ", disable worker" << endl;
1221 setWorkerBroken();
1222 continue;
1223 }
1224
1225 // from here on we know THIS worker only got data that verified
1226 // now lets see if the stripe was finished too
1227 // stripe blocks can now be only in FINALIZED or ERROR states
1228 if (worker->_state == WORKER_FETCH ) {
1229 if ( worker->_competing ) {
1230 worker->disableCompetition ();
1231 }
1232 auto &wrkerStripe = _requiredStripes[worker->_stripe];
1233 bool done = std::all_of( wrkerStripe.blockStates.begin(), wrkerStripe.blockStates.begin(), []( const Stripe::RState s ) { return s == Stripe::FINALIZED; } );
1234 if ( !done ) {
1235 // all ranges that are not finalized are in a bogus state, refetch them
1236 std::for_each( wrkerStripe.blockStates.begin(), wrkerStripe.blockStates.begin(), []( Stripe::RState &s ) {
1237 if ( s != Stripe::FINALIZED)
1238 s = Stripe::PENDING;
1239 });
1240
1241 _finished = false; //reset finished flag
1242 worker->runjob();
1243 continue;
1244 }
1245 }
1246
1247 // make bad workers ( bad as in really slow ) sleep a little
1248 double maxavg = 0;
1249 int maxworkerno = 0;
1250 int numbetter = 0;
1251 for (auto workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
1252 {
1253 multifetchworker *oworker = workeriter->get();
1254 if (oworker->_state == WORKER_BROKEN)
1255 continue;
1256 if (oworker->_avgspeed > maxavg)
1257 {
1258 maxavg = oworker->_avgspeed;
1259 maxworkerno = oworker->_workerno;
1260 }
1261 if (oworker->_avgspeed > worker->_avgspeed)
1262 numbetter++;
1263 }
1264 if (maxavg && !_stealing)
1265 {
1266 double ratio = worker->_avgspeed / maxavg;
1267 ratio = 1 - ratio;
1268 if (numbetter < 3) // don't sleep that much if we're in the top two
1269 ratio = ratio * ratio;
1270 if (ratio > .01)
1271 {
1272 XXX << "#" << worker->_workerno << ": too slow ("<< ratio << ", " << worker->_avgspeed << ", #" << maxworkerno << ": " << maxavg << "), going to sleep for " << ratio * 1000 << " ms" << endl;
1273 worker->_sleepuntil = now + ratio;
1274 worker->_state = WORKER_SLEEP;
1275 _sleepworkers++;
1276 continue;
1277 }
1278 }
1279
1280 // do rate control (if requested)
1281 // should use periodavg, but that's not what libcurl does
1282 if (_maxspeed && now > _starttime)
1283 {
1284 double avg = _fetchedsize / (now - _starttime);
1285 avg = worker->_maxspeed * _maxspeed / avg;
1286 if (avg < _maxspeed / _maxworkers)
1287 avg = _maxspeed / _maxworkers;
1288 if (avg > _maxspeed)
1289 avg = _maxspeed;
1290 if (avg < 1024)
1291 avg = 1024;
1292 worker->_maxspeed = avg;
1293#if CURLVERSION_AT_LEAST(7,15,5)
1294 curl_easy_setopt(worker->_curl, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t)(avg));
1295#endif
1296 }
1297
1298 worker->nextjob();
1299 }
1300
1301 if ( _filesize > 0 && _fetchedgoodsize > _filesize ) {
1303 }
1304 }
1305
1306 // send report
1307 if (_report)
1308 {
1309 int percent = _totalsize ? (100 * (_fetchedgoodsize + _fetchedsize)) / (_totalsize + _fetchedsize) : 0;
1310
1311 double avg = 0;
1312 if (now > _starttime)
1313 avg = _fetchedsize / (now - _starttime);
1314 if (!(*(_report))->progress(percent, _baseurl, avg, _lastperiodstart == _starttime ? avg : _periodavg))
1315 ZYPP_THROW(MediaCurlException(_baseurl, "User abort", "cancelled"));
1316 }
1317
1318 if (_timeout && now - _lastprogress > _timeout)
1319 break;
1320 }
1321
1322 if (!_finished)
1324
1325 // print some download stats
1326 WAR << "overall result" << endl;
1327 for (auto workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
1328 {
1329 multifetchworker *worker = workeriter->get();
1330 WAR << "#" << worker->_workerno << ": state: " << worker->_state << " received: " << worker->_received << " url: " << worker->_url << endl;
1331 }
1332}
1333
1334inline zypp::ByteCount multifetchrequest::makeBlksize ( uint maxConns, size_t filesize )
1335{
1336 return std::max<zypp::ByteCount>( filesize / std::min( std::max<int>( 1, maxConns ) , MAXURLS ), zypp::ByteCount(4, zypp::ByteCount::K) );
1337}
1338
1340
1341
1342MediaMultiCurl::MediaMultiCurl(const Url &url_r, const Pathname & attach_point_hint_r)
1343 : MediaCurl(url_r, attach_point_hint_r)
1344{
1345 MIL << "MediaMultiCurl::MediaMultiCurl(" << url_r << ", " << attach_point_hint_r << ")" << endl;
1346 _multi = 0;
1348}
1349
1351{
1353 {
1354 curl_slist_free_all(_customHeadersMetalink);
1356 }
1357 if (_multi)
1358 {
1359 curl_multi_cleanup(_multi);
1360 _multi = 0;
1361 }
1362 std::map<std::string, CURL *>::iterator it;
1363 for (it = _easypool.begin(); it != _easypool.end(); it++)
1364 {
1365 CURL *easy = it->second;
1366 if (easy)
1367 {
1368 curl_easy_cleanup(easy);
1369 it->second = NULL;
1370 }
1371 }
1372}
1373
1375{
1377
1379 {
1380 curl_slist_free_all(_customHeadersMetalink);
1382 }
1383 struct curl_slist *sl = _customHeaders;
1384 for (; sl; sl = sl->next)
1385 _customHeadersMetalink = curl_slist_append(_customHeadersMetalink, sl->data);
1386 //, application/x-zsync
1387 _customHeadersMetalink = curl_slist_append(_customHeadersMetalink, "Accept: */*, application/x-zsync, application/metalink+xml, application/metalink4+xml");
1388}
1389
1390// here we try to suppress all progress coming from a metalink download
1391// bsc#1021291: Nevertheless send alive trigger (without stats), so UIs
1392// are able to abort a hanging metalink download via callback response.
1393int MediaMultiCurl::progressCallback( void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
1394{
1396 if (!_curl)
1397 return MediaCurl::aliveCallback(clientp, dltotal, dlnow, ultotal, ulnow);
1398
1399 // bsc#408814: Don't report any sizes before we don't have data on disk. Data reported
1400 // due to redirection etc. are not interesting, but may disturb filesize checks.
1401 FILE *fp = 0;
1402 if ( curl_easy_getinfo( _curl, CURLINFO_PRIVATE, &fp ) != CURLE_OK || !fp )
1403 return MediaCurl::aliveCallback( clientp, dltotal, dlnow, ultotal, ulnow );
1404 if ( ftell( fp ) == 0 )
1405 return MediaCurl::aliveCallback( clientp, dltotal, 0.0, ultotal, ulnow );
1406
1407 // (no longer needed due to the filesize check above?)
1408 // work around curl bug that gives us old data
1409 long httpReturnCode = 0;
1410 if (curl_easy_getinfo(_curl, CURLINFO_RESPONSE_CODE, &httpReturnCode ) != CURLE_OK || httpReturnCode == 0)
1411 return MediaCurl::aliveCallback(clientp, dltotal, dlnow, ultotal, ulnow);
1412
1413 char *ptr = NULL;
1414 bool ismetalink = false;
1415 if (curl_easy_getinfo(_curl, CURLINFO_CONTENT_TYPE, &ptr) == CURLE_OK && ptr)
1416 {
1417 std::string ct = std::string(ptr);
1418 if (ct.find("application/x-zsync") == 0 || ct.find("application/metalink+xml") == 0 || ct.find("application/metalink4+xml") == 0)
1419 ismetalink = true;
1420 }
1421 if (!ismetalink && dlnow < 256)
1422 {
1423 // can't tell yet, ...
1424 return MediaCurl::aliveCallback(clientp, dltotal, dlnow, ultotal, ulnow);
1425 }
1426 if (!ismetalink)
1427 {
1428 fflush(fp);
1429 ismetalink = looks_like_meta_file(fp) != MetaDataType::None;
1430 DBG << "looks_like_meta_file: " << ismetalink << endl;
1431 }
1432 if (ismetalink)
1433 {
1434 // this is a metalink file change the expected filesize
1436 // we're downloading the metalink file. Just trigger aliveCallbacks
1437 curl_easy_setopt(_curl, CURLOPT_XFERINFOFUNCTION, &MediaCurl::aliveCallback);
1438 return MediaCurl::aliveCallback(clientp, dltotal, dlnow, ultotal, ulnow);
1439 }
1440 curl_easy_setopt(_curl, CURLOPT_XFERINFOFUNCTION, &MediaCurl::progressCallback);
1441 return MediaCurl::progressCallback(clientp, dltotal, dlnow, ultotal, ulnow);
1442}
1443
1444void MediaMultiCurl::doGetFileCopy( const OnMediaLocation &srcFile , const Pathname & target, callback::SendReport<DownloadProgressReport> & report, RequestOptions options ) const
1445{
1446 Pathname dest = target.absolutename();
1447 if( assert_dir( dest.dirname() ) )
1448 {
1449 DBG << "assert_dir " << dest.dirname() << " failed" << endl;
1450 ZYPP_THROW( MediaSystemException(getFileUrl(srcFile.filename()), "System error on " + dest.dirname().asString()) );
1451 }
1452
1453 ManagedFile destNew { target.extend( ".new.zypp.XXXXXX" ) };
1454 AutoFILE file;
1455 {
1456 AutoFREE<char> buf { ::strdup( (*destNew).c_str() ) };
1457 if( ! buf )
1458 {
1459 ERR << "out of memory for temp file name" << endl;
1460 ZYPP_THROW(MediaSystemException(getFileUrl(srcFile.filename()), "out of memory for temp file name"));
1461 }
1462
1463 AutoFD tmp_fd { ::mkostemp( buf, O_CLOEXEC ) };
1464 if( tmp_fd == -1 )
1465 {
1466 ERR << "mkstemp failed for file '" << destNew << "'" << endl;
1468 }
1469 destNew = ManagedFile( (*buf), filesystem::unlink );
1470
1471 file = ::fdopen( tmp_fd, "we" );
1472 if ( ! file )
1473 {
1474 ERR << "fopen failed for file '" << destNew << "'" << endl;
1476 }
1477 tmp_fd.resetDispose(); // don't close it here! ::fdopen moved ownership to file
1478 }
1479
1480 DBG << "dest: " << dest << endl;
1481 DBG << "temp: " << destNew << endl;
1482
1483 // set IFMODSINCE time condition (no download if not modified)
1484 if( PathInfo(target).isExist() && !(options & OPTION_NO_IFMODSINCE) )
1485 {
1486 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_IFMODSINCE);
1487 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, (long)PathInfo(target).mtime());
1488 }
1489 else
1490 {
1491 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE);
1492 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L);
1493 }
1494 // change header to include Accept: metalink
1495 curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeadersMetalink);
1496 // change to our own progress funcion
1497 curl_easy_setopt(_curl, CURLOPT_XFERINFOFUNCTION, &progressCallback);
1498 curl_easy_setopt(_curl, CURLOPT_PRIVATE, (*file) ); // important to pass the FILE* explicitly (passing through varargs)
1499 try
1500 {
1501 MediaCurl::doGetFileCopyFile( srcFile, dest, file, report, options );
1502 }
1503 catch (Exception &ex)
1504 {
1505 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE);
1506 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L);
1507 curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeaders);
1508 curl_easy_setopt(_curl, CURLOPT_PRIVATE, (void *)0);
1509 ZYPP_RETHROW(ex);
1510 }
1511 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE);
1512 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L);
1513 curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeaders);
1514 curl_easy_setopt(_curl, CURLOPT_PRIVATE, (void *)0);
1515 long httpReturnCode = 0;
1516 CURLcode infoRet = curl_easy_getinfo(_curl, CURLINFO_RESPONSE_CODE, &httpReturnCode);
1517 if (infoRet == CURLE_OK)
1518 {
1519 DBG << "HTTP response: " + str::numstring(httpReturnCode) << endl;
1520 if ( httpReturnCode == 304
1521 || ( httpReturnCode == 213 && _url.getScheme() == "ftp" ) ) // not modified
1522 {
1523 DBG << "not modified: " << PathInfo(dest) << endl;
1524 return;
1525 }
1526 }
1527 else
1528 {
1529 WAR << "Could not get the response code." << endl;
1530 }
1531
1532 MetaDataType ismetalink = MetaDataType::None;
1533
1534 char *ptr = NULL;
1535 if (curl_easy_getinfo(_curl, CURLINFO_CONTENT_TYPE, &ptr) == CURLE_OK && ptr)
1536 {
1537 std::string ct = std::string(ptr);
1538 if (ct.find("application/x-zsync") == 0 )
1539 ismetalink = MetaDataType::Zsync;
1540 else if (ct.find("application/metalink+xml") == 0 || ct.find("application/metalink4+xml") == 0)
1541 ismetalink = MetaDataType::MetaLink;
1542 }
1543
1544 if ( ismetalink == MetaDataType::None )
1545 {
1546 // some proxies do not store the content type, so also look at the file to find
1547 // out if we received a metalink (bnc#649925)
1548 fflush(file);
1549 ismetalink = looks_like_meta_file(destNew);
1550 }
1551
1552 if ( ismetalink != MetaDataType::None )
1553 {
1554 bool userabort = false;
1555 Pathname failedFile = ZConfig::instance().repoCachePath() / "MultiCurl.failed";
1556 file = nullptr; // explicitly close destNew before the parser reads it.
1557 try
1558 {
1559 MediaBlockList bl;
1560 std::vector<Url> urls;
1561 if ( ismetalink == MetaDataType::Zsync ) {
1562 ZsyncParser parser;
1563 parser.parse( destNew );
1564 bl = parser.getBlockList();
1565 urls = parser.getUrls();
1566
1567 XXX << getFileUrl(srcFile.filename()) << " returned zsync meta data." << std::endl;
1568 } else {
1569 MetaLinkParser mlp;
1570 mlp.parse(destNew);
1571 bl = mlp.getBlockList();
1572 urls = mlp.getUrls();
1573
1574 XXX << getFileUrl(srcFile.filename()) << " returned metalink meta data." << std::endl;
1575 }
1576
1577 if ( bl.numBlocks() )
1578 XXX << "With " << bl.numBlocks() << " nr of blocks and a blocksize of " << bl.getBlock(0).size << std::endl;
1579 else
1580 XXX << "With no blocks" << std::endl;
1581
1582 /*
1583 * gihub issue libzipp:#277 Multicurl backend breaks with MirrorCache and Metalink with unknown filesize.
1584 * Fall back to a normal download if we have no knowledge about the filesize we want to download.
1585 */
1586 if ( !bl.haveFilesize() && ! srcFile.downloadSize() ) {
1587 XXX << "No filesize in metalink file and no expected filesize, aborting multicurl." << std::endl;
1588 ZYPP_THROW( MediaException("Multicurl requires filesize but none was provided.") );
1589 }
1590
1591#if 0
1592 Disabling this workaround for now, since we now do zip ranges into bigger requests
1593 /*
1594 * bsc#1191609 In certain locations we do not receive a suitable number of metalink mirrors, and might even
1595 * download chunks serially from one and the same server. In those cases we need to fall back to a normal download.
1596 */
1597 if ( urls.size() < MIN_REQ_MIRRS ) {
1598 ZYPP_THROW( MediaException("Multicurl enabled but not enough mirrors provided") );
1599 }
1600#endif
1601
1602 // XXX << bl << endl;
1603 file = fopen((*destNew).c_str(), "w+e");
1604 if (!file)
1606 if (PathInfo(target).isExist())
1607 {
1608 XXX << "reusing blocks from file " << target << endl;
1609 bl.reuseBlocks(file, target.asString());
1610 XXX << bl << endl;
1611 }
1612 if (bl.haveChecksum(1) && PathInfo(failedFile).isExist())
1613 {
1614 XXX << "reusing blocks from file " << failedFile << endl;
1615 bl.reuseBlocks(file, failedFile.asString());
1616 XXX << bl << endl;
1617 filesystem::unlink(failedFile);
1618 }
1619 Pathname df = srcFile.deltafile();
1620 if (!df.empty())
1621 {
1622 XXX << "reusing blocks from file " << df << endl;
1623 bl.reuseBlocks(file, df.asString());
1624 XXX << bl << endl;
1625 }
1626 try
1627 {
1628 multifetch(srcFile.filename(), file, &urls, &report, std::move(bl), srcFile.downloadSize());
1629 }
1630 catch (MediaCurlException &ex)
1631 {
1632 userabort = ex.errstr() == "User abort";
1633 ZYPP_RETHROW(ex);
1634 }
1635 }
1636 catch (MediaFileSizeExceededException &ex) {
1637 ZYPP_RETHROW(ex);
1638 }
1639 catch (Exception &ex)
1640 {
1641 // something went wrong. fall back to normal download
1642 file = nullptr; // explicitly close destNew before moving it
1643 WAR<< "Failed to multifetch file " << ex << " falling back to single Curl download!" << std::endl;
1644 if (PathInfo(destNew).size() >= 63336)
1645 {
1646 ::unlink(failedFile.asString().c_str());
1647 filesystem::hardlinkCopy(destNew, failedFile);
1648 }
1649 if (userabort)
1650 {
1651 ZYPP_RETHROW(ex);
1652 }
1653 file = fopen((*destNew).c_str(), "w+e");
1654 if (!file)
1656
1657 // use the default progressCallback
1658 curl_easy_setopt(_curl, CURLOPT_XFERINFOFUNCTION, &MediaCurl::progressCallback);
1659 MediaCurl::doGetFileCopyFile(srcFile, dest, file, report, options | OPTION_NO_REPORT_START);
1660 }
1661 }
1662
1663 if (::fchmod( ::fileno(file), filesystem::applyUmaskTo( 0644 )))
1664 {
1665 ERR << "Failed to chmod file " << destNew << endl;
1666 }
1667
1668 file.resetDispose(); // we're going to close it manually here
1669 if (::fclose(file))
1670 {
1671 filesystem::unlink(destNew);
1672 ERR << "Fclose failed for file '" << destNew << "'" << endl;
1674 }
1675
1676 if ( rename( destNew, dest ) != 0 )
1677 {
1678 ERR << "Rename failed" << endl;
1680 }
1681 destNew.resetDispose(); // no more need to unlink it
1682
1683 DBG << "done: " << PathInfo(dest) << endl;
1684}
1685
1686void MediaMultiCurl::multifetch(const Pathname & filename, FILE *fp, std::vector<Url> *urllist, MediaBlockList &&blklist, callback::SendReport<DownloadProgressReport> *report, off_t filesize) const
1687{
1688 Url baseurl(getFileUrl(filename));
1689 if (filesize == off_t(-1) && blklist.haveFilesize())
1690 filesize = blklist.getFilesize();
1691 if (!blklist.haveBlocks() && filesize != 0) {
1692 if ( filesize == -1 ) {
1693 ZYPP_THROW(MediaException("No filesize and no blocklist, falling back to normal download."));
1694 }
1695
1696 // build a blocklist on demand just so that we have ranges
1697 MIL << "Generate blocklist, since there was none in the metalink file." << std::endl;
1698
1699 off_t currOff = 0;
1700 const auto prefSize = multifetchrequest::makeBlksize( _settings.maxConcurrentConnections(), filesize );
1701
1702 while ( currOff < filesize ) {
1703
1704 auto blksize = filesize - currOff ;
1705 if ( blksize > prefSize )
1706 blksize = prefSize;
1707
1708 blklist.addBlock( currOff, blksize );
1709 currOff += blksize;
1710 }
1711
1712 XXX << "Generated blocklist: " << std::endl << blklist << std::endl << " End blocklist " << std::endl;
1713
1714 }
1715 if (filesize == 0 || !blklist.numBlocks()) {
1716 checkFileDigest(baseurl, fp, blklist);
1717 return;
1718 }
1719 if (filesize == 0)
1720 return;
1721
1722 if (!_multi)
1723 {
1724 _multi = curl_multi_init();
1725 if (!_multi)
1727 }
1728
1729 multifetchrequest req(this, filename, baseurl, _multi, fp, report, std::move(blklist), filesize);
1730 std::vector<Url> myurllist;
1731 for (std::vector<Url>::iterator urliter = urllist->begin(); urliter != urllist->end(); ++urliter)
1732 {
1733 try
1734 {
1735 std::string scheme = urliter->getScheme();
1736 if (scheme == "http" || scheme == "https" || scheme == "ftp" || scheme == "tftp")
1737 {
1738 checkProtocol(*urliter);
1739 myurllist.push_back(internal::propagateQueryParams(*urliter, _url));
1740 }
1741 }
1742 catch (...)
1743 {
1744 }
1745 }
1746 if (!myurllist.size())
1747 myurllist.push_back(baseurl);
1748 req.run(myurllist);
1749 checkFileDigest(baseurl, fp, req.blockList() );
1750}
1751
1752void MediaMultiCurl::checkFileDigest(Url &url, FILE *fp, MediaBlockList &blklist) const
1753{
1754 if ( !blklist.haveFileChecksum() )
1755 return;
1756 if (fseeko(fp, off_t(0), SEEK_SET))
1757 ZYPP_THROW(MediaCurlException(url, "fseeko", "seek error"));
1758 Digest dig;
1759 blklist.createFileDigest(dig);
1760 char buf[4096];
1761 size_t l;
1762 while ((l = fread(buf, 1, sizeof(buf), fp)) > 0)
1763 dig.update(buf, l);
1764 if (!blklist.verifyFileDigest(dig))
1765 ZYPP_THROW(MediaCurlException(url, "file verification failed", "checksum error"));
1766}
1767
1768bool MediaMultiCurl::isDNSok(const std::string &host) const
1769{
1770 return _dnsok.find(host) == _dnsok.end() ? false : true;
1771}
1772
1773void MediaMultiCurl::setDNSok(const std::string &host) const
1774{
1775 _dnsok.insert(host);
1776}
1777
1778CURL *MediaMultiCurl::fromEasyPool(const std::string &host) const
1779{
1780 if (_easypool.find(host) == _easypool.end())
1781 return 0;
1782 CURL *ret = _easypool[host];
1783 _easypool.erase(host);
1784 return ret;
1785}
1786
1787void MediaMultiCurl::toEasyPool(const std::string &host, CURL *easy) const
1788{
1789 CURL *oldeasy = _easypool[host];
1790 _easypool[host] = easy;
1791 if (oldeasy)
1792 curl_easy_cleanup(oldeasy);
1793}
1794
1795 } // namespace media
1796} // namespace zypp
std::optional< KeyManagerCtx > _context
Definition KeyRing.cc:157
struct _GPollFD GPollFD
Definition ZYppImpl.h:26
void resetDispose()
Set no dispose function.
Store and operate with byte count.
Definition ByteCount.h:31
static const Unit MB
1000^2 Byte
Definition ByteCount.h:60
SizeType blocks(ByteCount blocksize_r=K) const
Return number of blocks of size blocksize_r (default 1K).
Definition ByteCount.h:114
static const Unit K
1024 Byte
Definition ByteCount.h:45
std::string asString(unsigned field_width_r=0, unsigned unit_width_r=1) const
Auto selected Unit and precision.
Definition ByteCount.h:133
Compute Message Digests (MD5, SHA1 etc)
Definition Digest.h:38
bool update(const char *bytes, size_t len)
feed data into digest computation algorithm
Definition Digest.cc:309
Digest clone() const
Returns a clone of the current Digest and returns it.
Definition Digest.cc:228
Base class for Exception.
Definition Exception.h:146
Describes a resource file located on a medium.
const ByteCount & downloadSize() const
The size of the resource on the server.
const Pathname & filename() const
The path to the resource on the medium.
const Pathname & deltafile() const
The existing deltafile that can be used to reduce download size ( zchunk or metalink )
Url manipulation class.
Definition Url.h:92
std::string getScheme() const
Returns the scheme name of the URL.
Definition Url.cc:533
std::string asString() const
Returns a default string representation of the Url object.
Definition Url.cc:497
std::string getHost(EEncoding eflag=zypp::url::E_DECODED) const
Returns the hostname or IP from the URL authority.
Definition Url.cc:588
Pathname repoCachePath() const
Path where the caches are kept (/var/cache/zypp)
Definition ZConfig.cc:1039
static ZConfig & instance()
Singleton ctor.
Definition ZConfig.cc:922
Wrapper class for stat/lstat.
Definition PathInfo.h:221
Pathname extend(const std::string &r) const
Append string r to the last component of the path.
Definition Pathname.h:173
Pathname dirname() const
Return all but the last component od this path.
Definition Pathname.h:124
const std::string & asString() const
String representation.
Definition Pathname.h:91
Pathname absolutename() const
Return this path, adding a leading '/' if relative.
Definition Pathname.h:139
static long auth_type_str2long(std::string &auth_type_str)
Converts a string of comma separated list of authetication type names into a long of ORed CURLAUTH_* ...
bool haveChecksum(size_t blkno) const
void reuseBlocks(FILE *wfp, std::string filename)
const MediaBlock & getBlock(size_t blkno) const
return the offset/size of a block with number blkno
UByteArray getChecksum(size_t blkno) const
std::string getChecksumType() const
bool createFileDigest(Digest &digest) const
size_t numBlocks() const
return the number of blocks in the blocklist
bool verifyFileDigest(Digest &digest) const
Implementation class for FTP, HTTP and HTTPS MediaHandler.
Definition MediaCurl.h:32
virtual void setupEasy()
initializes the curl easy handle with the data from the url
Definition MediaCurl.cc:409
@ OPTION_NO_IFMODSINCE
to not add a IFMODSINCE header if target exists
Definition MediaCurl.h:43
@ OPTION_NO_REPORT_START
do not send a start ProgressReport
Definition MediaCurl.h:45
static void resetExpectedFileSize(void *clientp, const ByteCount &expectedFileSize)
MediaMultiCurl needs to reset the expected filesize in case a metalink file is downloaded otherwise t...
static int progressCallback(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
Callback reporting download progress.
Url clearQueryString(const Url &url) const
Definition MediaCurl.cc:372
char _curlError[CURL_ERROR_SIZE]
Definition MediaCurl.h:170
void doGetFileCopyFile(const OnMediaLocation &srcFile, const Pathname &dest, FILE *file, callback::SendReport< DownloadProgressReport > &report, RequestOptions options=OPTION_NONE) const
void checkProtocol(const Url &url) const
check the url is supported by the curl library
Definition MediaCurl.cc:384
void evaluateCurlCode(const zypp::Pathname &filename, CURLcode code, bool timeout) const
Evaluates a curl return code and throws the right MediaException filename Filename being downloaded c...
Definition MediaCurl.cc:826
static CURL * progressCallback_getcurl(void *clientp)
static int aliveCallback(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
Callback sending just an alive trigger to the UI, without stats (e.g.
virtual void disconnectFrom() override
Definition MediaCurl.cc:698
curl_slist * _customHeaders
Definition MediaCurl.h:171
Just inherits Exception to separate media exceptions.
Url url() const
Url used.
const Url _url
Url to handle.
virtual void setupEasy() override
initializes the curl easy handle with the data from the url
std::map< std::string, CURL * > _easypool
void checkFileDigest(Url &url, FILE *fp, MediaBlockList &blklist) const
void setDNSok(const std::string &host) const
MediaMultiCurl(const Url &url_r, const Pathname &attach_point_hint_r)
std::set< std::string > _dnsok
bool isDNSok(const std::string &host) const
void multifetch(const Pathname &filename, FILE *fp, std::vector< Url > *urllist, MediaBlockList &&blklist, callback::SendReport< DownloadProgressReport > *report=0, off_t filesize=off_t(-1)) const
static int progressCallback(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
CURL * fromEasyPool(const std::string &host) const
virtual void doGetFileCopy(const OnMediaLocation &srcFile, const Pathname &targetFilename, callback::SendReport< DownloadProgressReport > &_report, RequestOptions options=OPTION_NONE) const override
void toEasyPool(const std::string &host, CURL *easy) const
Url getFileUrl(const Pathname &filename) const
concatenate the attach url and the filename to a complete download url
void parse(const Pathname &filename)
parse a file consisting of metalink xml data
MediaBlockList getBlockList() const
return the block list from the parsed metalink data
std::vector< Url > getUrls() const
return the download urls from the parsed metalink data
const std::string & password() const
auth password
const std::string & authType() const
get the allowed authentication types
void setUsername(const std::string &val_r)
sets the auth username
std::string userPassword() const
returns the user and password as a user:pass string
const std::string & proxy() const
proxy host
long maxConcurrentConnections() const
Maximum number of concurrent connections for a single transfer.
void setPassword(const std::string &val_r)
sets the auth password
const std::string & username() const
auth username
void setAuthType(const std::string &val_r)
set the allowed authentication types
void parse(const Pathname &filename)
parse a file consisting of zlink data
MediaBlockList getBlockList()
return the block list from the parsed metalink data
std::vector< Url > getUrls()
return the download urls from the parsed metalink data
std::vector< Stripe > _requiredStripes
callback::SendReport< DownloadProgressReport > * _report
void run(std::vector< Url > &urllist)
multifetchrequest(const MediaMultiCurl *context, const Pathname &filename, const Url &baseurl, CURLM *multi, FILE *fp, callback::SendReport< DownloadProgressReport > *report, MediaBlockList &&blklist, off_t filesize)
std::list< std::unique_ptr< multifetchworker > > _workers
static ByteCount makeBlksize(uint maxConns, size_t filesize)
const MediaMultiCurl * _context
bool beginRange(off_t range, std::string &cancelReason) override
multifetchworker(int no, multifetchrequest &request, const Url &url)
size_t writefunction(char *ptr, std::optional< off_t > offset, size_t bytes) override
size_t headerfunction(char *ptr, size_t bytes) override
MultiByteHandler::Range rangeFromBlock(off_t blockNo) const
std::vector< MultiByteHandler::Range > _blocks
void adddnsfd(std::vector< GPollFD > &waitFds)
MultiByteHandler::ProtocolMode _protocolMode
std::unique_ptr< MultiByteHandler > _multiByteHandler
bool recheckChecksum(off_t blockIdx)
void dnsevent(const std::vector< GPollFD > &waitFds)
std::vector< off_t > _rangeToStripeBlock
bool finishedRange(off_t range, bool validated, std::string &cancelReason) override
MultiFetchWorkerState _state
The CurlMultiPartHandler class.
zypp::Url propagateQueryParams(zypp::Url url_r, const zypp::Url &template_r)
Definition Arch.h:364
String related utilities and Regular expression matching.
mode_t applyUmaskTo(mode_t mode_r)
Modify mode_r according to the current umask ( mode_r & ~getUmask() ).
Definition PathInfo.h:789
int unlink(const Pathname &path)
Like 'unlink'.
Definition PathInfo.cc:700
int hardlinkCopy(const Pathname &oldpath, const Pathname &newpath)
Create newpath as hardlink or copy of oldpath.
Definition PathInfo.cc:883
constexpr auto MAXURLS
static bool env_isset(std::string name)
static double currentTime()
constexpr auto MIN_REQ_MIRRS
MetaDataType looks_like_meta_file(const Pathname &file)
std::string numstring(char n, int w=0)
Definition String.h:289
int zypp_poll(std::vector< GPollFD > &fds, int timeout)
Small wrapper around g_poll that additionally listens to the shutdown FD returned by ZYpp::shutdownSi...
Definition ZYppImpl.cc:313
Easy-to use interface to the ZYPP dependency resolver.
AutoDispose< const Pathname > ManagedFile
A Pathname plus associated cleanup code to be executed when path is no longer needed.
Definition ManagedFile.h:27
CURLMcode handleSocketActions(const std::vector< GPollFD > &actionsFds, int first=0)
std::vector< GPollFD > socks
std::optional< long > timeout_ms
AutoDispose<int> calling ::close
AutoDispose<FILE*> calling ::fclose
a single block from the blocklist, consisting of an offset and a size
std::vector< off_t > blocks
std::vector< RState > blockStates
#define ZYPP_RETHROW(EXCPT)
Drops a logline and rethrows, updating the CodeLocation.
Definition Exception.h:440
#define ZYPP_THROW(EXCPT)
Drops a logline and throws the Exception.
Definition Exception.h:428
#define DBG
Definition Logger.h:95
#define MIL
Definition Logger.h:96
#define ERR
Definition Logger.h:98
#define WAR
Definition Logger.h:97
#define XXX
Definition Logger.h:94