import collections.abc
import typing
import geopandas as gpd
import momepy
import networkx as nx
import numpy as np
import pandas as pd
import pyproj
import shapely
from scipy import sparse
from scipy.cluster import hierarchy
from sklearn.cluster import DBSCAN
def _fill_attrs(gdf: gpd.GeoDataFrame, source_row: pd.Series) -> gpd.GeoDataFrame:
"""Thoughtful attribute assignment to lines split into segments by new nodes –
taking list-like values into consideration. See gh#213. Regarding iterables,
currently only supports list values – others can be added based on input type
in the future on an ad hoc basis as problems arise. Called from within ``split()``.
Parameters
----------
gdf : geopandas.GeoDataFrame
The new frame of split linestrings.
source_row: pandas.Series
The original source row.
Returns
-------
geopandas.GeoDataFrame
The input ``gdf`` with updated columns based on values in ``source_row``.
"""
def _populate_column(attr):
"""Return the attribute if scalar, create vector of input if not."""
if isinstance(attr, collections.abc.Sequence) and not isinstance(attr, str):
attr = [attr] * gdf.shape[0]
return attr
for col in source_row.index.drop(["geometry", "_status"], errors="ignore"):
gdf[col] = _populate_column(source_row[col])
return gdf
def split(
split_points: list | np.ndarray | gpd.GeoSeries,
cleaned_streets: gpd.GeoDataFrame,
crs: str | pyproj.CRS,
*,
eps: float = 1e-4,
) -> gpd.GeoSeries | gpd.GeoDataFrame:
"""Split lines on new nodes.
Parameters
----------
split_points : list | numpy.ndarray
Points to split the ``cleaned_roads``.
cleaned_streets : geopandas.GeoDataFrame
Line geometries to be split with ``split_points``.
crs : str | pyproj.CRS
Anything accepted by ``pyproj.CRS``.
eps : float = 1e-4
Tolerance epsilon for point snapping.
Returns
-------
geopandas.GeoSeries | geopandas.GeoDataFrame
Resultant split line geometries.
"""
split_points = gpd.GeoSeries(split_points, crs=crs)
for split in split_points.drop_duplicates():
_, ix = cleaned_streets.sindex.nearest(split, max_distance=eps)
row = cleaned_streets.iloc[ix]
edge = row.geometry
if edge.shape[0] == 1:
row = row.iloc[0]
lines_split = _snap_n_split(edge.item(), split, eps)
if lines_split.shape[0] > 1:
gdf_split = gpd.GeoDataFrame(geometry=lines_split, crs=crs)
gdf_split = _fill_attrs(gdf_split, row)
gdf_split["_status"] = "changed"
cleaned_streets = pd.concat(
[cleaned_streets.drop(edge.index[0]), gdf_split],
ignore_index=True,
)
elif edge.shape[0] > 1:
to_be_dropped = []
to_be_added = []
for i, e in edge.items():
lines_split = _snap_n_split(e, split, eps)
if lines_split.shape[0] > 1:
to_be_dropped.append(i)
to_be_added.append(lines_split)
if to_be_added:
gdf_split = pd.DataFrame(
{"geometry": to_be_added, "_orig": to_be_dropped}
).explode("geometry")
gdf_split = pd.concat(
[
gdf_split.drop(columns="_orig").reset_index(drop=True),
row.drop(columns="geometry")
.loc[gdf_split["_orig"]]
.reset_index(drop=True),
],
axis=1,
)
gdf_split["_status"] = "changed"
cleaned_streets = pd.concat(
[cleaned_streets.drop(to_be_dropped), gdf_split],
ignore_index=True,
)
cleaned_streets = gpd.GeoDataFrame(
cleaned_streets, geometry="geometry", crs=crs
)
return cleaned_streets.reset_index(drop=True)
def _snap_n_split(e: shapely.LineString, s: shapely.Point, tol: float) -> np.ndarray:
"""Snap point to edge and return lines to split."""
snapped = shapely.snap(e, s, tolerance=tol)
_lines_split = shapely.get_parts(shapely.ops.split(snapped, s))
return _lines_split[~shapely.is_empty(_lines_split)]
def _status(x: pd.Series) -> str:
"""Determine the status of edge line(s)."""
if len(x) == 1:
return x.iloc[0]
return "changed"
def isolate_bowtie_nodes(edgelines: list | np.ndarray | gpd.GeoSeries) -> gpd.GeoSeries:
r"""
Bowties are a rare edgecase whereby a component has:
* 2 unique nodes
* 2 edges that are loops
* 4 edges that have only 3 unique coord-pairs *after* sorting.
|\ /‾‾\ /|
| * * |
|/ \__/ \|
Although extremely rare, these cases throw a wrench in the
efficient logic of ``get_components()``. See gh#214.
"""
ignore = []
mm_nx = momepy.gdf_to_nx(gpd.GeoDataFrame(geometry=edgelines))
mm_nx_cc = nx.connected_components(mm_nx)
potential_bowtie_cc_nodes = []
for cc in list(mm_nx_cc):
# potential bowties only have 2 unique nodes
if len(cc) == 2:
potential_bowtie_cc_nodes += [list(cc)]
if potential_bowtie_cc_nodes:
for potential_bowtie_cc in potential_bowtie_cc_nodes:
comp_edges = mm_nx.subgraph(potential_bowtie_cc).edges
# potential bowties have 2 edges that are loops
loops = [comp_edges[ce]["geometry"].is_closed for ce in comp_edges]
if sum(loops) == 2:
# -- failsafe
# ensure the 4 edges have only 3 unique coord-pairs after sorting
sorted_edge_coords = [sorted(ce[:2]) for ce in comp_edges]
unique_sorted_edge_coords = np.unique(sorted_edge_coords, axis=0)
if unique_sorted_edge_coords.shape[0] == 3:
ignore += potential_bowtie_cc
return gpd.GeoSeries([shapely.Point(i) for i in ignore]).sort_values(
ignore_index=True
)
def get_components(
edgelines: list | np.ndarray | gpd.GeoSeries,
*,
ignore: None | gpd.GeoSeries = None,
) -> np.ndarray:
"""Determine groups of chained edges ("components" in this function) that are
linked by degree-2 nodes. These edges are given component labels that are then
used to aggregate the chained edges into single, LineString geometries.
Parameters
----------
edgelines : list | np.ndarray | gpd.GeoSeries
Collection of line objects.
ignore : None | gpd.GeoSeries = None
Nodes to ignore when labeling components.
Returns
-------
np.ndarray
Component labels to aggregate chains of edges bounded by
degree-2 nodes into single, LineString geometries.
Notes
-----
See [https://github.com/uscuni/neatnet/issues/56] and
[https://github.com/uscuni/neatnet/pull/235] for detailed explanation of output.
"""
# 1. tease out any "bowtie" cases - don't consider those nodes to merge
bowtie_nodes = isolate_bowtie_nodes(edgelines)
if not bowtie_nodes.empty:
if ignore is not None:
ignore = pd.concat([ignore, bowtie_nodes], ignore_index=True)
else:
ignore = bowtie_nodes
# 2. convert to numpy array for operations
edgelines = np.array(edgelines)
# 3. isolate starting and ending nodes of edges
start_points = shapely.get_point(edgelines, 0)
end_points = shapely.get_point(edgelines, -1)
# 4. consider only unique nodes
points = shapely.points(
np.unique(
shapely.get_coordinates(np.concatenate([start_points, end_points])), axis=0
)
)
# 5. filter out any pre-defined nodes to ignore when considering merging
if ignore is not None:
mask = np.isin(points, ignore)
points = points[~mask]
# 6. query nodes that intersect non-loop edges
inp, res = shapely.STRtree(shapely.boundary(edgelines)).query(
points, predicate="intersects"
)
# 7. filter nodes that intersect exactly 2 edges
unique, counts = np.unique(inp, return_counts=True)
mask = np.isin(inp, unique[counts == 2])
# node index to consider for merging
merge_inp = inp[mask]
# edge index to consider for merging
merge_res = res[mask]
# 8. generate loop index from input edgelines
closed = np.arange(len(edgelines))[shapely.is_closed(edgelines)]
# 9. update mask with loop edges
mask = np.isin(merge_inp, closed) | np.isin(merge_res, closed)
# 10. invert mask for final filter of nodes/edges to merge
merge_res = merge_res[~mask]
merge_inp = merge_inp[~mask]
# 11. generate network component topology for edges to merge
g = nx.Graph(list(zip((merge_inp * -1) - 1, merge_res, strict=True)))
components = {
i: {v for v in k if v > -1} for i, k in enumerate(nx.connected_components(g))
}
component_labels = {value: key for key in components for value in components[key]}
labels = pd.Series(component_labels, index=range(len(edgelines)))
max_label = len(edgelines) - 1 if pd.isna(labels.max()) else int(labels.max())
filling = pd.Series(range(max_label + 1, max_label + len(edgelines) + 1))
labels = labels.fillna(filling)
return labels.values
def weld_edges(
edgelines: list | np.ndarray | gpd.GeoSeries,
*,
ignore: None | gpd.GeoSeries = None,
) -> list | np.ndarray | gpd.GeoSeries:
"""Combine lines sharing an endpoint (if only 2 lines share that point).
Lightweight version of ``remove_interstitial_nodes()``.
Parameters
----------
edgelines : list | np.ndarray | gpd.GeoSeries
Collection of line objects.
ignore : None | gpd.GeoSeries = None
Nodes to ignore when welding components.
Returns
-------
list | np.ndarray | gpd.GeoSeries
Resultant welded ``edgelines`` if more than 1 passed in, otherwise
the original ``edgelines`` object.
"""
if len(edgelines) < 2:
return edgelines
labels = get_components(edgelines, ignore=ignore)
return (
gpd.GeoSeries(edgelines)
.groupby(labels)
.agg(lambda x: shapely.line_merge(shapely.GeometryCollection(x.values)))
).tolist()
[docs]
def induce_nodes(streets: gpd.GeoDataFrame, *, eps: float = 1e-4) -> gpd.GeoDataFrame:
"""Adding potentially missing nodes on intersections of individual LineString
endpoints with the remaining network. The idea behind is that if a line ends
on an intersection with another, there should be a node on both of them.
Parameters
----------
streets : geopandas.GeoDataFrame
Input LineString geometries.
eps : float = 1e-4
Tolerance epsilon for point snapping passed into ``nodes.split()``.
Returns
-------
geopandas.GeoDataFrame
Updated ``streets`` with (potentially) added nodes.
"""
sindex_kws = {"predicate": "dwithin", "distance": 1e-4}
# identify degree mismatch cases
nodes_degree_mismatch = _identify_degree_mismatch(streets, sindex_kws)
# ensure loop topology cases:
# - loop nodes intersecting non-loops
# - loop nodes intersecting other loops
nodes_off_loops, nodes_on_loops = _makes_loop_contact(streets, sindex_kws)
# all nodes to induce
nodes_to_induce = pd.concat(
[nodes_degree_mismatch, nodes_off_loops, nodes_on_loops]
)
return split(nodes_to_induce.geometry, streets, streets.crs, eps=eps)
def _identify_degree_mismatch(
edges: gpd.GeoDataFrame, sindex_kws: dict
) -> gpd.GeoSeries:
"""Helper to identify difference of observed vs. expected node degree."""
nodes = _nodes_degrees_from_edges(edges.geometry)
nodes = nodes.set_crs(edges.crs)
nix, eix = edges.sindex.query(nodes.geometry, **sindex_kws)
coo_vals = ([True] * len(nix), (nix, eix))
coo_shape = (len(nodes), len(edges))
intersects = sparse.coo_array(coo_vals, shape=coo_shape, dtype=np.bool_)
nodes["expected_degree"] = intersects.sum(axis=1)
return nodes[nodes["degree"] != nodes["expected_degree"]].geometry
def _nodes_from_edges(
edgelines: list | np.ndarray | gpd.GeoSeries,
return_degrees=False,
) -> np.ndarray | tuple[np.ndarray, np.ndarray]:
"""Helper to get network nodes from edges' geometries."""
edgelines = np.array(edgelines)
start_points = shapely.get_point(edgelines, 0)
end_points = shapely.get_point(edgelines, -1)
node_coords = np.unique(
shapely.get_coordinates(np.concatenate([start_points, end_points])),
axis=0,
return_counts=return_degrees,
)
if return_degrees:
node_coords, degrees = node_coords
node_points = shapely.points(node_coords)
if return_degrees:
return node_points, degrees
else:
return node_points
def _nodes_degrees_from_edges(
edgelines: list | np.ndarray | gpd.GeoSeries,
) -> gpd.GeoDataFrame:
"""Helper to get network nodes and their degrees from edges' geometries."""
node_points, degrees = _nodes_from_edges(edgelines, return_degrees=True)
nodes_gdf = gpd.GeoDataFrame({"degree": degrees, "geometry": node_points})
return nodes_gdf
def _makes_loop_contact(
edges: gpd.GeoDataFrame, sindex_kws: dict
) -> tuple[gpd.GeoSeries, gpd.GeoSeries]:
"""Helper to identify:
1. loop nodes intersecting non-loops
2. loop nodes intersecting other loops
"""
loops, not_loops = _loops_and_non_loops(edges)
loop_points = shapely.points(loops.get_coordinates().values)
loop_gdf = gpd.GeoDataFrame(geometry=loop_points, crs=edges.crs)
loop_point_geoms = loop_gdf.geometry
# loop points intersecting non-loops
nodes_from_non_loops_ix, _ = not_loops.sindex.query(loop_point_geoms, **sindex_kws)
# loop points intersecting other loops
nodes_from_loops_ix, _ = loops.sindex.query(loop_point_geoms, **sindex_kws)
loop_x_loop, n_loop_x_loop = np.unique(nodes_from_loops_ix, return_counts=True)
nodes_from_loops_ix = loop_x_loop[n_loop_x_loop > 1]
# tease out both varieties
nodes_non_loops = loop_gdf.loc[nodes_from_non_loops_ix]
nodes_loops = loop_gdf.loc[nodes_from_loops_ix]
return nodes_non_loops.geometry, nodes_loops.geometry
def _loops_and_non_loops(
edges: gpd.GeoDataFrame,
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
"""Bifurcate edge gdf into loops and non-loops."""
loop_mask = edges.is_ring
not_loops = edges[~loop_mask]
loops = edges[loop_mask]
return loops, not_loops
[docs]
def remove_interstitial_nodes(
gdf: gpd.GeoSeries | gpd.GeoDataFrame, *, aggfunc: str | dict = "first", **kwargs
) -> gpd.GeoSeries | gpd.GeoDataFrame:
"""Clean topology of existing LineString geometry by removal of nodes of degree 2.
Returns the original gdf if there’s no node of degree 2.
Parameters
----------
gdf : geopandas.GeoSeries | geopandas.GeoDataFrame
Input edgelines process. If any edges are ``MultiLineString`` they
will be exploded into constituent ``LineString`` components.
aggfunc : str | dict = 'first'
Aggregate function for processing non-spatial component.
**kwargs
Keyword arguments for ``aggfunc``.
Returns
-------
geopandas.GeoSeries | geopandas.GeoDataFrame
The original input ``gdf`` if only 1 edgeline, otherwise the processed
edgeline without interstitial nodes.
Notes
-----
Any 3D geometries are (potentially) downcast in loops.
"""
def merge_geometries(block: gpd.GeoSeries) -> shapely.LineString:
"""Helper in processing the spatial component."""
return shapely.line_merge(shapely.GeometryCollection(block.values))
if len(gdf) < 2:
return gdf
if isinstance(gdf, gpd.GeoSeries):
gdf = gdf.to_frame("geometry")
gdf = gdf.explode(ignore_index=True)
labels = get_components(gdf.geometry)
# Process non-spatial component
data = gdf.drop(labels=gdf.geometry.name, axis=1)
aggregated_data = data.groupby(by=labels).agg(aggfunc, **kwargs)
aggregated_data.columns = aggregated_data.columns.to_flat_index()
# Process spatial component
g = gdf.groupby(group_keys=False, by=labels)[gdf.geometry.name].agg(
merge_geometries
)
aggregated_geometry = gpd.GeoDataFrame(g, geometry=gdf.geometry.name, crs=gdf.crs)
# Recombine
aggregated = aggregated_geometry.join(aggregated_data)
# Derive nodes
nodes = _nodes_from_edges(aggregated.geometry)
# Bifurcate edges into loops and non-loops
loops, not_loops = _loops_and_non_loops(aggregated)
# Ensure:
# - all loops have exactly 1 endpoint; and
# - that endpoint shares a node with an intersecting line
fixed_loops = []
fixed_index = []
node_ix, loop_ix = loops.sindex.query(nodes, predicate="intersects")
for ix in np.unique(loop_ix):
loop_geom = loops.geometry.iloc[ix]
target_nodes = nodes[node_ix[loop_ix == ix]]
if len(target_nodes) == 2:
new_sequence = _rotate_loop_coords(loop_geom, not_loops)
fixed_loops.append(shapely.LineString(new_sequence))
fixed_index.append(ix)
aggregated.loc[loops.index[fixed_index], aggregated.geometry.name] = fixed_loops
return aggregated.reset_index(drop=True)
def _rotate_loop_coords(
loop_geom: shapely.LineString, not_loops: gpd.GeoDataFrame
) -> np.ndarray:
"""Rotate loop node coordinates if needed to ensure topology."""
loop_coords = shapely.get_coordinates(loop_geom)
loop_points = gpd.GeoDataFrame(geometry=shapely.points(loop_coords))
loop_points_ix, _ = not_loops.sindex.query(
loop_points.geometry, predicate="dwithin", distance=1e-4
)
mode = loop_points.loc[loop_points_ix].geometry.mode()
# if there is a non-planar intersection, we may have multiple points. Check with
# entrypoints only in that case
if mode.shape[0] > 1:
loop_points_ix, _ = not_loops.sindex.query(
loop_points.geometry, predicate="dwithin", distance=1e-4
)
new_mode = loop_points.loc[loop_points_ix].geometry.mode()
# if that did not help, just pick one to avoid failure and hope for the best
if new_mode.empty | new_mode.shape[0] > 1:
mode = mode.iloc[[0]]
new_start = mode.get_coordinates().values
_coords_match = (loop_coords == new_start).all(axis=1)
new_start_idx = np.where(_coords_match)[0].squeeze()
rolled_coords = np.roll(loop_coords[:-1], -new_start_idx, axis=0)
new_sequence = np.append(rolled_coords, rolled_coords[[0]], axis=0)
return new_sequence
[docs]
def fix_topology(
streets: gpd.GeoDataFrame,
*,
eps: float = 1e-4,
**kwargs,
) -> gpd.GeoDataFrame:
"""Fix street network topology. This ensures correct topology of the network by:
1. Adding potentially missing nodes...
on intersections of individual LineString endpoints
with the remaining network. The idea behind is that
if a line ends on an intersection with another, there
should be a node on both of them.
2. Removing nodes of degree 2...
that have no meaning in the network used within our framework.
3. Removing duplicated geometries (irrespective of orientation).
Parameters
----------
streets : geopandas.GeoDataFrame
Input LineString geometries.
eps : float = 1e-4
Tolerance epsilon for point snapping passed into ``nodes.split()``.
**kwargs : dict
Key word arguments passed into ``remove_interstitial_nodes()``.
Returns
-------
gpd.GeoDataFrame
The input streets that now have fixed topology and are ready
to proceed through the simplification algorithm.
"""
streets = streets[~streets.geometry.normalize().duplicated()].copy()
streets_w_nodes = induce_nodes(streets, eps=eps)
return remove_interstitial_nodes(streets_w_nodes, **kwargs)
[docs]
def consolidate_nodes(
gdf: np.ndarray | gpd.GeoSeries | gpd.GeoDataFrame,
*,
tolerance: float = 2.0,
preserve_ends: bool = False,
) -> gpd.GeoSeries:
"""Return geometry with consolidated nodes.
Replace clusters of nodes with a single node (weighted centroid
of a cluster) and snap linestring geometry to it. Cluster is
defined using hierarchical clustering with average linkage
on coordinates cut at a cophenetic distance equal to ``tolerance``.
The use of hierachical clustering avoids the chaining effect of a sequence
of intersections within ``tolerance`` from each other that would happen with
DBSCAN and similar solutions.
Parameters
----------
gdf : numpy.ndarray | geopandas.GeoSeries | geopandas.GeoDataFrame
LineString geometries (usually representing street network).
tolerance : float = 2.0
The maximum distance between two nodes for one to be considered
as in the neighborhood of the other. Nodes within tolerance are
considered a part of a single cluster and will be consolidated.
preserve_ends : bool = False
If ``True``, nodes of a degree 1 will be excluded from the consolidation.
Returns
-------
geopandas.GeoSeries
Updated input ``gdf`` of LineStrings with consolidated nodes.
"""
def _get_labels(nodes: gpd.GeoDataFrame) -> np.ndarray:
"""Generate node cluster labels that avoids a chaining effect."""
linkage = hierarchy.linkage(shapely.get_coordinates(nodes), method="average")
labels = (
hierarchy.fcluster(linkage, tolerance, criterion="distance").astype(str)
+ f"_{nodes.name}"
)
return labels
if not isinstance(gdf, gpd.GeoDataFrame):
gdf = gpd.GeoDataFrame(geometry=gdf)
nodes = _nodes_degrees_from_edges(gdf.geometry)
if preserve_ends:
# keep at least one meter of original geometry around each end
ends = nodes[nodes["degree"] == 1].buffer(1)
nodes = nodes[nodes["degree"] > 1].copy()
# if all we have are ends, return the original
# - this is generally when called from within ``geometry._consolidate()``
if nodes.shape[0] < 2:
gdf["_status"] = "original"
return gdf
# Get node cluster to be consolidated. First, get components of possible clusters
# then do the linkage itself. Otherwise, it's dead slow and needs a ton of memory.
db = DBSCAN(eps=tolerance, min_samples=2).fit(nodes.get_coordinates())
comp_labels = db.labels_
mask = comp_labels > -1
components = comp_labels[mask]
nodes_to_merge = nodes[mask]
# get grouped node cluster labels to determines which nodes must change
grouped = (
pd.Series(nodes_to_merge.geometry).groupby(components).transform(_get_labels)
)
nodes["lab"] = grouped
unique, counts = np.unique(nodes["lab"].dropna(), return_counts=True)
actual_clusters = unique[counts > 1]
change = nodes[nodes["lab"].isin(actual_clusters)]
# no change needed, return the original
if change.empty:
gdf["_status"] = "original"
return gdf
gdf = gdf.copy()
geom = gdf.geometry.copy()
status = pd.Series("original", index=geom.index)
# loop over clusters, cut out geometry within tolerance / 2 and replace it
# with spider-like geometry to the weighted centroid of a cluster
spiders = []
midpoints = []
clusters = change.dissolve(change["lab"])
# TODO: not optimal but avoids some MultiLineStrings but not all
cookies = clusters.buffer(tolerance / 2).convex_hull
if preserve_ends:
cookies = cookies.to_frame().overlay(ends.to_frame(), how="difference")
for cluster, cookie in zip(clusters.geometry, cookies.geometry, strict=True):
inds = geom.sindex.query(cookie, predicate="intersects")
pts = shapely.get_coordinates(geom.iloc[inds].intersection(cookie.boundary))
if pts.shape[0] > 0:
# TODO: this may result in MultiLineString - we need to avoid that
# TODO: It is temporarily fixed by that explode in return
geom.iloc[inds] = geom.iloc[inds].difference(cookie)
status.iloc[inds] = "changed"
midpoint = np.mean(shapely.get_coordinates(cluster), axis=0)
midpoints.append(midpoint)
mids = np.array([midpoint] * len(pts))
spider = shapely.linestrings(
np.array([pts[:, 0], mids[:, 0]]).T,
y=np.array([pts[:, 1], mids[:, 1]]).T,
)
spiders.append(spider)
gdf = gdf.set_geometry(geom)
gdf["_status"] = status
if spiders:
# combine geometries
geoms = np.hstack(spiders)
gdf = pd.concat([gdf, gpd.GeoDataFrame(geometry=geoms, crs=geom.crs)])
agg: dict[str, str | typing.Callable] = {"_status": _status}
for c in gdf.columns.drop(gdf.active_geometry_name):
if c != "_status":
agg[c] = "first"
return remove_interstitial_nodes(
gdf[~gdf.geometry.is_empty].explode(),
# NOTE: this aggfunc needs to be able to process all the columns
aggfunc=agg,
)