|
|
import struct |
|
|
import numpy as np |
|
|
from enum import Enum, auto |
|
|
from typing import List, Dict, Tuple, Optional |
|
|
|
|
|
class Rasterizer: |
|
|
def __init__(self, driver): |
|
|
self.driver = driver |
|
|
print("Rasterizer initialized.") |
|
|
|
|
|
def _compute_edge_function(self, x0, y0, x1, y1, px, py): |
|
|
""" |
|
|
Compute edge function for point (px,py) against edge (x0,y0)->(x1,y1) |
|
|
Positive value means point is on left side of edge |
|
|
""" |
|
|
return (px - x0) * (y1 - y0) - (py - y0) * (x1 - x0) |
|
|
|
|
|
def _is_top_left_edge(self, x0, y0, x1, y1): |
|
|
"""Check if edge is top or left edge for tie-breaking rules""" |
|
|
return (y0 == y1 and x0 < x1) or y0 < y1 |
|
|
|
|
|
def _compute_perspective_w(self, barycentric, w0, w1, w2): |
|
|
"""Compute perspective-correct interpolation weight""" |
|
|
return 1.0 / (barycentric[0]/w0 + barycentric[1]/w1 + barycentric[2]/w2) |
|
|
|
|
|
def _interpolate_perspective(self, barycentric, attr0, attr1, attr2, w0, w1, w2): |
|
|
"""Perspective-correct attribute interpolation""" |
|
|
w = self._compute_perspective_w(barycentric, w0, w1, w2) |
|
|
return w * ( |
|
|
attr0 * barycentric[0]/w0 + |
|
|
attr1 * barycentric[1]/w1 + |
|
|
attr2 * barycentric[2]/w2 |
|
|
) |
|
|
|
|
|
def rasterize_triangle(self, v0, v1, v2, framebuffer_width, framebuffer_height, |
|
|
msaa_samples=1, conservative=False): |
|
|
""" |
|
|
Rasterize triangle with modern features: |
|
|
- Edge function rasterization |
|
|
- Perspective-correct interpolation |
|
|
- MSAA support |
|
|
- Conservative rasterization option |
|
|
|
|
|
Args: |
|
|
v0, v1, v2: vertices with (x,y,z,w) coordinates and attributes |
|
|
msaa_samples: number of MSAA samples (1, 2, 4, or 8) |
|
|
conservative: use conservative rasterization |
|
|
""" |
|
|
fragments = [] |
|
|
|
|
|
|
|
|
pos0, pos1, pos2 = v0['position'], v1['position'], v2['position'] |
|
|
w0, w1, w2 = pos0[3], pos1[3], pos2[3] |
|
|
|
|
|
|
|
|
screen0 = [(pos0[0]/w0 + 1)*0.5*framebuffer_width, |
|
|
(pos0[1]/w0 + 1)*0.5*framebuffer_height] |
|
|
screen1 = [(pos1[0]/w1 + 1)*0.5*framebuffer_width, |
|
|
(pos1[1]/w1 + 1)*0.5*framebuffer_height] |
|
|
screen2 = [(pos2[0]/w2 + 1)*0.5*framebuffer_width, |
|
|
(pos2[1]/w2 + 1)*0.5*framebuffer_height] |
|
|
|
|
|
|
|
|
min_x = max(0, int(min(screen0[0], screen1[0], screen2[0]))) |
|
|
max_x = min(framebuffer_width - 1, int(max(screen0[0], screen1[0], screen2[0]))) |
|
|
min_y = max(0, int(min(screen0[1], screen1[1], screen2[1]))) |
|
|
max_y = min(framebuffer_height - 1, int(max(screen0[1], screen1[1], screen2[1]))) |
|
|
|
|
|
|
|
|
if conservative: |
|
|
min_x -= 1 |
|
|
min_y -= 1 |
|
|
max_x += 1 |
|
|
max_y += 1 |
|
|
|
|
|
|
|
|
def edge01(px, py): return self._compute_edge_function( |
|
|
screen0[0], screen0[1], screen1[0], screen1[1], px, py) |
|
|
def edge12(px, py): return self._compute_edge_function( |
|
|
screen1[0], screen1[1], screen2[0], screen2[1], px, py) |
|
|
def edge20(px, py): return self._compute_edge_function( |
|
|
screen2[0], screen2[1], screen0[0], screen0[1], px, py) |
|
|
|
|
|
|
|
|
is_top_left01 = self._is_top_left_edge(screen0[0], screen0[1], |
|
|
screen1[0], screen1[1]) |
|
|
is_top_left12 = self._is_top_left_edge(screen1[0], screen1[1], |
|
|
screen2[0], screen2[1]) |
|
|
is_top_left20 = self._is_top_left_edge(screen2[0], screen2[1], |
|
|
screen0[0], screen0[1]) |
|
|
|
|
|
|
|
|
area = edge01(screen2[0], screen2[1]) |
|
|
if area <= 0: |
|
|
return [] |
|
|
|
|
|
|
|
|
if msaa_samples == 4: |
|
|
sample_positions = [ |
|
|
(-0.375, -0.375), (0.375, -0.375), |
|
|
(-0.375, 0.375), (0.375, 0.375) |
|
|
] |
|
|
else: |
|
|
sample_positions = [(0.0, 0.0)] |
|
|
|
|
|
|
|
|
for y in range(min_y, max_y + 1): |
|
|
for x in range(min_x, max_x + 1): |
|
|
covered_samples = 0 |
|
|
sample_fragments = [] |
|
|
|
|
|
|
|
|
for sample_x, sample_y in sample_positions: |
|
|
px, py = x + sample_x, y + sample_y |
|
|
|
|
|
|
|
|
e01 = edge01(px, py) |
|
|
e12 = edge12(px, py) |
|
|
e20 = edge20(px, py) |
|
|
|
|
|
|
|
|
inside = ( |
|
|
(e01 > 0 or (e01 == 0 and is_top_left01)) and |
|
|
(e12 > 0 or (e12 == 0 and is_top_left12)) and |
|
|
(e20 > 0 or (e20 == 0 and is_top_left20)) |
|
|
) |
|
|
|
|
|
if inside or (conservative and (e01 >= 0 and e12 >= 0 and e20 >= 0)): |
|
|
covered_samples += 1 |
|
|
|
|
|
|
|
|
b0 = e12 / area |
|
|
b1 = e20 / area |
|
|
b2 = e01 / area |
|
|
|
|
|
|
|
|
z = self._interpolate_perspective( |
|
|
(b0, b1, b2), |
|
|
pos0[2], pos1[2], pos2[2], |
|
|
w0, w1, w2 |
|
|
) |
|
|
|
|
|
|
|
|
attributes = {} |
|
|
for attr in v0['attributes'].keys(): |
|
|
attributes[attr] = self._interpolate_perspective( |
|
|
(b0, b1, b2), |
|
|
v0['attributes'][attr], |
|
|
v1['attributes'][attr], |
|
|
v2['attributes'][attr], |
|
|
w0, w1, w2 |
|
|
) |
|
|
|
|
|
sample_fragments.append({ |
|
|
"x": x, |
|
|
"y": y, |
|
|
"sample_x": sample_x, |
|
|
"sample_y": sample_y, |
|
|
"depth": z, |
|
|
"attributes": attributes, |
|
|
"barycentric": (b0, b1, b2) |
|
|
}) |
|
|
|
|
|
if covered_samples > 0: |
|
|
fragment = { |
|
|
"x": x, |
|
|
"y": y, |
|
|
"samples": sample_fragments, |
|
|
"coverage": covered_samples / len(sample_positions) |
|
|
} |
|
|
fragments.append(fragment) |
|
|
|
|
|
return fragments |
|
|
|
|
|
class HiZBuffer: |
|
|
"""Hierarchical Z-buffer for early depth testing""" |
|
|
def __init__(self, width, height): |
|
|
self.width = width |
|
|
self.height = height |
|
|
self.levels = [] |
|
|
|
|
|
|
|
|
current_w, current_h = width, height |
|
|
while current_w > 0 and current_h > 0: |
|
|
self.levels.append(np.full((current_h, current_w), 1.0)) |
|
|
current_w //= 2 |
|
|
current_h //= 2 |
|
|
|
|
|
def update_region(self, x, y, z): |
|
|
"""Update Hi-Z pyramid after depth write""" |
|
|
level = 0 |
|
|
while level < len(self.levels): |
|
|
level_x, level_y = x >> level, y >> level |
|
|
if level_x >= self.levels[level].shape[1] or level_y >= self.levels[level].shape[0]: |
|
|
break |
|
|
|
|
|
|
|
|
self.levels[level][level_y, level_x] = min( |
|
|
self.levels[level][level_y, level_x], z) |
|
|
level += 1 |
|
|
|
|
|
def test_region(self, min_x, min_y, max_x, max_y, z): |
|
|
"""Test if region could be visible (not occluded)""" |
|
|
|
|
|
width = max_x - min_x + 1 |
|
|
height = max_y - min_y + 1 |
|
|
level = max(0, int(np.log2(max(width, height)))) |
|
|
|
|
|
if level >= len(self.levels): |
|
|
return True |
|
|
|
|
|
|
|
|
level_min_x = min_x >> level |
|
|
level_min_y = min_y >> level |
|
|
level_max_x = max_x >> level |
|
|
level_max_y = max_y >> level |
|
|
|
|
|
|
|
|
min_depth = np.inf |
|
|
for ly in range(level_min_y, level_max_y + 1): |
|
|
for lx in range(level_min_x, level_max_x + 1): |
|
|
if ly < self.levels[level].shape[0] and lx < self.levels[level].shape[1]: |
|
|
min_depth = min(min_depth, self.levels[level][ly, lx]) |
|
|
|
|
|
return z <= min_depth |
|
|
|
|
|
def process_fragments(self, fragments, fragment_shader_program, chip_id=0, |
|
|
early_z=True, hierarchical_z=True): |
|
|
""" |
|
|
Process fragments using the fragment shader with early-Z and Hi-Z optimizations |
|
|
|
|
|
Args: |
|
|
fragments: List of fragments to process |
|
|
fragment_shader_program: Shader program to execute |
|
|
chip_id: GPU chip to use |
|
|
early_z: Enable early-Z optimization |
|
|
hierarchical_z: Enable hierarchical Z-buffer |
|
|
""" |
|
|
processed_fragments = [] |
|
|
|
|
|
|
|
|
hiz = None |
|
|
if hierarchical_z: |
|
|
fb_width = max(f["x"] for f in fragments) + 1 |
|
|
fb_height = max(f["y"] for f in fragments) + 1 |
|
|
hiz = self.HiZBuffer(fb_width, fb_height) |
|
|
|
|
|
|
|
|
TILE_SIZE = 32 |
|
|
tiles = {} |
|
|
for fragment in fragments: |
|
|
tile_x = fragment["x"] // TILE_SIZE |
|
|
tile_y = fragment["y"] // TILE_SIZE |
|
|
if (tile_x, tile_y) not in tiles: |
|
|
tiles[(tile_x, tile_y)] = [] |
|
|
tiles[(tile_x, tile_y)].append(fragment) |
|
|
|
|
|
|
|
|
for (tile_x, tile_y), tile_fragments in tiles.items(): |
|
|
|
|
|
if early_z: |
|
|
tile_fragments.sort(key=lambda f: f["samples"][0]["depth"]) |
|
|
|
|
|
|
|
|
tile_min_x = tile_x * TILE_SIZE |
|
|
tile_min_y = tile_y * TILE_SIZE |
|
|
tile_max_x = min(tile_min_x + TILE_SIZE - 1, fb_width - 1) |
|
|
tile_max_y = min(tile_min_y + TILE_SIZE - 1, fb_height - 1) |
|
|
|
|
|
if hierarchical_z: |
|
|
min_depth = min(s["depth"] for f in tile_fragments for s in f["samples"]) |
|
|
if not hiz.test_region(tile_min_x, tile_min_y, tile_max_x, tile_max_y, min_depth): |
|
|
continue |
|
|
|
|
|
|
|
|
for fragment in tile_fragments: |
|
|
|
|
|
if early_z: |
|
|
depth_test_passed = False |
|
|
for sample in fragment["samples"]: |
|
|
if self._depth_test(sample["depth"], fragment["x"], fragment["y"]): |
|
|
depth_test_passed = True |
|
|
break |
|
|
if not depth_test_passed: |
|
|
continue |
|
|
|
|
|
|
|
|
processed_samples = [] |
|
|
for sample in fragment["samples"]: |
|
|
color = self._execute_fragment_shader(sample, fragment_shader_program, chip_id) |
|
|
processed_sample = { |
|
|
"sample_x": sample["sample_x"], |
|
|
"sample_y": sample["sample_y"], |
|
|
"depth": sample["depth"], |
|
|
"color": color |
|
|
} |
|
|
processed_samples.append(processed_sample) |
|
|
|
|
|
|
|
|
if hierarchical_z: |
|
|
hiz.update_region(fragment["x"], fragment["y"], sample["depth"]) |
|
|
|
|
|
processed_fragment = { |
|
|
"x": fragment["x"], |
|
|
"y": fragment["y"], |
|
|
"samples": processed_samples, |
|
|
"coverage": fragment["coverage"] |
|
|
} |
|
|
processed_fragments.append(processed_fragment) |
|
|
|
|
|
return processed_fragments |
|
|
|
|
|
def _execute_fragment_shader(self, fragment, fragment_shader_program, chip_id): |
|
|
""" |
|
|
Simulate execution of a fragment shader for a single fragment. |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
r = (fragment["x"] % 256) / 255.0 |
|
|
g = (fragment["y"] % 256) / 255.0 |
|
|
b = fragment["depth"] |
|
|
a = 1.0 |
|
|
|
|
|
return (r, g, b, a) |
|
|
|
|
|
def _depth_test(self, fragment_depth: float, x: int, y: int, |
|
|
depth_func=lambda a,b: a < b) -> bool: |
|
|
""" |
|
|
Test fragment depth against depth buffer |
|
|
|
|
|
Args: |
|
|
fragment_depth: Fragment's depth value |
|
|
x, y: Fragment coordinates |
|
|
depth_func: Depth comparison function |
|
|
|
|
|
Returns: |
|
|
bool: True if fragment passes depth test |
|
|
""" |
|
|
depth_buffer_index = y * self.framebuffer_width + x |
|
|
current_depth = self.depth_buffer[depth_buffer_index] |
|
|
return depth_func(fragment_depth, current_depth) |
|
|
|
|
|
def depth_test(self, fragments: List[Dict], depth_buffer_bytes: bytes, |
|
|
framebuffer_width: int, |
|
|
depth_func: str = 'LESS', |
|
|
depth_write: bool = True, |
|
|
stencil_enabled: bool = False) -> Tuple[List[Dict], bytes]: |
|
|
""" |
|
|
Perform depth and optional stencil testing on fragments |
|
|
|
|
|
Args: |
|
|
fragments: List of fragments to test |
|
|
depth_buffer_bytes: Current depth buffer |
|
|
framebuffer_width: Width of framebuffer |
|
|
depth_func: Depth comparison function ('LESS', 'LEQUAL', etc) |
|
|
depth_write: Whether to write passing fragments to depth buffer |
|
|
stencil_enabled: Whether to perform stencil testing |
|
|
|
|
|
Returns: |
|
|
Tuple of (passed fragments, modified depth buffer) |
|
|
""" |
|
|
self.framebuffer_width = framebuffer_width |
|
|
|
|
|
|
|
|
depth_funcs = { |
|
|
'NEVER': lambda a,b: False, |
|
|
'LESS': lambda a,b: a < b, |
|
|
'EQUAL': lambda a,b: abs(a - b) < 1e-6, |
|
|
'LEQUAL': lambda a,b: a <= b, |
|
|
'GREATER': lambda a,b: a > b, |
|
|
'NOTEQUAL': lambda a,b: abs(a - b) >= 1e-6, |
|
|
'GEQUAL': lambda a,b: a >= b, |
|
|
'ALWAYS': lambda a,b: True |
|
|
} |
|
|
depth_compare = depth_funcs[depth_func] |
|
|
|
|
|
|
|
|
self.depth_buffer = [] |
|
|
if depth_buffer_bytes: |
|
|
for i in range(0, len(depth_buffer_bytes), 4): |
|
|
depth = struct.unpack("f", bytes(bytearray(depth_buffer_bytes[i:i+4])))[0] |
|
|
self.depth_buffer.append(depth) |
|
|
else: |
|
|
self.depth_buffer = [1.0] * (framebuffer_width * framebuffer_width) |
|
|
|
|
|
passed_fragments = [] |
|
|
for fragment in fragments: |
|
|
x, y = fragment["x"], fragment["y"] |
|
|
passed_samples = [] |
|
|
|
|
|
for sample in fragment["samples"]: |
|
|
if self._depth_test(sample["depth"], x, y, depth_compare): |
|
|
passed_samples.append(sample) |
|
|
|
|
|
|
|
|
if depth_write: |
|
|
depth_idx = y * framebuffer_width + x |
|
|
self.depth_buffer[depth_idx] = sample["depth"] |
|
|
|
|
|
if passed_samples: |
|
|
fragment = fragment.copy() |
|
|
fragment["samples"] = passed_samples |
|
|
fragment["coverage"] = len(passed_samples) / len(fragment["samples"]) |
|
|
passed_fragments.append(fragment) |
|
|
|
|
|
|
|
|
modified_depth_buffer = b''.join( |
|
|
[struct.pack("f", d) for d in self.depth_buffer]) |
|
|
|
|
|
return passed_fragments, modified_depth_buffer |
|
|
|
|
|
class BlendMode(Enum): |
|
|
"""Blend modes for color blending""" |
|
|
ZERO = auto() |
|
|
ONE = auto() |
|
|
SRC_COLOR = auto() |
|
|
ONE_MINUS_SRC_COLOR = auto() |
|
|
DST_COLOR = auto() |
|
|
ONE_MINUS_DST_COLOR = auto() |
|
|
SRC_ALPHA = auto() |
|
|
ONE_MINUS_SRC_ALPHA = auto() |
|
|
DST_ALPHA = auto() |
|
|
ONE_MINUS_DST_ALPHA = auto() |
|
|
|
|
|
class BlendOp(Enum): |
|
|
"""Blend operations""" |
|
|
ADD = auto() |
|
|
SUBTRACT = auto() |
|
|
REVERSE_SUBTRACT = auto() |
|
|
MIN = auto() |
|
|
MAX = auto() |
|
|
|
|
|
def _blend_factor(self, mode: BlendMode, src_color, dst_color) -> np.ndarray: |
|
|
"""Calculate blend factor based on mode""" |
|
|
if mode == self.BlendMode.ZERO: |
|
|
return np.zeros(4) |
|
|
elif mode == self.BlendMode.ONE: |
|
|
return np.ones(4) |
|
|
elif mode == self.BlendMode.SRC_COLOR: |
|
|
return src_color |
|
|
elif mode == self.BlendMode.ONE_MINUS_SRC_COLOR: |
|
|
return 1.0 - src_color |
|
|
elif mode == self.BlendMode.DST_COLOR: |
|
|
return dst_color |
|
|
elif mode == self.BlendMode.ONE_MINUS_DST_COLOR: |
|
|
return 1.0 - dst_color |
|
|
elif mode == self.BlendMode.SRC_ALPHA: |
|
|
return np.full(4, src_color[3]) |
|
|
elif mode == self.BlendMode.ONE_MINUS_SRC_ALPHA: |
|
|
return np.full(4, 1.0 - src_color[3]) |
|
|
elif mode == self.BlendMode.DST_ALPHA: |
|
|
return np.full(4, dst_color[3]) |
|
|
elif mode == self.BlendMode.ONE_MINUS_DST_ALPHA: |
|
|
return np.full(4, 1.0 - dst_color[3]) |
|
|
|
|
|
def _blend_operation(self, op: BlendOp, src: np.ndarray, dst: np.ndarray) -> np.ndarray: |
|
|
"""Apply blend operation""" |
|
|
if op == self.BlendOp.ADD: |
|
|
return src + dst |
|
|
elif op == self.BlendOp.SUBTRACT: |
|
|
return src - dst |
|
|
elif op == self.BlendOp.REVERSE_SUBTRACT: |
|
|
return dst - src |
|
|
elif op == self.BlendOp.MIN: |
|
|
return np.minimum(src, dst) |
|
|
elif op == self.BlendOp.MAX: |
|
|
return np.maximum(src, dst) |
|
|
|
|
|
def write_to_framebuffer(self, fragments: List[Dict], color_buffer: bytearray, |
|
|
framebuffer_width: int, |
|
|
blend_enable: bool = True, |
|
|
src_blend: BlendMode = BlendMode.SRC_ALPHA, |
|
|
dst_blend: BlendMode = BlendMode.ONE_MINUS_SRC_ALPHA, |
|
|
blend_op: BlendOp = BlendOp.ADD) -> bytearray: |
|
|
""" |
|
|
Write fragments to framebuffer with MSAA resolve and blending |
|
|
|
|
|
Args: |
|
|
fragments: List of fragments to write |
|
|
color_buffer: Current framebuffer contents |
|
|
framebuffer_width: Width of framebuffer |
|
|
blend_enable: Whether to enable blending |
|
|
src_blend: Source blend factor |
|
|
dst_blend: Destination blend factor |
|
|
blend_op: Blend operation |
|
|
|
|
|
Returns: |
|
|
Modified color buffer |
|
|
""" |
|
|
for fragment in fragments: |
|
|
x, y = fragment["x"], fragment["y"] |
|
|
buffer_index = (y * framebuffer_width + x) * 4 |
|
|
|
|
|
|
|
|
dst_color = np.array([ |
|
|
color_buffer[buffer_index] / 255.0, |
|
|
color_buffer[buffer_index + 1] / 255.0, |
|
|
color_buffer[buffer_index + 2] / 255.0, |
|
|
color_buffer[buffer_index + 3] / 255.0 |
|
|
]) |
|
|
|
|
|
|
|
|
if len(fragment["samples"]) > 1: |
|
|
|
|
|
src_color = np.zeros(4) |
|
|
total_weight = 0.0 |
|
|
|
|
|
for sample in fragment["samples"]: |
|
|
weight = fragment["coverage"] / len(fragment["samples"]) |
|
|
src_color += np.array(sample["color"]) * weight |
|
|
total_weight += weight |
|
|
|
|
|
if total_weight > 0: |
|
|
src_color /= total_weight |
|
|
else: |
|
|
src_color = np.array(fragment["samples"][0]["color"]) |
|
|
|
|
|
|
|
|
if blend_enable: |
|
|
src_factor = self._blend_factor(src_blend, src_color, dst_color) |
|
|
dst_factor = self._blend_factor(dst_blend, src_color, dst_color) |
|
|
|
|
|
final_color = self._blend_operation( |
|
|
blend_op, |
|
|
src_color * src_factor, |
|
|
dst_color * dst_factor |
|
|
) |
|
|
else: |
|
|
final_color = src_color |
|
|
|
|
|
|
|
|
final_color = np.clip(final_color, 0.0, 1.0) |
|
|
color_buffer[buffer_index] = int(final_color[0] * 255) |
|
|
color_buffer[buffer_index + 1] = int(final_color[1] * 255) |
|
|
color_buffer[buffer_index + 2] = int(final_color[2] * 255) |
|
|
color_buffer[buffer_index + 3] = int(final_color[3] * 255) |
|
|
|
|
|
return color_buffer |
|
|
|
|
|
|
|
|
|