Source code for neatnet.gaps

import math
import operator

import geopandas as gpd
import numpy as np
import shapely

__all__ = [
    "close_gaps",
    "extend_lines",
]


[docs] def close_gaps( gdf: gpd.GeoSeries | gpd.GeoDataFrame, tolerance: float ) -> gpd.GeoSeries: """Close gaps in LineString geometry where it should be contiguous. Snaps both lines to a centroid of a gap in between. Parameters ---------- gdf : geopandas.GeoSeries | geopandas.GeoDataFrame LineString representations of a network. tolerance : float Nodes within ``tolerance`` will be snapped together. Returns ------- geopandas.GeoSeries See also -------- neatnet.extend_lines neatnet.remove_interstitial_nodes """ geom = gdf.geometry.array coords = shapely.get_coordinates(geom) indices = shapely.get_num_coordinates(geom) # generate a list of start and end coordinates and create point geometries edges = [0] i = 0 for ind in indices: ix = i + ind edges.append(ix - 1) edges.append(ix) i = ix edges = edges[:-1] points = shapely.points(np.unique(coords[edges], axis=0)) buffered = shapely.buffer(points, tolerance / 2) dissolved = shapely.union_all(buffered) exploded = [ shapely.get_geometry(dissolved, i) for i in range(shapely.get_num_geometries(dissolved)) ] centroids = shapely.centroid(exploded) snapped = shapely.snap(geom, shapely.union_all(centroids), tolerance) return gpd.GeoSeries(snapped, crs=gdf.crs)
[docs] def extend_lines( gdf: gpd.GeoDataFrame, tolerance: float, *, target: None | gpd.GeoSeries | gpd.GeoDataFrame = None, barrier: None | gpd.GeoSeries | gpd.GeoDataFrame = None, extension: int | float = 0, ) -> gpd.GeoDataFrame: """Extends lines from ``gdf`` to itself or target within a set tolerance. Extends unjoined ends of LineString segments to join with other segments or target. If ``target`` is passed, extend lines to target. Otherwise extend lines to itself. If ``barrier`` is passed, each extended line is checked for intersection with ``barrier``. If they intersect, extended line is not returned. This can be useful if you don't want to extend street network segments through buildings. Parameters ---------- gdf : geopandas.GeoDataFrame GeoDataFrame containing LineString geometry. tolerance : float Tolerance in snapping (by how much could be each segment extended). target : None | geopandas.GeoSeries | geopandas.GeoDataFrame Target geometry to which ``gdf`` gets extended. Has to be (Multi)LineString geometry. barrier : None | geopandas.GeoSeries | geopandas.GeoDataFrame = None Extended line is not used if it intersects barrier. extension : int | float = 0 By how much to extend line beyond the snapped geometry. Useful when creating enclosures to avoid floating point imprecision. Returns ------- geopandas.GeoDataFrame GeoDataFrame with extended geometries. See also -------- neatnet.close_gaps neatnet.remove_interstitial_nodes """ # explode to avoid MultiLineStrings # reset index due to the bug in GeoPandas explode df = gdf.reset_index(drop=True).explode(ignore_index=True) if target is None: target = df itself = True else: itself = False # get underlying shapely geometry geom = df.geometry.array # extract array of coordinates and number per geometry coords = shapely.get_coordinates(geom) indices = shapely.get_num_coordinates(geom) # generate a list of start and end coordinates and create point geometries edges = [0] i = 0 for ind in indices: ix = i + ind edges.append(ix - 1) edges.append(ix) i = ix edges = edges[:-1] points = shapely.points(np.unique(coords[edges], axis=0)) # query LineString geometry to identify points intersecting 2 geometries tree = shapely.STRtree(geom) inp, res = tree.query(points, predicate="intersects") unique, counts = np.unique(inp, return_counts=True) ends = np.unique(res[np.isin(inp, unique[counts == 1])]) new_geoms = [] # iterate over cul-de-sac-like segments and attempt to snap them to street network for line in ends: l_coords = shapely.get_coordinates(geom[line]) start = shapely.points(l_coords[0]) end = shapely.points(l_coords[-1]) first = list(tree.query(start, predicate="intersects")) second = list(tree.query(end, predicate="intersects")) first.remove(line) second.remove(line) t = target if not itself else target.drop(line) if first and not second: snapped = _extend_line(l_coords, t, tolerance) if ( barrier is not None and barrier.sindex.query( shapely.linestrings(snapped), predicate="intersects" ).size > 0 ): new_geoms.append(geom[line]) else: if extension == 0: new_geoms.append(shapely.linestrings(snapped)) else: new_geoms.append( shapely.linestrings( _extend_line(snapped, t, extension, snap=False) ) ) elif not first and second: snapped = _extend_line(np.flip(l_coords, axis=0), t, tolerance) if ( barrier is not None and barrier.sindex.query( shapely.linestrings(snapped), predicate="intersects" ).size > 0 ): new_geoms.append(geom[line]) else: if extension == 0: new_geoms.append(shapely.linestrings(snapped)) else: new_geoms.append( shapely.linestrings( _extend_line(snapped, t, extension, snap=False) ) ) elif not first and not second: one_side = _extend_line(l_coords, t, tolerance) one_side_e = _extend_line(one_side, t, extension, snap=False) snapped = _extend_line(np.flip(one_side_e, axis=0), t, tolerance) if ( barrier is not None and barrier.sindex.query( shapely.linestrings(snapped), predicate="intersects" ).size > 0 ): new_geoms.append(geom[line]) else: if extension == 0: new_geoms.append(shapely.linestrings(snapped)) else: new_geoms.append( shapely.linestrings( _extend_line(snapped, t, extension, snap=False) ) ) df.iloc[ends, df.columns.get_loc(df.geometry.name)] = new_geoms return df
def _extend_line( coords: np.ndarray, target: gpd.GeoDataFrame | gpd.GeoSeries, tolerance: float, snap: bool = True, ) -> np.ndarray: """Extends a line geometry to snap on the target within a tolerance.""" if snap: extrapolation = _get_extrapolated_line( coords[-4:] if len(coords.shape) == 1 else coords[-2:].flatten(), tolerance, ) int_idx = target.sindex.query(extrapolation, predicate="intersects") intersection = shapely.intersection( target.iloc[int_idx].geometry.array, extrapolation ) if intersection.size > 0: if len(intersection) > 1: distances = {} ix = 0 for p in intersection: distance = shapely.distance(p, shapely.points(coords[-1])) distances[ix] = distance ix = ix + 1 minimal = min(distances.items(), key=operator.itemgetter(1))[0] new_point_coords = shapely.get_coordinates(intersection[minimal]) else: new_point_coords = shapely.get_coordinates(intersection[0]) coo = np.append(coords, new_point_coords) new = np.reshape(coo, (len(coo) // 2, 2)) return new return coords extrapolation = _get_extrapolated_line( coords[-4:] if len(coords.shape) == 1 else coords[-2:].flatten(), tolerance, point=True, ) return np.vstack([coords, extrapolation]) def _get_extrapolated_line( coords: np.ndarray, tolerance: float, point: bool = False ) -> tuple[float, float] | shapely.LineString: """Creates a shapely line extrapolated in p1->p2 direction.""" p1 = coords[:2] p2 = coords[2:] a = p2 # defining new point based on the vector between existing points if p1[0] >= p2[0] and p1[1] >= p2[1]: b = ( p2[0] - tolerance * math.cos( math.atan( math.fabs(p1[1] - p2[1] + 0.000001) / math.fabs(p1[0] - p2[0] + 0.000001) ) ), p2[1] - tolerance * math.sin( math.atan( math.fabs(p1[1] - p2[1] + 0.000001) / math.fabs(p1[0] - p2[0] + 0.000001) ) ), ) elif p1[0] <= p2[0] and p1[1] >= p2[1]: b = ( p2[0] + tolerance * math.cos( math.atan( math.fabs(p1[1] - p2[1] + 0.000001) / math.fabs(p1[0] - p2[0] + 0.000001) ) ), p2[1] - tolerance * math.sin( math.atan( math.fabs(p1[1] - p2[1] + 0.000001) / math.fabs(p1[0] - p2[0] + 0.000001) ) ), ) elif p1[0] <= p2[0] and p1[1] <= p2[1]: b = ( p2[0] + tolerance * math.cos( math.atan( math.fabs(p1[1] - p2[1] + 0.000001) / math.fabs(p1[0] - p2[0] + 0.000001) ) ), p2[1] + tolerance * math.sin( math.atan( math.fabs(p1[1] - p2[1] + 0.000001) / math.fabs(p1[0] - p2[0] + 0.000001) ) ), ) else: b = ( p2[0] - tolerance * math.cos( math.atan( math.fabs(p1[1] - p2[1] + 0.000001) / math.fabs(p1[0] - p2[0] + 0.000001) ) ), p2[1] + tolerance * math.sin( math.atan( math.fabs(p1[1] - p2[1] + 0.000001) / math.fabs(p1[0] - p2[0] + 0.000001) ) ), ) if point: return b return shapely.linestrings([a, b])