Merge pull request #3232 from cfromknecht/filter-non-target-sessions

watchtower/wtclient: filter non-target towers from candidates
This commit is contained in:
Olaoluwa Osuntokun 2019-06-20 19:44:35 -07:00 committed by GitHub
commit 8ee803e449
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 27 additions and 1 deletions

View File

@ -18,6 +18,10 @@ type TowerCandidateIterator interface {
// to return results in any particular order. If no more candidates are
// available, ErrTowerCandidatesExhausted is returned.
Next() (*wtdb.Tower, error)
// TowerIDs returns the set of tower IDs contained in the iterator,
// which can be used to filter candidate sessions for the active tower.
TowerIDs() map[wtdb.TowerID]struct{}
}
// towerListIterator is a linked-list backed TowerCandidateIterator.
@ -58,6 +62,17 @@ func (t *towerListIterator) Reset() error {
return nil
}
// TowerIDs returns the set of tower IDs contained in the iterator, which can be
// used to filter candidate sessions for the active tower.
func (t *towerListIterator) TowerIDs() map[wtdb.TowerID]struct{} {
ids := make(map[wtdb.TowerID]struct{})
for e := t.candidates.Front(); e != nil; e = e.Next() {
tower := e.Value.(*wtdb.Tower)
ids[tower.ID] = struct{}{}
}
return ids
}
// Next returns the next candidate tower. This iterator will always return
// candidates in the order given when the iterator was instantiated. If no more
// candidates are available, ErrTowerCandidatesExhausted is returned.

View File

@ -151,6 +151,7 @@ type TowerClient struct {
negotiator SessionNegotiator
candidateSessions map[wtdb.SessionID]*wtdb.ClientSession
activeSessions sessionQueueSet
targetTowerIDs map[wtdb.TowerID]struct{}
sessionQueue *sessionQueue
prevTask *backupTask
@ -198,10 +199,14 @@ func New(config *Config) (*TowerClient, error) {
log.Infof("Using private watchtower %s, offering policy %s",
cfg.PrivateTower, cfg.Policy)
candidates := newTowerListIterator(tower)
targetTowerIDs := candidates.TowerIDs()
c := &TowerClient{
cfg: cfg,
pipeline: newTaskPipeline(),
activeSessions: make(sessionQueueSet),
targetTowerIDs: targetTowerIDs,
statTicker: time.NewTicker(DefaultStatInterval),
forceQuit: make(chan struct{}),
}
@ -213,7 +218,7 @@ func New(config *Config) (*TowerClient, error) {
SendMessage: c.sendMessage,
ReadMessage: c.readMessage,
Dial: c.dial,
Candidates: newTowerListIterator(tower),
Candidates: candidates,
MinBackoff: cfg.MinBackoff,
MaxBackoff: cfg.MaxBackoff,
})
@ -526,6 +531,12 @@ func (c *TowerClient) nextSessionQueue() *sessionQueue {
continue
}
// Skip any sessions that are still active, but are not for the
// users currently configured tower.
if _, ok := c.targetTowerIDs[sessionInfo.TowerID]; !ok {
continue
}
candidateSession = sessionInfo
break
}