Spaces:
Build error
Build error
File size: 5,349 Bytes
3d38118 4c04f50 3d38118 077e1eb 3d38118 52771bf 3d38118 4c04f50 3d38118 4c04f50 3d38118 077e1eb 4c04f50 d5ff673 077e1eb d5ff673 077e1eb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
"""Gne pset from cmat. Find pairs for a given cmat.
tinybee.find_pairs.py with fixed estimator='dbscan' eps=eps, min_samples=min_samples
"""
# pylint: disable=too-many-locals, unused-import, invalid-name
from typing import List, Tuple, Union
import numpy as np
import pandas as pd
from sklearn.cluster import DBSCAN
import logzero
from logzero import logger
from radiobee.cmat2tset import cmat2tset
from radiobee.interpolate_pset import interpolate_pset
def _gen_pset(
cmat1: Union[List[List[float]], np.ndarray, pd.DataFrame],
eps: float = 10,
min_samples: int = 6,
delta: float = 7,
verbose: Union[bool, int] = False,
# ) -> List[Tuple[int, int, Union[float, str]]]:
) -> List[Tuple[Union[float, str], Union[float, str], Union[float, str]]]:
"""Gen pset from cmat.
Find pairs for a given cmat.
Args:
cmat: correlation/similarity matrix
eps: min epsilon for DBSCAN (10)
min_samples: minimum # of samples for DBSCAN (6)
delta: tolerance (7)
Returns:
pairs + "" or metric (float)
dbscan_pairs' setup
if eps is None:
eps = src_len * .01
if eps < 3:
eps = 3
if min_samples is None:
min_samples = tgt_len / 100 * 0.5
if min_samples < 3:
min_samples = 3
def gen_eps_minsamples(src_len, tgt_len):
eps = src_len * .01
if eps < 3:
eps = 3
min_samples = tgt_len / 100 * 0.5
if min_samples < 3:
min_samples = 3
return {"eps": eps, "min_samples": min_samples}
"""
if isinstance(verbose, bool):
if verbose:
verbose = 10
else:
verbose = 20
logzero.loglevel(verbose)
# if isinstance(cmat, list):
cmat = np.array(cmat1)
src_len, tgt_len = cmat.shape
# tset = cmat2tset(cmat)
tset = cmat2tset(cmat).tolist()
logger.debug("tset: %s", tset)
# iset = gen_iset(cmat, verbose=verbose, estimator=estimator)
labels = DBSCAN(eps=eps, min_samples=min_samples).fit(tset).labels_
df_tset = pd.DataFrame(tset, columns=["x", "y", "cos"])
cset = df_tset[labels > -1].to_numpy()
# sort cset
_ = sorted(cset.tolist(), key=lambda x: x[0])
iset = interpolate_pset(_, tgt_len)
# *_, ymax = zip(*tset)
# ymax = list(ymax)
# low_ = np.min(ymax) - 1 # reset to minimum_value - 1
buff = [(-1, -1, ""), (tgt_len, src_len, "")]
# for idx, tset_elm in enumerate(tset):
for tset_elm in tset:
logger.debug("buff: %s", buff)
# postion max in ymax and insert in buff
# if with range given by iset+-delta and
# it's valid (do not exceed constraint
# by neighboring points
# argmax = int(np.argmax(ymax))
# logger.debug("=== %s,%s === %s", _, argmax, tset[_])
logger.debug("=== %s === %s", _, tset_elm)
# ymax[_] = low_
# elm = tset[argmax]
# elm0, *_ = elm
elm0, *_ = tset_elm
# position elm in buff
idx = -1 # for making pyright happy
for idx, loc in enumerate(buff):
if loc[0] > elm0:
break
else:
idx += 1 # last
# insert elm in for valid elm
# (within range inside two neighboring points)
# pos = int(tset[argmax][0])
pos = int(tset_elm[0])
logger.debug(" %s <=> %s ", tset_elm, iset[pos])
# if abs(tset[argmax][1] - iset[pos][1]) <= delta:
if abs(tset_elm[1] - iset[pos][1]) <= delta:
if tset_elm[1] > buff[idx - 1][1] and tset_elm[1] < buff[idx][1]:
buff.insert(idx, tset_elm)
logger.debug("idx: %s, tset_elm: %s", idx, tset_elm)
else:
logger.debug("\t***\t idx: %s, tset_elm: %s", idx, tset_elm)
_ = """
if abs(tset[loc][1] - iset[loc][1]) <= delta:
if tset[loc][1] > buff[idx][1] and tset[loc][1] < buff[idx + 1][1]:
buff.insert(idx + 1, tset[loc])
# """
# remove first and last entry in buff
buff.pop(0)
buff.pop()
# return [(1, 1, "")]
return [(int(elm0), int(elm1), elm2) for elm0, elm1, elm2 in buff]
def gen_pset(
cmat1: Union[List[List[float]], np.ndarray, pd.DataFrame],
eps: float = 10,
min_samples: int = 6,
delta: float = 7,
verbose: Union[bool, int] = False,
) -> List[Tuple[Union[float, str], Union[float, str], Union[float, str]]]:
"""Gen pset.
Refer to _gen_pset.
"""
del verbose
gen_pset.min_samples = min_samples
for min_s in range(min_samples):
logger.debug(" min_samples, try %s", min_samples - min_s)
try:
pset = _gen_pset(
cmat1,
eps=eps,
min_samples=min_samples - min_s,
delta=delta,
)
break
except ValueError:
logger.debug(" decrease min_samples by %s", min_s + 1)
continue
except Exception as e:
logger.error(e)
continue
else:
# break should happen above when min_samples = 2
raise Exception("bummer, this shouldn't happen, probably another bug")
# store new min_samples
gen_pset.min_samples = min_samples - min_s
return pset
|