import logging
import typing
import warnings
import geopandas as gpd
import numpy as np
import pandas as pd
import shapely
from geopandas.testing import assert_geoseries_equal
from libpysal import graph
from scipy import sparse
from .artifacts import (
get_artifacts,
n1_g1_identical,
nx_gx,
nx_gx_cluster,
nx_gx_identical,
)
from .continuity import continuity, get_stroke_info
from .nodes import (
_first_non_null,
_nodes_degrees_from_edges,
_nodes_from_edges,
_status,
consolidate_nodes,
fix_topology,
induce_nodes,
remove_interstitial_nodes,
split,
)
DEBUGGING = False
logger = logging.getLogger(__name__)
def _check_input_crs(streets: gpd.GeoDataFrame, exclusion_mask: gpd.GeoSeries):
"""Ensure input data is in appropriate Coordinate reference systems."""
streets_crs = streets.crs
streets_has_crs = streets_crs is not None
if not streets_has_crs:
warnings.warn(
(
"The input `streets` data does not have an assigned "
"coordinate reference system. Assuming a projected CRS in meters."
),
category=UserWarning,
stacklevel=2,
)
else:
if not streets_crs.is_projected:
raise ValueError(
"The input `streets` data are not in a projected "
"coordinate reference system. Reproject and rerun."
)
if streets_crs.axis_info[0].unit_name != "metre":
warnings.warn(
(
"The input `streets` data coordinate reference system is projected "
"but not in meters. All `neatnet` defaults assume meters. "
"Either reproject and rerun or proceed with caution."
),
category=UserWarning,
stacklevel=2,
)
if exclusion_mask is not None and exclusion_mask.crs != streets_crs:
raise ValueError(
"The input `streets` and `exclusion_mask` data are in "
"different coordinate reference systems. Reproject and rerun."
)
def _link_nodes_artifacts(
step: str,
streets: gpd.GeoDataFrame,
artifacts: gpd.GeoDataFrame,
eps: None | float,
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
"""Helper to prep nodes & artifacts when simplifying singletons & pairs."""
# Get nodes from the network
nodes = _nodes_degrees_from_edges(streets.geometry)
if step == "singletons":
node_geom = nodes.geometry
sindex_kwargs = {"predicate": "dwithin", "distance": eps}
else:
node_geom = nodes.buffer(0.1)
sindex_kwargs = {"predicate": "intersects"}
# Link nodes to artifacts
node_idx, artifact_idx = artifacts.sindex.query(node_geom, **sindex_kwargs)
intersects = sparse.coo_array(
([True] * len(node_idx), (node_idx, artifact_idx)),
shape=(len(nodes), len(artifacts)),
dtype=np.bool_,
)
# Compute number of nodes per artifact
artifacts["node_count"] = intersects.sum(axis=0)
return nodes, artifacts
def _classify_strokes(
artifacts: gpd.GeoDataFrame, streets: gpd.GeoDataFrame
) -> gpd.GeoDataFrame:
"""Classify artifacts with ``{C,E,S}`` typology."""
strokes, c_, e_, s_ = get_stroke_info(artifacts, streets)
artifacts["stroke_count"] = strokes
artifacts["C"] = c_
artifacts["E"] = e_
artifacts["S"] = s_
return artifacts
def _identify_non_planar(
artifacts: gpd.GeoDataFrame, streets: gpd.GeoDataFrame
) -> gpd.GeoDataFrame:
"""Filter artifacts caused by non-planar intersections."""
# Note from within `neatify_singletons()`
# TODO: This is not perfect.
# TODO: Some 3CC artifacts were non-planar but not captured here.
artifacts["non_planar"] = artifacts["stroke_count"] > artifacts["node_count"]
a_idx, r_idx = streets.sindex.query(
artifacts.geometry.boundary, predicate="overlaps"
)
artifacts.iloc[np.unique(a_idx), artifacts.columns.get_loc("non_planar")] = True
return artifacts
[docs]
def neatify_singletons(
artifacts: gpd.GeoDataFrame,
streets: gpd.GeoDataFrame,
*,
max_segment_length: float | int = 1,
compute_coins: bool = True,
angle_threshold: float = 120,
min_dangle_length: float | int = 10,
eps: float = 1e-4,
clip_limit: float | int = 2,
simplification_factor: float | int = 2,
consolidation_tolerance: float | int = 10,
) -> gpd.GeoDataFrame:
"""Simplification of singleton face artifacts – the first simplification step in
the procedure detailed in ``simplify.neatify_loop()``.
This process extracts nodes from network edges before computing and labeling
face artifacts with a ``{C, E, S}`` typology through ``momepy.COINS`` via the
constituent street geometries.
Next, each artifact is iterated over and constituent line geometries are either
dropped or added in the following order of typologies:
1. 1 node and 1 continuity group
2. more than 1 node and 1 or more identical continuity groups
3. 2 or more nodes and 2 or more continuity groups
Non-planar geometries are ignored.
Parameters
----------
artifacts : geopandas.GeoDataFrame
Face artifact polygons.
streets : geopandas.GeoDataFrame
Preprocessed street network data.
max_segment_length : float | int = 1
Additional nodes will be added so that all line segments
are no longer than this value. Must be greater than 0.
Used in multiple internal geometric operations.
compute_coins : bool = True
Flag for computing and labeling artifacts with a ``{C, E, S}`` typology through
:class:`momepy.COINS` via the constituent street geometries.
angle_threshold : float = 120
See the ``angle_threshold`` keyword argument in :class:`momepy.COINS`.
min_dangle_length : float | int = 10
The threshold for determining if linestrings are dangling slivers to be
removed or not.
eps : float = 1e-4
Tolerance epsilon used in multiple internal geometric operations.
clip_limit : float | int = 2
Following generation of the Voronoi linework, we clip to fit inside the
polygon. To ensure we get a space to make proper topological connections
from the linework to the actual points on the edge of the polygon, we clip
using a polygon with a negative buffer of ``clip_limit`` or the radius of
maximum inscribed circle, whichever is smaller.
simplification_factor : float | int = 2
The factor by which singles, pairs, and clusters are simplified. The
``max_segment_length`` is multiplied by this factor to get the
simplification epsilon.
consolidation_tolerance : float | int = 10
Tolerance passed to node consolidation when generating Voronoi skeletons.
Returns
-------
geopandas.GeoDataFrame
The street network line data following the singleton procedure.
"""
# Extract network nodes and relate to artifacts
nodes, artifacts = _link_nodes_artifacts("singletons", streets, artifacts, eps)
# Compute number of stroke groups per artifact
if compute_coins:
streets, _ = continuity(streets, angle_threshold=angle_threshold)
artifacts = _classify_strokes(artifacts, streets)
# Filter artifacts caused by non-planar intersections
artifacts = _identify_non_planar(artifacts, streets)
# Count intersititial nodes (primes)
_prime_count = artifacts["node_count"] - artifacts[["C", "E", "S"]].sum(axis=1)
artifacts["interstitial_nodes"] = _prime_count
# Define the type label
ces_type = []
for x in artifacts[["node_count", "C", "E", "S"]].itertuples():
ces_type.append(f"{x.node_count}{'C' * x.C}{'E' * x.E}{'S' * x.S}")
artifacts["ces_type"] = ces_type
# Collect changes
to_drop: list[int] = []
to_add: list[int] = []
split_points: list[shapely.Point] = []
# Isolate planar artifacts
planar = artifacts[~artifacts["non_planar"]].copy()
planar["buffered"] = planar.buffer(eps)
if artifacts["non_planar"].any():
logger.debug(f"IGNORING {artifacts.non_planar.sum()} non planar artifacts")
# Iterate over each singleton planar artifact and simplify based on typology
for artifact in planar.itertuples():
n_nodes = artifact.node_count
n_strokes = artifact.stroke_count
cestype = artifact.ces_type
# Get edges relevant for an artifact
edges = streets.iloc[
streets.sindex.query(artifact.buffered, predicate="covers")
]
# Dispatch by typology
try:
# 1 node and 1 continuity group
if (n_nodes == 1) and (n_strokes == 1):
logger.debug("FUNCTION n1_g1_identical")
n1_g1_identical(
edges,
to_drop=to_drop,
to_add=to_add,
geom=artifact.geometry,
max_segment_length=max_segment_length,
clip_limit=clip_limit,
)
# More than 1 node and 1 or more identical continuity groups
elif (n_nodes > 1) and (len(set(cestype[1:])) == 1):
logger.debug("FUNCTION nx_gx_identical")
nx_gx_identical(
edges,
geom=artifact.geometry,
to_add=to_add,
to_drop=to_drop,
nodes=nodes,
angle=75,
max_segment_length=max_segment_length,
clip_limit=clip_limit,
consolidation_tolerance=consolidation_tolerance,
)
# 2 or more nodes and 2 or more continuity groups
elif (n_nodes > 1) and (len(cestype) > 2):
logger.debug("FUNCTION nx_gx")
nx_gx(
edges,
artifact=artifact,
to_drop=to_drop,
to_add=to_add,
split_points=split_points,
nodes=nodes,
max_segment_length=max_segment_length,
clip_limit=clip_limit,
min_dangle_length=min_dangle_length,
consolidation_tolerance=consolidation_tolerance,
)
else:
logger.debug("NON PLANAR")
except Exception as e:
if DEBUGGING:
raise e
warnings.warn(
f"An error occured at location {artifact.geometry.centroid}. "
f"The artifact has not been simplified. The original message:\n{e}",
UserWarning,
stacklevel=2,
)
cleaned_streets = streets.drop(to_drop)
# split lines on new nodes
cleaned_streets = split(split_points, streets.drop(to_drop), streets.crs)
if to_add:
# Create new streets with fixed geometry.
# Note: ``to_add`` and ``to_drop`` lists shall be global and
# this step should happen only once, not for every artifact
_add_merged = gpd.GeoSeries(to_add).line_merge()
new = gpd.GeoDataFrame(geometry=_add_merged, crs=streets.crs).explode()
new = new[~new.normalize().duplicated()].copy()
new["_status"] = "new"
new.geometry = new.simplify(max_segment_length * simplification_factor)
new_streets = pd.concat([cleaned_streets, new], ignore_index=True)
agg: dict[str, str | typing.Callable] = {"_status": _status}
for c in cleaned_streets.columns.drop(cleaned_streets.active_geometry_name):
if c != "_status":
# returning first non null as we may be joining new with existing
# and want attributes from existing
agg[c] = _first_non_null
non_empties = new_streets[~(new_streets.is_empty | new_streets.geometry.isna())]
new_streets = remove_interstitial_nodes(non_empties, aggfunc=agg)
final = new_streets
else:
final = cleaned_streets
if "coins_group" in final.columns:
final = final.drop(
columns=[c for c in streets.columns if c.startswith("coins_")]
)
return final
[docs]
def neatify_pairs(
artifacts: gpd.GeoDataFrame,
streets: gpd.GeoDataFrame,
*,
max_segment_length: float | int = 1,
min_dangle_length: float | int = 20,
clip_limit: float | int = 2,
simplification_factor: float | int = 2,
consolidation_tolerance: float | int = 10,
) -> gpd.GeoDataFrame:
"""Simplification of pairs of face artifacts – the second simplification step in
the procedure detailed in ``simplify.neatify_loop()``.
This process extracts nodes from network edges before identifying non-planarity
and cluster information.
If paired artifacts are present we further classify them as grouped by
first vs. last instance of duplicated component label, and whether
or not to be simplified with the clustered process.
Finally, simplification is performed based on the following order of typologies:
1. Singletons – merged pairs & first instance (w/o COINS)
2. Singletons – Second instance – w/ COINS
3. Clusters
Parameters
----------
artifacts : geopandas.GeoDataFrame
Face artifact polygons.
streets : geopandas.GeoDataFrame
Preprocessed street network data.
max_segment_length : float | int = 1
Additional vertices will be added so that all line segments
are no longer than this value. Must be greater than 0.
Used in multiple internal geometric operations.
min_dangle_length : float | int = 20
The threshold for determining if linestrings are dangling slivers to be
removed or not.
clip_limit : float | int = 2
Following generation of the Voronoi linework, we clip to fit inside the
polygon. To ensure we get a space to make proper topological connections
from the linework to the actual points on the edge of the polygon, we clip
using a polygon with a negative buffer of ``clip_limit`` or the radius of
maximum inscribed circle, whichever is smaller.
simplification_factor : float | int = 2
The factor by which singles, pairs, and clusters are simplified. The
``max_segment_length`` is multiplied by this factor to get the
simplification epsilon.
consolidation_tolerance : float | int = 10
Tolerance passed to node consolidation when generating Voronoi skeletons.
Returns
-------
geopandas.GeoDataFrame
The street network line data following the pairs procedure.
"""
# Extract network nodes and relate to artifacts
nodes, artifacts = _link_nodes_artifacts("pairs", streets, artifacts, None)
# Compute number of stroke groups per artifact
streets, _ = continuity(streets)
artifacts = _classify_strokes(artifacts, streets)
# Filter artifacts caused by non-planar intersections
artifacts = _identify_non_planar(artifacts, streets)
# Identify non-planar clusters
_id_np = lambda x: sum(artifacts.loc[artifacts["comp"] == x.comp]["non_planar"]) # noqa: E731
artifacts["non_planar_cluster"] = artifacts.apply(_id_np, axis=1)
# Subset non-planar clusters and planar artifacts
np_clusters = artifacts[artifacts.non_planar_cluster > 0]
artifacts_planar = artifacts[artifacts.non_planar_cluster == 0]
# Isolate planar artifacts
_planar_grouped = artifacts_planar.groupby("comp")[artifacts_planar.columns]
_solutions = _planar_grouped.apply(get_solution, streets=streets)
artifacts_w_info = artifacts.merge(_solutions, left_on="comp", right_index=True)
# Isolate non-planar clusters of value 2 – e.g., artifact under highway
_np_clust_2 = np_clusters["non_planar_cluster"] == 2
artifacts_under_np = np_clusters[_np_clust_2].dissolve("comp", as_index=False)
# Determine typology dispatch if artifacts are present
if not artifacts_w_info.empty:
agg = {
"coins_group": "first",
"coins_end": lambda x: x.any(),
"_status": _status,
}
for c in streets.columns.drop(
[streets.active_geometry_name, "coins_count"], errors="ignore"
):
if c not in agg:
agg[c] = "first"
sol_drop = "solution == 'drop_interline'"
sol_iter = "solution == 'iterate'"
# Determine artifacts and street edges to drop
_to_drop = artifacts_w_info.drop_duplicates("comp").query(sol_drop).drop_id
_drop_streets = streets.drop(_to_drop.dropna().values)
# Re-run node cleaning on subset of fresh street edges
streets_cleaned = remove_interstitial_nodes(
_drop_streets,
aggfunc=agg,
)
# Isolate drops to create merged pairs
merged_pairs = artifacts_w_info.query(sol_drop).dissolve("comp", as_index=False)
# Sort artifacts by their node count low-to-high
sorted_node_count = artifacts_w_info.sort_values("node_count", ascending=False)
# Isolate artifacts to process as singletons – first instance
_1st = sorted_node_count.query(sol_iter).drop_duplicates("comp", keep="first")
_planar_clusters = np_clusters[~np_clusters["non_planar"]]
_1st = pd.concat([_1st, _planar_clusters], ignore_index=True)
# Isolate artifacts to process as singletons – last instance
_2nd = sorted_node_count.query(sol_iter).drop_duplicates("comp", keep="last")
# Isolate artifacts to process as clusters
for_skeleton = artifacts_w_info.query("solution == 'skeleton'")
# Otherwise instantiate artifact containers as empty
else:
merged_pairs = pd.DataFrame()
_1st = pd.DataFrame()
_2nd = pd.DataFrame()
for_skeleton = pd.DataFrame()
streets_cleaned = streets
# Generate counts of COINs groups for edges
coins_count = (
streets_cleaned.groupby("coins_group", as_index=False)
.geometry.count()
.rename(columns={"geometry": "coins_count"})
)
streets_cleaned = streets_cleaned.merge(coins_count, on="coins_group", how="left")
# Add under non-planars to cluster dispatcher
if not artifacts_under_np.empty:
for_skeleton = pd.concat([for_skeleton, artifacts_under_np])
# Dispatch singleton simplifier
if not merged_pairs.empty or not _1st.empty:
# Merged pairs & first instance – w/o COINS
streets_cleaned = neatify_singletons(
pd.concat([merged_pairs, _1st]),
streets_cleaned,
max_segment_length=max_segment_length,
clip_limit=clip_limit,
compute_coins=False,
min_dangle_length=min_dangle_length,
simplification_factor=simplification_factor,
consolidation_tolerance=consolidation_tolerance,
)
# Second instance – w/ COINS
if not _2nd.empty:
streets_cleaned = neatify_singletons(
_2nd,
streets_cleaned,
max_segment_length=max_segment_length,
clip_limit=clip_limit,
compute_coins=True,
min_dangle_length=min_dangle_length,
simplification_factor=simplification_factor,
consolidation_tolerance=consolidation_tolerance,
)
# Dispatch cluster simplifier
if not for_skeleton.empty:
streets_cleaned = neatify_clusters(
for_skeleton,
streets_cleaned,
max_segment_length=max_segment_length,
simplification_factor=simplification_factor,
min_dangle_length=min_dangle_length,
consolidation_tolerance=consolidation_tolerance,
)
return streets_cleaned
[docs]
def neatify_clusters(
artifacts: gpd.GeoDataFrame,
streets: gpd.GeoDataFrame,
*,
max_segment_length: float | int = 1,
eps: float = 1e-4,
simplification_factor: float | int = 2,
min_dangle_length: float | int = 20,
consolidation_tolerance: float | int = 10,
) -> gpd.GeoDataFrame:
"""Simplification of clusters of face artifacts – the third simplification step in
the procedure detailed in ``simplify.neatify_loop()``.
This process extracts nodes from network edges before iterating over each
cluster artifact and performing simplification.
Parameters
----------
artifacts : geopandas.GeoDataFrame
Face artifact polygons.
streets : geopandas.GeoDataFrame
Preprocessed street network data.
max_segment_length : float | int = 1
Additional vertices will be added so that all line segments
are no longer than this value. Must be greater than 0.
Used in multiple internal geometric operations.
eps : float = 1e-4
Tolerance epsilon used in multiple internal geometric operations.
simplification_factor : float | int = 2
The factor by which singles, pairs, and clusters are simplified. The
``max_segment_length`` is multiplied by this factor to get the
simplification epsilon.
min_dangle_length : float | int = 20
The threshold for determining if linestrings are dangling slivers to be
removed or not.
consolidation_tolerance : float | int = 10
Tolerance passed to node consolidation when generating Voronoi skeletons.
Returns
-------
geopandas.GeoDataFrame
The street network line data following the clusters procedure.
"""
# Get nodes from the network
nodes = gpd.GeoSeries(_nodes_from_edges(streets.geometry))
# Collect changes
to_drop: list[int] = []
to_add: list[int] = []
for _, artifact in artifacts.groupby("comp"):
# Get artifact cluster polygon
cluster_geom = artifact.union_all()
# Get edges relevant for an artifact
edges = streets.iloc[
streets.sindex.query(cluster_geom, predicate="intersects")
].copy()
# Clusters of 2 or more nodes and 2 or more continuity groups
nx_gx_cluster(
edges=edges,
cluster_geom=cluster_geom,
nodes=nodes,
to_drop=to_drop,
to_add=to_add,
eps=eps,
max_segment_length=max_segment_length,
min_dangle_length=min_dangle_length,
consolidation_tolerance=consolidation_tolerance,
)
cleaned_streets = streets.drop(to_drop)
# Create new street with fixed geometry.
# Note: ``to_add`` and ``to_drop`` lists shall be global and
# this step should happen only once, not for every artifact
new = gpd.GeoDataFrame(geometry=to_add, crs=streets.crs)
new["_status"] = "new"
new["geometry"] = new.line_merge().simplify(
max_segment_length * simplification_factor
)
new_streets = pd.concat([cleaned_streets, new], ignore_index=True).explode()
agg: dict[str, str | typing.Callable] = {"_status": _status}
for c in new_streets.columns.drop(new_streets.active_geometry_name):
if c != "_status":
agg[c] = "first"
new_streets = remove_interstitial_nodes(
new_streets[~new_streets.is_empty], aggfunc=agg
).drop_duplicates("geometry")
return new_streets
def get_type(edges: gpd.GeoDataFrame, shared_edge: int) -> str:
"""Classify artifact edges according to the ``{C, E, S}``
schema when considering solutions for pairs of artifacts.
Parameters
----------
edges : geopandas.GeoDataFrame
Artifact edges in consideration.
shared_edge : int
The index location of the shared edge of the pair.
Returns
-------
str
Classification for an edge in ``{C, E, S}``.
"""
if ( # Roundabout special case
edges["coins_group"].nunique() == 1
and edges.shape[0] == edges["coins_count"].iloc[0]
):
return "S"
all_ends = edges[edges["coins_end"]]
mains = edges[~edges["coins_group"].isin(all_ends["coins_group"])]
shared = edges.loc[shared_edge]
if shared_edge in mains.index:
return "C"
if shared["coins_count"] == (edges["coins_group"] == shared["coins_group"]).sum():
return "S"
return "E"
def get_solution(group: gpd.GeoDataFrame, streets: gpd.GeoDataFrame) -> pd.Series:
"""Determine the solution for paired planar artifacts.
Parameters
----------
group : geopandas.GeoDataFrame
Dissolved group of connected planar artifacts.
streets : geopandas.GeoDataFrame
Street network data.
Returns
-------
pandas.Series
The determined solution and edge to drop.
"""
def _relate(loc: int) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
"""Isolate intersecting & covering street geometries."""
_geom = group.geometry.iloc[loc]
_streets = streets.iloc[streets.sindex.query(_geom, predicate="intersects")]
_covers = _streets.iloc[_streets.sindex.query(_geom, predicate="covers")]
return _streets, _covers
cluster_geom = group.union_all()
streets_a, covers_a = _relate(0)
streets_b, covers_b = _relate(1)
# Find the street segment that is contained within the cluster geometry
shared = streets.index[streets.sindex.query(cluster_geom, predicate="contains")]
if shared.empty or covers_a.empty or covers_b.empty:
return pd.Series({"solution": "non_planar", "drop_id": None})
if len(shared) > 1:
return pd.Series({"solution": "skeleton", "drop_id": shared})
shared = shared.item()
if (np.invert(streets_b.index.isin(covers_a.index)).sum() == 1) or (
np.invert(streets_a.index.isin(covers_b.index)).sum() == 1
):
return pd.Series({"solution": "drop_interline", "drop_id": shared})
seen_by_a = get_type(covers_a, shared)
seen_by_b = get_type(covers_b, shared)
if seen_by_a == "C" and seen_by_b == "C":
return pd.Series({"solution": "iterate", "drop_id": shared})
if seen_by_a == seen_by_b:
return pd.Series({"solution": "drop_interline", "drop_id": shared})
return pd.Series({"solution": "skeleton", "drop_id": shared})
[docs]
def neatify(
streets: gpd.GeoDataFrame,
*,
exclusion_mask: None | gpd.GeoSeries = None,
predicate: str = "intersects",
max_segment_length: float | int = 1,
min_dangle_length: float | int = 20,
clip_limit: float | int = 2,
simplification_factor: float | int = 2,
consolidation_tolerance: float | int = 10,
artifact_threshold: None | float | int = None,
artifact_threshold_fallback: float | int = 7,
area_threshold_blocks: float | int = 1e5,
isoareal_threshold_blocks: float | int = 0.5,
area_threshold_circles: float | int = 5e4,
isoareal_threshold_circles_enclosed: float | int = 0.75,
isoperimetric_threshold_circles_touching: float | int = 0.9,
angle_threshold: float = 120,
eps: float = 1e-4,
n_loops: int = 2,
) -> gpd.GeoDataFrame:
"""Top-level workflow for simplifying street networks.
Follows the Adaptive Continuity-Preserving Simplification algorithm proposed by
:cite:`fleischmann2026Adaptive`.
The input raw street network
data, which must be in a projected coordinate reference system and is expected to be
in meters, is first preprocessed (topological corrections & node consolidation)
before two iterations of artifact detection and simplification.
Each iteration of the simplification procedure which includes (1.) the removal
of false nodes; (2.) face artifact classification; and (3.) the line-based
simplification of face artifacts in the order of single artifacts, pairs of
artifacts, clusters of artifacts.
For further information on face artifact detection and extraction
see :cite:`fleischmann_shape-based_2024`.
This algorithm is designed for use with only "street" network geometries as input.
While passing in other types of pathing (e.g., sidewalks, canals) will likely yield
valid geometric results, that behavior is untested.
Parameters
----------
streets : geopandas.GeoDataFrame
Raw street network data. This input *must* be in a projected coordinate
reference system and *should* be in meters. All defaults arguments assume
meters. The internal algorithm is designed for use with street network
geometries, not other types of pathing (e.g., sidewalks, canals), which
should be filtered out.
exclusion_mask : None | geopandas.GeoSeries = None
Geometries used to determine face artifacts to exclude from returned output.
predicate : str = 'intersects'
The spatial predicate used to exclude face artifacts from returned output.
max_segment_length : float | int = 1
Additional vertices will be added so that all line segments
are no longer than this value. Must be greater than 0.
Used in multiple internal geometric operations.
min_dangle_length : float | int
The threshold for determining if linestrings are dangling slivers to be
removed or not.
clip_limit : float | int = 2
Following generation of the Voronoi linework, we clip to fit inside the
polygon. To ensure we get a space to make proper topological connections
from the linework to the actual points on the edge of the polygon, we clip
using a polygon with a negative buffer of ``clip_limit`` or the radius of
maximum inscribed circle, whichever is smaller.
simplification_factor : float | int = 2
The factor by which singles, pairs, and clusters are simplified. The
``max_segment_length`` is multiplied by this factor to get the
simplification epsilon.
consolidation_tolerance : float | int = 10
Tolerance passed to node consolidation when generating Voronoi skeletons.
artifact_threshold : None | float | int = None
When ``artifact_threshold`` is passed, the computed value from
``momepy.FaceArtifacts.threshold`` is not used in favor of the
given value. This is useful for small networks where artifact
detection may fail or become unreliable.
artifact_threshold_fallback : float | int = 7
If artifact threshold detection fails, this value is used as a fallback.
area_threshold_blocks : float | int = 1e5
This is the first threshold for detecting block-like artifacts whose
Face Artifact Index (see :cite:`fleischmann_shape-based_2024`) is above
the value passed in ``artifact_threshold``.
If a polygon has an area below ``area_threshold_blocks``, *and*
is of elongated shape (see also ``isoareal_threshold_blocks``),
*and* touches at least one polygon that has already been classified as artifact,
then it will be classified as an artifact.
isoareal_threshold_blocks : float | int = 0.5
This is the second threshold for detecting block-like artifacts whose
Face Artifact Index (see :cite:`fleischmann_shape-based_2024`) is above the
value passed in ``artifact_threshold``. If a polygon has an isoareal quotient
below ``isoareal_threshold_blocks`` (see ``esda.shape.isoareal_quotient``),
i.e., if it has an elongated shape; *and* it has a sufficiently small area
(see also ``area_threshold_blocks``), *and* if it touches at least one
polygon that has already been detected as an artifact,
then it will be classified as an artifact.
area_threshold_circles : float | int = 5e4
This is the first threshold for detecting circle-like artifacts whose
Face Artifact Index (see :cite:`fleischmann_shape-based_2024`) is above the
value passed in ``artifact_threshold``. If a polygon has an area below
``area_threshold_circles``, *and* one of the following 2 cases is given:
(a) the polygon is touched, but not enclosed by polygons already classified
as artifacts, *and* with an isoperimetric quotient
(see ``esda.shape.isoperimetric_quotient``)
above ``isoperimetric_threshold_circles_touching``, i.e., if its shape
is close to circular; or (b) the polygon is fully enclosed by polygons
already classified as artifacts, *and* with an isoareal quotient
above
``isoareal_threshold_circles_enclosed``, i.e., if its shape is
close to circular; then it will be classified as an artifact.
isoareal_threshold_circles_enclosed : float | int = 0.75
This is the second threshold for detecting circle-like artifacts whose
Face Artifact Index (see :cite:`fleischmann_shape-based_2024`) is above the
value passed in ``artifact_threshold``. If a polygon has a sufficiently small
area (see also ``area_threshold_circles``), *and* the polygon is
fully enclosed by polygons already classified as artifacts,
*and* its isoareal quotient (see ``esda.shape.isoareal_quotient``)
is above the value passed to ``isoareal_threshold_circles_enclosed``,
i.e., if its shape is close to circular;
then it will be classified as an artifact.
isoperimetric_threshold_circles_touching : float | int = 0.9
This is the third threshold for detecting circle-like artifacts whose
Face Artifact Index (see :cite:`fleischmann_shape-based_2024`)
is above the value passed in ``artifact_threshold``.
If a polygon has a sufficiently small area
(see also ``area_threshold_circles``), *and* the polygon is touched
by at least one polygon already classified as artifact,
*and* its isoperimetric quotient (see ``esda.shape.isoperimetric_quotient``)
is above the value passed to ``isoperimetric_threshold_circles_touching``,
i.e., if its shape is close to circular;
then it will be classified as an artifact.
angle_threshold : float = 120
See the ``angle_threshold`` keyword argument in :class:`momepy.COINS`.
eps : float = 1e-4
Tolerance epsilon used in multiple internal geometric operations.
n_loops : int = 2
Number of loops through the simplification pipeline. It is recommended to stick
to the default value and increase it only very conservatively.
Returns
-------
geopandas.GeoDataFrame
The final, simplified street network line data.
Notes
-----
As is noted above, the input network data must be in a projected coordinate
reference system and is expected to be in meters. However, it may be possible to
work with network data projected in feet if all default arguments are adjusted.
"""
_check_input_crs(streets, exclusion_mask)
# Record state of initial input to compare with results from
# -- topo fix & node consolidation if there are no artifacts
raw_streets = streets.copy()
# NOTE: this keeps attributes but resets index
streets = fix_topology(streets, eps=eps)
# Merge nearby nodes (up to double of distance used in skeleton).
streets = consolidate_nodes(streets, tolerance=max_segment_length * 2.1)
# Identify artifacts
artifacts, threshold = get_artifacts(
streets,
exclusion_mask=exclusion_mask,
predicate=predicate,
threshold=artifact_threshold,
threshold_fallback=artifact_threshold_fallback,
area_threshold_blocks=area_threshold_blocks,
isoareal_threshold_blocks=isoareal_threshold_blocks,
area_threshold_circles=area_threshold_circles,
isoareal_threshold_circles_enclosed=isoareal_threshold_circles_enclosed,
isoperimetric_threshold_circles_touching=isoperimetric_threshold_circles_touching,
)
# If no artifacts return either the raw streets or topologically-fixed streets
if artifacts.empty:
try:
assert_geoseries_equal(streets.geometry, raw_streets.geometry)
warnings.warn(
(
"No topological corrections performed on input `streets` "
"and no artifacts were detected. Returning as is."
),
UserWarning,
stacklevel=2,
)
return raw_streets
except AssertionError:
warnings.warn(
(
"Topological corrections performed on input `streets` "
"but no artifacts were detected. Returning the results of "
"`fix_topology()` and `consolidate_nodes()`."
),
UserWarning,
stacklevel=2,
)
return streets
# Loop 1
new_streets = neatify_loop(
streets,
artifacts,
max_segment_length=max_segment_length,
min_dangle_length=min_dangle_length,
clip_limit=clip_limit,
simplification_factor=simplification_factor,
consolidation_tolerance=consolidation_tolerance,
eps=eps,
angle_threshold=angle_threshold,
)
# This is potentially fixing some minor erroneous edges coming from Voronoi
new_streets = induce_nodes(new_streets, eps=eps)
new_streets = new_streets[~new_streets.geometry.normalize().duplicated()].copy()
for _ in range(2, n_loops + 1):
# Identify artifacts based on the first loop network
artifacts, _ = get_artifacts(
new_streets,
threshold=threshold,
threshold_fallback=artifact_threshold_fallback,
area_threshold_blocks=area_threshold_blocks,
isoareal_threshold_blocks=isoareal_threshold_blocks,
area_threshold_circles=area_threshold_circles,
isoareal_threshold_circles_enclosed=isoareal_threshold_circles_enclosed,
isoperimetric_threshold_circles_touching=isoperimetric_threshold_circles_touching,
exclusion_mask=exclusion_mask,
predicate=predicate,
)
if artifacts.empty:
return new_streets.reset_index(drop=True)
new_streets = neatify_loop(
new_streets,
artifacts,
max_segment_length=max_segment_length,
min_dangle_length=min_dangle_length,
clip_limit=clip_limit,
simplification_factor=simplification_factor,
consolidation_tolerance=consolidation_tolerance,
eps=eps,
angle_threshold=angle_threshold,
)
# This is potentially fixing some minor erroneous edges coming from Voronoi
new_streets = induce_nodes(new_streets, eps=eps)
new_streets = new_streets[~new_streets.geometry.normalize().duplicated()].copy()
return new_streets
[docs]
def neatify_loop(
streets: gpd.GeoDataFrame,
artifacts: gpd.GeoDataFrame,
*,
max_segment_length: float | int = 1,
min_dangle_length: float | int = 20,
clip_limit: float | int = 2,
simplification_factor: float | int = 2,
consolidation_tolerance: float | int = 10,
angle_threshold: float = 120,
eps: float = 1e-4,
) -> gpd.GeoDataFrame:
"""Perform an iteration of the simplification procedure which includes:
1. Removal of false nodes
2. Artifact classification
3. Simplifying artifacts:
- Single artifacts
- Pairs of artifacts
- Clusters of artifacts
Parameters
----------
streets : geopandas.GeoDataFrame
Raw street network data.
artifacts : geopandas.GeoDataFrame
Face artifact polygons.
max_segment_length : float | int = 1
Additional vertices will be added so that all line segments
are no longer than this value. Must be greater than 0.
Used in multiple internal geometric operations.
min_dangle_length : float | int = 20
The threshold for determining if linestrings are dangling slivers to be
removed or not.
clip_limit : float | int = 2
Following generation of the Voronoi linework, we clip to fit inside the
polygon. To ensure we get a space to make proper topological connections
from the linework to the actual points on the edge of the polygon, we clip
using a polygon with a negative buffer of ``clip_limit`` or the radius of
maximum inscribed circle, whichever is smaller.
simplification_factor : float | int = 2
The factor by which singles, pairs, and clusters are simplified. The
``max_segment_length`` is multiplied by this factor to get the
simplification epsilon.
consolidation_tolerance : float | int = 10
Tolerance passed to node consolidation when generating Voronoi skeletons.
angle_threshold : float = 120
See the ``angle_threshold`` keyword argument in :class:`momepy.COINS`.
eps : float = 1e-4
Tolerance epsilon used in multiple internal geometric operations.
Returns
-------
geopandas.GeoDataFrame
The street network line data following 1 iteration of simplification.
"""
# Remove edges fully within the artifact (dangles).
_, r_idx = streets.sindex.query(artifacts.geometry, predicate="contains")
# Dropping may lead to new false nodes – drop those
streets = remove_interstitial_nodes(streets.drop(streets.index[r_idx]))
# Filter singleton artifacts
rook = graph.Graph.build_contiguity(artifacts, rook=True)
# Keep only those artifacts which occur as isolates,
# e.g. artifacts that are not part of a larger intersection
singles = artifacts.loc[artifacts.index.intersection(rook.isolates)].copy()
# Filter doubles
artifacts["comp"] = rook.component_labels
counts = artifacts["comp"].value_counts()
doubles = artifacts.loc[artifacts["comp"].isin(counts[counts == 2].index)].copy()
# Filter clusters
clusters = artifacts.loc[artifacts["comp"].isin(counts[counts > 2].index)].copy()
if not singles.empty:
# NOTE: this drops attributes
streets = neatify_singletons(
singles,
streets,
max_segment_length=max_segment_length,
simplification_factor=simplification_factor,
consolidation_tolerance=consolidation_tolerance,
angle_threshold=angle_threshold,
)
if not doubles.empty:
streets = neatify_pairs(
doubles,
streets,
max_segment_length=max_segment_length,
min_dangle_length=min_dangle_length,
clip_limit=clip_limit,
simplification_factor=simplification_factor,
consolidation_tolerance=consolidation_tolerance,
)
if not clusters.empty:
streets = neatify_clusters(
clusters,
streets,
max_segment_length=max_segment_length,
simplification_factor=simplification_factor,
eps=eps,
min_dangle_length=min_dangle_length,
consolidation_tolerance=consolidation_tolerance,
)
if "coins_group" in streets.columns:
streets = streets.drop(
columns=[c for c in streets.columns if c.startswith("coins_")]
)
return streets