"""Wrappers exploiting social patterns for improved disclosure prediction.""" from __future__ import division from itertools import izip, combinations, product, chain import operator from diles.learn import Label, Learner from diles.stats import minimals, maximals, mean, quartiles, atchain from diles.stats import posetwidth, posetchains, majorposetchains, inchain from diles.learn.util import jaccarddist, sugconf, narrowconf DOCTESTDEBUG = [] # ============================================================================= # misc utility functions # ============================================================================= def _maxitems(dic, keys=None, val=None, _maxfn=max): """Get those items in `dic` whose values are maximal. `val` may be a function which converts values to something to be used for comparison. Returns these maximal items as a new dictionary. Optionally the items to consider maybe limited by `keys`, a positive key filter list. >>> d = {1: range(4), 2: range(5), 3: range(5)} >>> sorted(_maxitems(d, val=lambda x: len(x)).items()) [(2, [0, 1, 2, 3, 4]), (3, [0, 1, 2, 3, 4])] >>> sorted(_maxitems(d, [1,2], val=lambda x: len(x)).items()) [(2, [0, 1, 2, 3, 4])] >>> sorted(_maxitems(d, val=lambda x: -x[-1]).items()) [(1, [0, 1, 2, 3])] """ keys = keys or dic.keys() val = val or (lambda x: x) maxcount = _maxfn(val(dic[k]) for k in keys) longest = [k for k in keys if val(dic[k]) == maxcount] return dict((k, dic[k]) for k in longest) def _minitems(dic, keys=None, val=None): """Similar as :func:`_maxitems()`, but for minimal items.""" return _maxitems(dic, keys, val, _maxfn=min) def _omcounts(sits, pairs=False): omc = dict(preserving=0, reversing=0, ignoring=0, loosing=0) for s1, s2 in sits if pairs else combinations(sits, 2): try: omc[s1.ordermapping(s2)] += 1 except ValueError: continue # situations not in same community or not comparable return omc # ============================================================================= # interpolation # ============================================================================= class _InterpolationRefused(Exception): pass def _homogenize(newsit, pairs, gomc, strategies): """Get a subset of `pairs` where all have the same order mapping (which is not an order loosing mapping). If no such subset could be generated or if the subset would have pairs with an order loosing mapping, an _InterpolationRefused exception is raised. Homogenization may follow different `strategies`: reasonable values for strategy chains are: - empty chain - *droploosing* - 1-, and 2-element permutations of strategies *distance* and *counts* - *droploosing* followed by one the the above permutation Finally there 10 possible chains. """ def checkloosing(bytype): omt, pairs = bytype.popitem() if omt == 'loosing': raise _InterpolationRefused("only order loosing") return omt, pairs # arrange pairs by OMT bytype = dict(preserving=[], reversing=[], ignoring=[], loosing=[]) for s1, s2 in pairs: bytype[s1.ordermapping(s2)].append((s1, s2)) for k, v in bytype.items(): if len(v) == 0: del bytype[k] # check if already done or only loosing if len(bytype) == 1: if DOCTESTDEBUG: print "no need to homogenize" return checkloosing(bytype) # apply strategies for strategy in strategies: if strategy == 'droploosing': bytype.pop('loosing', None) if len(bytype) == 1: if DOCTESTDEBUG: print "win by drop loosing" return checkloosing(bytype) elif strategy == 'distance': # compare sorted distances bytype = _minitems(bytype, val=lambda x: newsit.dists(*chain(*x), combine=sorted)) if len(bytype) == 1: if DOCTESTDEBUG: print "win by distance" return checkloosing(bytype) elif strategy == 'counts': # get pairs with most often occurring type bytype = _maxitems(bytype, val=len) if len(bytype) == 1: if DOCTESTDEBUG: print "win by local count" return checkloosing(bytype) # get pairs whose type globally occurs more often than others rank = _maxitems(gomc, bytype.keys()) bytype = dict((omt, bytype[omt]) for omt in bytype if omt in rank) if len(bytype) == 1: if DOCTESTDEBUG: print "win by global count" return checkloosing(bytype) else: assert False # invalid strategy raise _InterpolationRefused("could not homogenize") def _ipoljoin(newsit, lower, upper, gomc, strategies): """Interpolation when joining existing chains.""" pairs = tuple(product(lower, upper)) omt, pairs = _homogenize(newsit, pairs, gomc, strategies) lower, upper = (set(x) for x in zip(*pairs)) ldist = jaccarddist(newsit.group, frozenset.union(*(s.group for s in lower))) udist = jaccarddist(newsit.group, frozenset.intersection(*(s.group for s in upper))) #print omt, ldist, udist, [s.label for s in upper] if omt == 'reversing': if ldist < udist: combine, sits = Label.hintersection, lower else: combine, sits = Label.hunion, upper elif omt == 'preserving': if ldist < udist: combine, sits = Label.hunion, lower else: combine, sits = Label.hintersection, upper elif omt == 'ignoring': combine, sits = Label, [lower.pop()] # just pick one else: assert False return combine(*(s.label for s in sits)) def _ipolextend(newsit, pairs, gomc, strategies): """Interpolation when extending existing chains.""" atbottom = pairs[0][0].group > newsit.group omt, pairs = _homogenize(newsit, pairs, gomc, strategies) neighbors = tuple(p[0] if atbottom else p[1] for p in pairs) return _ipolextendorcreate(newsit, neighbors, omt, atbottom, gomc) def _ipolcreate(newsit, neighbors, gomc): """Interpolation when creating new chains.""" atbottom = neighbors[0].group > newsit.group xgomc = _maxitems(gomc) xgomc.pop('loosing', None) if not xgomc: raise _InterpolationRefused("mostly order loosing (create)") if len(xgomc) > 1: raise _InterpolationRefused("no global major type (create)") omt = xgomc.popitem()[0] return _ipolextendorcreate(newsit, neighbors, omt, atbottom, gomc) def _ipolextendorcreate(newsit, neighbors, omt, atbottom, gomc): """Common part of interpolation when extending or creating chains.""" if omt == 'ignoring': # choose label of closest neighbors bydists = {} for s in neighbors: d = newsit.dists(s) bydists.setdefault(d, []).append(s) neighbors = bydists[min(bydists)] candidates = tuple(s.label for s in neighbors) if len(set(candidates)) == 1: return candidates[0] bycounts = dict((l, candidates.count(l)) for l in candidates) bycounts = _maxitems(bycounts) if len(bycounts) == 1: return bycounts.popitem()[0] raise _InterpolationRefused("no best disclosure for ignoring mapping") if omt == 'reversing': if atbottom: combine = Label.hunion else: combine = Label.hintersection elif omt == 'preserving': if atbottom: combine = Label.hintersection else: combine = Label.hunion else: assert False return combine(*(s.label for s in neighbors)) # ============================================================================= # situation wrapper # ============================================================================= class _Situation(object): """Convenience wrapper around a sample-label tuple providing order mapping related functionality. """ @staticmethod def dropconflicts(sits): """From these situations which only differ in label, only keep the most recent one (assuming `sits` is sorted by time). """ reduced = sits[:] seen = set() for i, s in reversed(tuple(enumerate(sits))): if tuple(s.sample) in seen: del reduced[i] else: seen.add(tuple(s.sample)) return reduced def __init__(self, sample, label, fwl, fwt, pfeatures): """A situation is given by a `sample` (feature tuple), a `label` and a list of indices specifiying person existence related features (`pfeatures`). """ self.sample = sample self.label = label self.group = frozenset(i for i, j in enumerate(pfeatures) if sample[j]) self.xsample = tuple(s for i, s in enumerate(sample) if i not in pfeatures and fwl[i] > fwt) def __str__(self): return str((self.xsample, Label(self.group), self.label)) def __repr__(self): return self.__str__() def __hash__(self): return hash((self.label, self.group, self.xsample)) def community(self, sits): """Get those situation in `sits` which differ from `self` only in the person features. """ return tuple(s for s in sits if self.xsample == s.xsample) def match(self, sits): """If there is one, returns the label of a matching situation (i.e. all features equal), otherwise `None` is returned. """ for s in sits: if self.sample == s.sample: return s.label return None def neighbors(self, sits, nearestonly=False): """Get lower and upper neighbors of `self` in `sits`, i.e. those situations which are in `self`'s :meth:`community` and whose person groups is smaller/greater than `self`'s person group and, among these, where there isn't any other situation with a greater/smaller person group. """ def nearest(sits): if not sits: return () dists = [(jaccarddist(s.group, self.group), s) for s in sits] mindist = min(dists) return tuple(s for d, s in dists if d == mindist) community = self.community(sits) lower = tuple(s for s in community if self.group > s.group) lower = maximals(lower, itemgetter=lambda s: s.group) upper = tuple(s for s in community if self.group < s.group) upper = minimals(upper, itemgetter=lambda s: s.group) if nearestonly: lower, upper = nearest(lower), nearest(upper) return lower, upper def ordermapping(self, sit): """Get the order mapping type for two situations (from the same community and with comparable person groups). """ s1, s2 = self, sit if s1.xsample != s2.xsample: raise ValueError if s1.group > s2.group: s1, s2 = s2, s1 if not s1.group < s2.group: raise ValueError if s1.label < s2.label: return 'preserving' if s1.label > s2.label: return 'reversing' if s1.label == s2.label: return 'ignoring' return 'loosing' def dists(self, *others, **kwargs): """Get distances to other situations (Jaccard distance of person groups features). Assumes all situations are in the same community. Distances may be combined according to the function provided by the `combine` keyword (which uses `tuple` as default if there multiple other situations and ``lambda x: x`` if there is one other situation). """ assert others combine = kwargs.get('combine', tuple if len(others) > 1 else next) return combine(jaccarddist(self.group, o.group) for o in others) # ============================================================================= # learner # ============================================================================= class _BaseSocialWrapper(Learner): # pylint: disable-msg=W0223 def __init__(self, bcls, **params): super(_BaseSocialWrapper, self).__init__() self.pfeatures = set(params.pop('pfeatures')) self.getgroupft = operator.itemgetter(*self.pfeatures) self.getotherft = lambda s: tuple(f for i, f in enumerate(s) if i not in self.pfeatures) self.learner = bcls(**params) self.stats = {} self.tlab = None def settlab(self, tlab): self.tlab = tlab def train(self, samples, labels, fwl=None, fwt=0): self.fwl, self.fwt = fwl or [1] * len(samples[0]), fwt # pylint: disable-msg=W0201 self.learner.train(samples, labels, self.fwl, self.fwt) sits = [_Situation(s, l, self.fwl, self.fwt, self.pfeatures) for s, l in izip(samples, labels)] self.sits = _Situation.dropconflicts(sits) # pylint: disable-msg=W0201 self.stats['droppedconflicts'] = len(sits) - len(self.sits) class OMIWrapper(_BaseSocialWrapper): paramspace = { 'variants': [ (), ('globalom',), ('ipolextend',), ('ipolextend', 'ipolcreate'), ], 'minneighbors': [1,2,3], 'nearestonly': [True, False], 'homogenize': [(), ('droploosing',), ('distance',), ('counts',), #('droploosing', 'distance'), #('droploosing', 'distance', 'counts'), #('droploosing', 'counts'), #('droploosing', 'counts', 'distance'), #('distance', 'counts'), #('counts', 'distance') ], 'pfeatures': None # will be set externally } def __init__(self, bcls, **params): self.strategies = params.pop('homogenize', ()) variants = params.pop('variants', ()) self.ipolcreate = 'ipolcreate' in variants self.ipolextend = 'ipolextend' in variants self.globalom = 'globalom' in variants self.nearestonly = params.pop('nearestonly', False) self.minneighbors = params.pop('minneighbors', 2) super(OMIWrapper, self).__init__(bcls, **params) def _count(self, key): self.stats.setdefault(key, 0) self.stats[key] += 1 def _interpolate(self, sample, newsit, community): gomc = _omcounts(community) lower, upper = newsit.neighbors(community, nearestonly=self.nearestonly) if self.globalom: gomc.pop('ignoring', None) omts = _maxitems(gomc).keys() if "loosing" in omts: return self._fallback(sample) if "preserving" in omts and "reversing" in omts: return self._fallback(sample) if not "preserving" in omts and not "reversing" in omts: return self._fallback(sample) omt = "preserving" if "preserving" in omts else "reversing" if len(lower) >= self.minneighbors and len(lower) >= len(upper): self._count('scheme-global') self._count('interpolated') if omt == 'preserving': return Label.hunion(*(x.label for x in lower)), 2/3, None elif omt == 'reversing': return Label.hintersection(*(x.label for x in lower)), 2/3, None else: assert False if len(upper) >= self.minneighbors: self._count('scheme-global') self._count('interpolated') if omt == 'preserving': return Label.intersection(*(x.label for x in upper)), 2/3, None elif omt == 'reversing': return Label.hunion(*(x.label for x in upper)), 2/3, None else: assert False return self._fallback(sample) if len(upper) >= self.minneighbors and len(lower) >= self.minneighbors: # join existing chains self._count('scheme-join') ipol = lambda: _ipoljoin(newsit, lower, upper, gomc, self.strategies) elif len(upper) >= self.minneighbors or len(lower) >= self.minneighbors: # either lower or upper neighbors neighbors = upper or lower pairs = [] if len(lower) >= self.minneighbors: for nb in lower: pairs += ((nbx, nb) for nbx in nb.neighbors(community)[0]) else: # len(upper) >= self.minneighbors for nb in upper: pairs += ((nb, nbx) for nbx in nb.neighbors(community)[1]) if pairs: # extend existing chains if self.ipolextend: self._count('scheme-extend') ipol = lambda: _ipolextend(newsit, pairs, gomc, self.strategies) else: return self._fallback(sample) else: # create new chains if self.ipolcreate: self._count('scheme-create') ipol = lambda: _ipolcreate(newsit, neighbors, gomc) else: return self._fallback(sample) else: # no neighbors self._count('scheme-noneighbors') return self._fallback(sample) try: l = ipol() self._count('interpolated') return l, 2/3, None except _InterpolationRefused: return self._fallback(sample) def _fallback(self, sample, sugg=False): self._count('refused') label, conf, ranking = self.learner.predict(sample) return label, sugconf(conf) if sugg else conf, ranking def predict(self, sample): """Predict the label for the given sample. Returns: - predicted label - confidence in prediction, between 0 and 1 (exclusive), not comparable across different learner implementations - a prediction ranking of all known labels (also not comparable across different learners), e.g. ``[('a', 0.6), ('b', 0.5), ...]`` """ newsit = _Situation(sample, None, self.fwl, self.fwt, self.pfeatures) community = newsit.community(self.sits) match = newsit.match(community) if match: return match, narrowconf(1), None return self._interpolate(sample, newsit, community) # ============================================================================= # tests # ============================================================================= def __doctests_situation(): """ >>> samples = ( ... (1,2,True,False,False,True,3), ... (0,1,True,True,False,True,3), ... (1,2,True,False,True,True,3), ... (1,2,True,True,False,True,3), ... (1,2,True,True,True,True,3), ... (0,1,True,False,True,True,4), ... ) >>> labels = [Label(i) for i in range(len(samples))] >>> pfeatures = 5,4,3,2 >>> fwl, fwt = [1] * len(samples[0]), 0 >>> sits = [_Situation(s, l, fwl, fwt, pfeatures) for s, l in zip(samples, labels)] >>> sits[0].community(sits[1:]) (((1, 2, 3), {0, 1, 3}, {2}), ((1, 2, 3), {0, 2, 3}, {3}), ((1, 2, 3), {0, 1, 2, 3}, {4})) >>> sits[-1].community(sits[:-1]) () >>> sits[0].neighbors(sits[1:]) ((), (((1, 2, 3), {0, 1, 3}, {2}), ((1, 2, 3), {0, 2, 3}, {3}))) >>> _Situation.ordermapping(sits[0], sits[2]) 'loosing' >>> sits[2].label = Label([0,2]) >>> _Situation.ordermapping(sits[0], sits[2]) 'preserving' >>> sits[2].label = Label([]) >>> _Situation.ordermapping(sits[0], sits[2]) 'reversing' >>> sits[2].label = Label(0) >>> _Situation.ordermapping(sits[0], sits[2]) 'ignoring' Check dropconflicts(): >>> pf = range(5) >>> samlabs = [ ... ([1,0,0,0,0,2], "A"), ... ([1,0,0,0,0,2], "B"), ... ([1,0,1,0,0,2], "C"), ... ([1,0,0,0,0,2], "D"), ... ([1,0,0,0,0,3], "E"), ... ] >>> fwl, fwt = [1] * len(samlabs[0][0]), 0 >>> sits = [_Situation(tuple(s), Label(l), fwl, fwt, pf) for s, l in samlabs] >>> _Situation.dropconflicts(sits) [((2,), {0, 2}, {'C'}), ((2,), {0}, {'D'}), ((3,), {0}, {'E'})] """ def __doctests_interpolate(): """ >>> DOCTESTDEBUG[:] = ["on"] >>> persons = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" >>> pf = range(len(persons)) >>> fwl, fwt = [1] * len(persons), 0 >>> ps = lambda p: [int(x in p) for x in persons] >>> lower = [ ... _Situation(ps("A"), Label("12"), fwl, fwt, pf), ... _Situation(ps("B"), Label("23"), fwl, fwt, pf), ... ] >>> upper = [ ... _Situation(ps("ABCD"), Label("2"), fwl, fwt, pf), ... _Situation(ps("ABCEF"), Label("23"), fwl, fwt, pf), ... _Situation(ps("ABCGH"), Label("1234"), fwl, fwt, pf), ... _Situation(ps("ABCIJ"), Label("12345"), fwl, fwt, pf), ... ] >>> strategies = ('droploosing', 'counts', 'distance') >>> gomc = dict(preserving=2, reversing=2, ignoring=2, loosing=8) Preserving (win by local type), interpolate with upper neighbors: >>> newsit = _Situation(ps("ABC"), Label(""), fwl, fwt, pf) >>> _ipoljoin(newsit, lower, upper, gomc, strategies) win by local count {'1', '2', '3', '4'} Preserving (win by local type), interpolate with lower neighbors: >>> newsit = _Situation(ps("AB"), Label(""), fwl, fwt, pf) >>> _ipoljoin(newsit, lower, upper, gomc, strategies) win by local count {'1', '2', '3'} Reversing (win by dist), interpolate with lower neighbors: >>> newsit = _Situation(ps("AB"), Label(""), fwl, fwt, pf) >>> _ipoljoin(newsit, lower, upper[:3], gomc, strategies) win by distance {'2'} Reversing (win by dist), interpolate with upper neighbors: >>> newsit = _Situation(ps("ABC"), Label(""), fwl, fwt, pf) >>> _ipoljoin(newsit, lower, upper[:3], gomc, strategies) win by distance {'2'} Ignoring (win by drop loosing): >>> _ipoljoin(newsit, lower, upper[1:2], gomc, strategies) win by drop loosing {'2', '3'} >>> _ipoljoin(newsit, lower[0:1], upper[1:2], gomc, strategies) Traceback (most recent call last): _InterpolationRefused: only order loosing Extending chains ---------------- >>> pairs = [ ... (_Situation(ps("ABC"), Label("12"), fwl, fwt, pf), _Situation(ps("ABCD"), Label("2"), fwl, fwt, pf)), ... (_Situation(ps("AD"), Label("23"), fwl, fwt, pf), _Situation(ps("ABCD"), Label("2"), fwl, fwt, pf)), ... (_Situation(ps("ABC"), Label("12"), fwl, fwt, pf), _Situation(ps("ABCE"), Label("123"), fwl, fwt, pf)), ... (_Situation(ps("AD"), Label("23"), fwl, fwt, pf), _Situation(ps("ABDE"), Label("123"), fwl, fwt, pf)), ... ] >>> gomc = dict(preserving=2, reversing=2, ignoring=2, loosing=8) >>> newsit = _Situation(ps("A"), Label(""), fwl, fwt, pf) >>> _ipolextend(newsit, pairs[:3], gomc, strategies) win by local count {'1', '2', '3'} Check influence of global major type: >>> _ipolextend(newsit, pairs, gomc, strategies) Traceback (most recent call last): _InterpolationRefused: could not homogenize >>> gomc = dict(preserving=3, reversing=2, ignoring=2, loosing=8) >>> _ipolextend(newsit, pairs, gomc, strategies) win by global count {'2'} >>> gomc = dict(preserving=2, reversing=3, ignoring=4, loosing=8) >>> _ipolextend(newsit, pairs, gomc, strategies) win by global count {'1', '2', '3'} Check distance based selection: >>> pairs = [ ... (_Situation(ps("ABC"), Label("12"), fwl, fwt, pf), _Situation(ps("ABCD"), Label("2"), fwl, fwt, pf)), ... (_Situation(ps("AD"), Label("23"), fwl, fwt, pf), _Situation(ps("ABCD"), Label("2"), fwl, fwt, pf)), ... (_Situation(ps("ABC"), Label("12"), fwl, fwt, pf), _Situation(ps("ABCEX"), Label("123"), fwl, fwt, pf)), ... (_Situation(ps("AD"), Label("23"), fwl, fwt, pf), _Situation(ps("ABDEX"), Label("123"), fwl, fwt, pf)), ... ] >>> gomc = dict(preserving=2, reversing=2, ignoring=2, loosing=8) >>> newsit = _Situation(ps("A"), Label(""), fwl, fwt, pf) >>> _ipolextend(newsit, pairs, gomc, strategies) # reversing win by distance {'1', '2', '3'} Check lower neighbors: >>> newsit = _Situation(ps("ABCDEX"), Label(""), fwl, fwt, pf) >>> _ipolextend(newsit, pairs, gomc, strategies) # reversing win by distance {'1', '2', '3'} >>> pairs = [ ... (_Situation(ps("ABC"), Label("12"), fwl, fwt, pf), _Situation(ps("ABCD"), Label("2"), fwl, fwt, pf)), ... (_Situation(ps("AD"), Label("23"), fwl, fwt, pf), _Situation(ps("ABCD"), Label("2"), fwl, fwt, pf)), ... (_Situation(ps("ABC"), Label("12"), fwl, fwt, pf), _Situation(ps("ABCE"), Label("123"), fwl, fwt, pf)), ... (_Situation(ps("AD"), Label("23"), fwl, fwt, pf), _Situation(ps("ABDE"), Label("123"), fwl, fwt, pf)), ... ] >>> newsit = _Situation(ps("ABCDEX"), Label(""), fwl, fwt, pf) >>> _ipolextend(newsit, pairs, gomc, strategies) # reversing Traceback (most recent call last): _InterpolationRefused: could not homogenize >>> gomc = dict(preserving=1, reversing=2, ignoring=2, loosing=8) >>> _ipolextend(newsit, pairs, gomc, strategies) # reversing win by global count {'2'} Creating chains --------------- >>> neighbors = [ ... _Situation(ps("ABCD"), Label("2"), fwl, fwt, pf), ... _Situation(ps("ABCEF"), Label("23"), fwl, fwt, pf), ... _Situation(ps("ABCGH"), Label("1234"), fwl, fwt, pf), ... _Situation(ps("ABCIJ"), Label("12345"), fwl, fwt, pf), ... ] >>> newsit = _Situation(ps("ABC"), Label(""), fwl, fwt, pf) >>> gomc = dict(preserving=1, reversing=2, ignoring=2, loosing=8) >>> _ipolcreate(newsit, neighbors, gomc) Traceback (most recent call last): _InterpolationRefused: mostly order loosing (create) >>> gomc = dict(preserving=1, reversing=2, ignoring=2, loosing=1) >>> _ipolcreate(newsit, neighbors, gomc) Traceback (most recent call last): _InterpolationRefused: no global major type (create) >>> gomc = dict(preserving=1, reversing=2, ignoring=3, loosing=1) >>> _ipolcreate(newsit, neighbors, gomc) {'2'} >>> neighbors = [ ... _Situation(ps("ABCD"), Label("2"), fwl, fwt, pf), ... _Situation(ps("ABCEF"), Label("23"), fwl, fwt, pf), ... _Situation(ps("ABCG"), Label("1234"), fwl, fwt, pf), ... _Situation(ps("ABCIJ"), Label("12345"), fwl, fwt, pf), ... ] >>> gomc = dict(preserving=1, reversing=2, ignoring=3, loosing=1) >>> _ipolcreate(newsit, neighbors, gomc) Traceback (most recent call last): _InterpolationRefused: no best disclosure for ignoring mapping >>> _ipolcreate(newsit, neighbors[1:], gomc) {'1', '2', '3', '4'} >>> gomc = dict(preserving=2, reversing=2, ignoring=3, loosing=1) >>> _ipolcreate(newsit, neighbors, gomc) Traceback (most recent call last): _InterpolationRefused: no best disclosure for ignoring mapping >>> gomc = dict(preserving=1, reversing=2, ignoring=1, loosing=1) >>> _ipolcreate(newsit, neighbors, gomc) {'1', '2', '3', '4', '5'} >>> gomc = dict(preserving=5, reversing=2, ignoring=1, loosing=1) >>> _ipolcreate(newsit, neighbors, gomc) {'2'} """ def __doctests_homogenize(): """ >>> pf = range(1,11) >>> fwl, fwt = [1] * len(pf), 0 >>> newsit = _Situation([2,1,1,0,0,0,0,0,0,0,0], Label("12"),fwl, fwt, pf) >>> lower = [ ... _Situation([2,1,0,0,0,0,0,0,0,0,0], Label("12"),fwl, fwt, pf), ... _Situation([2,0,1,0,0,0,0,0,0,0,0], Label("23"),fwl, fwt, pf), ... ] >>> upper = [ ... _Situation([2,1,1,1,1,0,0,0,0,0,0], Label("2"),fwl, fwt, pf), ... _Situation([2,1,1,1,0,1,1,0,0,0,0], Label("23"),fwl, fwt, pf), ... _Situation([2,1,1,1,0,0,0,1,1,0,0], Label("123"),fwl, fwt, pf), ... _Situation([2,1,1,1,0,1,1,0,0,1,1], Label("1234"),fwl, fwt, pf), ... ] >>> strategies = ('droploosing', 'counts', 'distance') >>> gomc = dict(preserving=2, reversing=2, ignoring=2, loosing=8) >>> #import diles.learn.pattern >>> #diles.learn.pattern.DOCTESTDEBUG = True >>> DOCTESTDEBUG[:] = ["on"] >>> pairs = tuple(product(lower, upper)) >>> omt, xpairs = _homogenize(newsit, pairs, gomc, strategies) win by local count >>> omt 'preserving' >>> for a,b in xpairs: print a, "-", b ((2,), {0}, {'1', '2'}) - ((2,), {0, 1, 2, 6, 7}, {'1', '2', '3'}) ((2,), {0}, {'1', '2'}) - ((2,), {0, 1, 2, 4, 5, 8, 9}, {'1', '2', '3', '4'}) ((2,), {1}, {'2', '3'}) - ((2,), {0, 1, 2, 6, 7}, {'1', '2', '3'}) ((2,), {1}, {'2', '3'}) - ((2,), {0, 1, 2, 4, 5, 8, 9}, {'1', '2', '3', '4'}) >>> pairs = tuple(product(lower, upper[:2])) >>> omt, xpairs = _homogenize(newsit, pairs, gomc, strategies) win by local count >>> omt 'reversing' >>> for a,b in xpairs: print a, "-", b ((2,), {0}, {'1', '2'}) - ((2,), {0, 1, 2, 3}, {'2'}) ((2,), {1}, {'2', '3'}) - ((2,), {0, 1, 2, 3}, {'2'}) >>> pairs = tuple(product(lower, upper[1:2])) >>> omt, xpairs = _homogenize(newsit, pairs, gomc, strategies) win by drop loosing >>> omt 'ignoring' >>> for a,b in xpairs: print a, "-", b ((2,), {1}, {'2', '3'}) - ((2,), {0, 1, 2, 4, 5}, {'2', '3'}) Check only loosing pairs: >>> pairs = tuple(product(lower[0:1], upper[1:2])) >>> omt, xpairs = _homogenize(newsit, pairs, gomc, strategies) Traceback (most recent call last): _InterpolationRefused: only order loosing Check no major type: >>> pairs = tuple(product(lower[1:2], upper[1:3])) >>> omt, xpairs = _homogenize(newsit, pairs, gomc, strategies) Traceback (most recent call last): _InterpolationRefused: could not homogenize Check influence of distances: >>> pairs = tuple(product(lower, upper[:3])) >>> omt, xpairs = _homogenize(newsit, pairs, gomc, strategies) win by distance >>> omt 'reversing' >>> for a,b in xpairs: print a, "-", b # no output -> no major type ((2,), {0}, {'1', '2'}) - ((2,), {0, 1, 2, 3}, {'2'}) ((2,), {1}, {'2', '3'}) - ((2,), {0, 1, 2, 3}, {'2'}) >>> pairs = tuple(product(lower, upper[:3])) >>> newsit = _Situation([2,1,1,1,0,0,0,1,1,0,0], Label("12"),fwl, fwt, pf) # practically nonsence >>> omt, xpairs = _homogenize(newsit, pairs, gomc, strategies) win by distance >>> omt 'preserving' >>> for a,b in xpairs: print a, "-", b # no output -> no major type ((2,), {0}, {'1', '2'}) - ((2,), {0, 1, 2, 6, 7}, {'1', '2', '3'}) ((2,), {1}, {'2', '3'}) - ((2,), {0, 1, 2, 6, 7}, {'1', '2', '3'}) Check influence of global order mapping counts: >>> pairs = tuple(product(lower, upper[:3])) >>> gomc = dict(preserving=3, reversing=2, ignoring=5, loosing=8) >>> omt, xpairs = _homogenize(newsit, pairs, gomc, strategies) win by global count >>> omt 'preserving' >>> for a, b in xpairs: print a, "-", b ((2,), {0}, {'1', '2'}) - ((2,), {0, 1, 2, 6, 7}, {'1', '2', '3'}) ((2,), {1}, {'2', '3'}) - ((2,), {0, 1, 2, 6, 7}, {'1', '2', '3'}) """ def __doctests_omlearner(): """ >>> from diles.learn.learners import GuessLearner >>> DOCTESTDEBUG[:] = ["on"] >>> pf = range(5) >>> fwl, fwt = [1] * (len(pf) +1), 0 >>> samlabs = [ ... ([1,0,0,0,0,2], "A"), ... ([0,1,0,0,0,2], "B"), ... ([1,1,1,0,0,2], "ABC"), ... ([1,1,1,1,0,2], "ABCD"), ... ([0,0,0,0,1,3], "E"), ... ] >>> samples, labels = zip(*samlabs) >>> labels = [Label(l) for l in labels] >>> oml = OMIWrapper(GuessLearner, ... variants=('ipolcreate', 'ipolextend'), ... nearestonly=False, minneighbors=1, ... homogenize=('droploosing', 'counts', 'distance'), ... pfeatures=pf) Joining chains: >>> oml.train(samples, labels) >>> oml.predict([1,1,0,0,0,2]) no need to homogenize ({'A', 'B'}, 0.666..., None) >>> sorted(oml.stats.items()) [('droppedconflicts', 0), ('interpolated', 1), ('scheme-join', 1)] >>> oml.stats.clear() >>> oml.predict([1,1,1,0,0,2]) ({'A', 'B', 'C'}, 0.999..., None) >>> oml.stats {} >>> oml.predict([1,1,0,1,0,2]) no need to homogenize ({'A', 'B', 'C', 'D'}, 0.666..., None) >>> sorted(oml.stats.items()) [('interpolated', 1), ('scheme-join', 1)] Extedning chains: >>> samlabs = [ ... ([1,1,1,0,0,2], "ABC"), ... ([1,1,0,1,0,2], "ABD"), ... ([1,1,1,1,0,2], "ABCD"), ... ([0,0,0,0,1,3], "E"), ... ] >>> samples, labels = zip(*samlabs) >>> labels = [Label(l) for l in labels] >>> oml.train(samples, labels) >>> oml.stats.clear() >>> oml.predict([1,1,0,0,0,2]) no need to homogenize ({'A', 'B'}, 0.66666666666666663, None) >>> sorted(oml.stats.items()) [('interpolated', 1), ('scheme-extend', 1)] """