import typing
import geopandas as gpd
import momepy
import networkx as nx
import numpy as np
import pandas as pd
import pyproj
import shapely
from scipy import sparse
def split(
split_points: list | np.ndarray | gpd.GeoSeries,
cleaned_roads: gpd.GeoDataFrame,
crs: str | pyproj.CRS,
eps: float = 1e-4,
) -> gpd.GeoSeries | gpd.GeoDataFrame:
"""Split lines on new nodes.
Parameters
----------
split_points : list | numpy.ndarray
Points to split the ``cleaned_roads``.
cleaned_roads : geopandas.GeoDataFrame
Line geometries to be split with ``split_points``.
crs : str | pyproj.CRS
Anything accepted by ``pyproj.CRS``.
eps : float = 1e-4
Tolerance epsilon for point snapping.
Returns
-------
geopandas.GeoSeries | geopandas.GeoDataFrame
Resultant split line geometries.
"""
split_points = gpd.GeoSeries(split_points, crs=crs)
for split in split_points.drop_duplicates():
_, ix = cleaned_roads.sindex.nearest(split, max_distance=eps)
row = cleaned_roads.iloc[ix]
edge = row.geometry
if edge.shape[0] == 1:
row = row.iloc[0]
lines_split = _snap_n_split(edge.item(), split, eps)
if lines_split.shape[0] > 1:
gdf_split = gpd.GeoDataFrame(geometry=lines_split, crs=crs)
for c in row.index.drop(["geometry", "_status"], errors="ignore"):
gdf_split[c] = row[c]
gdf_split["_status"] = "changed"
cleaned_roads = pd.concat(
[cleaned_roads.drop(edge.index[0]), gdf_split],
ignore_index=True,
)
elif edge.shape[0] > 1:
to_be_dropped = []
to_be_added = []
for i, e in edge.items():
lines_split = _snap_n_split(e, split, eps)
if lines_split.shape[0] > 1:
to_be_dropped.append(i)
to_be_added.append(lines_split)
if to_be_added:
gdf_split = pd.DataFrame(
{"geometry": to_be_added, "_orig": to_be_dropped}
).explode("geometry")
gdf_split = pd.concat(
[
gdf_split.drop(columns="_orig").reset_index(drop=True),
row.drop(columns="geometry")
.loc[gdf_split["_orig"]]
.reset_index(drop=True),
],
axis=1,
)
gdf_split["_status"] = "changed"
cleaned_roads = pd.concat(
[cleaned_roads.drop(to_be_dropped), gdf_split],
ignore_index=True,
)
cleaned_roads = gpd.GeoDataFrame(
cleaned_roads, geometry="geometry", crs=crs
)
return cleaned_roads.reset_index(drop=True)
def _snap_n_split(e: shapely.LineString, s: shapely.Point, tol: float) -> np.ndarray:
"""Snap point to edge and return lines to split."""
snapped = shapely.snap(e, s, tolerance=tol)
_lines_split = shapely.get_parts(shapely.ops.split(snapped, s))
return _lines_split[~shapely.is_empty(_lines_split)]
def _status(x: pd.Series) -> str:
"""Determine the status of edge line(s)."""
if len(x) == 1:
return x.iloc[0]
return "changed"
def get_components(
edgelines: list | np.ndarray | gpd.GeoSeries,
ignore: None | gpd.GeoSeries = None,
) -> np.ndarray:
"""Associate edges with connected component labels and return.
Parameters
----------
edgelines : list | np.ndarray | gpd.GeoSeries
Collection of line objects.
ignore : None | gpd.GeoSeries = None
Nodes to ignore when labeling components.
Returns
-------
np.ndarray
Edge connected component labels.
Notes
-----
See [https://github.com/uscuni/neatnet/issues/56] for detailed explanation of
output.
"""
edgelines = np.array(edgelines)
start_points = shapely.get_point(edgelines, 0)
end_points = shapely.get_point(edgelines, -1)
points = shapely.points(
np.unique(
shapely.get_coordinates(np.concatenate([start_points, end_points])), axis=0
)
)
if ignore is not None:
mask = np.isin(points, ignore)
points = points[~mask]
# query LineString geometry to identify points intersecting 2 geometries
inp, res = shapely.STRtree(shapely.boundary(edgelines)).query(
points, predicate="intersects"
)
unique, counts = np.unique(inp, return_counts=True)
mask = np.isin(inp, unique[counts == 2])
merge_res = res[mask]
merge_inp = inp[mask]
closed = np.arange(len(edgelines))[shapely.is_closed(edgelines)]
mask = np.isin(merge_res, closed) | np.isin(merge_inp, closed)
merge_res = merge_res[~mask]
merge_inp = merge_inp[~mask]
g = nx.Graph(list(zip((merge_inp * -1) - 1, merge_res, strict=True)))
components = {
i: {v for v in k if v > -1} for i, k in enumerate(nx.connected_components(g))
}
component_labels = {value: key for key in components for value in components[key]}
labels = pd.Series(component_labels, index=range(len(edgelines)))
max_label = len(edgelines) - 1 if pd.isna(labels.max()) else int(labels.max())
filling = pd.Series(range(max_label + 1, max_label + len(edgelines) + 1))
labels = labels.fillna(filling)
return labels.values
def weld_edges(
edgelines: list | np.ndarray | gpd.GeoSeries,
ignore: None | gpd.GeoSeries = None,
) -> list | np.ndarray | gpd.GeoSeries:
"""Combine lines sharing an endpoint (if only 2 lines share that point).
Lightweight version of ``remove_false_nodes()``.
Parameters
----------
edgelines : list | np.ndarray | gpd.GeoSeries
Collection of line objects.
ignore : None | gpd.GeoSeries = None
Nodes to ignore when welding components.
Returns
-------
list | np.ndarray | gpd.GeoSeries
Resultant welded ``edgelines`` if more than 1 passed in, otherwise
the original ``edgelines`` object.
"""
if len(edgelines) < 2:
return edgelines
labels = get_components(edgelines, ignore=ignore)
return (
gpd.GeoSeries(edgelines)
.groupby(labels)
.agg(lambda x: shapely.line_merge(shapely.GeometryCollection(x.values)))
).tolist()
[docs]
def induce_nodes(roads: gpd.GeoDataFrame, eps: float = 1e-4) -> gpd.GeoDataFrame:
"""Adding potentially missing nodes on intersections of individual LineString
endpoints with the remaining network. The idea behind is that if a line ends
on an intersection with another, there should be a node on both of them.
Parameters
----------
roads : geopandas.GeoDataFrame
Input LineString geometries.
eps : float = 1e-4
Tolerance epsilon for point snapping passed into ``nodes.split()``.
Returns
-------
geopandas.GeoDataFrame
Updated ``roads`` with (potentially) added nodes.
"""
sindex_kws = {"predicate": "dwithin", "distance": 1e-4}
# identify degree mismatch cases
nodes_degree_mismatch = _identify_degree_mismatch(roads, sindex_kws)
# ensure loop topology cases:
# - loop nodes intersecting non-loops
# - loop nodes intersecting other loops
nodes_off_loops, nodes_on_loops = _makes_loop_contact(roads, sindex_kws)
# all nodes to induce
nodes_to_induce = pd.concat(
[nodes_degree_mismatch, nodes_off_loops, nodes_on_loops]
)
return split(nodes_to_induce.geometry, roads, roads.crs, eps=eps)
def _identify_degree_mismatch(
edges: gpd.GeoDataFrame, sindex_kws: dict
) -> gpd.GeoSeries:
"""Helper to identify difference of observed vs. expected node degree."""
nodes = momepy.nx_to_gdf(momepy.node_degree(momepy.gdf_to_nx(edges)), lines=False)
nix, eix = edges.sindex.query(nodes.geometry, **sindex_kws)
coo_vals = ([True] * len(nix), (nix, eix))
coo_shape = (len(nodes), len(edges))
intersects = sparse.coo_array(coo_vals, shape=coo_shape, dtype=np.bool_)
nodes["expected_degree"] = intersects.sum(axis=1)
return nodes[nodes["degree"] != nodes["expected_degree"]].geometry
def _makes_loop_contact(
edges: gpd.GeoDataFrame, sindex_kws: dict
) -> tuple[gpd.GeoSeries, gpd.GeoSeries]:
"""Helper to identify:
1. loop nodes intersecting non-loops
2. loop nodes intersecting other loops
"""
loops, not_loops = _loops_and_non_loops(edges)
loop_points = shapely.points(loops.get_coordinates().values)
loop_gdf = gpd.GeoDataFrame(geometry=loop_points, crs=edges.crs)
loop_point_geoms = loop_gdf.geometry
# loop points intersecting non-loops
nodes_from_non_loops_ix, _ = not_loops.sindex.query(loop_point_geoms, **sindex_kws)
# loop points intersecting other loops
nodes_from_loops_ix, _ = loops.sindex.query(loop_point_geoms, **sindex_kws)
loop_x_loop, n_loop_x_loop = np.unique(nodes_from_loops_ix, return_counts=True)
nodes_from_loops_ix = loop_x_loop[n_loop_x_loop > 1]
# tease out both varieties
nodes_non_loops = loop_gdf.loc[nodes_from_non_loops_ix]
nodes_loops = loop_gdf.loc[nodes_from_loops_ix]
return nodes_non_loops.geometry, nodes_loops.geometry
def _loops_and_non_loops(
edges: gpd.GeoDataFrame,
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
"""Bifurcate edge gdf into loops and non-loops."""
loop_mask = edges.is_ring
not_loops = edges[~loop_mask]
loops = edges[loop_mask]
return loops, not_loops
[docs]
def remove_false_nodes(
gdf: gpd.GeoSeries | gpd.GeoDataFrame, aggfunc: str | dict = "first", **kwargs
) -> gpd.GeoSeries | gpd.GeoDataFrame:
"""Reimplementation of ``momepy.remove_false_nodes()`` that preserves attributes.
Parameters
----------
gdf : geopandas.GeoSeries | geopandas.GeoDataFrame
Input edgelines process. If any edges are ``MultiLineString`` they
will be exploded into constituent ``LineString`` components.
aggfunc : str | dict = 'first'
Aggregate function for processing non-spatial component.
**kwargs
Keyword arguments for ``aggfunc``.
Returns
-------
geopandas.GeoSeries | geopandas.GeoDataFrame
The original input ``gdf`` if only 1 edgeline, otherwise the processed
edgeline without interstitial nodes.
Notes
-----
Any 3D geometries are (potentially) downcast in loops.
"""
def merge_geometries(block: gpd.GeoSeries) -> shapely.LineString:
"""Helper in processing the spatial component."""
return shapely.line_merge(shapely.GeometryCollection(block.values))
if len(gdf) < 2:
return gdf
if isinstance(gdf, gpd.GeoSeries):
gdf = gdf.to_frame("geometry")
gdf = gdf.explode(ignore_index=True)
labels = get_components(gdf.geometry)
# Process non-spatial component
data = gdf.drop(labels=gdf.geometry.name, axis=1)
aggregated_data = data.groupby(by=labels).agg(aggfunc, **kwargs)
aggregated_data.columns = aggregated_data.columns.to_flat_index()
# Process spatial component
g = gdf.groupby(group_keys=False, by=labels)[gdf.geometry.name].agg(
merge_geometries
)
aggregated_geometry = gpd.GeoDataFrame(g, geometry=gdf.geometry.name, crs=gdf.crs)
# Recombine
aggregated = aggregated_geometry.join(aggregated_data)
# Derive nodes
nodes = momepy.nx_to_gdf(
momepy.node_degree(momepy.gdf_to_nx(aggregated[[aggregated.geometry.name]])),
lines=False,
)
# Bifurcate edges into loops and non-loops
loops, not_loops = _loops_and_non_loops(aggregated)
# Ensure:
# - all loops have exactly 1 endpoint; and
# - that endpoint shares a node with an intersecting line
fixed_loops = []
fixed_index = []
node_ix, loop_ix = loops.sindex.query(nodes.geometry, predicate="intersects")
for ix in np.unique(loop_ix):
loop_geom = loops.geometry.iloc[ix]
target_nodes = nodes.geometry.iloc[node_ix[loop_ix == ix]]
if len(target_nodes) == 2:
new_sequence = _rotate_loop_coords(loop_geom, not_loops)
fixed_loops.append(shapely.LineString(new_sequence))
fixed_index.append(ix)
aggregated.loc[loops.index[fixed_index], aggregated.geometry.name] = fixed_loops
return aggregated.reset_index(drop=True)
def _rotate_loop_coords(
loop_geom: shapely.LineString, not_loops: gpd.GeoDataFrame
) -> np.ndarray:
"""Rotate loop node coordinates if needed to ensure topology."""
loop_coords = shapely.get_coordinates(loop_geom)
loop_points = gpd.GeoDataFrame(geometry=shapely.points(loop_coords))
loop_points_ix, _ = not_loops.sindex.query(
loop_points.geometry, predicate="dwithin", distance=1e-4
)
new_start = loop_points.loc[loop_points_ix].geometry.mode().get_coordinates().values
_coords_match = (loop_coords == new_start).all(axis=1)
new_start_idx = np.where(_coords_match)[0].squeeze()
rolled_coords = np.roll(loop_coords[:-1], -new_start_idx, axis=0)
new_sequence = np.append(rolled_coords, rolled_coords[[0]], axis=0)
return new_sequence
[docs]
def fix_topology(
roads: gpd.GeoDataFrame,
eps: float = 1e-4,
**kwargs,
) -> gpd.GeoDataFrame:
"""Fix road network topology. This ensures correct topology of the network by:
1. Adding potentially missing nodes...
on intersections of individual LineString endpoints
with the remaining network. The idea behind is that
if a line ends on an intersection with another, there
should be a node on both of them.
2. Removing nodes of degree 2...
that have no meaning in the network used within our framework.
3. Removing duplicated geometries (irrespective of orientation).
Parameters
----------
roads : geopandas.GeoDataFrame
Input LineString geometries.
eps : float = 1e-4
Tolerance epsilon for point snapping passed into ``nodes.split()``.
**kwargs : dict
Key word arguments passed into ``remove_false_nodes()``.
Returns
-------
gpd.GeoDataFrame
The input roads that now have fixed topology and are ready
to proceed through the simplification algorithm.
"""
roads = roads[~roads.geometry.normalize().duplicated()].copy()
roads_w_nodes = induce_nodes(roads, eps=eps)
return remove_false_nodes(roads_w_nodes, **kwargs)
[docs]
def consolidate_nodes(
gdf: gpd.GeoDataFrame,
tolerance: float = 2.0,
preserve_ends: bool = False,
) -> gpd.GeoSeries:
"""Return geometry with consolidated nodes.
Replace clusters of nodes with a single node (weighted centroid
of a cluster) and snap linestring geometry to it. Cluster is
defined using hierarchical clustering with average linkage
on coordinates cut at a cophenetic distance equal to ``tolerance``.
The use of hierachical clustering avoids the chaining effect of a sequence
of intersections within ``tolerance`` from each other that would happen with
DBSCAN and similar solutions.
Parameters
----------
gdf : geopandas.GeoDataFrame
GeoDataFrame with LineStrings (usually representing street network).
tolerance : float = 2.0
The maximum distance between two nodes for one to be considered
as in the neighborhood of the other. Nodes within tolerance are
considered a part of a single cluster and will be consolidated.
preserve_ends : bool = False
If ``True``, nodes of a degree 1 will be excluded from the consolidation.
Returns
-------
geopandas.GeoSeries
Updated input ``gdf`` of LineStrings with consolidated nodes.
"""
from scipy.cluster import hierarchy
if isinstance(gdf, gpd.GeoSeries):
gdf = gdf.to_frame("geometry")
elif isinstance(gdf, np.ndarray):
gdf = gpd.GeoDataFrame(geometry=gdf)
nodes = momepy.nx_to_gdf(momepy.node_degree(momepy.gdf_to_nx(gdf)), lines=False)
if preserve_ends:
# keep at least one meter of original geometry around each end
ends = nodes[nodes["degree"] == 1].buffer(1)
nodes = nodes[nodes["degree"] > 1].copy()
# if all we have are ends, return the original
# - this is generally when called from within ``geometry._consolidate()``
if nodes.shape[0] < 2:
gdf["_status"] = "original"
return gdf
# get clusters of nodes which should be consolidated
linkage = hierarchy.linkage(nodes.get_coordinates(), method="average")
nodes["lab"] = hierarchy.fcluster(linkage, tolerance, criterion="distance")
unique, counts = np.unique(nodes["lab"], return_counts=True)
actual_clusters = unique[counts > 1]
change = nodes[nodes["lab"].isin(actual_clusters)]
# no change needed, return the original
if change.empty:
gdf["_status"] = "original"
return gdf
gdf = gdf.copy()
# get geometry
geom = gdf.geometry.copy()
status = pd.Series("original", index=geom.index)
# loop over clusters, cut out geometry within tolerance / 2 and replace it
# with spider-like geometry to the weighted centroid of a cluster
spiders = []
midpoints = []
clusters = change.dissolve(change["lab"])
# TODO: not optimal but avoids some MultiLineStrings but not all
cookies = clusters.buffer(tolerance / 2).convex_hull
if preserve_ends:
cookies = cookies.to_frame().overlay(ends.to_frame(), how="difference")
for cluster, cookie in zip(clusters.geometry, cookies.geometry, strict=True):
inds = geom.sindex.query(cookie, predicate="intersects")
pts = shapely.get_coordinates(geom.iloc[inds].intersection(cookie.boundary))
if pts.shape[0] > 0:
# TODO: this may result in MultiLineString - we need to avoid that
# TODO: It is temporarily fixed by that explode in return
geom.iloc[inds] = geom.iloc[inds].difference(cookie)
status.iloc[inds] = "changed"
midpoint = np.mean(shapely.get_coordinates(cluster), axis=0)
midpoints.append(midpoint)
mids = np.array([midpoint] * len(pts))
spider = shapely.linestrings(
np.array([pts[:, 0], mids[:, 0]]).T,
y=np.array([pts[:, 1], mids[:, 1]]).T,
)
spiders.append(spider)
gdf = gdf.set_geometry(geom)
gdf["_status"] = status
if spiders:
# combine geometries
geoms = np.hstack(spiders)
gdf = pd.concat([gdf, gpd.GeoDataFrame(geometry=geoms, crs=geom.crs)])
agg: dict[str, str | typing.Callable] = {"_status": _status}
for c in gdf.columns.drop(gdf.active_geometry_name):
if c != "_status":
agg[c] = "first"
return remove_false_nodes(
gdf[~gdf.geometry.is_empty].explode(),
# NOTE: this aggfunc needs to be able to process all the columns
aggfunc=agg,
)