"""Disclosure prediction validation methods.""" from __future__ import division from itertools import izip, product import re from diles.stats import mean, quartiles, atchain from diles.stats import posetwidth, posetchains, majorposetchains, inchain from diles.learn.wrappers.social import _Situation, _omcounts, _maxitems DOCTESTDEBUG = [] # ============================================================================= # utility functions # ============================================================================= def omcheck(sit, sits, extend=False, create=False, nearestonly=False, minrate=0.5): """Check if a situation harmonizes with order mappings of its neighbors. It harmonizes if at least half of the order mappings between lower and upper neighbors are not broken by the new situation. """ assert not create or extend # create => extend lower, upper = sit.neighbors(sits, nearestonly=nearestonly) if lower and upper: matches = 0 for l, u in product(lower, upper): om1 = l.ordermapping(u) om2 = l.ordermapping(sit) om3 = sit.ordermapping(u) if om1 == "loosing": match = True elif om2 in (om1, "ignoring") and om3 in (om1, "ignoring"): match = True else: match = False if DOCTESTDEBUG: print om1, "->", om2, om3, "->", match matches += int(match) rate = matches / (len(lower) * len(upper)) return rate >= minrate if upper: upperpairs = [] for u in upper: upperpairs += [(u, x) for x in u.neighbors(sits, nearestonly=nearestonly)[1]] if upperpairs and extend: matches = 0 for u, xu in upperpairs: om1 = sit.ordermapping(u) om2 = u.ordermapping(xu) match = om2 == "loosing" or om1 in (om2, "ignoring") if DOCTESTDEBUG: print om1, "->", om2, "->", match matches += int(match) rate = matches / len(upperpairs) return rate >= minrate if create: lomc = {} for u in upper: lomc.setdefault(sit.ordermapping(u), []).append(u) gomc = _omcounts(sit.community(sits)) lw, gw = set(_maxitems(lomc, val=len)), set(_maxitems(gomc)) if DOCTESTDEBUG: print "local:", lw print "global", gw return bool(lw & gw) if lower: lowerpairs = [] for l in lower: lowerpairs += [(x, l) for x in l.neighbors(sits, nearestonly=nearestonly)[0]] if lowerpairs and extend: matches = 0 for xl, l in lowerpairs: om1 = l.ordermapping(sit) om2 = xl.ordermapping(l) match = om2 == "loosing" or om1 in (om2, "ignoring") if DOCTESTDEBUG: print om1, "->", om2, "->", match matches += int(match) rate = matches / len(lowerpairs) return rate >= minrate if create: lomc = {} for l in lower: lomc.setdefault(l.ordermapping(sit), []).append(l) gomc = _omcounts(sit.community(sits)) lw, gw = set(_maxitems(lomc, val=len)), set(_maxitems(gomc)) if DOCTESTDEBUG: print "local:", lw print "global", gw return bool(lw & gw) return True # ============================================================================= # validator class # ============================================================================= def annotate(vid, vtype=None): def decorator(m): m.id = vid m.type = vtype return m return decorator class Validator(object): """Validator for predicted disclosures using social disclosure patterns.""" def __init__(self, pfeatures): """Which of those features in a samples are person (information receiver) features is indicated by `pfeatures`, a list of feature index numbers. """ self.pfeatures = pfeatures def setup(self, samples, labels, fwl, fwt): """Set up the validator for the given history of samples and labels.""" self.fwl, self.fwt = fwl or [1] * len(samples[0]), fwt # pylint: disable-msg=W0201 sits = [_Situation(s, l, fwl, fwt, self.pfeatures) for s, l in izip(samples, labels)] self.sits = _Situation.dropconflicts(sits) # pylint: disable-msg=W0201 def validate(self, sample, label): """Validate the prediction `label` on the given `sample. Returns a dictionary mapping validation names to results (which are booleans specifying if a prediction is supported or not). """ sit = _Situation(sample, label, self.fwl, self.fwt, self.pfeatures) community = sit.community(self.sits) validations = {} vldmethods = [m for m in dir(Validator) if m.startswith("_vld_")] for methname in vldmethods: meth = getattr(self, methname) validations[meth.id] = meth(sit, community) return validations @annotate('none', None) def _vld_none(self, sit, community): # pylint: disable-msg=W0613 """No validation, always support predictions.""" return True # ------------------------------------------------------------------------- # disclosure path based validations # ------------------------------------------------------------------------- # @annotate('spw') # def _vld_same_poset_width(self, sit, community): # """Only support disclosures which do not increase the poset width of # previous disclosures. # """ # disclosures = set(s.label for s in community) # if not disclosures: # return False # pw = posetwidth(disclosures) # xdisclosures = disclosures | set([sit.label]) # xpw = posetwidth(xdisclosures) # assert xpw >= pw # return xpw == pw @annotate('cxdp', 'disclosure-paths') def _vld_at_existing_disclosure_path(self, sit, community): """Predicted disclosure is connected to an existing disclosure path.""" disclosures = set(s.label for s in community) if not disclosures: return False return any(atchain(c, sit.label) for c in posetchains(disclosures)) @annotate('ixdp', 'disclosure-paths') def _vld_on_existing_disclosure_path(self, sit, community): """Predicted disclosure is integrated in an existing disclosure path.""" disclosures = set(s.label for s in community) if not disclosures: return False return any(inchain(c, sit.label) for c in posetchains(disclosures)) @annotate('cmdp', 'disclosure-paths') def _vld_at_major_disclosure_path(self, sit, community): """Predicted disclosure is connected to a major disclosure path (length is at least the mean of all disclosure paths).""" disclosures = set(s.label for s in community) if not disclosures: return False return any(atchain(c, sit.label) for c in majorposetchains(disclosures)) @annotate('imdp', 'disclosure-paths') def _vld_on_major_disclosure_path(self, sit, community): """Predicted disclosure is integrated in a major disclosure path (length is at least the mean of all disclosure paths).""" disclosures = set(s.label for s in community) if not disclosures: return False return any(inchain(c, sit.label) for c in majorposetchains(disclosures)) @annotate('csdp', 'disclosure-paths') def _vld_at_super_disclosure_path(self, sit, community): """Predicted disclosure is connected to a super disclosure path (length is at least the 3rd quartile of all disclosure paths).""" disclosures = set(s.label for s in community) if not disclosures: return False return any(atchain(c, sit.label) for c in majorposetchains(disclosures, quartile=3)) @annotate('isdp', 'disclosure-paths') def _vld_on_super_disclosure_path(self, sit, community): """Predicted disclosure is connected to a super disclosure path (length is at least the 3rd quartile of all disclosure paths).""" disclosures = set(s.label for s in community) if not disclosures: return False return any(inchain(c, sit.label) for c in majorposetchains(disclosures, quartile=3)) # ------------------------------------------------------------------------- # usage count based validations # ------------------------------------------------------------------------- @annotate('uc1', 'usage-counts') def _vld_used_once(self, sit, community): """Absolute usage count of predicted disclosure is at least 1.""" disclosures = [s.label for s in community] return disclosures.count(sit.label) >= 1 @annotate('uc2', 'usage-counts') def _vld_used_twice(self, sit, community): """Absolute usage count of predicted disclosure is at least 2.""" disclosures = [s.label for s in community] return disclosures.count(sit.label) >= 2 @annotate('ucm', 'usage-counts') def _vld_major_disclosure(self, sit, community): """Predicted disclosure is a major disclosure (usage count is at least the mean of all disclosure usage counts).""" disclosures = [s.label for s in community] if not disclosures: return False counts = {} for d in disclosures: counts[d] = counts.get(d, 0) + 1 t = mean(counts.values()) majordiscs = set(k for k, v in counts.items() if v >= t) return sit.label in majordiscs @annotate('ucs', 'usage-counts') def _vld_super_disclosure(self, sit, community): """Predicted disclosure is a super disclosure (usage count is at least the 3rd quartile of all disclosure usage counts).""" disclosures = [s.label for s in community] if not disclosures: return False counts = {} for d in disclosures: counts[d] = counts.get(d, 0) + 1 t = quartiles(counts.values())[2] superdiscs = set(k for k, v in counts.items() if v >= t) return sit.label in superdiscs # ------------------------------------------------------------------------- # complexity based validations # ------------------------------------------------------------------------- @annotate('pw1', 'disclosure-complexity') def _vld_max_posetwidth_1(self, sit, community): # pylint: disable-msg=W0613 """Poset width of previous disclosures is at most 1.""" disclosures = set(s.label for s in community) pw = posetwidth(disclosures) return pw <= 1 @annotate('pw2', 'disclosure-complexity') def _vld_max_posetwidth_2(self, sit, community): # pylint: disable-msg=W0613 """Poset width of previous disclosures is at most 2.""" disclosures = set(s.label for s in community) pw = posetwidth(disclosures) return pw <= 2 @annotate('pwr25', 'disclosure-complexity') def _vld_max_posetwidth_ratio_25(self, sit, community): # pylint: disable-msg=W0613 """Ratio of poset width of previous disclosures to the number of unique previous disclosures is at most 0.25.""" disclosures = set(s.label for s in community) pw = posetwidth(disclosures) return not disclosures or pw / len(disclosures) <= 0.25 @annotate('pwr50', 'disclosure-complexity') def _vld_max_posetwidth_ratio_50(self, sit, community): # pylint: disable-msg=W0613 """Same as `pwr25` but with a maximum ratio of 0.5.""" disclosures = set(s.label for s in community) pw = posetwidth(disclosures) return not disclosures or pw / len(disclosures) <= 0.5 @annotate('pwr75', 'disclosure-complexity') def _vld_max_posetwidth_ratio_75(self, sit, community): # pylint: disable-msg=W0613 """Same as `pwr25` but with a maximum ratio of 0.75.""" disclosures = set(s.label for s in community) pw = posetwidth(disclosures) return not disclosures or pw / len(disclosures) <= 0.75 @annotate('udr25', 'disclosure-complexity') def _vld_max_disclosures_ratio_25(self, sit, community): # pylint: disable-msg=W0613 """Ratio of the number of unique previous disclosures to the number of previous situations is at most 0.25.""" disclosures = set(s.label for s in community) return not community or len(disclosures) / len(community) <= 0.25 @annotate('udr50', 'disclosure-complexity') def _vld_max_disclosures_ratio_50(self, sit, community): # pylint: disable-msg=W0613 """Same as `udr25` but with a maximum ratio of 0.5.""" disclosures = set(s.label for s in community) return not community or len(disclosures) / len(community) <= 0.5 @annotate('udr75', 'disclosure-complexity') def _vld_max_disclosures_ratio_75(self, sit, community): # pylint: disable-msg=W0613 """Same as `udr25` but with a maximum ratio of 0.75.""" disclosures = set(s.label for s in community) return not community or len(disclosures) / len(community) <= 0.75 # ------------------------------------------------------------------------- # order mapping based validations # ------------------------------------------------------------------------- # def _vld_omnearest(self, sit, community): # return omcheck(sit, community, nearestonly=True, minrate=0.5) # return True @annotate('om50', 'order-mapping') def _vld_om_50(self, sit, community): """Situation to predict joins existing situation chains and the predicted disclosure harmonizes with at least 50% of the order mappings among the neighbor situations.""" return omcheck(sit, community, minrate=0.5) @annotate('om75', 'order-mapping') def _vld_om_75(self, sit, community): """Same as `om50` but requires 75% harmonizing order mappings.""" return omcheck(sit, community, minrate=0.75) @annotate('om100', 'order-mapping') def _vld_om_100(self, sit, community): """Same as `om50` but requires 100% harmonizing order mappings.""" return omcheck(sit, community, minrate=1) @annotate('omx50', 'order-mapping') def _vld_omx_50(self, sit, community): """Same as `om50` but also consider the case of extending situation chains.""" return omcheck(sit, community, minrate=0.5, extend=True) @annotate('omx75', 'order-mapping') def _vld_omx_75(self, sit, community): """Same as `omx50` but requires 75% harmonizing order mappings.""" return omcheck(sit, community, minrate=0.75, extend=True) @annotate('omx100', 'order-mapping') def _vld_omx_100(self, sit, community): """Same as `omx50` but requires 100% harmonizing order mappings.""" return omcheck(sit, community, minrate=1, extend=True) @annotate('omxc50', 'order-mapping') def _vld_omxc_50(self, sit, community): """Same as `omx50` but also consider the case of creating new situation chains and then check if the new order mapping is one that globally occurs most often.""" return omcheck(sit, community, minrate=0.5, extend=True, create=True) @annotate('omxc75', 'order-mapping') def _vld_omxc_75(self, sit, community): """Same as `omxc50` but requires 75% harmonizing order mappings.""" return omcheck(sit, community, minrate=0.75, extend=True, create=True) @annotate('omxc100', 'order-mapping') def _vld_omxc_100(self, sit, community): """Same as `omxc75` but requires 100% harmonizing order mappings.""" return omcheck(sit, community, minrate=1, extend=True, create=True) # ------------------------------------------------------------------------- # combinations # ------------------------------------------------------------------------- def _combinedvalidators(self, sit, community, *vlds): """Chained validation.""" for vld in vlds: if not vld(sit, community): return False return True @annotate('uc1 + omx75') def _vld_used_once__omx_75(self, sit, community): """Chained validation.""" return self._combinedvalidators(sit, community, self._vld_used_once, self._vld_omx_75, ) @annotate('uc1 + omx100') def _vld_used_once__omx_100(self, sit, community): """Chained validation.""" return self._combinedvalidators(sit, community, self._vld_used_once, self._vld_omx_100, ) @annotate('uc2 + omx75') def _vld_used_twice__omx_75(self, sit, community): """Chained validation.""" return self._combinedvalidators(sit, community, self._vld_used_twice, self._vld_omx_75, ) @annotate('uc2 + omx100') def _vld_used_twice__omx_100(self, sit, community): """Chained validation.""" return self._combinedvalidators(sit, community, self._vld_used_twice, self._vld_omx_100, ) @annotate('uc1 + oxdp') def _vld_used_once__on_existing_disclosure_path(self, sit, community): """Chained validation.""" return self._combinedvalidators(sit, community, self._vld_used_once, self._vld_on_existing_disclosure_path, ) @annotate('uc1 + omdp') def _vld_used_once__on_major_disclosure_path(self, sit, community): """Chained validation.""" return self._combinedvalidators(sit, community, self._vld_used_once, self._vld_on_major_disclosure_path, ) @annotate('uc2 + oxdp') def _vld_used_twice__on_existing_disclosure_path(self, sit, community): """Chained validation.""" return self._combinedvalidators(sit, community, self._vld_used_twice, self._vld_on_existing_disclosure_path, ) @annotate('uc2 + omdp') def _vld_used_twice__on_major_disclosure_path(self, sit, community): """Chained validation.""" return self._combinedvalidators(sit, community, self._vld_used_twice, self._vld_on_major_disclosure_path, ) @annotate('ucm + omx75') def _vld_major_disclosure__omx_75(self, sit, community): """Chained validation.""" return self._combinedvalidators(sit, community, self._vld_major_disclosure, self._vld_omx_75, ) @annotate('ucm + omx100') def _vld_major_disclosure__omx_100(self, sit, community): """Chained validation.""" return self._combinedvalidators(sit, community, self._vld_major_disclosure, self._vld_omx_100, ) # def _vld_super_disclosure__omx_75(self, sit, community): # return self._combinedvalidators(sit, community, # self._vld_super_disclosure, # self._vld_omx_75, # ) # # def _vld_super_disclosure__omx_100(self, sit, community): # return self._combinedvalidators(sit, community, # self._vld_super_disclosure, # self._vld_omx_100, # ) @annotate('ucm + ixdp') def _vld_major_disclosure__on_existing_disclosure_path(self, sit, community): """Chained validation.""" return self._combinedvalidators(sit, community, self._vld_major_disclosure, self._vld_on_existing_disclosure_path, ) @annotate('ucm + imdp') def _vld_major_disclosure__on_major_disclosure_path(self, sit, community): """Chained validation.""" return self._combinedvalidators(sit, community, self._vld_major_disclosure, self._vld_on_major_disclosure_path, ) # def _vld_super_disclosure__on_existing_disclosure_path(self, sit, community): # return self._combinedvalidators(sit, community, # self._vld_super_disclosure, # self._vld_on_existing_disclosure_path, # ) # # def _vld_super_disclosure__on_major_disclosure_path(self, sit, community): # return self._combinedvalidators(sit, community, # self._vld_super_disclosure, # self._vld_on_major_disclosure_path, # ) # ----------------------------------------------------------------------------- _v = Validator(None) _vldmethnames = [m for m in dir(_v) if m.startswith("_vld_")] ids = tuple(getattr(_v, m).id for m in _vldmethnames) types = tuple(getattr(_v, m).type for m in _vldmethnames) descs = tuple(re.sub(r'\s+', ' ', getattr(_v, m).__doc__.split("\n\n")[0].strip()) for m in _vldmethnames) # ============================================================================= # tests # ============================================================================= def __doctests_omcheck(): """ >>> DOCTESTDEBUG[:] = ["on"] >>> from diles.learn import Label >>> persons = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" >>> pf = range(len(persons)) >>> fwl, fwt = [1] * len(persons), 0 >>> ps = lambda p: [int(x in p) for x in persons] >>> lower = [ ... _Situation(ps("A"), Label("12"), fwl, fwt, pf), ... _Situation(ps("B"), Label("23"), fwl, fwt, pf), ... ] >>> upper = [ ... _Situation(ps("ABCD"), Label("2"), fwl, fwt, pf), ... _Situation(ps("ABCEF"), Label("23"), fwl, fwt, pf), ... _Situation(ps("ABCGH"), Label("1234"), fwl, fwt, pf), ... _Situation(ps("ABCIJ"), Label("12345"), fwl, fwt, pf), ... ] >>> sit = _Situation(ps("ABC"), Label("2"), fwl, fwt, pf) >>> omcheck(sit, lower + upper) reversing -> reversing ignoring -> True loosing -> reversing preserving -> True preserving -> reversing preserving -> False preserving -> reversing preserving -> False reversing -> reversing ignoring -> True ignoring -> reversing preserving -> False preserving -> reversing preserving -> False preserving -> reversing preserving -> False False >>> omcheck(sit, lower + upper[:2]) reversing -> reversing ignoring -> True loosing -> reversing preserving -> True reversing -> reversing ignoring -> True ignoring -> reversing preserving -> False True >>> sit = _Situation(ps("ABC"), Label("123"), fwl, fwt, pf) >>> omcheck(sit, lower + upper) reversing -> preserving reversing -> False loosing -> preserving reversing -> True preserving -> preserving preserving -> True preserving -> preserving preserving -> True reversing -> preserving reversing -> False ignoring -> preserving reversing -> False preserving -> preserving preserving -> True preserving -> preserving preserving -> True True Also check when extending chains: >>> sit = _Situation(ps("ABCDEF"), Label("123"), fwl, fwt, pf) >>> omcheck(sit, lower + upper) True >>> omcheck(sit, lower + upper, extend=True) preserving -> reversing -> False preserving -> reversing -> False preserving -> loosing -> True preserving -> ignoring -> False False >>> omcheck(sit, lower + upper, extend=True, minrate=0.25) preserving -> reversing -> False preserving -> reversing -> False preserving -> loosing -> True preserving -> ignoring -> False True >>> sit = _Situation(ps("ABCGHIJ"), Label("123"), fwl, fwt, pf) >>> omcheck(sit, lower + upper, extend=True) reversing -> preserving -> False reversing -> preserving -> False reversing -> preserving -> False reversing -> preserving -> False False >>> sit = _Situation(ps("ABCGHIJ"), Label("12345"), fwl, fwt, pf) >>> omcheck(sit, lower + upper, extend=True) preserving -> preserving -> True preserving -> preserving -> True ignoring -> preserving -> True ignoring -> preserving -> True True >>> sit = _Situation(ps(""), Label("1"), fwl, fwt, pf) >>> omcheck(sit, lower + upper, extend=True) preserving -> reversing -> False preserving -> loosing -> True preserving -> preserving -> True preserving -> preserving -> True loosing -> reversing -> False loosing -> ignoring -> False loosing -> preserving -> False loosing -> preserving -> False False >>> sit = _Situation(ps(""), Label("2"), fwl, fwt, pf) >>> omcheck(sit, lower + upper, extend=True) preserving -> reversing -> False preserving -> loosing -> True preserving -> preserving -> True preserving -> preserving -> True preserving -> reversing -> False preserving -> ignoring -> False preserving -> preserving -> True preserving -> preserving -> True True Also check when creating chains: >>> sits = [ ... _Situation(ps("A"), Label("12"), fwl, fwt, pf), ... _Situation(ps("B"), Label("23"), fwl, fwt, pf), ... _Situation(ps("ABCD"), Label("2"), fwl, fwt, pf), ... _Situation(ps("ABCEF"), Label("23"), fwl, fwt, pf), ... _Situation(ps("ABCGH"), Label("1234"), fwl, fwt, pf), ... _Situation(ps("ABCIJ"), Label("12345"), fwl, fwt, pf), ... _Situation(ps("MN"), Label("12"), fwl, fwt, pf), ... _Situation(ps("MO"), Label("12"), fwl, fwt, pf), ... _Situation(ps("MP"), Label(""), fwl, fwt, pf), ... ] >>> sit = _Situation(ps("M"), Label("2"), fwl, fwt, pf) >>> omcheck(sit, sits, extend=True, create=True) local: set(['preserving']) global set(['preserving']) True >>> omcheck(sit, sits[:4] + sits[-3:], extend=True, create=True) local: set(['preserving']) global set(['reversing']) False >>> sit = _Situation(ps("MNOP"), Label("2"), fwl, fwt, pf) >>> omcheck(sit, sits, extend=True, create=True) local: set(['reversing']) global set(['preserving']) False >>> sit = _Situation(ps("MOP"), Label("2"), fwl, fwt, pf) >>> omcheck(sit, sits, extend=True, create=True) local: set(['reversing', 'preserving']) global set(['preserving']) True """