importtypingimportgeopandasasgpdimportmomepyimportnetworkxasnximportnumpyasnpimportpandasaspdimportpyprojimportshapelyfromscipyimportsparsefromsklearn.clusterimportDBSCANdefsplit(split_points:list|np.ndarray|gpd.GeoSeries,cleaned_roads:gpd.GeoDataFrame,crs:str|pyproj.CRS,*,eps:float=1e-4,)->gpd.GeoSeries|gpd.GeoDataFrame:"""Split lines on new nodes. Parameters ---------- split_points : list | numpy.ndarray Points to split the ``cleaned_roads``. cleaned_roads : geopandas.GeoDataFrame Line geometries to be split with ``split_points``. crs : str | pyproj.CRS Anything accepted by ``pyproj.CRS``. eps : float = 1e-4 Tolerance epsilon for point snapping. Returns ------- geopandas.GeoSeries | geopandas.GeoDataFrame Resultant split line geometries. """split_points=gpd.GeoSeries(split_points,crs=crs)forsplitinsplit_points.drop_duplicates():_,ix=cleaned_roads.sindex.nearest(split,max_distance=eps)row=cleaned_roads.iloc[ix]edge=row.geometryifedge.shape[0]==1:row=row.iloc[0]lines_split=_snap_n_split(edge.item(),split,eps)iflines_split.shape[0]>1:gdf_split=gpd.GeoDataFrame(geometry=lines_split,crs=crs)forcinrow.index.drop(["geometry","_status"],errors="ignore"):gdf_split[c]=row[c]gdf_split["_status"]="changed"cleaned_roads=pd.concat([cleaned_roads.drop(edge.index[0]),gdf_split],ignore_index=True,)elifedge.shape[0]>1:to_be_dropped=[]to_be_added=[]fori,einedge.items():lines_split=_snap_n_split(e,split,eps)iflines_split.shape[0]>1:to_be_dropped.append(i)to_be_added.append(lines_split)ifto_be_added:gdf_split=pd.DataFrame({"geometry":to_be_added,"_orig":to_be_dropped}).explode("geometry")gdf_split=pd.concat([gdf_split.drop(columns="_orig").reset_index(drop=True),row.drop(columns="geometry").loc[gdf_split["_orig"]].reset_index(drop=True),],axis=1,)gdf_split["_status"]="changed"cleaned_roads=pd.concat([cleaned_roads.drop(to_be_dropped),gdf_split],ignore_index=True,)cleaned_roads=gpd.GeoDataFrame(cleaned_roads,geometry="geometry",crs=crs)returncleaned_roads.reset_index(drop=True)def_snap_n_split(e:shapely.LineString,s:shapely.Point,tol:float)->np.ndarray:"""Snap point to edge and return lines to split."""snapped=shapely.snap(e,s,tolerance=tol)_lines_split=shapely.get_parts(shapely.ops.split(snapped,s))return_lines_split[~shapely.is_empty(_lines_split)]def_status(x:pd.Series)->str:"""Determine the status of edge line(s)."""iflen(x)==1:returnx.iloc[0]return"changed"defget_components(edgelines:list|np.ndarray|gpd.GeoSeries,*,ignore:None|gpd.GeoSeries=None,)->np.ndarray:"""Associate edges with connected component labels and return. Parameters ---------- edgelines : list | np.ndarray | gpd.GeoSeries Collection of line objects. ignore : None | gpd.GeoSeries = None Nodes to ignore when labeling components. Returns ------- np.ndarray Edge connected component labels. Notes ----- See [https://github.com/uscuni/neatnet/issues/56] for detailed explanation of output. """edgelines=np.array(edgelines)start_points=shapely.get_point(edgelines,0)end_points=shapely.get_point(edgelines,-1)points=shapely.points(np.unique(shapely.get_coordinates(np.concatenate([start_points,end_points])),axis=0))ifignoreisnotNone:mask=np.isin(points,ignore)points=points[~mask]# query LineString geometry to identify points intersecting 2 geometriesinp,res=shapely.STRtree(shapely.boundary(edgelines)).query(points,predicate="intersects")unique,counts=np.unique(inp,return_counts=True)mask=np.isin(inp,unique[counts==2])merge_res=res[mask]merge_inp=inp[mask]closed=np.arange(len(edgelines))[shapely.is_closed(edgelines)]mask=np.isin(merge_res,closed)|np.isin(merge_inp,closed)merge_res=merge_res[~mask]merge_inp=merge_inp[~mask]g=nx.Graph(list(zip((merge_inp*-1)-1,merge_res,strict=True)))components={i:{vforvinkifv>-1}fori,kinenumerate(nx.connected_components(g))}component_labels={value:keyforkeyincomponentsforvalueincomponents[key]}labels=pd.Series(component_labels,index=range(len(edgelines)))max_label=len(edgelines)-1ifpd.isna(labels.max())elseint(labels.max())filling=pd.Series(range(max_label+1,max_label+len(edgelines)+1))labels=labels.fillna(filling)returnlabels.valuesdefweld_edges(edgelines:list|np.ndarray|gpd.GeoSeries,*,ignore:None|gpd.GeoSeries=None,)->list|np.ndarray|gpd.GeoSeries:"""Combine lines sharing an endpoint (if only 2 lines share that point). Lightweight version of ``remove_interstitial_nodes()``. Parameters ---------- edgelines : list | np.ndarray | gpd.GeoSeries Collection of line objects. ignore : None | gpd.GeoSeries = None Nodes to ignore when welding components. Returns ------- list | np.ndarray | gpd.GeoSeries Resultant welded ``edgelines`` if more than 1 passed in, otherwise the original ``edgelines`` object. """iflen(edgelines)<2:returnedgelineslabels=get_components(edgelines,ignore=ignore)return(gpd.GeoSeries(edgelines).groupby(labels).agg(lambdax:shapely.line_merge(shapely.GeometryCollection(x.values)))).tolist()
[docs]definduce_nodes(roads:gpd.GeoDataFrame,*,eps:float=1e-4)->gpd.GeoDataFrame:"""Adding potentially missing nodes on intersections of individual LineString endpoints with the remaining network. The idea behind is that if a line ends on an intersection with another, there should be a node on both of them. Parameters ---------- roads : geopandas.GeoDataFrame Input LineString geometries. eps : float = 1e-4 Tolerance epsilon for point snapping passed into ``nodes.split()``. Returns ------- geopandas.GeoDataFrame Updated ``roads`` with (potentially) added nodes. """sindex_kws={"predicate":"dwithin","distance":1e-4}# identify degree mismatch casesnodes_degree_mismatch=_identify_degree_mismatch(roads,sindex_kws)# ensure loop topology cases:# - loop nodes intersecting non-loops# - loop nodes intersecting other loopsnodes_off_loops,nodes_on_loops=_makes_loop_contact(roads,sindex_kws)# all nodes to inducenodes_to_induce=pd.concat([nodes_degree_mismatch,nodes_off_loops,nodes_on_loops])returnsplit(nodes_to_induce.geometry,roads,roads.crs,eps=eps)
def_identify_degree_mismatch(edges:gpd.GeoDataFrame,sindex_kws:dict)->gpd.GeoSeries:"""Helper to identify difference of observed vs. expected node degree."""nodes=momepy.nx_to_gdf(momepy.node_degree(momepy.gdf_to_nx(edges)),lines=False)nix,eix=edges.sindex.query(nodes.geometry,**sindex_kws)coo_vals=([True]*len(nix),(nix,eix))coo_shape=(len(nodes),len(edges))intersects=sparse.coo_array(coo_vals,shape=coo_shape,dtype=np.bool_)nodes["expected_degree"]=intersects.sum(axis=1)returnnodes[nodes["degree"]!=nodes["expected_degree"]].geometrydef_makes_loop_contact(edges:gpd.GeoDataFrame,sindex_kws:dict)->tuple[gpd.GeoSeries,gpd.GeoSeries]:"""Helper to identify: 1. loop nodes intersecting non-loops 2. loop nodes intersecting other loops """loops,not_loops=_loops_and_non_loops(edges)loop_points=shapely.points(loops.get_coordinates().values)loop_gdf=gpd.GeoDataFrame(geometry=loop_points,crs=edges.crs)loop_point_geoms=loop_gdf.geometry# loop points intersecting non-loopsnodes_from_non_loops_ix,_=not_loops.sindex.query(loop_point_geoms,**sindex_kws)# loop points intersecting other loopsnodes_from_loops_ix,_=loops.sindex.query(loop_point_geoms,**sindex_kws)loop_x_loop,n_loop_x_loop=np.unique(nodes_from_loops_ix,return_counts=True)nodes_from_loops_ix=loop_x_loop[n_loop_x_loop>1]# tease out both varietiesnodes_non_loops=loop_gdf.loc[nodes_from_non_loops_ix]nodes_loops=loop_gdf.loc[nodes_from_loops_ix]returnnodes_non_loops.geometry,nodes_loops.geometrydef_loops_and_non_loops(edges:gpd.GeoDataFrame,)->tuple[gpd.GeoDataFrame,gpd.GeoDataFrame]:"""Bifurcate edge gdf into loops and non-loops."""loop_mask=edges.is_ringnot_loops=edges[~loop_mask]loops=edges[loop_mask]returnloops,not_loops
[docs]defremove_interstitial_nodes(gdf:gpd.GeoSeries|gpd.GeoDataFrame,*,aggfunc:str|dict="first",**kwargs)->gpd.GeoSeries|gpd.GeoDataFrame:"""Clean topology of existing LineString geometry by removal of nodes of degree 2. Returns the original gdf if there’s no node of degree 2. Parameters ---------- gdf : geopandas.GeoSeries | geopandas.GeoDataFrame Input edgelines process. If any edges are ``MultiLineString`` they will be exploded into constituent ``LineString`` components. aggfunc : str | dict = 'first' Aggregate function for processing non-spatial component. **kwargs Keyword arguments for ``aggfunc``. Returns ------- geopandas.GeoSeries | geopandas.GeoDataFrame The original input ``gdf`` if only 1 edgeline, otherwise the processed edgeline without interstitial nodes. Notes ----- Any 3D geometries are (potentially) downcast in loops. """defmerge_geometries(block:gpd.GeoSeries)->shapely.LineString:"""Helper in processing the spatial component."""returnshapely.line_merge(shapely.GeometryCollection(block.values))iflen(gdf)<2:returngdfifisinstance(gdf,gpd.GeoSeries):gdf=gdf.to_frame("geometry")gdf=gdf.explode(ignore_index=True)labels=get_components(gdf.geometry)# Process non-spatial componentdata=gdf.drop(labels=gdf.geometry.name,axis=1)aggregated_data=data.groupby(by=labels).agg(aggfunc,**kwargs)aggregated_data.columns=aggregated_data.columns.to_flat_index()# Process spatial componentg=gdf.groupby(group_keys=False,by=labels)[gdf.geometry.name].agg(merge_geometries)aggregated_geometry=gpd.GeoDataFrame(g,geometry=gdf.geometry.name,crs=gdf.crs)# Recombineaggregated=aggregated_geometry.join(aggregated_data)# Derive nodesnodes=momepy.nx_to_gdf(momepy.node_degree(momepy.gdf_to_nx(aggregated[[aggregated.geometry.name]])),lines=False,)# Bifurcate edges into loops and non-loopsloops,not_loops=_loops_and_non_loops(aggregated)# Ensure:# - all loops have exactly 1 endpoint; and# - that endpoint shares a node with an intersecting linefixed_loops=[]fixed_index=[]node_ix,loop_ix=loops.sindex.query(nodes.geometry,predicate="intersects")forixinnp.unique(loop_ix):loop_geom=loops.geometry.iloc[ix]target_nodes=nodes.geometry.iloc[node_ix[loop_ix==ix]]iflen(target_nodes)==2:new_sequence=_rotate_loop_coords(loop_geom,not_loops)fixed_loops.append(shapely.LineString(new_sequence))fixed_index.append(ix)aggregated.loc[loops.index[fixed_index],aggregated.geometry.name]=fixed_loopsreturnaggregated.reset_index(drop=True)
def_rotate_loop_coords(loop_geom:shapely.LineString,not_loops:gpd.GeoDataFrame)->np.ndarray:"""Rotate loop node coordinates if needed to ensure topology."""loop_coords=shapely.get_coordinates(loop_geom)loop_points=gpd.GeoDataFrame(geometry=shapely.points(loop_coords))loop_points_ix,_=not_loops.sindex.query(loop_points.geometry,predicate="dwithin",distance=1e-4)mode=loop_points.loc[loop_points_ix].geometry.mode()# if there is a non-planar intersection, we may have multiple points. Check with# entrypoints only in that caseifmode.shape[0]>1:loop_points_ix,_=not_loops.sindex.query(loop_points.geometry,predicate="dwithin",distance=1e-4)new_mode=loop_points.loc[loop_points_ix].geometry.mode()# if that did not help, just pick one to avoid failure and hope for the bestifnew_mode.empty|new_mode.shape[0]>1:mode=mode.iloc[[0]]new_start=mode.get_coordinates().values_coords_match=(loop_coords==new_start).all(axis=1)new_start_idx=np.where(_coords_match)[0].squeeze()rolled_coords=np.roll(loop_coords[:-1],-new_start_idx,axis=0)new_sequence=np.append(rolled_coords,rolled_coords[[0]],axis=0)returnnew_sequence
[docs]deffix_topology(roads:gpd.GeoDataFrame,*,eps:float=1e-4,**kwargs,)->gpd.GeoDataFrame:"""Fix road network topology. This ensures correct topology of the network by: 1. Adding potentially missing nodes... on intersections of individual LineString endpoints with the remaining network. The idea behind is that if a line ends on an intersection with another, there should be a node on both of them. 2. Removing nodes of degree 2... that have no meaning in the network used within our framework. 3. Removing duplicated geometries (irrespective of orientation). Parameters ---------- roads : geopandas.GeoDataFrame Input LineString geometries. eps : float = 1e-4 Tolerance epsilon for point snapping passed into ``nodes.split()``. **kwargs : dict Key word arguments passed into ``remove_interstitial_nodes()``. Returns ------- gpd.GeoDataFrame The input roads that now have fixed topology and are ready to proceed through the simplification algorithm. """roads=roads[~roads.geometry.normalize().duplicated()].copy()roads_w_nodes=induce_nodes(roads,eps=eps)returnremove_interstitial_nodes(roads_w_nodes,**kwargs)
[docs]defconsolidate_nodes(gdf:gpd.GeoDataFrame,*,tolerance:float=2.0,preserve_ends:bool=False,)->gpd.GeoSeries:"""Return geometry with consolidated nodes. Replace clusters of nodes with a single node (weighted centroid of a cluster) and snap linestring geometry to it. Cluster is defined using hierarchical clustering with average linkage on coordinates cut at a cophenetic distance equal to ``tolerance``. The use of hierachical clustering avoids the chaining effect of a sequence of intersections within ``tolerance`` from each other that would happen with DBSCAN and similar solutions. Parameters ---------- gdf : geopandas.GeoDataFrame GeoDataFrame with LineStrings (usually representing street network). tolerance : float = 2.0 The maximum distance between two nodes for one to be considered as in the neighborhood of the other. Nodes within tolerance are considered a part of a single cluster and will be consolidated. preserve_ends : bool = False If ``True``, nodes of a degree 1 will be excluded from the consolidation. Returns ------- geopandas.GeoSeries Updated input ``gdf`` of LineStrings with consolidated nodes. """fromscipy.clusterimporthierarchyifisinstance(gdf,gpd.GeoSeries):gdf=gdf.to_frame("geometry")elifisinstance(gdf,np.ndarray):gdf=gpd.GeoDataFrame(geometry=gdf)nodes=momepy.nx_to_gdf(momepy.node_degree(momepy.gdf_to_nx(gdf)),lines=False)ifpreserve_ends:# keep at least one meter of original geometry around each endends=nodes[nodes["degree"]==1].buffer(1)nodes=nodes[nodes["degree"]>1].copy()# if all we have are ends, return the original# - this is generally when called from within ``geometry._consolidate()``ifnodes.shape[0]<2:gdf["_status"]="original"returngdf# get clusters of nodes which should be consolidated# first get components of possible clusters to and then do the linkage itself# otherwise is dead slow and needs a ton of memorydb=DBSCAN(eps=tolerance,min_samples=2).fit(nodes.get_coordinates())comp_labels=db.labels_mask=comp_labels>-1components=comp_labels[mask]nodes_to_merge=nodes[mask]defget_labels(nodes):linkage=hierarchy.linkage(shapely.get_coordinates(nodes),method="average")labels=(hierarchy.fcluster(linkage,tolerance,criterion="distance").astype(str)+f"_{nodes.name}")returnlabelsgrouped=(pd.Series(nodes_to_merge.geometry).groupby(components).transform(get_labels))nodes["lab"]=groupedunique,counts=np.unique(nodes["lab"].dropna(),return_counts=True)actual_clusters=unique[counts>1]change=nodes[nodes["lab"].isin(actual_clusters)]# no change needed, return the originalifchange.empty:gdf["_status"]="original"returngdfgdf=gdf.copy()# get geometrygeom=gdf.geometry.copy()status=pd.Series("original",index=geom.index)# loop over clusters, cut out geometry within tolerance / 2 and replace it# with spider-like geometry to the weighted centroid of a clusterspiders=[]midpoints=[]clusters=change.dissolve(change["lab"])# TODO: not optimal but avoids some MultiLineStrings but not allcookies=clusters.buffer(tolerance/2).convex_hullifpreserve_ends:cookies=cookies.to_frame().overlay(ends.to_frame(),how="difference")forcluster,cookieinzip(clusters.geometry,cookies.geometry,strict=True):inds=geom.sindex.query(cookie,predicate="intersects")pts=shapely.get_coordinates(geom.iloc[inds].intersection(cookie.boundary))ifpts.shape[0]>0:# TODO: this may result in MultiLineString - we need to avoid that# TODO: It is temporarily fixed by that explode in returngeom.iloc[inds]=geom.iloc[inds].difference(cookie)status.iloc[inds]="changed"midpoint=np.mean(shapely.get_coordinates(cluster),axis=0)midpoints.append(midpoint)mids=np.array([midpoint]*len(pts))spider=shapely.linestrings(np.array([pts[:,0],mids[:,0]]).T,y=np.array([pts[:,1],mids[:,1]]).T,)spiders.append(spider)gdf=gdf.set_geometry(geom)gdf["_status"]=statusifspiders:# combine geometriesgeoms=np.hstack(spiders)gdf=pd.concat([gdf,gpd.GeoDataFrame(geometry=geoms,crs=geom.crs)])agg:dict[str,str|typing.Callable]={"_status":_status}forcingdf.columns.drop(gdf.active_geometry_name):ifc!="_status":agg[c]="first"returnremove_interstitial_nodes(gdf[~gdf.geometry.is_empty].explode(),# NOTE: this aggfunc needs to be able to process all the columnsaggfunc=agg,)