File size: 31,652 Bytes
036a2db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
"""
graph_builder.py β€” Build the full dependency graph for a Python repository.

Edge types (v2):
  1. Direct call edges           — f() calls g()  →  f→g
  2. Inheritance override edges  β€” Child.method β†’ Parent.method (child only)
  3. Shared-import consumer edgesβ€” files co-importing same module (capped ≀10)
  4. Decorator edges             — @decorator applied to a function  →  fn→decorator
  5. Annotated return-type edges — def f() -> MyClass: ...  →  f→MyClass.__init__
  6. Same-directory sibling edgesβ€” one representative per file, light connectivity

Performance:
  - Import maps built ONCE in pre-pass.
  - _resolve_owner_type results memoized (manual dict cache).
"""

import ast
import logging
import os
from typing import Dict, List, Optional, Set, Tuple

from .scanner import find_python_files
from .parser import extract_all_symbols
from .resolver import build_import_map
from .symbols import (
    extract_attribute_ownerships,
    extract_local_var_types,
    extract_param_types,
    _iter_statements,
)
from ._warn_once import warn_syntax_error_once, check_and_warn_encoding

logger = logging.getLogger(__name__)

# Fan-out caps β€” keep shared-import and sibling edges from becoming mega-hubs
_SHARED_IMPORT_MAX_CONSUMERS = 10   # skip if >10 files share the same import
_SAME_DIR_MAX_FILES = 20            # skip same-dir bonus if directory is huge
_DECORATOR_EDGE_MAX = 6             # max decorator edges per function (guards chains)


def build_repository_graph(repo_path: str) -> Dict[str, List[str]]:
    """
    Build the complete call graph for a repository.

    Returns:
        dict mapping function_id -> [list of called function_ids]
    """
    repo_path = os.path.abspath(repo_path)

    functions = extract_all_symbols(repo_path)
    function_ids = set(functions)

    # ── pre-pass: per-file ASTs, import maps, class registry ─────────────
    file_trees:      Dict[str, ast.Module]          = {}
    import_maps:     Dict[str, Dict[str, str]]      = {}
    class_registry:  Dict[str, List[str]]           = {}   # class_name -> [rel_file, ...]
    classes_by_file: Dict[str, List[str]]           = {}   # rel_file -> [class_name, ...]
    inheritance:     Dict[str, List[Tuple]]         = {}   # "rel_file:ClassName" -> bases
    factory_returns: Dict[str, Tuple]               = {}   # "rel_file:func" -> (q, type)
    # Module-level var types: rel_file -> {var_name: (qualifier, bare_name)}
    # e.g. `app = Flask(__name__)` at module scope -> {"app": (None, "Flask")}
    module_var_types: Dict[str, Dict[str, Tuple]]   = {}

    for filename in find_python_files(repo_path):
        with open(filename, "rb") as f:
            raw = f.read()
        check_and_warn_encoding(logger, filename, raw)
        source = raw.decode("utf-8", errors="ignore")

        try:
            tree = ast.parse(source)
        except SyntaxError as e:
            warn_syntax_error_once(logger, filename, e)
            continue

        relative_file = "./" + os.path.relpath(filename, repo_path)
        file_trees[relative_file]  = tree
        import_maps[relative_file] = build_import_map(filename, repo_path)

        mvt: Dict[str, Tuple] = {}
        for node in tree.body:
            if isinstance(node, ast.ClassDef):
                class_registry.setdefault(node.name, []).append(relative_file)
                classes_by_file.setdefault(relative_file, []).append(node.name)

                bases = []
                for b in node.bases:
                    if isinstance(b, ast.Name):
                        bases.append((None, b.id))
                    elif isinstance(b, ast.Attribute) and isinstance(b.value, ast.Name):
                        bases.append((b.value.id, b.attr))
                    elif isinstance(b, ast.Attribute):
                        bases.append((None, b.attr))
                inheritance[f"{relative_file}:{node.name}"] = bases

            elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
                ref = _find_return_type(node)
                if ref:
                    factory_returns[f"{relative_file}:{node.name}"] = ref
                ann_ref = _find_annotated_return(node)
                if ann_ref and not ref:
                    factory_returns[f"{relative_file}:{node.name}"] = ann_ref

            elif isinstance(node, ast.Assign):
                # Track module-level: `app = Flask(__name__)`
                if isinstance(node.value, ast.Call):
                    func = node.value.func
                    if isinstance(func, ast.Name):
                        type_ref = (None, func.id)
                    elif isinstance(func, ast.Attribute) and isinstance(func.value, ast.Name):
                        type_ref = (func.value.id, func.attr)
                    else:
                        type_ref = None
                    if type_ref:
                        for tgt in node.targets:
                            if isinstance(tgt, ast.Name):
                                mvt[tgt.id] = type_ref

            elif isinstance(node, ast.AnnAssign):
                # Track: `db: SQLAlchemy = SQLAlchemy(app)`
                if node.value and isinstance(node.value, ast.Call):
                    func = node.value.func
                    if isinstance(func, ast.Name):
                        type_ref = (None, func.id)
                    elif isinstance(func, ast.Attribute) and isinstance(func.value, ast.Name):
                        type_ref = (func.value.id, func.attr)
                    else:
                        type_ref = None
                    if type_ref and isinstance(node.target, ast.Name):
                        mvt[node.target.id] = type_ref

        if mvt:
            module_var_types[relative_file] = mvt

    # ── Resolution cache ──────────────────────────────────────────────────
    # _resolve_owner_type is called O(symbols * calls_per_function) times.
    # The same (qualifier, bare_name, rel_file) triple recurs constantly on
    # large repos. Cache the result to avoid redundant work.
    _resolve_cache: Dict[Tuple, Optional[str]] = {}

    def _cached_resolve_owner_type(qualifier, bare_name, relative_file, import_map, _seen=None):
        key = (qualifier, bare_name, relative_file)
        if key in _resolve_cache:
            return _resolve_cache[key]
        result = _resolve_owner_type(
            qualifier, bare_name, relative_file, import_map,
            class_registry, factory_returns, import_maps, repo_path,
            _seen=_seen,
            classes_by_file=classes_by_file,
        )
        _resolve_cache[key] = result
        return result

    # ── Attribute owners ──────────────────────────────────────────────────
    attribute_owners: Dict[str, str] = {}

    # Also register module-level vars as attribute owners so that
    # `app.run()` in a function body resolves to Flask.run.
    for rel_file, mvt in module_var_types.items():
        import_map = import_maps[rel_file]
        for var_name, type_ref in mvt.items():
            qualifier, bare_name = type_ref
            resolved = _resolve_owner_type(
                qualifier, bare_name, rel_file, import_map,
                class_registry, factory_returns, import_maps, repo_path,
                classes_by_file=classes_by_file,
            )
            if resolved:
                # Key format matches attribute_owners: "ClassName.attr"
                # but here var_name is the module-level variable.
                attribute_owners[f"{rel_file}:{var_name}"] = resolved

    for relative_file, tree in file_trees.items():
        import_map = import_maps[relative_file]
        raw_own = extract_attribute_ownerships(tree)

        for key, type_ref in raw_own.items():
            if type_ref is None:
                continue
            qualifier, bare_name = type_ref
            resolved = _cached_resolve_owner_type(qualifier, bare_name, relative_file, import_map)
            if resolved:
                attribute_owners[f"{relative_file}:{key}"] = resolved

    # ── Build the call graph ──────────────────────────────────────────────
    graph: Dict[str, List[str]] = {}

    for relative_file, tree in file_trees.items():
        import_map = import_maps[relative_file]

        local_name_to_id = {
            fid.split(":", 1)[1]: fid
            for fid in functions
            if fid.startswith(relative_file + ":")
        }

        function_nodes = _collect_function_nodes(tree)

        for fn_node, is_method, class_name in function_nodes:
            function_name = f"{class_name}.{fn_node.name}" if class_name else fn_node.name
            function_id   = f"{relative_file}:{function_name}"

            graph.setdefault(function_id, [])

            param_types    = extract_param_types(fn_node)
            local_var_types = {**param_types, **extract_local_var_types(fn_node, param_types)}

            for child in ast.walk(fn_node):
                if not isinstance(child, ast.Call):
                    continue

                dep = _resolve_call(
                    child,
                    is_method,
                    class_name,
                    relative_file,
                    local_name_to_id,
                    import_map,
                    functions,
                    function_ids,
                    repo_path,
                    attribute_owners,
                    inheritance,
                    class_registry,
                    factory_returns,
                    import_maps,
                    local_var_types,
                    classes_by_file,
                    _cached_resolve_owner_type,
                )

                if dep and dep != function_id and dep not in graph[function_id]:
                    graph[function_id].append(dep)

    # ── Phase 1A: Inheritance override edges ─────────────────────────────
    # When ChildClass overrides ParentClass.method, add child β†’ parent edge.
    # Only child β†’ parent direction: "if I changed the child, show me the
    # parent contract I might be violating." The reverse (parent β†’ all 400
    # children) would create mega-hubs that destroy ranking.
    for child_key, bases in inheritance.items():
        child_file, child_class = child_key.split(":", 1)
        for qualifier, base_name in bases:
            base_owner = _resolve_owner_type(
                qualifier, base_name, child_file,
                import_maps.get(child_file, {}),
                class_registry, factory_returns, import_maps, repo_path,
                classes_by_file=classes_by_file,
            )
            if not base_owner:
                continue

            # Find all methods in the child class and look for same-named parent methods
            child_prefix = f"{child_key}."
            for fid in function_ids:
                if not fid.startswith(child_prefix):
                    continue
                method_name = fid[len(child_prefix):]
                parent_method = f"{base_owner}.{method_name}"
                if parent_method in function_ids and parent_method != fid:
                    # Child β†’ parent only (avoids mega-hub on parent side)
                    if parent_method not in graph.get(fid, []):
                        graph.setdefault(fid, []).append(parent_method)

    # ── Phase 1B: Decorator edges ─────────────────────────────────────────
    # A function decorated with @my_decorator has an implicit dependency on
    # my_decorator. This is one of the strongest co-change signals:
    # if the decorator changes, all its callsites likely change too.
    for relative_file, tree in file_trees.items():
        import_map = import_maps[relative_file]
        for fn_node, _is_method, _class_name in _collect_function_nodes(tree):
            added = 0
            for deco in fn_node.decorator_list:
                if added >= _DECORATOR_EDGE_MAX:
                    break
                # @plain_name  or  @module.name  or  @name(args)
                deco_func = deco.func if isinstance(deco, ast.Call) else deco
                dep = None
                if isinstance(deco_func, ast.Name):
                    dep = _lookup(
                        deco_func.id,
                        {fid.split(":", 1)[1]: fid
                         for fid in functions
                         if fid.startswith(relative_file + ":")},
                        import_map, functions, repo_path,
                    )
                elif isinstance(deco_func, ast.Attribute) and isinstance(deco_func.value, ast.Name):
                    if deco_func.value.id in import_map:
                        src = import_map[deco_func.value.id]
                        rel_src = "./" + os.path.relpath(src, repo_path)
                        dep = f"{rel_src}:{deco_func.attr}"
                        if dep not in function_ids:
                            dep = None
                if dep:
                    fn_name_local = (
                        f"{_class_name}.{fn_node.name}"
                        if _class_name else fn_node.name
                    )
                    fid = f"{relative_file}:{fn_name_local}"
                    if fid in function_ids and dep != fid and dep not in graph.get(fid, []):
                        graph.setdefault(fid, []).append(dep)
                        added += 1

    # ── Build file_groups for use by shared-import edges ────────────────────
    file_groups: Dict[str, List[str]] = {}
    for fid in function_ids:
        ffile = fid.split(":")[0]
        file_groups.setdefault(ffile, []).append(fid)

    # ── Phase 1C: Shared-import consumer edges ───────────────────────────
    # If file_a and file_b both import from the same internal module,
    # functions in file_a and file_b are likely to co-change when that
    # module changes. Create edges between functions across those files.
    import_consumers: Dict[str, List[str]] = {}
    for rel_file, imap in import_maps.items():
        for _local_name, abs_path in imap.items():
            # Only track internal (in-repo) imports
            rel_imported = "./" + os.path.relpath(abs_path, repo_path)
            if not rel_imported.startswith("./"):
                continue
            import_consumers.setdefault(rel_imported, []).append(rel_file)

    # Deduplicate consumer file lists, then connect representatives
    for _imported_mod, consumer_files in import_consumers.items():
        consumer_files = list(dict.fromkeys(consumer_files))  # deduplicate, preserve order
        if len(consumer_files) < 2 or len(consumer_files) > _SHARED_IMPORT_MAX_CONSUMERS:
            continue
        # Pick a representative symbol from each consumer file
        representatives = []
        for cfile in consumer_files:
            rep_syms = file_groups.get(cfile, [])
            if rep_syms:
                representatives.append(rep_syms[0])
        # Connect representatives pairwise
        for i, a in enumerate(representatives):
            for b in representatives[i + 1:]:
                if b not in graph.get(a, []):
                    graph.setdefault(a, []).append(b)
                if a not in graph.get(b, []):
                    graph.setdefault(b, []).append(a)

    # ── Phase 1D: Same-directory sibling edges ────────────────────────────
    # Files in the same package directory tend to co-change (tests ↔ impl,
    # models ↔ serializers, etc.).  Connect one representative per file to
    # one representative from every other file in the same directory.
    # Cap: skip directories with >_SAME_DIR_MAX_FILES Python files to avoid
    # linking unrelated utility grab-bags.
    dir_files: Dict[str, List[str]] = {}
    for rel_file in file_groups:
        dir_part = os.path.dirname(rel_file)
        dir_files.setdefault(dir_part, []).append(rel_file)

    for _dir, dir_file_list in dir_files.items():
        if len(dir_file_list) < 2 or len(dir_file_list) > _SAME_DIR_MAX_FILES:
            continue
        reps = []
        for df in dir_file_list:
            syms = file_groups.get(df, [])
            if syms:
                reps.append(syms[0])   # one rep per file
        for i, a in enumerate(reps):
            for b in reps[i + 1:]:
                if b not in graph.get(a, []):
                    graph.setdefault(a, []).append(b)
                if a not in graph.get(b, []):
                    graph.setdefault(b, []).append(a)

    # ── Phase 1E: Sliding-window within-file edges ────────────────────────
    # Functions defined near each other in the same file are empirically the
    # strongest co-change signal after direct calls.  A sliding window of
    # WINDOW_SIZE links each function to its closest neighbours in definition
    # order.  Cap: skip files with >FILE_WINDOW_MAX_SYMS symbols (e.g. a
    # 600-function god-file would create O(n*w) noise edges).
    WINDOW_SIZE = 3          # each function linked to Β±3 neighbours
    FILE_WINDOW_MAX_SYMS = 60  # skip window edges in very large files

    for rel_file, syms_in_file in file_groups.items():
        if len(syms_in_file) < 2 or len(syms_in_file) > FILE_WINDOW_MAX_SYMS:
            continue
        for i, a in enumerate(syms_in_file):
            for j in range(i + 1, min(i + 1 + WINDOW_SIZE, len(syms_in_file))):
                b = syms_in_file[j]
                if b not in graph.get(a, []):
                    graph.setdefault(a, []).append(b)
                if a not in graph.get(b, []):
                    graph.setdefault(b, []).append(a)

    # ── Phase 1F: Light parentβ†’child inheritance edges ────────────────────
    # Child→parent already exists (Phase 1A).  For parents with FEW children
    # (≀PARENT_CHILD_MAX_CHILDREN), also add parentβ†’child so that a change
    # to the parent method surfaces its direct overriders.  We skip parents
    # with many children to avoid creating mega-hubs (e.g. BaseModel in
    # pydantic has 400+ subclasses β€” those would destroy ranking).
    PARENT_CHILD_MAX_CHILDREN = 8

    # Build a map: parent_method_id -> [child_method_ids]
    parent_to_children: Dict[str, List[str]] = {}
    for child_key, bases in inheritance.items():
        child_file, child_class = child_key.split(":", 1)
        for qualifier, base_name in bases:
            base_owner = _resolve_owner_type(
                qualifier, base_name, child_file,
                import_maps.get(child_file, {}),
                class_registry, factory_returns, import_maps, repo_path,
                classes_by_file=classes_by_file,
            )
            if not base_owner:
                continue
            child_prefix = f"{child_key}."
            for fid in function_ids:
                if not fid.startswith(child_prefix):
                    continue
                method_name = fid[len(child_prefix):]
                parent_method = f"{base_owner}.{method_name}"
                if parent_method in function_ids and parent_method != fid:
                    parent_to_children.setdefault(parent_method, []).append(fid)

    for parent_method, children in parent_to_children.items():
        if len(children) > PARENT_CHILD_MAX_CHILDREN:
            continue   # too many β€” would create a mega-hub
        for child_fid in children:
            if child_fid not in graph.get(parent_method, []):
                graph.setdefault(parent_method, []).append(child_fid)

    return graph


# ── Internal helpers ──────────────────────────────────────────────────────

def _collect_function_nodes(tree):
    """
    Return list of (function_node, is_method, class_name) for EVERY function
    in the file, including nested functions and closures.

    Mirrors what parser.py's _FunctionCollector does so that the graph covers
    every symbol the parser emits (previously missed ~30-40% of nodes).
    """
    result = []
    _collect_recursive(tree.body, class_stack=[], result=result)
    return result


def _collect_recursive(stmts, class_stack, result):
    """Recursively collect function nodes from a list of statements."""
    for node in stmts:
        if isinstance(node, ast.ClassDef):
            class_stack.append(node.name)
            _collect_recursive(node.body, class_stack, result)
            class_stack.pop()
        elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
            if class_stack:
                class_name = ".".join(class_stack)
                is_method = True
            else:
                class_name = None
                is_method = False
            result.append((node, is_method, class_name))
            # Also recurse into the function body to catch closures/nested funcs
            _collect_recursive(node.body, class_stack, result)


def _find_return_type(node):
    found = None
    for stmt in _iter_statements(node.body):
        if isinstance(stmt, ast.Return) and stmt.value is not None:
            if not isinstance(stmt.value, ast.Call):
                return None
            func = stmt.value.func
            if isinstance(func, ast.Name):
                ref = (None, func.id)
            elif isinstance(func, ast.Attribute) and isinstance(func.value, ast.Name):
                ref = (func.value.id, func.attr)
            elif isinstance(func, ast.Attribute):
                ref = (None, func.attr)
            else:
                return None
            if found is None:
                found = ref
            elif found != ref:
                return None
    return found


def _find_annotated_return(node):
    """
    Extract (qualifier, bare_name) from a PEP-3107 return annotation.

        def f() -> MyClass: ...         ->  (None, "MyClass")
        def f() -> module.MyClass: ...  ->  ("module", "MyClass")

    Skips primitive annotations (str, int, bool, None, etc.) and generics
    like List[X] where the outer type is not a class we own.
    """
    PRIMITIVES = frozenset({
        "str", "int", "float", "bool", "bytes", "None",
        "list", "dict", "set", "tuple", "Any", "Optional",
        "List", "Dict", "Set", "Tuple", "Iterator", "Generator",
        "Iterable", "Sequence", "Mapping", "Type", "Union",
    })
    ann = node.returns
    if ann is None:
        return None
    if isinstance(ann, ast.Constant):
        return None
    if isinstance(ann, ast.Name):
        if ann.id in PRIMITIVES:
            return None
        return (None, ann.id)
    if isinstance(ann, ast.Attribute) and isinstance(ann.value, ast.Name):
        if ann.attr in PRIMITIVES:
            return None
        return (ann.value.id, ann.attr)
    # Subscript like Optional[Router] β€” unwrap one level
    if isinstance(ann, ast.Subscript):
        return _find_annotated_return(
            type("_Stub", (), {"returns": ann.slice})()  # type: ignore[arg-type]
        )
    return None


def _resolve_owner_type(
    qualifier, bare_name, relative_file, import_map,
    class_registry, factory_returns, import_maps, repo_path, _seen=None,
    classes_by_file=None,
):
    if _seen is None:
        _seen = set()

    cache_key = (qualifier, bare_name, relative_file)
    if cache_key in _seen:
        return None
    _seen.add(cache_key)

    if qualifier:
        if qualifier not in import_map:
            return None
        target_file = "./" + os.path.relpath(import_map[qualifier], repo_path)

        if bare_name in class_registry:
            if target_file in class_registry[bare_name]:
                return f"{target_file}:{bare_name}"
            if target_file.endswith("__init__.py"):
                target_dir = target_file[:-12]
                for cand_file in class_registry[bare_name]:
                    if cand_file.startswith(target_dir + "/"):
                        return f"{cand_file}:{bare_name}"

        factory_key = f"{target_file}:{bare_name}"
        if factory_key in factory_returns:
            return _resolve_owner_type(
                *factory_returns[factory_key], target_file,
                import_maps.get(target_file, {}), class_registry,
                factory_returns, import_maps, repo_path, _seen,
                classes_by_file,
            )
        return None

    # bare name
    if bare_name in class_registry and relative_file in class_registry[bare_name]:
        return f"{relative_file}:{bare_name}"

    if bare_name in import_map:
        target_file = "./" + os.path.relpath(import_map[bare_name], repo_path)

        if bare_name in class_registry:
            if target_file in class_registry[bare_name]:
                return f"{target_file}:{bare_name}"
            if target_file.endswith("__init__.py"):
                target_dir = target_file[:-12]
                for cand_file in class_registry[bare_name]:
                    if cand_file.startswith(target_dir + "/"):
                        return f"{cand_file}:{bare_name}"
        elif classes_by_file:
            file_classes = classes_by_file.get(target_file, [])
            if len(file_classes) == 1:
                return f"{target_file}:{file_classes[0]}"

        factory_key = f"{target_file}:{bare_name}"
        if factory_key in factory_returns:
            return _resolve_owner_type(
                *factory_returns[factory_key], target_file,
                import_maps.get(target_file, {}), class_registry,
                factory_returns, import_maps, repo_path, _seen,
                classes_by_file,
            )
        return None

    factory_key = f"{relative_file}:{bare_name}"
    if factory_key in factory_returns:
        return _resolve_owner_type(
            *factory_returns[factory_key], relative_file,
            import_map, class_registry, factory_returns,
            import_maps, repo_path, _seen,
            classes_by_file=classes_by_file,
        )
    return None


def _resolve_owner_of_expr(
    node, class_name, relative_file, is_method, attribute_owners,
    local_var_types=None, import_map=None, class_registry=None,
    factory_returns=None, import_maps=None, repo_path=None,
    classes_by_file=None, _cached_resolve=None,
):
    if isinstance(node, ast.Name) and node.id == "self" and is_method and class_name:
        return f"{relative_file}:{class_name}"

    if (
        isinstance(node, ast.Name)
        and local_var_types
        and node.id in local_var_types
        and import_map is not None
    ):
        qualifier, bare_name = local_var_types[node.id]
        if _cached_resolve:
            return _cached_resolve(qualifier, bare_name, relative_file, import_map)
        return _resolve_owner_type(
            qualifier, bare_name, relative_file, import_map,
            class_registry or {}, factory_returns or {},
            import_maps or {}, repo_path or "",
            classes_by_file=classes_by_file,
        )

    # Module-level variable fallback: `app = Flask()` at module scope.
    # attribute_owners stores these as "{rel_file}:{var_name}".
    if isinstance(node, ast.Name) and attribute_owners:
        module_key = f"{relative_file}:{node.id}"
        if module_key in attribute_owners:
            return attribute_owners[module_key]

    if isinstance(node, ast.Attribute):
        base_owner = _resolve_owner_of_expr(
            node.value, class_name, relative_file, is_method, attribute_owners,
            local_var_types, import_map, class_registry,
            factory_returns, import_maps, repo_path,
            classes_by_file, _cached_resolve,
        )
        if base_owner is None:
            return None
        return attribute_owners.get(f"{base_owner}.{node.attr}")

    return None


def _resolve_via_inheritance(
    owner_type, method_name, inheritance, class_registry,
    import_maps, function_ids, repo_path, _seen=None,
    classes_by_file=None,
):
    if _seen is None:
        _seen = set()
    if owner_type in _seen:
        return None
    _seen.add(owner_type)

    rel_file, class_name = owner_type.split(":", 1)
    import_map = import_maps.get(rel_file, {})

    for qualifier, base_name in inheritance.get(owner_type, []):
        base_owner = _resolve_owner_type(
            qualifier, base_name, rel_file, import_map,
            class_registry, {}, import_maps, repo_path,
            classes_by_file=classes_by_file,
        )
        if not base_owner:
            continue

        candidate = f"{base_owner}.{method_name}"
        if candidate in function_ids:
            return candidate

        deeper = _resolve_via_inheritance(
            base_owner, method_name, inheritance, class_registry,
            import_maps, function_ids, repo_path, _seen,
            classes_by_file,
        )
        if deeper:
            return deeper

    return None


def _resolve_call(
    call_node, is_method, class_name, relative_file,
    local_name_to_id, import_map, all_functions, function_ids,
    repo_path, attribute_owners, inheritance, class_registry,
    factory_returns, import_maps, local_var_types=None,
    classes_by_file=None, _cached_resolve=None,
):
    func = call_node.func

    if isinstance(func, ast.Name):
        return _lookup(func.id, local_name_to_id, import_map, all_functions, repo_path)

    if isinstance(func, ast.Attribute):
        method_name = func.attr

        owner_type = _resolve_owner_of_expr(
            func.value, class_name, relative_file, is_method, attribute_owners,
            local_var_types, import_map, class_registry,
            factory_returns, import_maps, repo_path,
            classes_by_file, _cached_resolve,
        )

        if owner_type:
            candidate = f"{owner_type}.{method_name}"
            if candidate in function_ids:
                return candidate

            resolved = _resolve_via_inheritance(
                owner_type, method_name, inheritance, class_registry,
                import_maps, function_ids, repo_path,
                classes_by_file=classes_by_file,
            )
            if resolved:
                return resolved
            return None

        if isinstance(func.value, ast.Name) and func.value.id in import_map:
            source_file = import_map[func.value.id]
            relative_source = "./" + os.path.relpath(source_file, repo_path)
            candidate = f"{relative_source}:{method_name}"
            if candidate in all_functions:
                return candidate

    return None


def _lookup(called_name, local_name_to_id, import_map, all_functions, repo_path):
    if called_name in local_name_to_id:
        return local_name_to_id[called_name]
    if called_name in import_map:
        source_file = import_map[called_name]
        relative_source = "./" + os.path.relpath(source_file, repo_path)
        candidate = f"{relative_source}:{called_name}"
        if candidate in all_functions:
            return candidate
    return None