76 DBG_PRV <<
"Scheduling triggered during scheduling, returning immediately." << std::endl;
81#ifdef _SC_NPROCESSORS_ONLN
82 sysconf(_SC_NPROCESSORS_ONLN) * 2;
88 constexpr auto findLaziestWorker = [](
const auto &workerQueues,
const auto &idleNames ) {
89 auto candidate = workerQueues.end();
93 for (
const auto &name : idleNames ) {
94 auto thisElem = workerQueues.find(name);
95 if ( thisElem == workerQueues.end() )
98 const auto idleS = thisElem->second->idleSince();
100 && ( candidate == workerQueues.end() || *idleS < candidateIdleSince ) ) {
101 candidateIdleSince = *idleS;
102 candidate = thisElem;
106 if ( candidate != workerQueues.end() )
107 MIL_PRV <<
"Found idle worker:" << candidate->first <<
" idle since: " << candidateIdleSince.time_since_epoch().count() << std::endl;
115 if ( iMedia->_refCount > 0 ) {
116 MIL_PRV <<
"Not releasing media " << iMedia->_name <<
" refcount is not zero" << std::endl;
120 if ( iMedia->_workerType == ProvideQueue::Config::Downloading ) {
122 if ( std::chrono::steady_clock::now() - iMedia->_idleSince >= std::chrono::hours(1) ) {
123 MIL <<
"Detaching medium " << iMedia->_name <<
" for baseUrl " << iMedia->_attachedUrl << std::endl;
127 MIL_PRV <<
"Not releasing media " << iMedia->_name <<
" downloading worker and not timed out yet." << std::endl;
131 auto bQueue = iMedia->_backingQueue.lock();
138 MIL <<
"Detaching medium " << iMedia->_name <<
" for baseUrl " << iMedia->_attachedUrl << std::endl;
139 bQueue->enqueue ( *req );
143 ERR <<
"Could not send detach request, creating the request failed" << std::endl;
146 ERR <<
"Could not send detach request since no backing queue was defined" << std::endl;
155 const auto schedStart = std::chrono::steady_clock::now();
156 MIL_PRV <<
"Start scheduling" << std::endl;
159 const auto dur = std::chrono::steady_clock::now() - schedStart;
160 MIL_PRV <<
"Exit scheduling after:" << std::chrono::duration_cast<std::chrono::milliseconds>( dur ).count () << std::endl;
164 for (
auto it =
_items.begin (); it !=
_items.end(); ) {
180 for(
auto queueIter =
_queues.begin(); queueIter !=
_queues.end(); queueIter ++ ) {
182 const auto &scheme = queueIter->_schemeName;
183 auto &queue = queueIter->_requests;
190 MIL_PRV <<
"Start scheduling for scheme:" << scheme <<
" queue size is: " << queue.size() << std::endl;
194 ERR <<
"Scheme: " << scheme <<
" failed to return a valid configuration." << std::endl;
196 while( queue.size() ) {
197 auto item = std::move( queue.front() );
207 const auto &config = configOpt.get();
208 const auto isSingleInstance = ( (config.cfg_flags() & ProvideQueue::Config::SingleInstance) == ProvideQueue::Config::SingleInstance );
209 if ( config.worker_type() == ProvideQueue::Config::Downloading && !isSingleInstance ) {
211 for(
auto i = queue.begin (); i != queue.end(); ) {
215 while ( i != queue.end() && !(*i) ) {
219 if ( i == queue.end() )
222 ProvideRequestRef item = *i;
226 if( item->code() == ProvideMessage::Code::Attach || item->code() == ProvideMessage::Code::Detach ) {
229 item->owner()->finishReq(
nullptr, item,
ZYPP_EXCPT_PTR(
zypp::Exception(
"Downloading Queues do not support ProvideMessage::Code::Attach requests") ) );
233 MIL_PRV <<
"Trying to schedule request: " << item->urls().front() << std::endl;
236 int existingTypeWorkers = 0;
239 int existingConnections = 0;
242 std::vector< std::pair<zypp::Url, ProvideQueue*> > possibleHostWorkers;
245 std::vector<std::string> idleWorkers;
248 std::vector<zypp::Url> mirrsWithoutWorker;
249 for (
const auto &url : item->urls() ) {
252 MIL <<
"Mirror URL " << url <<
" is incompatible with current scheme: " << scheme <<
", ignoring." << std::endl;
256 if( item->owner()->canRedirectTo( item, url ) )
257 mirrsWithoutWorker.push_back( url );
259 MIL_PRV <<
"URL was rejected" << url << std::endl;
264 if( mirrsWithoutWorker.size() == 0 ) {
265 MIL <<
"Request has NO usable URLs" << std::endl;
274 if ( ProvideQueue::Config::Downloading != workerQueue->workerConfig().worker_type() )
277 existingTypeWorkers ++;
278 existingConnections += workerQueue->activeRequests();
280 if ( workerQueue->isIdle() )
286 for (
auto i = mirrsWithoutWorker.begin (); i != mirrsWithoutWorker.end(); ) {
288 if ( u.getHost() == workerQueue->hostname() ) {
290 possibleHostWorkers.push_back( {u, workerQueue.get()} );
291 i = mirrsWithoutWorker.erase( i );
300 MIL <<
"Current stats: " << std::endl;
301 MIL <<
"Existing type workers: " << existingTypeWorkers << std::endl;
302 MIL <<
"Existing active connections: " << existingConnections << std::endl;
303 MIL <<
"Possible host workers: "<< possibleHostWorkers.size() << std::endl;
304 MIL <<
"Mirrors without worker: " << mirrsWithoutWorker.size() << std::endl;
309 MIL_PRV <<
"Reached maximum nr of connections, break" << std::endl;
316 && mirrsWithoutWorker.size() ) {
318 MIL_PRV <<
"Free worker slots and available mirror URLs, starting a new worker" << std::endl;
322 for(
const auto &url : mirrsWithoutWorker ) {
325 if ( !item->owner()->safeRedirectTo ( item, url ) )
328 ProvideQueueRef q = std::make_shared<ProvideQueue>( *
this );
329 if ( !q->startup( scheme,
_workDir / scheme / url.getHost(), url.getHost() ) ) {
333 MIL_PRV <<
"Started worker for " << url.getHost() <<
" enqueing request" << std::endl;
335 item->setActiveUrl(url);
352 if ( possibleHostWorkers.size() ) {
354 MIL_PRV <<
"No free worker slots, looking for the best existing worker" << std::endl;
356 while( possibleHostWorkers.size () ) {
357 std::vector< std::pair<zypp::Url, ProvideQueue *> >::iterator candidate = possibleHostWorkers.begin();
358 for (
auto i = candidate+1; i != possibleHostWorkers.end(); i++ ) {
359 if ( i->second->activeRequests () < candidate->second->activeRequests () )
363 if ( !item->owner()->safeRedirectTo( item, candidate->first ) ) {
364 possibleHostWorkers.erase( candidate );
368 MIL_PRV <<
"Using existing worker " << candidate->first.getHost() <<
" to download request" << std::endl;
371 item->setActiveUrl( candidate->first );
372 candidate->second->enqueue( item );
384 if ( idleWorkers.size() && mirrsWithoutWorker.size() ) {
386 MIL_PRV <<
"No free worker slots, no slots in existing queues, trying to decomission idle queues." << std::endl;
388 auto candidate = findLaziestWorker(
_workerQueues, idleWorkers );
397 for(
const auto &url : mirrsWithoutWorker ) {
399 if ( !item->owner()->safeRedirectTo ( item, url ) )
402 ProvideQueueRef q = std::make_shared<ProvideQueue>( *
this );
403 if ( !q->startup( scheme,
_workDir / scheme / url.getHost(), url.getHost() ) ) {
407 MIL_PRV <<
"Replaced worker for " << url.getHost() <<
", enqueing request" << std::endl;
409 item->setActiveUrl(url);
426 MIL_PRV <<
"End of line, deferring request for next try." << std::endl;
430 }
else if ( config.worker_type() == ProvideQueue::Config::CPUBound && !isSingleInstance ) {
432 for(
auto i = queue.begin (); i != queue.end(); ) {
436 while ( i != queue.end() && !(*i) ) {
440 if ( i == queue.end() )
444 ProvideRequestRef item = *i;
448 if( item->code() == ProvideMessage::Code::Attach || item->code() == ProvideMessage::Code::Detach ) {
450 if ( item->owner () )
451 item->owner()->finishReq(
nullptr, item,
ZYPP_EXCPT_PTR(
zypp::Exception(
"CPU bound Queues do not support ProvideAttachSpecRef requests") ) );
455 MIL_PRV <<
"Trying to schedule request: " << item->urls().front() << std::endl;
458 int existingTypeWorkers = 0;
459 int existingSchemeWorkers = 0;
462 std::vector< ProvideQueue* > possibleWorkers;
465 std::vector<std::string> idleWorkers;
471 for (
const auto &tmpurl : item->urls() ) {
473 MIL <<
"Mirror URL " << tmpurl <<
" is incompatible with current scheme: " << scheme <<
", ignoring." << std::endl;
482 MIL <<
"Request has NO usable URLs" << std::endl;
491 if ( ProvideQueue::Config::CPUBound != workerQueue->workerConfig().worker_type() )
496 existingTypeWorkers ++;
498 existingSchemeWorkers++;
499 if ( workerQueue->canScheduleMore() )
500 possibleWorkers.push_back(workerQueue.get());
503 if ( workerQueue->isIdle() )
508 MIL <<
"Current stats: " << std::endl;
509 MIL <<
"Existing type workers: " << existingTypeWorkers << std::endl;
510 MIL <<
"Possible CPU workers: "<< possibleWorkers.size() << std::endl;
514 if ( possibleWorkers.size() ) {
516 for (
auto &w : possibleWorkers ) {
518 MIL_PRV <<
"Using existing idle worker to provide request" << std::endl;
520 item->owner()->redirectTo ( item, url );
521 item->setActiveUrl( url );
533 if ( existingTypeWorkers < cpuLimit ) {
535 MIL_PRV <<
"Free CPU slots, starting a new worker" << std::endl;
538 item->owner()->redirectTo ( item, url );
540 ProvideQueueRef q = std::make_shared<ProvideQueue>( *
this );
541 if ( q->startup( scheme,
_workDir / scheme ) ) {
543 item->setActiveUrl(url);
560 if ( possibleWorkers.size() ) {
561 MIL_PRV <<
"No free CPU slots, looking for the best existing worker" << std::endl;
563 if( possibleWorkers.size () ) {
564 std::vector<ProvideQueue *>::iterator candidate = possibleWorkers.begin();
565 for (
auto i = candidate+1; i != possibleWorkers.end(); i++ ) {
566 if ( (*i)->activeRequests () < (*candidate)->activeRequests () )
571 item->owner()->redirectTo ( item, url );
573 MIL_PRV <<
"Using existing worker to provide request" << std::endl;
574 item->setActiveUrl( url );
575 (*candidate)->enqueue( item );
583 if ( idleWorkers.size() ) {
585 MIL_PRV <<
"No free CPU slots, no slots in existing queues, trying to decomission idle queues." << std::endl;
587 auto candidate = findLaziestWorker(
_workerQueues, idleWorkers );
593 item->owner()->redirectTo ( item, url );
595 ProvideQueueRef q = std::make_shared<ProvideQueue>( *
this );
596 if ( q->startup( scheme,
_workDir / scheme ) ) {
598 MIL_PRV <<
"Replaced worker, enqueing request" << std::endl;
600 item->setActiveUrl(url);
616 MIL_PRV <<
"No idle workers and no free CPU spots, wait for the next schedule run" << std::endl;
621 MIL_PRV <<
"End of line, deferring request for next try." << std::endl;
628 for(
auto i = queue.begin (); i != queue.end(); ) {
632 while ( i != queue.end() && !(*i) ) {
636 if ( i == queue.end() )
640 ProvideRequestRef item = *i;
641 MIL_PRV <<
"Trying to schedule request: " << item->urls().front() << std::endl;
646 for (
const auto &tmpurl : item->urls() ) {
648 MIL <<
"Mirror URL " << tmpurl <<
" is incompatible with current scheme: " << scheme <<
", ignoring." << std::endl;
657 MIL <<
"Request has NO usable URLs" << std::endl;
667 ProvideQueueRef q = std::make_shared<ProvideQueue>( *
this );
668 if ( !q->startup( scheme,
_workDir / scheme ) ) {
669 ERR <<
"Worker startup failed!" << std::endl;
678 MIL_PRV <<
"Started worker, enqueing request" << std::endl;
682 MIL_PRV <<
"Found worker, enqueing request" << std::endl;
687 item->owner()->redirectTo ( item, url );
689 item->setActiveUrl(url);