"""Miscellaneous utilities for learning related stuff.""" from __future__ import division from itertools import izip, combinations, chain from numbers import Number import random import sys import orange from diles.learn import Label from diles.util import memoized @memoized def binom(n, k): """Binomial coefficient of `n` over `k`.""" if k == 0: return 1 if n == 0 or n < k: return 0 if k == n: return 1 b = [0] * (n + 1) b[0] = 1 for i in xrange(1, n + 1): b[i] = 1 j = i - 1 while j > 0: b[j] += b[j - 1] j -= 1 return b[k] def randcomb(iterable, k, m): """ Get `m` distinct randomly chosen `k`-combinations from an iterable. Uses 3 different techniques to generate distinct random combinations: - If `m` is equal or greater than the number of `k`-combinations, *all* combinations are returned, i.e. here `m` is an upper bound of the number of returned combinations. - If the number of combinations is 10000 times bigger than `m`, samples are generated with ``random.sample()`` and a duplicate check. - Otherwise specific combinatations identified by ``random.sample(xrange(numcombinations), k)`` are generated. Combinations are returned as an iterator. """ def generate(): nums = random.sample(xrange(l), m) for num in nums: indice = combnum(n, k, num) yield tuple(seq[i] for i in indice) raise StopIteration def sample(): seen = set() for _ in xrange(m): comb = tuple(random.sample(iterable, k)) if comb not in seen: seen.add(comb) yield comb raise StopIteration def iterate(): return combinations(seq, k) seq = tuple(iterable) n = len(seq) l = binom(n, k) return m >= l and iterate() or l > m * 10000 and sample() or generate() def combnum(n, k, num): """Get combination `num` in an `(n, k)`-combinatorial number system. Raises an IndexError if `num` exceeds the number of combinations. >>> combnum(5, 2, 4) (3, 1) >>> combnum(5, 2, 5) (3, 2) :mod:`itertools` provides similar functionality, ... >>> from itertools import islice, combinations >>> tuple(islice(combinations(range(5), 2), 7, 8))[0] (2, 3) ... but would take a very long time in the following case: >>> combnum(30,15, 155113520) (29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 18, 17, 15, 14, 10) This functions is used by :func:`randcomb`. """ if num >= binom(n, k): raise IndexError(num) indice = [] while k: m = binom(n, k) while m > num: n -= 1 m = binom(n, k) indice.append(n) num -= m k -= 1 return tuple(indice) def jaccarddist(set1, set2): """Calculate the Jaccard distance of two sets. Arguments may be any sequence-like objects. >>> jaccarddist([0],[0]) 0.0 >>> jaccarddist([],[]) 0.0 >>> jaccarddist([0],[1]) 1.0 >>> jaccarddist([0],[0,1]) 0.5 >>> jaccarddist([0],[0,1,2]) 0.66... >>> jaccarddist([0,1],[0,2]) 0.66... """ set1, set2 = set(set1), set(set2) linsec, lunion = len(set1 & set2), len(set1 | set2) return 0.0 if lunion == 0 else (1 - linsec / lunion) #@memoized # may consume too much memory def euclidean_dist(features1, features2, fwl=None, fwt=0): """Calculate Euclidean distance between to feature vectors. Distance between individual features are weighted according to `fwl`. Non-numerical elements have a distance of 0 if they are equal and 1 otherwise (discrete metric). Sequence-like features are interpreted as sets -- their distance is calculated using :func:`jaccarddist`. >>> euclidean_dist([0],[1]) 1.0 >>> euclidean_dist([0,0],[0,1]) 1.0 >>> euclidean_dist([0,0],[1,1]) 1.41... >>> euclidean_dist([0,0.5],[1,1]) 1.11... >>> euclidean_dist(['a','b'],['a','c']) 1.0 >>> euclidean_dist(['a','b','c'],['a','x','c']) 1.0 >>> euclidean_dist(['a','b','c'],['a','x','x']) 1.41... >>> euclidean_dist(['a','b','c'],['x','y','z'], [1, 0, 0]) 1.0 >>> euclidean_dist(['a','b','c'],['x','y','z'], [1, 1, 0]) 1.41... >>> euclidean_dist(['a','b','c'],['x','y','z'], [1, 0.5, 0]) 1.11... >>> euclidean_dist(['a','b','c'],['a','y','z'], [1, 0.5, 0]) 0.5 """ d = 0.0 fwl = fwl or [1] * len(features1) for f1, f2, w in izip(features1, features2, fwl): if w <= fwt: continue if hasattr(f1, '__iter__'): f1, f2 = 0, jaccarddist(f1, f2) elif not hasattr(f1, '__float__'): f1, f2 = f1 == f2 and (0, 0) or (0, 1) d += ((f1 - f2) * w)**2 return d**0.5 def numerical(sample, vtable, fwl, fwt): """Get numerical representation of a polynomial feature vector. `vtable` is a list of sorted value lists (``vtable[i]`` lists all values for feature ``i``). The numerical representation is as long as all those list together, i.e. it s a *flat* representation where each element is ``1`` if the corresponding feature-value maps to the related value in `sample` and ``-1`` otherwise. >>> numerical("aaa", ["abc", "axx", "0a"], [1, 1, 1], 0) [1, -1, -1, 1, -1, -1, -1, 1] >>> numerical("aaa", ["abc", "axx", "0a"], [0, 1, 1], 0) [1, -1, -1, -1, 1] >>> numerical("aaa", ["abc", "axx", "0a"], [1, 1, 0], 0) [1, -1, -1, 1, -1, -1] """ row = [] for t, v, w in izip(vtable, sample, fwl): if w <= fwt: continue elif t: row.extend(1 if v == x else -1 for x in t) else: # already numeric row.append(v) return row def tosvm(samples, labels, fwl, fwt): """Convert samples to LIBSVM format. Returns a 4-tuple: - label numbers - data rows (value numbers for each feature) - ordered labels (the index gives numeric representation of a label) - feature values table (see :func:`numerical`) >>> samples = "a", "c", "b" >>> labels = "x", "x", "z" >>> tosvm(samples, labels, [1], 0) ([0, 0, 1], [[1, -1, -1], [-1, -1, 1], [-1, 1, -1]], ['x', 'z'], [['a', 'b', 'c']]) >>> samples = "b", "c", "a" >>> labels = "z", "x", "x" >>> tosvm(samples, labels, [1], 0) ([0, 1, 1], [[-1, 1, -1], [-1, -1, 1], [1, -1, -1]], ['z', 'x'], [['a', 'b', 'c']]) >>> samples = "abc", "aac", "bbc", "dbc" >>> labels = "x", "x", "z", "y" >>> nums, rows, labix, _ = tosvm(samples, labels, [1, 1, 1], 0) >>> nums [0, 0, 1, 2] >>> rows [[1, -1, -1, -1, 1, 1], [1, -1, -1, 1, -1, 1], [-1, 1, -1, -1, 1, 1], [-1, -1, 1, -1, 1, 1]] >>> samples = ("a", 2), ("a", 1), ("b", 0), ("d", -2) >>> labels = "xxzy" >>> nums, rows, _, _ = tosvm(samples, labels, [1, 1, 1], 0) >>> nums [0, 0, 1, 2] >>> rows [[1, -1, -1, 2], [1, -1, -1, 1], [-1, 1, -1, 0], [-1, -1, 1, -2]] """ valtab = [set() for _ in samples[0]] for sample in samples: for i, v in enumerate(sample): if not isinstance(v, Number) or isinstance(v, bool): valtab[i].add(v) valtab = [sorted(vl) for vl in valtab] labix = [] for l in labels: if l not in labix: labix.append(l) nums = [labix.index(l) for l in labels] rows = [numerical(s, valtab, fwl, fwt) for s in samples] return nums, rows, labix, valtab ORANGEUNKNOWN = "__unkown__" def toorange(samples, labels, fwl, fwt): """Convert samples to *Orange* format. Label are used in their string representation (i.e. an Orange classifier will predict strings! """ samples = [[str(y) for y in x] for x in samples] labels = [repr(x) for x in labels] unique = lambda x: list(sorted(set(x))) classattribute = orange.EnumVariable("label", values=unique(labels)) attributes = [orange.EnumVariable("feature-%d" % i, values=(s + [ORANGEUNKNOWN])) for i, s in enumerate(unique(x) for x in izip(*samples))] examples = [list(s) + [l] for s, l in izip(samples, labels)] domain = orange.Domain(attributes + [classattribute]) data = orange.ExampleTable(domain, examples) return data def solopt(solspace, fitness, gridlim=100, popsize=10, elite=0.4, generations=100, gainwindow=10, mutprob=0.4, newprob=0.3, mutvars=1, debug=False): """Find good (possibly best) solution. The solution space is given by `solspace`, a list of value counts for each variable (i.e. variable ``i`` may range from ``0`` to ``solspace[i] - 1``). A solution is evaluated using the function `fitness` (greater return value indicates greater fitness). If the solution space is smaller than `gridlim`, **all** solutions will be checked (grid search). Otherwise an evolutionary approach is used: The evolution stops after `generations` iterations or if there was no improvement during the last `gainwindow` generations (a gain window of 0 disables this termination condition). In each evolution the `elite` fraction of the population survives and the rest is replaced either by mutating a survived elite individual (probability given by `mutprob`), by entirely new creations (probabilitiy given by `newprob`), or by crossing 2 survived elite individuals (probability is `1 - mutprob - newprob`). A mutation touches `mutvars` variables. >>> random.seed(2010) >>> fitness = lambda s: sum(s) / (s.count(9) + 1.0) >>> solspace = [10, 10, 10, 10, 11] >>> solopt(solspace, fitness, generations=10) ([5, 8, 5, 4, 8], 30.0) >>> solopt(solspace, fitness, generations=10, popsize=20) ([8, 5, 8, 8, 10], 39.0) >>> solopt(solspace, fitness, generations=10, popsize=50) ([8, 8, 8, 8, 10], 42.0) >>> random.seed(None) Check proper handling of empty solution space: >>> solopt([], lambda s: len(s), generations=10, popsize=50, debug=True) Checked complete solution space Final population: [] (0) ([], 0) >>> random.seed(None) """ def produce(): """Produce an individual.""" return [random.randint(0, x-1) for x in solspace] def merge(ind1, ind2): """Merge two individuals.""" return [random.choice(x) for x in izip(ind1, ind2)] def mutate(ind): """Create a mutated copy of an individual.""" ind = ind[:] for i in random.sample(mutix, mutvars): ind[i] = (ind[i] + random.choice([-1, 1])) % solspace[i] return ind def evolve(population): """Create a new generation. The population is assumed to be sorted already (fittest first). Returns a new population with the elite from the old one and new merged, mutated or produced individuals. """ # get n-best distinct individuals (duplicate removal needs sorted pop.) best = [p for i, p in enumerate(population) if i == 0 or p != population[i-1]][:int(popsize * elite)] nextgen = best[:] while len(nextgen) < popsize: r = random.random() if r < newprob: ind = produce() elif r < newprob + mutprob: ind = mutate(random.choice(best)) else: ind1, ind2 = random.sample(best, 2) ind = merge(ind1, ind2) nextgen.append(ind) return nextgen def rank(population): ranked = sorted(((fitness(ind), ind) for ind in population), reverse=True) return [ind for _, ind in ranked] def complete(fixed, var): if not var: return [fixed] results = [] for val in range(var[0]): nfixed = fixed + [val] nvar = var[1:] results += complete(nfixed, nvar) return results fitness = memoized(fitness) sss = reduce(lambda a,b: a*b, solspace, 1) # index of variables which may mutate mutix = [i for i, s in enumerate(solspace) if s > 1] mutvars = min(len(mutix), mutvars) gainwindow = gainwindow or generations if sss < gridlim: evolutions = -1 population = rank(complete([], solspace)) else: assert popsize * 2 < solspace assert int(elite * popsize) >= 2 population = rank([produce() for _ in range(popsize)]) bestval, bestage = fitness(population[0]), 0 # best fitness and its age evolutions = 0 while evolutions < generations and bestage < gainwindow: population = rank(evolve(population)) if fitness(population[0]) > bestval: bestval, bestage = fitness(population[0]), 0 else: bestage += 1 evolutions += 1 if debug: if evolutions == -1: print "Checked complete solution space" elif evolutions == generations: print "Reached maximum number of generations (%d)" % generations else: print "Stopped after %d generations (no more fitness gain)" % evolutions print "Final population:" for ind in population: print("%s (%s)" % (ind, (fitness(ind)))) return population[0], fitness(population[0]) def normalized(values, vrange=None): """Normalize list of values to be within 0 and 1. `values` may also be a named list of values, i.e. a list of 2-tuples with the first element as a name and the second one as a value. The maximum and minimum possible values to use as a reference may be given by `vrange` (e.g. ``(-1, 1)`` - if no range is given `0` and the sum of `values` is used (which *distributes* the values between 0 and 1). >>> normalized([-1, 0, 2], (-2, 2)) [0.25, 0.5, 1.0] >>> normalized([-1, 0, 2], (-1, 2)) [0.0, 0.33..., 1.0] >>> normalized([('a', -1), ('b', 0), ('c', 2)], (-1, 2)) [('a', 0.0), ('b', 0.33...), ('c', 1.0)] >>> values = [0.1, 0.2, 0.4] >>> normalized(values, (0, sum(values))) [0.142..., 0.285..., 0.571...] >>> normalized(values) [0.142..., 0.285..., 0.571...] >>> normalized([0.0, 0.0, 0.0, 0.0]) [0.25, 0.25, 0.25, 0.25] """ if isinstance(values[0], tuple): f, t = vrange or (0, sum([v for _, v in values])) if t - f == 0: return [(x, 1/len(values))for x, _ in values] else: return [(x, (max(min(v, t), f) - f) / (t - f)) for x, v in values] else: f, t = vrange or (0, sum(values)) if t - f == 0: return [1/len(values)] * len(values) else: return [(max(min(v, t), f) - f) / (t - f) for v in values] def relscore(items, n=0): """Convert item scoring to relative scores. Items must be a list of 2-tuples, an item and a corresponding score. Then The best scored items gets a score of 1, and all other items get a score relative to the best one. If `n` is given, only the best `n` items are considered. Example: >>> items = [('a', 0.5), ('b', 0.25), ('c', 0.25), ('d', 0.1)] >>> relscore(items) [('a', 1.0), ('b', 0.5), ('c', 0.5), ('d', 0.200...)] >>> items = [('a', 0.8), ('b', 0.2), ('c', 0.1), ('d', 0.1)] >>> relscore(items, n=2) [('a', 1.0), ('b', 0.25)] """ items = sorted(items, key=lambda x: x[1], reverse=True) items = items[:n] if n else items items = normalized(items, vrange=(0, max(x[1] for x in items))) return items CEPSILON = sys.float_info.epsilon * 2 # confidence value for predictions on hierarchical labels when a non-leaf item # has been predicted class sugconf(float): def __str__(self): return "?%s" % super(sugconf, self).__str__() def __repr__(self): return "?%s" % super(sugconf, self).__repr__() def narrowconf(conf): """Narrow down confidence to be in (0.0, 1.0), both ends exclusive. Ensures confidence is never 1.0 or 0.0 but slightly smaller respectively greater. This makes it possible to find a confidence separator in corner cases. >>> narrowconf(1.0) == 1.0 - CEPSILON True >>> narrowconf(0.0) == 0.0 + CEPSILON True """ return max(min(conf, 1 - CEPSILON), CEPSILON) def ratioconf(ranking): """Calculate confidence based on ratio between first 2 ranked labels. Returned confidence has been narrowed down with :func:`narrowconf`. >>> ratioconf([('x', 0.5)]) == 1 - CEPSILON True >>> ratioconf([('x', 0.0)]) == 0.0 + CEPSILON True >>> ratioconf([('x', 0.1), ('y', 0.0)]) == 1.0 - CEPSILON True >>> ratioconf([('x', 0.0), ('y', 0.0)]) == 0.0 + CEPSILON True >>> ratioconf([('x', 0.2), ('y', 0.1)]) 0.5 >>> ratioconf([('x', 0.9), ('y', 0.6)]) 0.33... >>> ratioconf([('x', 0.3), ('y', 0.2)]) 0.33... """ if ranking[0][1] == 0: c = 0.0 elif len(ranking) == 1 or ranking[1][1] == 0: c = 1.0 else: c =1 - ranking[1][1] / ranking[0][1] return narrowconf(c) def confsep(neg, pos): """Find a conference separating positive and negative predictions. The number of misclassifications (i.e. supported negative predictions and prevented positive predictions) is used as cost criteria for finding an optimal solution. From all optimal solutions, the one preventing the most negative predictions is chosen. >>> confsep([0.1],[0.9]) 0.5 >>> confsep([0.1],[0.4]) 0.25 >>> confsep([],[0.4]) 0.200... >>> confsep([0.5],[]) 0.75 >>> confsep([0.1,0.5], [0.7]) 0.599... >>> confsep([0.4], [0.2]) 0.699... >>> confsep([0.2,0.4], [0.3,0.5]) 0.450... >>> confsep([0.2,0.4], [0.3]) 0.699... >>> confsep([0.2,0.4], [0.3,0.3]) 0.25 >>> confsep([0.4], [0.3,0.5]) 0.450... """ def err(neg, pos, sep): n = len(neg) + len(pos) e = len([x for x in neg if x >= sep]) + len([x for x in pos if x < sep]) return e / n confs = neg + pos assert confs and all([x > 0.0 and x < 1.0 for x in confs]) borders = sorted(set([0.0,1.0] + list(confs))) seps = [(borders[i] + borders[i+1]) / 2 for i in range(len(borders)-1)] ranked = [(x, err(neg, pos, x)) for x in seps] ranked.sort(key=lambda x: (x[1], 1-x[0])) return ranked[0][0] @memoized def bloatlabel(l): """Bloat a hierarchical multi-label. Extends the label to also contain all parents of each single-label item: >>> bloatlabel([]) {} >>> bloatlabel(["0.1.2"]) {'0', '0.1', '0.1.2'} >>> bloatlabel(["0.1.2", "0.1", "0.x", "0.xx", "0.x.yy.z"]) {'0', '0.1', '0.1.2', '0.x', '0.x.yy', '0.x.yy.z', '0.xx'} """ bloated = set() for el in (tuple(es.split(".")) for es in l): bloated.update(el[:i+1] for i in xrange(len(el))) return Label(".".join(el) for el in bloated) # ============================================================================= # tests # ============================================================================= def __doctests_confsep(): """ >>> confsep([0.1,0.2,0.3,0.4], [0.5,0.6,0.7,0.8]) 0.450... >>> confsep([0.1,0.2,0.3,0.4], [0.4,0.5,0.6,0.7]) 0.450... >>> confsep([0.1,0.2,0.3,0.5], [0.4,0.5,0.6,0.7]) 0.3499... >>> confsep([0.1,0.2,0.3,0.4], [0.3,0.5,0.6,0.7]) 0.450... >>> confsep([0.1,0.3], [0.01,0.15]) 0.650... >>> confsep([0.2,0.3], [0.01, 0.1]) 0.650... >>> confsep([0.1,0.4], [0.2,0.3,0.5,0.6]) 0.150... >>> confsep([0.01,0.2,0.3], [0.1,0.4,0.5]) 0.34999... >>> confsep([0.3], [0.01,0.1]) 0.0050... >>> confsep([0.2,0.3], [0.1]) 0.650... >>> confsep([0.01,0.2,0.2,0.2,0.2], [0.1,0.1,0.1,0.1]) 0.599... >>> confsep([0.01,0.2,0.2,0.2], [0.1,0.1,0.1,0.1]) 0.055 >>> confsep([0.01,0.2,0.2,0.2,0.2], [0.1,0.1,0.1]) 0.599... >>> confsep([0.01,0.1,0.1,0.1,0.2], [0.1,0.1,0.1,0.1]) 0.599... >>> confsep([0.999,0.999], [0.5,0.5]) 0.99950... >>> confsep([0.999], [0.5]) 0.99950... >>> confsep((0.5, 0.9999), (0.5, 0.5)) 0.999950... >>> confsep((0.9999,), (0.9999,)) 0.999950... Some random tests (compare randomly chosen separators with calculated one): >>> import random >>> random.seed(2010) >>> err = lambda a,b,s: (len([x for x in a if x >= s]) + len([x for x in b if x < s])) / len(a+b) >>> for _ in range(100): ... # generate random confidences ... ... pop = [random.random() for _ in range(1000)] * 50 ... a = random.sample(pop, random.randint(1,20)) ... b = random.sample(pop, random.randint(1,20)) ... # ... calculate separator and its error ... ... s = confsep(a, b) ... e = err(a,b,s) ... # ... and compare it with random separators ... for s_ in set(pop): ... assert e <= err(a,b,s_) """ def __doctests_combnum(): """ Compare with itertools.combinations: >>> from itertools import combinations >>> for n, k in [(7,2), (6,3), (50,1), (1,1), (10,5)]: ... l1 = [] ... for i in range(binom(6,3)): ... l1.append(combnum(6, 3, i)) ... l1 = [tuple(reversed(x)) for x in l1] ... l2 = list(combinations(range(6), 3)) ... if sorted(l1) != sorted(l2): ... print "failed:", n, k Check IndexError: >>> combnum(5,2,9) (4, 3) >>> combnum(5,2,10) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): IndexError: 10 """ def __doctests_binom(): """ >>> for n in range(9): ... row = [] ... for k in range(n+1): ... row.append(binom(n, k)) ... print row [1] [1, 1] [1, 2, 1] [1, 3, 3, 1] [1, 4, 6, 4, 1] [1, 5, 10, 10, 5, 1] [1, 6, 15, 20, 15, 6, 1] [1, 7, 21, 35, 35, 21, 7, 1] [1, 8, 28, 56, 70, 56, 28, 8, 1] """ def __doctests_randcomb(): """ >>> random.seed(2010) >>> tuple(randcomb(range(5), 2, 3)) ((2, 0), (3, 1), (4, 3)) Check generate versus sample versus combinations: >>> x = randcomb(range(5), 2, 9) # random selection of combinations >>> type(x), x.__name__ (, 'generate') >>> x = randcomb(range(5), 2, 10) # all combinations >>> type(x) >>> y = randcomb(range(5), 2, 11) >>> type(y) == type(x) and tuple(x) == tuple(y) True >>> x = randcomb(range(100), 50, 3) >>> type(x), x.__name__ (, 'sample') Check if all elements occur approximately equally often: >>> random.seed(2010) >>> l = tuple(randcomb(range(30), 15, 100)) >>> xcount = dict((x, 0) for x in range(30)) >>> for comb in l: ... for x in comb: ... xcount[x] += 1 >>> import numpy >>> v = xcount.values() >>> numpy.mean(v), numpy.std(v), max(v), min(v) (50.0, 5.0332..., 61, 42) """ def __doctests_toorange(): """ >>> samples = ["aaa", "aab", "abc", "aab"] >>> labels = [Label(x) for x in "xxyy"] >>> fwl, fwt = [1] * len(samples[0]), 1 >>> et = toorange(samples, labels, fwl, fwt) >>> et.domain [feature-0, feature-1, feature-2, label] >>> for attr in et.domain.attributes: ... print [x.value for x in attr] ['a', '__unkown__'] ['a', 'b', '__unkown__'] ['a', 'b', 'c', '__unkown__'] >>> et.domain.classVar EnumVariable 'label' >>> list(et) [['a', 'a', 'a', '{'x'}'], ['a', 'a', 'b', '{'x'}'], ['a', 'b', 'c', '{'y'}'], ['a', 'a', 'b', '{'y'}']] >>> labels = [0,0,1,1] >>> et = toorange(samples, labels, fwl, fwt) >>> et.domain [feature-0, feature-1, feature-2, label] >>> et.domain.classVar EnumVariable 'label' >>> list(et) [['a', 'a', 'a', '0'], ['a', 'a', 'b', '0'], ['a', 'b', 'c', '1'], ['a', 'a', 'b', '1']] >>> samples = [(True, 10), (True, 0)] >>> labels = [1, 2] >>> et = toorange(samples, labels, fwl, fwt) >>> et.domain [feature-0, feature-1, label] >>> et.domain.classVar EnumVariable 'label' >>> list(et) [['True', '10', '1'], ['True', '0', '2']] """