Source code for neatnet.artifacts

import logging
import typing
import warnings

import geopandas as gpd
import momepy
import numpy as np
import pandas as pd
import shapely
from esda import shape
from libpysal import graph
from scipy import sparse
from scipy.signal import find_peaks
from scipy.stats import gaussian_kde

from .geometry import (
    _is_within,
    angle_between_two_lines,
    snap_to_targets,
    voronoi_skeleton,
)
from .nodes import weld_edges

logger = logging.getLogger(__name__)


[docs] class FaceArtifacts: """Identify face artifacts in street networks. For a given street network composed of transportation-oriented geometry containing features representing things like roundabouts, dual carriegaways and complex intersections, identify areas enclosed by geometry that is considered a `face artifact` as per :cite:`fleischmann_shape-based_2024`. Face artifacts highlight areas with a high likelihood of being of non-morphological (e.g. transporation) origin and may require simplification prior morphological analysis. See :cite:`fleischmann_shape-based_2024` for more details. Parameters ---------- gdf : geopandas.GeoDataFrame GeoDataFrame containing street network represented as (Multi)LineString geometry index : str, optional A type of the shape compacntess index to be used. Available are ['circlular_compactness', 'isoperimetric_quotient', 'diameter_ratio'], by default "circular_compactness" height_mins : float, optional Required depth of valleys, by default -np.inf height_maxs : float, optional Required height of peaks, by default 0.008 prominence : float, optional Required prominence of peaks, by default 0.00075 Attributes ---------- threshold : float Identified threshold between face polygons and face artifacts face_artifacts : GeoDataFrame A GeoDataFrame of geometries identified as face artifacts polygons : GeoDataFrame All polygons resulting from polygonization of the input gdf with the face_artifact_index kde : scipy.stats._kde.gaussian_kde Representation of a kernel-density estimate using Gaussian kernels. pdf : numpy.ndarray Probability density function peaks : numpy.ndarray locations of peaks in pdf valleys : numpy.ndarray locations of valleys in pdf Examples -------- >>> fa = neatnet.FaceArtifacts(street_network_prague) >>> fa.threshold 6.9634555986177045 >>> fa.face_artifacts.head() geometry face_artifact_index 6 POLYGON ((-744164.625 -1043922.362, -744167.39... 5.112844 9 POLYGON ((-744154.119 -1043804.734, -744152.07... 6.295660 10 POLYGON ((-744101.275 -1043738.053, -744103.80... 2.862871 12 POLYGON ((-744095.511 -1043623.478, -744095.35... 3.712403 17 POLYGON ((-744488.466 -1044533.317, -744489.33... 5.158554 Notes ----- In purely synthetic scenarios where all calculated face artifact index values are equal (e.g. a regular grid/lattice) a ``LinAlgError`` error will be raised. This is virtually guaranteed to not happen in practice. """
[docs] def __init__( self, gdf, index="circular_compactness", height_mins=-np.inf, height_maxs=0.008, prominence=0.00075, ): try: from esda import shape except (ImportError, ModuleNotFoundError) as err: raise ImportError( "The `esda` package is required. You can install it using " "`pip install esda` or `conda install esda -c conda-forge`." ) from err # Polygonize street network polygons = gpd.GeoSeries( shapely.polygonize( # polygonize [gdf.dissolve().geometry.item()] ) ).explode(ignore_index=True) # Store geometries as a GeoDataFrame self.polygons = gpd.GeoDataFrame(geometry=polygons) if self.polygons.empty: warnings.warn( "Input roads could not not be polygonized. " "Identification of face artifacts not possible.", UserWarning, stacklevel=2, ) self.kde = None self.pdf = None self.peaks = None self.d_peaks = None self.valleys = None self.d_valleys = None self.threshold = None self.face_artifacts = None return if index == "circular_compactness": self.polygons["face_artifact_index"] = np.log( shape.minimum_bounding_circle_ratio(polygons) * polygons.area ) elif index == "isoperimetric_quotient": self.polygons["face_artifact_index"] = np.log( shape.isoperimetric_quotient(polygons) * polygons.area ) elif index == "diameter_ratio": self.polygons["face_artifact_index"] = np.log( shape.diameter_ratio(polygons) * polygons.area ) else: raise ValueError( f"'{index}' is not supported. Use one of ['circlular_compactness', " "'isoperimetric_quotient', 'diameter_ratio']" ) # parameters for peak/valley finding peak_parameters = { "height_mins": height_mins, "height_maxs": height_maxs, "prominence": prominence, } mylinspace = np.linspace( self.polygons["face_artifact_index"].min(), self.polygons["face_artifact_index"].max(), 1000, ) self.kde = gaussian_kde( self.polygons["face_artifact_index"], bw_method="silverman" ) self.pdf = self.kde.pdf(mylinspace) # find peaks self.peaks, self.d_peaks = find_peaks( x=self.pdf, height=peak_parameters["height_maxs"], threshold=None, distance=None, prominence=peak_parameters["prominence"], width=1, plateau_size=None, ) # find valleys self.valleys, self.d_valleys = find_peaks( x=-self.pdf + 1, height=peak_parameters["height_mins"], threshold=None, distance=None, prominence=peak_parameters["prominence"], width=1, plateau_size=None, ) # check if we have at least 2 peaks condition_2peaks = len(self.peaks) > 1 # check if we have at least 1 valley condition_1valley = len(self.valleys) > 0 conditions = [condition_2peaks, condition_1valley] # if both these conditions are true, we find the artifact index if all(conditions): # find list order of highest peak highest_peak_listindex = np.argmax(self.d_peaks["peak_heights"]) # find index (in linspace) of highest peak highest_peak_index = self.peaks[highest_peak_listindex] # define all possible peak ranges fitting our definition peak_bounds = list(zip(self.peaks[:-1], self.peaks[1:], strict=True)) peak_bounds_accepted = [b for b in peak_bounds if highest_peak_index in b] # find all valleys that lie between two peaks valleys_accepted = [ v_index for v_index in self.valleys if any(v_index in range(b[0], b[1]) for b in peak_bounds_accepted) ] # the value of the first of those valleys is our artifact index # get the order of the valley valley_index = valleys_accepted[0] # derive threshold value for given option from index/linspace self.threshold = float(mylinspace[valley_index]) self.face_artifacts = self.polygons[ self.polygons.face_artifact_index < self.threshold ] else: warnings.warn( "No threshold found. Either your dataset it too small or the " "distribution of the face artifact index does not follow the " "expected shape.", UserWarning, stacklevel=2, ) self.threshold = None self.face_artifacts = None
[docs] def get_artifacts( roads: gpd.GeoDataFrame, threshold: None | float | int = None, threshold_fallback: None | float | int = None, area_threshold_blocks: float | int = 1e5, isoareal_threshold_blocks: float | int = 0.5, area_threshold_circles: float | int = 5e4, isoareal_threshold_circles_enclosed: float | int = 0.75, isoperimetric_threshold_circles_touching: float | int = 0.9, exclusion_mask: None | gpd.GeoSeries = None, predicate: str = "intersects", ) -> tuple[gpd.GeoDataFrame, float]: """Extract face artifacts and return the FAI threshold. See :cite:`fleischmann_shape-based_2024` for more details. Parameters ---------- roads : geopandas.GeoDataFrame Input roads that have been preprocessed. threshold : None | float | int = None First option threshold used to determine face artifacts. See the ``artifact_threshold`` keyword argument in ``simplify.simplify_network()``. threshold_fallback : None | float | int = None Second option threshold used to determine face artifacts. See the ``artifact_threshold_fallback`` keyword argument in ``simplify.simplify_network()``. area_threshold_blocks : float | int = 1e5 This is the first threshold for detecting block-like artifacts whose Face Artifact Index (see :cite:`fleischmann_shape-based_2024`) is above the value passed in ``artifact_threshold``. If a polygon has an area below ``area_threshold_blocks``, *and* is of elongated shape (see also ``isoareal_threshold_blocks``), *and* touches at least one polygon that has already been classified as artifact, then it will be classified as an artifact. isoareal_threshold_blocks : float | int = 0.5 This is the second threshold for detecting block-like artifacts whose Face Artifact Index (see :cite:`fleischmann_shape-based_2024`) is above the value passed in ``artifact_threshold``. If a polygon has an isoareal quotient below ``isoareal_threshold_blocks`` (see ``esda.shape.isoareal_quotient``), i.e., if it has an elongated shape; *and* it has a sufficiently small area (see also ``area_threshold_blocks``), *and* if it touches at least one polygon that has already been detected as an artifact, then it will be classified as an artifact. area_threshold_circles : float | int = 5e4 This is the first threshold for detecting circle-like artifacts whose Face Artifact Index (see :cite:`fleischmann_shape-based_2024`) is above the value passed in ``artifact_threshold``. If a polygon has an area below ``area_threshold_circles``, *and* one of the following 2 cases is given: (a) the polygon is touched, but not enclosed by polygons already classified as artifacts, *and* with an isoperimetric quotient (see ``esda.shape.isoperimetric_quotient``) above ``isoperimetric_threshold_circles_touching``, i.e., if its shape is close to circular; or (b) the polygon is fully enclosed by polygons already classified as artifacts, *and* with an isoareal quotient above ``isoareal_threshold_circles_enclosed``, i.e., if its shape is close to circular; then it will be classified as an artifact. isoareal_threshold_circles_enclosed : float | int = 0.75 This is the second threshold for detecting circle-like artifacts whose Face Artifact Index (see :cite:`fleischmann_shape-based_2024`) is above the value passed in ``artifact_threshold``. If a polygon has a sufficiently small area (see also ``area_threshold_circles``), *and* the polygon is fully enclosed by polygons already classified as artifacts, *and* its isoareal quotient (see ``esda.shape.isoareal_quotient``) is above the value passed to ``isoareal_threshold_circles_enclosed``, i.e., if its shape is close to circular; then it will be classified as an artifact. isoperimetric_threshold_circles_touching : float | int = 0.9 This is the third threshold for detecting circle-like artifacts whose Face Artifact Index (see :cite:`fleischmann_shape-based_2024`) is above the value passed in ``artifact_threshold``. If a polygon has a sufficiently small area (see also ``area_threshold_circles``), *and* the polygon is touched by at least one polygon already classified as artifact, *and* its isoperimetric quotient (see ``esda.shape.isoperimetric_quotient``) is above the value passed to ``isoperimetric_threshold_circles_touching``, i.e., if its shape is close to circular; then it will be classified as an artifact. exclusion_mask : None | geopandas.GeoSeries = None Polygons used to determine face artifacts to exclude from returned output. predicate : str = 'intersects' The spatial predicate used to exclude face artifacts from returned output. Returns ------- artifacts : geopandas.GeoDataFrame Face artifact polygons. threshold : float Resultant artifact detection threshold from ``FaceArtifacts.threshold``. May also be the returned value of ``threshold`` or ``threshold_fallback``. """ def _relate(neighs: tuple, cond: typing.Callable) -> bool: """Helper for relating artifacts.""" return len(neighs) > 0 and cond(polys.loc[list(neighs), "is_artifact"]) with warnings.catch_warnings(): # the second loop likey won't find threshold warnings.filterwarnings("ignore", message="No threshold found") fas = FaceArtifacts(roads) polys = fas.polygons.set_crs(roads.crs) # rook neighbors rook = graph.Graph.build_contiguity(polys, rook=True) polys["neighbors"] = rook.neighbors # polygons are not artifacts... polys["is_artifact"] = False # ...unless the fai is below the threshold if threshold is None: if not fas.threshold and threshold_fallback: threshold = threshold_fallback elif not fas.threshold and not threshold_fallback: raise ValueError( "No threshold for artifact detection found. Pass explicit " "`threshold` or `threshold_fallback` to provide the value directly." ) else: threshold = fas.threshold polys.loc[polys["face_artifact_index"] < threshold, "is_artifact"] = True # compute area, isoareal quotient, and isoperimetric quotient polys["area_sqm"] = polys.area polys["isoareal_index"] = shape.isoareal_quotient(polys.geometry) polys["isoperimetric_quotient"] = shape.isoperimetric_quotient(polys.geometry) is_artifact = polys["is_artifact"] area = polys["area_sqm"] isoareal = polys["isoareal_index"] isoperimetric = polys["isoperimetric_quotient"] # iterate to account for artifacts that become # enclosed or touching by new designation while True: # count number of artifacts to break while loop # when no new artifacts are added artifact_count_before = sum(is_artifact) # polygons that are enclosed by artifacts (at this moment) polys["enclosed"] = polys.apply(lambda x: _relate(x["neighbors"], all), axis=1) # polygons that are touching artifacts (at this moment) polys["touching"] = polys.apply(lambda x: _relate(x["neighbors"], any), axis=1) # "block" like artifacts (that are not too big or too rectangular) # TODO: there are still some dual carriageway - type blocks # TODO: that slip through this one cond_geom = polys["enclosed"] | polys["touching"] cond_area = area < area_threshold_blocks cond_metric = isoareal < isoareal_threshold_blocks polys.loc[cond_geom & cond_area & cond_metric, "is_artifact"] = True # "circle" like artifacts (that are small and quite circular) # -- circles enclosed cond_geom = polys["enclosed"] cond_area = area < area_threshold_circles cond_metric = isoareal > isoareal_threshold_circles_enclosed polys.loc[cond_geom & cond_area & cond_metric, "is_artifact"] = True # -- circles touching cond_geom = polys["touching"] cond_area = area < area_threshold_circles cond_metric = isoperimetric > isoperimetric_threshold_circles_touching polys.loc[cond_geom & cond_area & cond_metric, "is_artifact"] = True artifact_count_after = sum(is_artifact) if artifact_count_after == artifact_count_before: break artifacts = polys[is_artifact][["geometry"]].reset_index(drop=True) artifacts["id"] = artifacts.index if exclusion_mask is not None: _, art_idx = artifacts.sindex.query(exclusion_mask, predicate=predicate) artifacts = artifacts.drop(np.unique(art_idx)).copy() return artifacts, threshold
def ccss_special_case( primes: gpd.GeoSeries, conts_groups: gpd.GeoDataFrame, highest_hierarchy: gpd.GeoDataFrame, relevant_nodes: gpd.GeoDataFrame, split_points: list, ) -> np.ndarray: """If there are primes on both ``C``s, connect them. If there's prime on one ``C``, connect it to the other ``C``. If there are no primes, get midpoints on ``C``s and connect those. Parameters ---------- primes : geopandas.GeoSeries Nodes that are external to the artifacts, but need to be kept conts_groups : geopandas.GeoDataFrame All ``C`` labeled edges dissolved by connected component label. highest_hierarchy : geopandas.GeoDataFrame ``edges`` in the ``C`` continuity group – ``edges[~es_mask]``. relevant_targets : geopandas.GeoDataFrame The nodes forming the artifact. split_points : list Points to be used for topological corrections. Returns ------- numpy.ndarray New linestrings for reconnections. """ if primes.empty: # midpoints solution c0 = conts_groups.geometry.iloc[0] c1 = conts_groups.geometry.iloc[1] p0 = shapely.line_interpolate_point(c0, 0.5, normalized=True) p1 = shapely.line_interpolate_point(c1, 0.5, normalized=True) new_connections = [shapely.LineString([p0, p1])] split_points.append(p0) split_points.append(p1) # one prime, get shortest line to the other C elif primes.shape[0] == 1: no_prime_c = conts_groups[conts_groups.disjoint(primes.geometry.item())] sl = shapely.shortest_line(primes.geometry.item(), no_prime_c.geometry.item()) new_connections = [sl] split_points.append(shapely.get_point(sl, -1)) # multiple primes, connect two nearest on distinct Cs else: primes_on_c0 = primes[primes.intersects(conts_groups.geometry.iloc[0])] primes_on_c1 = primes[primes.intersects(conts_groups.geometry.iloc[1])] if primes_on_c0.empty: sl = shapely.shortest_line( conts_groups.geometry.iloc[0], primes_on_c1.union_all() ) new_connections = [sl] split_points.append(shapely.get_point(sl, 0)) elif primes_on_c1.empty: sl = shapely.shortest_line( primes_on_c0.union_all(), conts_groups.geometry.iloc[1] ) new_connections = [sl] split_points.append(shapely.get_point(sl, -1)) else: new_connections = [ shapely.shortest_line( primes_on_c0.union_all(), primes_on_c1.union_all() ) ] # some nodes may have ended unconnected. Find them and reconnect them. combined_linework = pd.concat( [ highest_hierarchy, gpd.GeoSeries(new_connections, crs=highest_hierarchy.crs), ] ).union_all() missing = relevant_nodes[relevant_nodes.disjoint(combined_linework)] new_connections.extend( shapely.shortest_line(missing.geometry, combined_linework).tolist() ) return np.array(new_connections) def filter_connections( primes: gpd.GeoSeries, relevant_targets: gpd.GeoDataFrame, conts_groups: gpd.GeoDataFrame, new_connections: np.ndarray, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """The skeleton returns connections to all the nodes. We need to keep only some, if there are multiple connections to a single C. We don't touch the other. Parameters ---------- primes : geopandas.GeoSeries Nodes that are external to the artifacts, but need to be kept relevant_targets : geopandas.GeoDataFrame The nodes forming the artifact. conts_groups : geopandas.GeoDataFrame All ``C`` labeled edges dissolved by connected component label. new_connections : numpy.ndarray New linestrings for reconnections. Returns ------- tuple[np.ndarray, np.ndarray, np.ndarray] - Updated ``new_connections`` - Connections intersecting ``C`` - Connections intersecting ``primes`` """ unwanted = [] keeping = [] conn_c = [] conn_p = [] if not primes.empty and not relevant_targets.empty: targets = pd.concat([primes.geometry, relevant_targets.geometry]) elif primes.empty: targets = relevant_targets.geometry else: targets = primes.geometry for c in conts_groups.geometry: int_mask = shapely.intersects(new_connections, c) connections_intersecting_c = new_connections[int_mask] conn_c.append(connections_intersecting_c) if len(connections_intersecting_c) > 1: prime_mask = shapely.intersects( connections_intersecting_c, targets.union_all() ) connections_intersecting_primes = connections_intersecting_c[prime_mask] conn_p.append(connections_intersecting_primes) # if there are multiple connections to a single C, drop them and keep only # the shortest one leading to prime if ( len(connections_intersecting_c) > 1 and len(connections_intersecting_primes) > 0 ): lens = shapely.length(connections_intersecting_primes) unwanted.append(connections_intersecting_c) keeping.append(connections_intersecting_primes[[np.argmin(lens)]]) # fork on two nodes on C elif len(connections_intersecting_c) > 1: lens = shapely.length(connections_intersecting_c) unwanted.append(connections_intersecting_c) if len(unwanted) > 0: wanted_mask = ~np.isin(new_connections, np.concatenate(unwanted)) if len(keeping) > 0: new_connections = np.concatenate( [new_connections[wanted_mask], np.concatenate(keeping)] ) else: new_connections = new_connections[wanted_mask] return ( new_connections, np.concatenate(conn_c) if len(conn_c) > 0 else np.array([]), np.concatenate(conn_p) if len(conn_p) > 0 else np.array([]), ) def avoid_forks( highest_hierarchy: gpd.GeoDataFrame, new_connections: np.ndarray, relevant_targets: gpd.GeoDataFrame, artifact: gpd.GeoDataFrame, split_points: list, ) -> np.ndarray: """Multiple ``C``s that are not intersecting. Avoid forks on the ends of a Voronoi skeleton. If one goes to a relevant node, keep it. If not, remove both and replace with a new shortest connection. Parameters ---------- highest_hierarchy : geopandas.GeoDataFrame ``edges`` in the ``C`` continuity group – ``edges[~es_mask]``. new_connections : numpy.ndarray New linestrings for reconnections. relevant_targets : geopandas.GeoDataFrame The nodes forming the artifact. artifact : geopandas.GeoDataFrame The polygonal representation of the artifact. split_points : list Points to be used for topological corrections. Returns ------- np.ndarray ``new_connections`` with either 1 prong of the fork if it connects to a relevant node or a new short connection if not. """ int_mask = shapely.intersects(new_connections, highest_hierarchy.union_all()) targets_mask = shapely.intersects(new_connections, relevant_targets.union_all()) new_connections = new_connections[(int_mask * targets_mask) | np.invert(int_mask)] cont_diss = highest_hierarchy.dissolve(highest_hierarchy.coins_group).geometry addition, splitters = snap_to_targets( new_connections, artifact.geometry, cont_diss[cont_diss.disjoint(shapely.union_all(new_connections))], ) split_points.extend(splitters) new_connections = np.concatenate([new_connections, addition]) return new_connections def reconnect( conts_groups: gpd.GeoDataFrame, new_connections: np.ndarray, artifact: gpd.GeoDataFrame, split_points: list, eps: float, ) -> np.ndarray: """Check for disconnected Cs and reconnect. Parameters ---------- conts_groups : geopandas.GeoDataFrame All ``C`` labeled edges dissolved by connected component label. new_connections : numpy.ndarray New linestrings for reconnections. artifact : geopandas.GeoDataFrame The polygonal representation of the artifact. split_points : list Points to be used for topological corrections. eps : float Small tolerance epsilon. Returns ------- np.ndarray ``new_connections`` with additional edges. """ new_connections_comps = graph.Graph.build_contiguity( gpd.GeoSeries(new_connections), rook=False ).component_labels new_components = gpd.GeoDataFrame(geometry=new_connections).dissolve( new_connections_comps ) additions = [] for c in conts_groups.geometry: mask = new_components.intersects(c.buffer(eps)) if not mask.all(): adds, splitters = snap_to_targets( new_components[~mask].geometry, artifact.geometry, [c] ) additions.extend(adds) split_points.extend(splitters) if len(additions) > 0: new_connections = np.concatenate([new_connections, additions]) return new_connections def remove_dangles( new_connections: np.ndarray, artifact: gpd.GeoDataFrame, eps: float = 1e-4, ) -> np.ndarray: """Dropping lines can introduce dangling edges. Remove those. Parameters ---------- new_connections : np.ndarray New linestrings for reconnections. artifact : geopandas.GeoDataFrame The polygonal representation of the artifact. eps : float = 1e-4 Small tolerance epsilon. Returns ------- np.ndarray ``new_connections`` without dangling edges. """ new_connections = shapely.line_merge(new_connections) pts0 = shapely.get_point(new_connections, 0) pts1 = shapely.get_point(new_connections, -1) pts = shapely.buffer(np.concatenate([pts0, pts1]), eps) all_idx, pts_idx = shapely.STRtree(pts).query( np.concatenate([pts, [artifact.geometry.boundary]]), predicate="intersects", ) data = [True] * len(all_idx) sp = sparse.coo_array((data, (pts_idx, all_idx)), shape=(len(pts), len(pts) + 1)) dangles = pts[sp.sum(axis=1) == 1] new_connections = new_connections[ shapely.disjoint(new_connections, shapely.union_all(dangles)) ] return new_connections def one_remaining( relevant_targets: gpd.GeoDataFrame, remaining_nodes: gpd.GeoDataFrame, artifact: gpd.GeoDataFrame, edges: gpd.GeoDataFrame, es_mask: pd.Series, max_segment_length: float | int, split_points: list, clip_limit: float | int, consolidation_tolerance: float | int, ) -> gpd.GeoDataFrame: """Resolve situations where there is 1 highest hierarchy and 1 remaining node. This function is called within ``artifacts.nx_gx()``: * first SUBRANCH of BRANCH 2: * relevant node targets exist * only one remaining node Parameters ---------- relevant_targets : geopandas.GeoDataFrame The nodes forming the artifact. remaining_nodes : geopandas.GeoDataFrame Nodes associated with the artifact that are not in group ``C``. artifact : geopandas.GeoDataFrame The polygonal representation of the artifact. edges : geopandas.GeoDataFrame Line geometries forming the artifact. es_mask : pandas.Series A mask for ``edges`` in the ``E`` and ``S`` continuity groups. max_segment_length : float | int Additional vertices will be added so that all line segments are no longer than this value. Must be greater than 0. split_points : list Points to be used for topological corrections. clip_limit : float | int Following generation of the Voronoi linework in ``geometry.voronoi_skeleton()``, we clip to fit inside the polygon. To ensure we get a space to make proper topological connections from the linework to the actual points on the edge of the polygon, we clip using a polygon with a negative buffer of ``clip_limit`` or the radius of maximum inscribed circle, whichever is smaller. consolidation_tolerance : float | int Tolerance passed to node consolidation within the ``geometry.voronoi_skeleton()``. Returns ------- geopandas.GeoDataFrame Newly resolved edges. The ``split_points`` parameter is also updated inplace. """ # find the nearest relevant target remaining_nearest, target_nearest = relevant_targets.sindex.nearest( remaining_nodes.geometry, return_all=False ) # create a new connection as the shortest straight line new_connections = shapely.shortest_line( remaining_nodes.geometry.iloc[remaining_nearest].values, relevant_targets.geometry.iloc[target_nearest].values, ) # check if the new connection is within the artifact connections_within = _is_within(new_connections, artifact.geometry, 0.1) # if it is not within, discard it and use the skeleton instead if not connections_within.all(): logger.debug("CONDITION _is_within False") new_connections, splitters = voronoi_skeleton( edges[es_mask].geometry, # use edges that are being dropped poly=artifact.geometry, snap_to=relevant_targets.geometry.iloc[target_nearest], # snap to nearest max_segment_length=max_segment_length, buffer=clip_limit, # TODO: figure out if we need this clip_limit=clip_limit, consolidation_tolerance=consolidation_tolerance, ) split_points.extend(splitters) return remove_dangles(new_connections, artifact) def multiple_remaining( edges: gpd.GeoDataFrame, es_mask: pd.Series, artifact: pd.DataFrame, max_segment_length: float | int, highest_hierarchy: gpd.GeoDataFrame, split_points: list, snap_to: gpd.GeoSeries, clip_limit: float | int, consolidation_tolerance: float | int, ) -> gpd.GeoDataFrame: """Resolve situations where there is 1 highest hierarchy and multiple remaining nodes. This function is called within ``artifacts.nx_gx()``: * second SUBRANCH of BRANCH 2: * relevant node targets exist * more than one remaining node * second SUBRANCH of BRANCH 3: * no target nodes - snapping to C * more than one remaining node Parameters ---------- edges : geopandas.GeoDataFrame Line geometries forming the artifact. es_mask : pandas.Series A mask for ``edges`` in the ``E`` and ``S`` continuity groups. artifact : geopandas.GeoDataFrame The polygonal representation of the artifact. max_segment_length : float | int Additional vertices will be added so that all line segments are no longer than this value. Must be greater than 0. highest_hierarchy : geopandas.GeoDataFrame ``edges`` in the ``C`` continuity group – ``edges[~es_mask]``. split_points : list Points to be used for topological corrections. snap_to : geopandas.GeoSeries Snap to these relevant node targets. clip_limit : float | int Following generation of the Voronoi linework in ``geometry.voronoi_skeleton()``, we clip to fit inside the polygon. To ensure we get a space to make proper topological connections from the linework to the actual points on the edge of the polygon, we clip using a polygon with a negative buffer of ``clip_limit`` or the radius of maximum inscribed circle, whichever is smaller. consolidation_tolerance : float | int Tolerance passed to node consolidation within the ``geometry.voronoi_skeleton()``. Returns ------- geopandas.GeoDataFrame Newly resolved edges. The ``split_points`` parameter is also updated inplace. """ # use skeleton to ensure all nodes are naturally connected new_connections, splitters = voronoi_skeleton( edges[es_mask].geometry, # use edges that are being dropped poly=artifact.geometry, snap_to=snap_to, # snap to relevant node targets max_segment_length=max_segment_length, secondary_snap_to=highest_hierarchy.geometry, clip_limit=clip_limit, consolidation_tolerance=consolidation_tolerance, ) split_points.extend(splitters) return remove_dangles(new_connections, artifact) def one_remaining_c( remaining_nodes: gpd.GeoDataFrame, highest_hierarchy: gpd.GeoDataFrame, artifact: gpd.GeoDataFrame, edges: gpd.GeoDataFrame, es_mask: pd.Series, max_segment_length: float | int, split_points: list, clip_limit: float | int, consolidation_tolerance: float | int = 10, ) -> np.ndarray: """Resolve situations where there is 1 highest hierarchy and 1 remaining node. This function is called within ``artifacts.nx_gx()``: * first SUBRANCH of BRANCH 3: * no target nodes - snapping to C * only one remaining node Parameters ---------- remaining_nodes : geopandas.GeoDataFrame Nodes associated with the artifact that are not in group ``C``. highest_hierarchy : geopandas.GeoDataFrame ``edges`` in the ``C`` continuity group – ``edges[~es_mask]``. artifact : geopandas.GeoDataFrame The polygonal representation of the artifact. edges : geopandas.GeoDataFrame Line geometries forming the artifact. es_mask : pandas.Series A mask for ``edges`` in the ``E`` and ``S`` continuity groups. max_segment_length : float | int = 1 Additional vertices will be added so that all line segments are no longer than this value. Must be greater than 0. split_points : list Points to be used for topological corrections. clip_limit : float | int Following generation of the Voronoi linework in ``geometry.voronoi_skeleton()``, we clip to fit inside the polygon. To ensure we get a space to make proper topological connections from the linework to the actual points on the edge of the polygon, we clip using a polygon with a negative buffer of ``clip_limit`` or the radius of maximum inscribed circle, whichever is smaller. consolidation_tolerance : float | int = 10 Tolerance passed to node consolidation within the ``geometry.voronoi_skeleton()``. Returns ------- numpy.ndarray Newly resolved edges. The ``split_points`` parameter is also updated inplace. """ # create a new connection as the shortest straight line to any C new_connections = shapely.shortest_line( remaining_nodes.geometry.values, highest_hierarchy.union_all(), ) splitters = shapely.get_point(new_connections, -1) # check if the new connection is within the artifact connections_within = _is_within(new_connections, artifact.geometry, 0.1) # if it is not within, discard it and use the skeleton instead if not connections_within.all(): logger.debug("CONDITION _is_within False") new_connections, splitters = voronoi_skeleton( edges[es_mask].geometry, # use edges that are being dropped poly=artifact.geometry, snap_to=highest_hierarchy.dissolve("coins_group").geometry, # snap to Cs max_segment_length=max_segment_length, clip_limit=clip_limit, consolidation_tolerance=consolidation_tolerance, ) split_points.extend(splitters) return new_connections def loop( edges: gpd.GeoDataFrame, es_mask: pd.Series, highest_hierarchy: gpd.GeoDataFrame, artifact: gpd.GeoDataFrame, max_segment_length: float | int, clip_limit: float | int, split_points: list, min_dangle_length: float | int, eps: float = 1e-4, ) -> list: """Replace an artifact formed by a loop with a single line formed by a subset of the Voronoi skeleton. Parameters ---------- edges : geopandas.GeoDataFrame Line geometries forming the artifact. es_mask : pandas.Series A mask for ``edges`` in the ``E`` and ``S`` continuity groups. highest_hierarchy : geopandas.GeoDataFrame ``edges`` in the ``C`` continuity group – ``edges[~es_mask]``. artifact : geopandas.GeoDataFrame The polygonal representation of the artifact. max_segment_length : float | int = 1 Additional vertices will be added so that all line segments are no longer than this value. Must be greater than 0. clip_limit : float | int Following generation of the Voronoi linework in ``geometry.voronoi_skeleton()``, we clip to fit inside the polygon. To ensure we get a space to make proper topological connections from the linework to the actual points on the edge of the polygon, we clip using a polygon with a negative buffer of ``clip_limit`` or the radius of maximum inscribed circle, whichever is smaller. split_points : list Points to be used for topological corrections. min_dangle_length : float | int The threshold for determining if linestrings are dangling slivers to be removed or not. eps : float = 1e-4 Small tolerance epsilon. Returns ------- to_add : list Linestring geometries to be added. """ # check if we need to add a deadend to represent the space to_add = [] dropped = edges[es_mask].geometry.item() segments = line_segments(dropped) # figure out if there's a snapping node # Get nodes on Cs bd_points = highest_hierarchy.boundary.explode() # Identify nodes on primes primes = bd_points[bd_points.duplicated()] if primes.empty: logger.debug("SNAP TO highest_hierarchy") snap_to = highest_hierarchy.dissolve("coins_group").geometry else: logger.debug("SNAP TO primes") snap_to = primes _possible_dangle, splitters = voronoi_skeleton( segments, # use edges that are being dropped poly=artifact.geometry, snap_to=snap_to, max_segment_length=max_segment_length, clip_limit=clip_limit, consolidation_tolerance=0, ) split_points.extend(splitters) possible_dangle = gpd.GeoDataFrame( geometry=_possible_dangle[shapely.disjoint(_possible_dangle, dropped)] ) if not possible_dangle.empty: comps = graph.Graph.build_contiguity( possible_dangle.difference(snap_to.union_all().buffer(eps)), rook=False ) if comps.n_components > 1: # NOTE: it is unclear to me what exactly should happen here. I believe that # there will be cases when we may want to keep multiple dangles. Now keeping # only one. logger.debug("LOOP components many") comp_labels = comps.component_labels.values longest_component = possible_dangle.dissolve( comps.component_labels ).length.idxmax() possible_dangle = possible_dangle[comp_labels == longest_component] dangle_coins = momepy.COINS( possible_dangle, flow_mode=True, ).stroke_gdf() candidate = dangle_coins.loc[dangle_coins.length.idxmax()].geometry if candidate.intersects(snap_to.union_all().buffer(eps)) and ( candidate.length > min_dangle_length ): logger.debug("LOOP intersects and length > min_dangle_length") if not primes.empty: points = [ shapely.get_point(candidate, 0), shapely.get_point(candidate, -1), ] distances = shapely.distance(points, highest_hierarchy.union_all()) if distances.max() > min_dangle_length: logger.debug("LOOP prime check passed") to_add.append(candidate) else: to_add.append(candidate) return to_add def n1_g1_identical( edges: gpd.GeoDataFrame, *, to_drop: list, to_add: list, geom: shapely.Polygon, max_segment_length: float | int = 1, min_dangle_length: float | int = 10, clip_limit: float | int = 2, ) -> None: """Determine lines within artifacts to drop & add when dealing with typologies of 1 node and 1 continuity group – ``{C, E, S}``. Parameters ---------- edges : geopandas.GeoDataFrame Line geometries forming the artifact. to_drop : list List collecting geometries to be dropped. to_add : list List collecting geometries to be added. geom : shapely.Polygon The polygonal representation of the artifact. max_segment_length : float | int = 1 Additional vertices will be added so that all line segments are no longer than this value. Must be greater than 0. min_dangle_length : float | int = 10 The threshold for determining if linestrings are dangling slivers to be removed or not. clip_limit : None | float | int = 2 Following generation of the Voronoi linework in ``geometry.voronoi_skeleton()``, we clip to fit inside the polygon. To ensure we get a space to make proper topological connections from the linework to the actual points on the edge of the polygon, we clip using a polygon with a negative buffer of ``clip_limit`` or the radius of maximum inscribed circle, whichever is smaller. Returns ------- None ``to_drop`` and ``to_add`` are updated inplace. """ to_drop.append(edges.index[0]) dropped = edges.geometry.item() segments = line_segments(dropped) snap_to = shapely.get_point(dropped, 0) possible_dangle, _ = voronoi_skeleton( segments, # use edges that are being dropped poly=geom, snap_to=[snap_to], max_segment_length=max_segment_length, clip_limit=clip_limit, ) disjoint = shapely.disjoint(possible_dangle, dropped) connecting = shapely.intersects(possible_dangle, snap_to) dangle = possible_dangle[disjoint | connecting] dangle_geoms = gpd.GeoSeries(shapely.line_merge(dangle)).explode() dangle_coins = momepy.COINS( dangle_geoms, flow_mode=True, angle_threshold=120 ).stroke_attribute() strokes = gpd.GeoDataFrame({"coin": dangle_coins}, geometry=dangle_geoms).dissolve( "coin" ) entry = strokes.geometry[strokes.intersects(snap_to)].item() if entry.length > min_dangle_length: to_add.append(entry) def nx_gx_identical( edges: gpd.GeoDataFrame, *, geom: shapely.Polygon, to_drop: list, to_add: list, nodes: gpd.GeoSeries, angle: float | int, max_segment_length: float | int = 1, clip_limit: float | int = 2, consolidation_tolerance: float | int = 10, eps: float = 1e-4, ) -> None: """Determine lines within artifacts to drop & add when dealing with typologies of more than 1 node and 1 or more identical continuity groups – ``{C, E, S}``. It is used when there are at least two nodes, one or more continuity groups but all edges have the same position in their respective continuity groups. For example, they all are ``E`` (ending), or ``S`` (single). It does not mean that all edges belong to a single continuity group. Here the "identical" refers to identical continuity groups – e.g. ``4CCC, ``5EE``, or ``3SSS``. Drop all of them and link the entry points to a centroid. Parameters ---------- edges : geopandas.GeoDataFrame Line geometries forming the artifact. geom : shapely.Polygon The polygonal representation of the artifact. to_drop : list List collecting geometries to be dropped. to_add : list List collecting geometries to be added. nodes : geopandas.GeoSeries Node geometries forming the artifact. angle : float | int Threshold for determination of line intersection angle acuteness. If the angle between two lines is too sharp, replace with a direct connection between the nodes. max_segment_length : float | int = 1 Additional vertices will be added so that all line segments are no longer than this value. Must be greater than 0. clip_limit : float | int = 2 Following generation of the Voronoi linework in ``geometry.voronoi_skeleton()``, we clip to fit inside the polygon. To ensure we get a space to make proper topological connections from the linework to the actual points on the edge of the polygon, we clip using a polygon with a negative buffer of ``clip_limit`` or the radius of maximum inscribed circle, whichever is smaller. consolidation_tolerance : float | int = 10 Tolerance passed to node consolidation within the ``geometry.voronoi_skeleton()``. eps : float = 1e-4 Small tolerance epsilon. Returns ------- None ``to_drop`` and ``to_add`` are updated inplace. """ centroid = geom.centroid relevant_nodes = nodes.geometry.iloc[ nodes.sindex.query(geom, predicate="dwithin", distance=eps) ] to_drop.extend(edges.index.to_list()) lines = shapely.shortest_line(relevant_nodes, centroid) if not _is_within(lines, geom).all(): logger.debug("NOT WITHIN replacing with skeleton") lines, _ = voronoi_skeleton( edges.geometry, # use edges that are being dropped poly=geom, max_segment_length=max_segment_length, clip_limit=clip_limit, snap_to=relevant_nodes, consolidation_tolerance=consolidation_tolerance, ) to_add.extend(weld_edges(lines, ignore=relevant_nodes.geometry)) # if the angle between two lines is too sharp, # replace with a direct connection between the nodes elif len(lines) == 2: if angle_between_two_lines(lines.iloc[0], lines.iloc[1]) < angle: logger.debug( "TWO LINES WITH SHARP ANGLE replacing with straight connection" ) to_add.append( shapely.LineString([relevant_nodes.iloc[0], relevant_nodes.iloc[1]]) ) else: to_add.extend(lines.tolist()) else: to_add.extend(lines.tolist()) def nx_gx( edges: gpd.GeoDataFrame, *, artifact: gpd.GeoDataFrame, to_drop: list, to_add: list, split_points: list, nodes: gpd.GeoSeries, max_segment_length: float | int = 1, clip_limit: float | int = 2, min_dangle_length: float | int = 10, consolidation_tolerance: float | int = 10, eps: float = 1e-4, ) -> None: """Determine lines within artifacts to drop & add when dealing with typologies of 2 or more nodes and 2 or more continuity groups – ``{C, E, S}``. Drop all but highest hierarchy. If there are unconnected nodes after drop, connect to nearest remaining edge or nearest intersection if there are more remaining edges. If there are three or more of the highest hierarchy, use the roundabout solution. If, after dropping, we end up with more than one connected component based on remaining edges, create a connection either as a shortest line between the two or using a skeleton if that is not inside or there are 3 or more components. The connection point should ideally be an existing nearest node with degree 4 or above. Parameters ---------- edges : geopandas.GeoDataFrame Line geometries forming the artifact. artifact : geopandas.GeoDataFrame The polygonal representation of the artifact. to_drop : list List collecting geometries to be dropped. to_add : list List collecting geometries to be added. split_points : list Points to be used for topological corrections. nodes : geopandas.GeoSeries Node geometries forming the artifact. clip_limit : float | int = 2 Following generation of the Voronoi linework in ``geometry.voronoi_skeleton()``, we clip to fit inside the polygon. To ensure we get a space to make proper topological connections from the linework to the actual points on the edge of the polygon, we clip using a polygon with a negative buffer of ``clip_limit`` or the radius of maximum inscribed circle, whichever is smaller. min_dangle_length : float | int = 10 The threshold for determining if linestrings are dangling slivers to be removed or not. consolidation_tolerance : float | int = 10 Tolerance passed to node consolidation within the ``geometry.voronoi_skeleton()``. eps : float = 1e-4 Small tolerance epsilon. Returns ------- None ``to_drop`` and ``to_add`` are updated inplace. """ # filter ends all_ends = edges[edges.coins_end] # determine if we have C present or not. Based on that, ensure that we # correctly pick-up the highest hierarchy and drop all lower if artifact.C > 0: logger.debug("HIGHEST C") else: logger.debug("HIGHEST E") singles = set() visited = [] for coins_count, group in zip( all_ends.coins_count, all_ends.coins_group, strict=True ): if (group not in visited) and ( coins_count == (edges.coins_group == group).sum() ): singles.add(group) visited.append(group) # re-filter ends all_ends = edges[edges.coins_group.isin(singles)] # define mask for E and S strokes es_mask = edges.coins_group.isin(all_ends.coins_group) # filter Cs highest_hierarchy = edges[~es_mask] # get nodes forming the artifact relevant_nodes = nodes.iloc[ nodes.sindex.query(artifact.geometry, predicate="dwithin", distance=eps) ] # filter nodes that lie on Cs (possibly primes) nodes_on_cont = relevant_nodes.index[ relevant_nodes.sindex.query( highest_hierarchy.geometry.union_all(), predicate="dwithin", distance=eps ) ] # get nodes that are not on Cs remaining_nodes = relevant_nodes.drop(nodes_on_cont) # get all remaining geometries and determine if they are all # connected or new connections need to happen remaining_geoms = pd.concat([remaining_nodes.geometry, highest_hierarchy.geometry]) heads_ix, tails_ix = remaining_geoms.sindex.query( remaining_geoms, predicate="intersects" ) n_comps = graph.Graph.from_arrays(heads_ix, tails_ix, 1).n_components # add list of existing edges to be removed from the network to_drop.extend(edges[es_mask].index.tolist()) # more than one component in the remaining geometries # (either highest_hierarchy or remaining nodes) if n_comps > 1: logger.debug("CONDITION n_comps > 1 True") # get nodes that are relevant snapping targets (degree 4+) relevant_targets = relevant_nodes.loc[nodes_on_cont].query("degree > 3") cont_comp_labels = graph.Graph.build_contiguity( highest_hierarchy, rook=False ).component_labels conts_groups = highest_hierarchy.dissolve(cont_comp_labels) # BRANCH 1 - multiple Cs if len(highest_hierarchy) > 1: logger.debug("CONDITION len(highest_hierarchy) > 1 True") # Get nodes on Cs bd_points = highest_hierarchy.boundary.explode() # Identify nodes on primes primes = bd_points[bd_points.duplicated()] # For CCSS we need a special case solution if the length of S is # significantly shorter than the length of C. In that case, Voronoi does not # create shortest connections but a line that is parallel to Cs. if ( highest_hierarchy.coins_group.nunique() == 2 and artifact.S == 2 and artifact.E == 0 and (highest_hierarchy.length.sum() > all_ends.length.sum()) ): logger.debug("CONDITION for CCSS special case True") # this also appends to split_points new_connections = ccss_special_case( primes, conts_groups, highest_hierarchy, relevant_nodes, split_points, ) else: logger.debug("CONDITION for CCSS special case False") # Get new connections via skeleton new_connections, splitters = voronoi_skeleton( edges.geometry, # use all edges as an input poly=artifact.geometry, snap_to=relevant_targets.geometry, # snap to nodes max_segment_length=max_segment_length, clip_limit=clip_limit, consolidation_tolerance=consolidation_tolerance, ) # If there are multiple components, limit_distance was too drastic and # clipped the skeleton in pieces. Re-do it with a tiny epsilon. # This may cause tiny sharp angles but at least it will be connected. if ( graph.Graph.build_contiguity( gpd.GeoSeries(new_connections), rook=False ).n_components > 1 ): # Get new connections via skeleton new_connections, splitters = voronoi_skeleton( edges.geometry, # use all edges as an input poly=artifact.geometry, snap_to=relevant_targets.geometry, # snap to nodes max_segment_length=max_segment_length, clip_limit=eps, consolidation_tolerance=consolidation_tolerance, ) split_points.extend(splitters) # The skeleton returns connections to all the nodes. We need to keep # only some, if there are multiple connections to a single C. We don't # touch the other. ( new_connections, connections_intersecting_c, connections_intersecting_primes, ) = filter_connections( primes, relevant_targets, conts_groups, new_connections ) # mutliple Cs that are not intersecting. Avoid forks on the ends of # Voronoi. If one goes to relevant node, keep it. If not, remove both # and replace with a new shortest connection if ( len(connections_intersecting_c) > 1 and len(connections_intersecting_primes) == 0 ): # this also appends to split_points new_connections = avoid_forks( highest_hierarchy, new_connections, relevant_targets, artifact, split_points, ) # check for disconnected Cs and reconnect new_connections = reconnect( conts_groups, new_connections, artifact, split_points, eps ) # the drop above could've introduced a dangling edges. Remove those. new_connections = remove_dangles(new_connections, artifact) # BRANCH 2 - relevant node targets exist elif relevant_targets.shape[0] > 0: logger.debug("CONDITION relevant_targets.shape[0] > 0 True") # SUB BRANCH - only one remaining node if remaining_nodes.shape[0] < 2: logger.debug("CONDITION remaining_nodes.shape[0] < 2 True") # this also appends to split_points new_connections = one_remaining( relevant_targets, remaining_nodes, artifact, edges, es_mask, max_segment_length, split_points, clip_limit, consolidation_tolerance, ) # SUB BRANCH - more than one remaining node else: logger.debug("CONDITION remaining_nodes.shape[0] < 2 False") # this also appends to split_points new_connections = multiple_remaining( edges, es_mask, artifact, max_segment_length, highest_hierarchy, split_points, relevant_targets.geometry, clip_limit, consolidation_tolerance, ) # BRANCH 3 - no target nodes - snapping to C else: logger.debug("CONDITION relevant_targets.shape[0] > 0 False, snapping to C") # SUB BRANCH - only one remaining node if remaining_nodes.shape[0] < 2: logger.debug("CONDITION remaining_nodes.shape[0] < 2 True") # this also appends to split_points new_connections = one_remaining_c( remaining_nodes, highest_hierarchy, artifact, edges, es_mask, max_segment_length, split_points, clip_limit, consolidation_tolerance, ) # SUB BRANCH - more than one remaining node else: logger.debug("CONDITION remaining_nodes.shape[0] < 2 False") # this also appends to split_points new_connections = multiple_remaining( edges, es_mask, artifact, max_segment_length, highest_hierarchy, split_points, highest_hierarchy.dissolve("coins_group").geometry, clip_limit, consolidation_tolerance, ) new_connections = reconnect( conts_groups, new_connections, artifact, split_points, eps ) # add new connections to a list of features to be added to the network to_add.extend(weld_edges(new_connections, ignore=remaining_nodes.geometry)) # there may be loops or half-loops we are dropping. If they are protruding enough # we want to replace them by a dead-end representing their space elif artifact.C == 1 and (artifact.E + artifact.S) == 1: logger.debug("CONDITION is_loop True") sl = shapely.shortest_line( relevant_nodes.geometry.iloc[0], relevant_nodes.geometry.iloc[1] ) if ( (artifact.interstitial_nodes == 0) and _is_within(sl, artifact.geometry) and (sl.length * 1.1) < highest_hierarchy.length.sum() ): logger.debug("DEVIATION replacing with shortest") to_add.append(sl) to_drop.append(highest_hierarchy.index[0]) else: dangles = loop( edges, es_mask, highest_hierarchy, artifact, max_segment_length, clip_limit, split_points, min_dangle_length, ) if len(dangles) > 0: to_add.extend(weld_edges(dangles)) elif artifact.node_count == 2 and artifact.stroke_count == 2: logger.debug("CONDITION is_sausage True") sl = shapely.shortest_line( relevant_nodes.geometry.iloc[0], relevant_nodes.geometry.iloc[1] ) if ( _is_within(sl, artifact.geometry) and (sl.length * 1.1) < highest_hierarchy.length.sum() ): logger.debug("DEVIATION replacing with shortest") to_add.append(sl) to_drop.append(highest_hierarchy.index[0]) else: logger.debug("DROP ONLY") def nx_gx_cluster( edges: gpd.GeoDataFrame, *, cluster_geom: gpd.GeoSeries, nodes: gpd.GeoSeries, to_drop: list, to_add: list, max_segment_length: float | int = 1, min_dangle_length: float | int = 20, consolidation_tolerance: float | int = 10, eps: float = 1e-4, ) -> None: """Determine lines within artifacts to drop & add when dealing with typologies of *clusters* of 2 or more nodes and 2 or more continuity groups – ``{C, E, S}``. Here :math:`n`-artifact cluster are treated as follows: * merge all artifact polygons; * drop all lines fully within the merged polygon; * skeletonize and keep only skeletonized edges and connecting nodes Parameters ---------- edges : geopandas.GeoDataFrame Line geometries forming the artifact. cluster_geom : geopandas.GeoSeries The polygonal representation of the artifact cluster. nodes : geopandas.GeoSeries Node geometries forming the artifact. to_drop : list List collecting geometries to be dropped. to_add : list List collecting geometries to be added. max_segment_length : float | int = 1 Additional vertices will be added so that all line segments are no longer than this value. Must be greater than 0. min_dangle_length : float | int = 20 The threshold for determining if linestrings are dangling slivers to be removed or not. consolidation_tolerance : float | int = 10 Tolerance passed to node consolidation within the ``geometry.voronoi_skeleton()``. eps : float = 1e-4 Small tolerance epsilon. Returns ------- None ``to_drop`` and ``to_add`` are updated inplace. """ lines_to_drop = edges.iloc[ edges.sindex.query(cluster_geom.buffer(eps), predicate="contains") ].index.to_list() connection = edges.drop(lines_to_drop).geometry # non-planar lines are not connections connection = connection[~connection.crosses(cluster_geom)] # if there's nothing to drop due to planarity, there's nothing to replace and # we can stop here if not lines_to_drop or connection.empty: return # get edges on boundary edges_on_boundary = edges.intersection(cluster_geom.boundary.buffer(eps)).explode( ignore_index=True ) edges_on_boundary = edges_on_boundary[ (~edges_on_boundary.is_empty) & (edges_on_boundary.geom_type.str.contains("Line")) & (edges_on_boundary.length > 100 * eps) ] # keeping only (multi)linestrings of length>>eps edges_on_boundary = edges_on_boundary.to_frame("geometry") # find nodes ON the cluster polygon boundary (to be partially kept) nodes_on_boundary = nodes.iloc[ nodes.sindex.query(cluster_geom.boundary.buffer(eps), predicate="intersects") ].copy() # find edges that cross but do not lie within edges_crossing = edges.iloc[ edges.sindex.query(cluster_geom.buffer(eps), predicate="crosses") ] # the nodes to keep are those that intersect with these crossing edges nodes_to_keep = nodes_on_boundary.iloc[ nodes_on_boundary.sindex.query( edges_crossing.union_all(), predicate="intersects" ) ].copy() # merging lines between nodes to keep: buffered_nodes_to_keep = nodes_to_keep.buffer(eps).union_all() # make queen contiguity graph on MINUSBUFFERED outline road segments, # and copy component labels into edges_on_boundary gdf edges_on_boundary = edges_on_boundary.explode(ignore_index=True) queen = graph.Graph.build_fuzzy_contiguity( edges_on_boundary.difference(buffered_nodes_to_keep) ) if len(connection) > 1: skeletonization_input = edges_on_boundary.dissolve( by=queen.component_labels ).geometry else: # a loop that has only a single entry point - use individual segments merged_edges = edges_on_boundary.dissolve().line_merge().item() if merged_edges.geom_type != "LineString": # this is a fallback for corner cases. It should result in the nearly the # same skeleton in the end but ensures we work with a single-part geometry merged_edges = shapely.concave_hull(merged_edges).exterior skeletonization_input = line_segments(merged_edges) # skeletonize skel, _ = voronoi_skeleton( skeletonization_input, cluster_geom, snap_to=False, max_segment_length=max_segment_length, clip_limit=1e-4, consolidation_tolerance=consolidation_tolerance, ) # if we used only segments, we need to remove dangles if len(connection) == 1: connection = connection.item() _skel = gpd.GeoSeries(skel) _skel = _skel[ _skel.disjoint(edges_on_boundary.union_all()) | _skel.intersects(connection) ] welded = gpd.GeoSeries(weld_edges(_skel)) skel = welded[ ~( ((welded.length < min_dangle_length) & (is_dangle(welded))) & welded.disjoint(connection) ) ] lines_to_add = list(skel) to_drop.extend(lines_to_drop) ### RECONNECTING NON-PLANAR INTRUDING EDGES TO SKELETON # considering only edges that are kept edges_kept = edges.copy().drop(lines_to_drop, axis=0) to_reconnect = [] skel_merged = shapely.line_merge(skel) skel_merged = gpd.GeoSeries(skel_merged, crs=edges.crs) skel_nodes = list(shapely.get_point(skel_merged, 0)) skel_nodes.extend(list(shapely.get_point(skel_merged, -1))) skel_nodes = gpd.GeoSeries(skel_nodes, crs=edges.crs).union_all() # loop through endpoints of kept edges... for i in [0, -1]: # do the same for "end" points endpoints = gpd.GeoSeries( shapely.get_point(edges_kept.geometry, i), crs=edges.crs ) # which are contained by artifact... endpoints = endpoints.iloc[ endpoints.sindex.query(cluster_geom, predicate="contains") ] # ...but NOT on skeleton endpoints = endpoints.difference(skel_merged.union_all()) to_reconnect.extend(endpoints.geometry.drop_duplicates()) # to_reconnect now contains a list of points which need to be connected to the # nearest skel node: from those nodes, we need to add shapely shortest lines between # those edges_kept.endpoints and non_planar_connections = shapely.shortest_line(skel_nodes, to_reconnect) # keep only those that are within conn_within = _is_within(non_planar_connections, cluster_geom) if not all(conn_within): warnings.warn( "Could not create a connection as it would lead outside of the artifact.", UserWarning, stacklevel=2, ) non_planar_connections = non_planar_connections[conn_within] ### extend our list "to_add" with this artifact clusters' contribution: lines_to_add.extend(non_planar_connections) to_add.extend(lines_to_add) def is_dangle(edgelines: gpd.GeoSeries) -> bool: """Determine if an edge is dangling or not.""" def _sum_intersects(loc: int) -> int: """Sum the number of places linestrings intersect each other.""" point = shapely.get_point(edgelines, loc) ix, edge_ix1 = edgelines.sindex.query(point, predicate="intersects") data = ([True] * len(ix), (ix, edge_ix1)) return sparse.coo_array(data, shape=shape, dtype=np.bool_).sum(axis=1) shape = (len(edgelines), len(edgelines)) first_sum = _sum_intersects(0) last_sum = _sum_intersects(-1) return (first_sum == 1) | (last_sum == 1) def line_segments(line: shapely.LineString) -> np.ndarray: """Explode a linestring into constituent pairwise coordinates.""" xys = shapely.get_coordinates(line) return shapely.linestrings( np.column_stack((xys[:-1], xys[1:])).reshape(xys.shape[0] - 1, 2, 2) )