Commit 56ba00ac authored by Rick Gruber-Riemer's avatar Rick Gruber-Riemer

New implementation of CityBlocks incl. plotting

parent 8366af64
......@@ -6,10 +6,10 @@ import logging
import math
import os.path
import time
from typing import List, Tuple
from typing import Dict, List, Tuple
import shapely.affinity as saf
from shapely.geometry import MultiPolygon, Point, Polygon
from shapely.geometry import MultiPolygon, Point, Polygon, CAP_STYLE, JOIN_STYLE
from shapely.ops import unary_union
import parameters
......@@ -264,17 +264,65 @@ def _process_btg_building_zones(transformer: Transformation) -> Tuple[List[m.BTG
return btg_zones, btg_reader.faces[btg.WATER_PROXY]
def split_to_city_blocks() -> None:
"""Splits all land-use into (city) blocks, i.e. areas surrounded by streets.
Creates a 'virtual' street at land-use boundary to also have blocks at outside of land-use zones.
See https://networkx.github.io/documentation/stable/reference/algorithms/generated/networkx.algorithms.cycles.cycle_basis.html#networkx.algorithms.cycles.cycle_basis
https://stackoverflow.com/questions/24021840/find-edges-in-a-cycle-networkx-python
def _link_area_with_highways(area: Polygon, highways_dict: Dict[int, m.Highway]) -> List[m.Highway]:
"""Link highways to an area to prepare for building generation.
https://stackoverflow.com/questions/12367801/finding-all-cycles-in-undirected-graphs#18388696
Highways_dict gets reduced by those highways, which were within, such that searching in other
areas gets quicker due to reduced volume.
"""
linked_highways = list()
to_be_removed = list()
for my_highway in highways_dict.values():
if not my_highway.populate_buildings_along():
continue
if not disjoint_bounds(my_highway.geometry.bounds, area.bounds): # a bit speed up
if my_highway.geometry.within(area):
linked_highways.append(my_highway)
to_be_removed.append(my_highway.osm_id)
elif my_highway.geometry.intersects(area):
linked_highways.append(my_highway)
return
for key in to_be_removed:
del highways_dict[key]
return linked_highways
def _assign_city_blocks(building_zones: List[m.BuildingZone], highways_dict: Dict[int, m.Highway]) -> None:
"""Splits all land-use into (city) blocks, i.e. areas surrounded by streets.
Brute force by buffering all highways, then take the geometry difference, which splits the zone into
multiple polygons. Some of the polygons will be real city blocks, others will be border areas.
Could also be done by using e.g. networkx.algorithms.cycles.cycle_basis.html. However is a bit more complicated
logic and programming wise, but might be faster.
"""
highways_dict_copy1 = highways_dict.copy() # otherwise when using highways_dict in plotting it will be "used"
for building_zone in building_zones:
polygons = list()
linked_highways = _link_area_with_highways(building_zone.geometry, highways_dict_copy1)
if linked_highways:
buffers = list()
for highway in linked_highways:
buffers.append(highway.geometry.buffer(2, cap_style=CAP_STYLE.square,
join_style=JOIN_STYLE.bevel))
geometry_difference = building_zone.geometry.difference(unary_union(buffers))
if isinstance(geometry_difference, Polygon) and geometry_difference.is_valid and \
geometry_difference >= parameters.OWBB_MIN_CITY_BLOCK_AREA:
polygons.append(geometry_difference)
elif isinstance(geometry_difference, MultiPolygon):
my_polygons = geometry_difference.geoms
for my_poly in my_polygons:
if isinstance(my_poly, Polygon) and my_poly.is_valid and \
my_poly.area >= parameters.OWBB_MIN_CITY_BLOCK_AREA:
polygons.append(my_poly)
logging.debug('Found %i city blocks in building zone osm_ID=%i', len(polygons), building_zone.osm_id)
for polygon in polygons:
my_city_block = m.CityBlock(op.get_next_pseudo_osm_id(op.OSMFeatureType.landuse), polygon, None)
building_zone.add_city_block(my_city_block)
def _process_landuse_for_lighting(building_zones: List[m.BuildingZone]) -> List[Polygon]:
......@@ -308,6 +356,57 @@ def _process_landuse_for_lighting(building_zones: List[m.BuildingZone]) -> List[
return lit_areas
def _split_generated_building_zones_by_major_lines(before_list: List[m.BuildingZone],
highways: Dict[int, m.Highway],
railways: Dict[int, m.RailwayLine],
waterways: Dict[int, m.Waterway]) -> List[m.BuildingZone]:
"""Splits generated building zones into several sub-zones along major transport lines and waterways.
Major transport lines are motorways, trunks as well as certain railway lines.
Using buffers (= polygons) instead of lines because Shapely cannot do it natively.
Using buffers directly instead of trying to entangle line strings of highways/railways due to
easiness plus performance wise most probably same or better."""
# create buffers around major transport
line_buffers = list()
for highway in highways.values():
if highway.type_ in [m.HighwayType.motorway, m.HighwayType.trunk] and not highway.is_tunnel:
line_buffers.append(highway.geometry.buffer(highway.get_width()/2))
for railway in railways.values():
if railway.type_ in [m.RailwayLineType.rail, m.RailwayLineType.light_rail, m.RailwayLineType.subway,
m.RailwayLineType.narrow_gauge] and not (railway.is_tunnel or railway.is_service_spur):
line_buffers.append(railway.geometry.buffer(railway.get_width() / 2))
for waterway in waterways.values():
if waterway.type_ is m.WaterwayType.large:
line_buffers.append(waterway.geometry.buffer(10))
merged_buffers = _merge_buffers(line_buffers)
if len(merged_buffers) == 0:
return before_list
# walk through all buffers and where intersecting get the difference with zone geometry as new zone polygon(s).
unhandled_list = before_list[:]
after_list = None
for buffer in merged_buffers:
after_list = list()
while len(unhandled_list) > 0:
zone = unhandled_list.pop()
if isinstance(zone, m.GeneratedBuildingZone) and zone.geometry.intersects(buffer):
zone.geometry = zone.geometry.difference(buffer)
if isinstance(zone.geometry, MultiPolygon):
after_list.extend(_split_multipolygon_generated_building_zone(zone))
# it could be that the returned list is empty because all parts are below size criteria for
# generated zones, which is ok
else:
after_list.append(zone)
else: # just keep as is if not GeneratedBuildingZone or not intersecting
after_list.append(zone)
unhandled_list = after_list
return after_list
def _merge_buffers(original_list: List[Polygon]) -> List[Polygon]:
"""Attempts to merge as many polygon buffers with each other as possible to return a reduced list."""
multi_polygon = unary_union(original_list)
......@@ -328,7 +427,7 @@ def process(transformer: Transformation) -> None:
building_zones = m.process_osm_building_zone_refs(transformer)
places = m.process_osm_place_refs(transformer)
osm_buildings = m.process_osm_building_refs(transformer)
highways_dict = m.process_osm_highway_refs(transformer)
highways_dict, nodes_dict = m.process_osm_highway_refs(transformer)
railways_dict = m.process_osm_railway_refs(transformer)
waterways_dict = m.process_osm_waterway_refs(transformer)
......@@ -363,13 +462,20 @@ def process(transformer: Transformation) -> None:
del buildings_outside
last_time = time_logging("Time used in seconds for generating building zones", last_time)
# =========== CREATE POLYGONS FOR LIGTHING OF STREETS ================================
# =========== CREATE POLYGONS FOR LIGHTING OF STREETS ================================
# Needs to be before finding city blocks as we need the boundary
lit_areas = _process_landuse_for_lighting(building_zones)
last_time = time_logging("Time used in seconds for finding lit areas", last_time)
# =========== FIND CITY BLOCKS ==========
# using an algorithm in a undirected graph finding simple cycles
# TODO
# =========== MAKE SURE GENERATED LAND-USE DOES NOT CROSS MAJOR LINEAR OBJECTS =======
if parameters.OWBB_SPLIT_MADE_UP_LANDUSE_BY_MAJOR_LINES:
# finally split generated zones by major transport lines
building_zones = _split_generated_building_zones_by_major_lines(building_zones, highways_dict,
railways_dict, waterways_dict)
# =========== FIND CITY BLOCKS AND ASSIGN TO BUILDING_ZONES ==========================
_assign_city_blocks(building_zones, highways_dict)
last_time = time_logging('Time used in seconds for splitting into city blocks', last_time)
# ============finally guess the land-use type ========================================
for my_zone in building_zones:
......@@ -380,5 +486,6 @@ def process(transformer: Transformation) -> None:
# =========== FINALIZE PROCESSING ====================================================
if parameters.DEBUG_PLOT:
bounds = m.Bounds.create_from_parameters(transformer)
plotting.draw_zones(highways_dict, osm_buildings, building_zones, btg_building_zones, lit_areas, bounds)
plotting.draw_zones(highways_dict, osm_buildings, building_zones, btg_building_zones,
lit_areas, bounds)
time_logging("Time used in seconds for plotting", last_time)
......@@ -9,7 +9,7 @@ import abc
from enum import IntEnum, unique
import logging
import math
from typing import Dict, List, Optional, Union
from typing import Dict, List, Optional, Tuple, Union
from shapely.geometry import box, Point, LineString, Polygon, MultiPolygon
import shapely.affinity as saf
......@@ -314,6 +314,14 @@ class BuildingZoneType(IntEnum): # element names must match OSM values apart fr
btg_port = 223
class CityBlock():
"""A special land-use derived from many kinds of land-use info and enclosed by streets (kind of)."""
def __init__(self, osm_id: int, geometry: Polygon, feature_type: Union[None, BuildingZoneType]) -> None:
self.osm_id = osm_id
self.geometry = geometry
self.building_zone_type = feature_type
class BuildingZone(OSMFeatureArea):
""" A 'Landuse' OSM map feature
A Landuse can either be of element type Node -> Point geometry or Way -> Area -> LinearString.
......@@ -332,6 +340,7 @@ class BuildingZone(OSMFeatureArea):
self.osm_buildings = list() # List of already existing osm buildings
self.generated_buildings = list() # List og GenBuilding objects for generated non-osm buildings
self.linked_genways = list() # List of Highways that are available for generating buildings
self.linked_city_blocks = list()
@classmethod
def create_from_way(cls, way: op.Way, nodes_dict: Dict[int, op.Node],
......@@ -364,6 +373,8 @@ class BuildingZone(OSMFeatureArea):
self.linked_blocked_areas.extend(temp_buildings.generated_blocked_areas)
self.generated_buildings.extend(temp_buildings.generated_buildings)
def add_city_block(self, city_block: CityBlock) -> None:
self.linked_city_blocks.append(city_block)
class BTGBuildingZone(object):
"""A land-use from materials in FlightGear read from BTG-files"""
......@@ -668,15 +679,16 @@ class HighwayType(IntEnum):
class Highway(OSMFeatureLinearWithTunnel):
def __init__(self, osm_id: int, geometry: LineString, tags_dict: KeyValueDict) -> None:
def __init__(self, osm_id: int, geometry: LineString, tags_dict: KeyValueDict, refs: List[int]) -> None:
super().__init__(osm_id, geometry, tags_dict)
self.is_roundabout = self._parse_tags_roundabout(tags_dict)
self.is_oneway = self._parse_tags_oneway(tags_dict)
self.lanes = self._parse_tags_lanes(tags_dict)
self.refs = refs
@classmethod
def create_from_scratch(cls, pseudo_id: int, existing_highway: 'Highway', linear: LineString) -> 'Highway':
obj = Highway(pseudo_id, linear, dict())
obj = Highway(pseudo_id, linear, dict(), existing_highway.refs)
obj.type_ = existing_highway.type_
obj.is_roundabout = existing_highway.is_roundabout
obj.is_oneway = existing_highway.is_oneway
......@@ -776,7 +788,7 @@ class Highway(OSMFeatureLinearWithTunnel):
def create_from_way(cls, way: op.Way, nodes_dict: Dict[int, op.Node],
transformer: co.Transformation) -> 'Highway':
my_geometry = cls.create_line_string_from_way(way, nodes_dict, transformer)
obj = Highway(way.osm_id, my_geometry, way.tags)
obj = Highway(way.osm_id, my_geometry, way.tags, way.refs)
return obj
......@@ -1157,7 +1169,7 @@ def process_osm_railway_refs(transformer: co.Transformation) -> Dict[int, Railwa
return my_ways
def process_osm_highway_refs(transformer: co.Transformation) -> Dict[int, Highway]:
def process_osm_highway_refs(transformer: co.Transformation) -> Tuple[Dict[int, Highway], Dict[int, op.Node]]:
osm_result = op.fetch_osm_db_data_ways_keys([HIGHWAY_KEY])
my_ways = dict()
for way in list(osm_result.ways_dict.values()):
......@@ -1165,7 +1177,7 @@ def process_osm_highway_refs(transformer: co.Transformation) -> Dict[int, Highwa
if my_way.is_valid():
my_ways[my_way.osm_id] = my_way
logging.info("OSM highways found: %s", len(my_ways))
return my_ways
return my_ways, osm_result.nodes_dict
def process_osm_waterway_refs(transformer: co.Transformation) -> Dict[int, Waterway]:
......
......@@ -56,7 +56,7 @@ def _set_ax_limits(ax: maxs.Axes, x_min: float, x_max: float, y_min: float, y_m
def _draw_highways(highways_dict, ax: maxs.Axes) -> None:
for my_highway in highways_dict.values():
if my_highway.is_sideway():
_plot_line(ax, my_highway.geometry, "black", 1)
_plot_line(ax, my_highway.geometry, "green", 1)
else:
_plot_line(ax, my_highway.geometry, "lime", 1)
......@@ -68,7 +68,7 @@ def _draw_buildings(buildings, ax: maxs.Axes) -> None:
ax.add_patch(patch)
def _draw_osm_zones(building_zones, ax: maxs.Axes) -> None:
def _draw_osm_zones(building_zones: List[m.BuildingZone], ax: maxs.Axes) -> None:
edge_color = "red"
for building_zone in building_zones:
if not isinstance(building_zone, m.GeneratedBuildingZone):
......@@ -141,6 +141,17 @@ def _draw_lit_areas(lit_areas: List[Polygon], ax: maxs.Axes) -> None:
ax.add_patch(patch)
def _draw_city_blocks(building_zones: List[m.BuildingZone], ax: maxs.Axes) -> None:
for zone in building_zones:
city_blocks = zone.linked_city_blocks
for item in city_blocks:
red = random.random()
green = random.random()
blue = random.random()
patch = PolygonPatch(item.geometry, facecolor=(red, green, blue), edgecolor="black")
ax.add_patch(patch)
def _draw_background_zones(building_zones, ax: maxs.Axes) -> None:
for building_zone in building_zones:
my_color = "lightgray"
......@@ -218,8 +229,8 @@ def draw_buildings(buildings, building_zones, bounds) -> None:
def draw_zones(highways_dict: Dict[int, m.Highway], buildings: List[m.Building],
building_zones: List[m.BuildingZone], btg_building_zones, lit_areas: List[Polygon],
bounds: m.Bounds) -> None:
building_zones: List[m.BuildingZone], btg_building_zones,
lit_areas: List[Polygon], bounds: m.Bounds) -> None:
pdf_pages = _create_pdf_pages("landuse")
# OSM building zones original
......@@ -269,6 +280,16 @@ def draw_zones(highways_dict: Dict[int, m.Highway], buildings: List[m.Building],
_set_ax_limits_from_bounds(ax, bounds)
pdf_pages.savefig(my_figure)
# City blocks
my_figure = _create_a3_landscape_figure()
my_figure.suptitle("City blocks")
ax = my_figure.add_subplot(111)
ax.grid(True, linewidth=1, linestyle="--", color="silver")
_draw_city_blocks(building_zones, ax)
_draw_highways(highways_dict, ax)
_set_ax_limits_from_bounds(ax, bounds)
pdf_pages.savefig(my_figure)
pdf_pages.close()
......
......@@ -31,7 +31,7 @@ def _prepare_building_zone_for_building_generation(building_zone, open_spaces_di
As soon as an element is entirely within a building zone, it is removed
from the lists in order to speedup by reducing the number of stuff to compare in the next
building zone (the lists area shared / passed by reference.
In order to make these removals fast the data structures must be dics instead of lists.
In order to make these removals fast the data structures must be dicts instead of lists.
Another possibility would be parallel processing, but that would lead to big
memory consumption due to copies of data structures in each process
......@@ -283,7 +283,7 @@ def process(transformer: co.Transformation) -> List[building_lib.Building]:
osm_buildings = m.process_osm_building_refs(transformer)
building_zones = m.process_osm_building_zone_refs(transformer)
open_spaces_dict = m.process_osm_open_space_refs(transformer)
highways_dict = m.process_osm_highway_refs(transformer)
highways_dict, nodes_dict = m.process_osm_highway_refs(transformer)
railways_dict = m.process_osm_railway_refs(transformer)
waterways_dict = m.process_osm_waterway_refs(transformer)
last_time = time_logging("Time used in seconds for parsing OSM data", last_time)
......
......@@ -316,6 +316,7 @@ OWBB_SPLIT_MADE_UP_LANDUSE_WATERWAY_BUFFER = 10 # meters
OWBB_GENERATE_BUILDINGS = False
OWBB_STEP_DISTANCE = 2 # in meters
OWBB_MIN_STREET_LENGTH = 10 # in meters
OWBB_MIN_CITY_BLOCK_AREA = 200 # square meters
OWBB_RESIDENTIAL_HIGHWAY_MIN_GEN_SHARE = 0.3
OWBB_INDUSTRIAL_HIGHWAY_MIN_GEN_SHARE = 0.3 # FIXME: not yet used
......
......@@ -113,14 +113,14 @@ class Graph(nx.Graph):
ref1 = way.refs[-1]
try:
junction0 = self.junction(ref0)
junction0.append_way(way, is_first = True)
junction0.append_way(way, is_first=True)
except KeyError:
junction0 = Junction(way, is_first=True) # IS_FIRST
junction0 = Junction(way, is_first=True) # IS_FIRST
super().add_node(ref0, obj=junction0)
try:
junction1 = self.junction(ref1)
junction1.append_way(way, is_first = False)
junction1.append_way(way, is_first=False)
except KeyError:
junction1 = Junction(way, is_first=False)
super().add_node(ref1, obj=junction1)
......@@ -129,10 +129,3 @@ class Graph(nx.Graph):
way.junction0 = junction0
way.junction1 = junction1
if __name__ == '__main__':
my_graph = nx.Graph()
my_graph.add_edges_from([(1, 2), (1, 3), (1, 4), (2, 3), (3, 4), (2, 6), (4, 6), (8, 7), (8, 9), (9, 7)])
cycles = nx.cycle_basis(my_graph) # -> list of list of nodes
foo = 1
......@@ -150,6 +150,3 @@ def _process_osm_building_refs(my_coord_transformator: Transformation) -> List[s
if my_polygon.is_valid and not my_polygon.is_empty:
my_buildings.append(my_polygon.convex_hull)
return my_buildings
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment