Commit 31402cd0 authored by Rick Gruber-Riemer's avatar Rick Gruber-Riemer

Removing old landuse.py

parent 56ba00ac
......@@ -54,7 +54,6 @@ class Procedures(IntEnum):
roads = 3
pylons = 4
details = 5
owbb = 6 # experimental
def _parse_exec_for_procedure(exec_argument: str) -> Procedures:
......@@ -66,7 +65,8 @@ def _parse_exec_for_procedure(exec_argument: str) -> Procedures:
def configure_logging(log_level: str, log_to_file: bool) -> None:
"""Set the logging level and maybe write to file.
See also accepted answer to https://stackoverflow.com/questions/29015958/how-can-i-prevent-the-inheritance-of-python-loggers-and-handlers-during-multipro?noredirect=1&lq=1.
See also accepted answer to https://stackoverflow.com/questions/29015958/how-can-i-prevent-the-inheritance-
of-python-loggers-and-handlers-during-multipro?noredirect=1&lq=1.
And: https://docs.python.org/3.5/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes
"""
logging_config = {
......@@ -121,9 +121,8 @@ def process_scenery_tile(scenery_tile: SceneryTile, params_file_name: str,
the_coords_transform = coordinates.Transformation(parameters.get_center_global())
if exec_argument is Procedures.owbb:
ol.process(the_coords_transform)
else:
lit_areas, building_zones = ol.process(the_coords_transform)
my_fg_elev = FGElev(the_coords_transform, scenery_tile.tile_index)
my_stg_entries = utils.stg_io2.read_stg_entries_in_boundary(True, the_coords_transform)
......@@ -141,15 +140,16 @@ def process_scenery_tile(scenery_tile: SceneryTile, params_file_name: str,
if exec_argument in [Procedures.buildings, Procedures.main, Procedures.all]:
generated_buildings = list()
if parameters.OWBB_GENERATE_BUILDINGS:
generated_buildings = ow.process(the_coords_transform)
generated_buildings = ow.process(the_coords_transform, building_zones)
buildings.process_buildings(the_coords_transform, my_fg_elev, my_blocked_areas, my_stg_entries,
generated_buildings, file_lock)
if exec_argument in [Procedures.roads, Procedures.main, Procedures.all]:
roads.process_roads(the_coords_transform, my_fg_elev, my_blocked_areas, my_stg_entries, file_lock)
roads.process_roads(the_coords_transform, my_fg_elev, my_blocked_areas, lit_areas,
my_stg_entries, file_lock)
if exec_argument in [Procedures.pylons, Procedures.main, Procedures.all]:
pylons.process_pylons(the_coords_transform, my_fg_elev, my_stg_entries, file_lock)
if exec_argument in [Procedures.details, Procedures.all]:
pylons.process_details(the_coords_transform, my_fg_elev, file_lock)
pylons.process_details(the_coords_transform, lit_areas, my_fg_elev, file_lock)
platforms.process_details(the_coords_transform, my_fg_elev, file_lock)
piers.process_details(the_coords_transform, my_fg_elev, file_lock)
......
......@@ -5,6 +5,7 @@
import logging
import math
import os.path
import pickle
import time
from typing import Dict, List, Tuple
......@@ -264,8 +265,8 @@ def _process_btg_building_zones(transformer: Transformation) -> Tuple[List[m.BTG
return btg_zones, btg_reader.faces[btg.WATER_PROXY]
def _link_area_with_highways(area: Polygon, highways_dict: Dict[int, m.Highway]) -> List[m.Highway]:
"""Link highways to an area to prepare for building generation.
def _test_highway_intersecting_area(area: Polygon, highways_dict: Dict[int, m.Highway]) -> List[m.Highway]:
"""Returns highways that are within an area or intersecting with an area.
Highways_dict gets reduced by those highways, which were within, such that searching in other
areas gets quicker due to reduced volume.
......@@ -301,10 +302,10 @@ def _assign_city_blocks(building_zones: List[m.BuildingZone], highways_dict: Dic
for building_zone in building_zones:
polygons = list()
linked_highways = _link_area_with_highways(building_zone.geometry, highways_dict_copy1)
if linked_highways:
intersecting_highways = _test_highway_intersecting_area(building_zone.geometry, highways_dict_copy1)
if intersecting_highways:
buffers = list()
for highway in linked_highways:
for highway in intersecting_highways:
buffers.append(highway.geometry.buffer(2, cap_style=CAP_STYLE.square,
join_style=JOIN_STYLE.bevel))
geometry_difference = building_zone.geometry.difference(unary_union(buffers))
......@@ -321,7 +322,8 @@ def _assign_city_blocks(building_zones: List[m.BuildingZone], highways_dict: Dic
logging.debug('Found %i city blocks in building zone osm_ID=%i', len(polygons), building_zone.osm_id)
for polygon in polygons:
my_city_block = m.CityBlock(op.get_next_pseudo_osm_id(op.OSMFeatureType.landuse), polygon, None)
my_city_block = m.CityBlock(op.get_next_pseudo_osm_id(op.OSMFeatureType.landuse), polygon,
building_zone.type_)
building_zone.add_city_block(my_city_block)
......@@ -420,9 +422,26 @@ def _merge_buffers(original_list: List[Polygon]) -> List[Polygon]:
return handled_list
def process(transformer: Transformation) -> None:
def process(transformer: Transformation) -> Tuple[List[Polygon], List[m.BuildingZone]]:
last_time = time.time()
# =========== TRY TO READ CACHED DATA FIRST =======
tile_index = parameters.get_tile_index()
cache_file_la = str(tile_index) + '_lit_areas.pkl'
cache_file_bz = str(tile_index) + '_building_zones.pkl'
if parameters.OWBB_LANDUSE_CACHE:
try:
with open(cache_file_la, 'rb') as file_pickle:
lit_areas = pickle.load(file_pickle)
logging.info('Successfully loaded %i objects from %s', len(lit_areas), cache_file_la)
with open(cache_file_bz, 'rb') as file_pickle:
building_zones = pickle.load(file_pickle)
logging.info('Successfully loaded %i objects from %s', len(building_zones), cache_file_bz)
return lit_areas, building_zones
except (IOError, EOFError) as reason:
logging.info("Loading of cache %s or %s failed (%s)", cache_file_la, cache_file_bz, reason)
# =========== READ OSM DATA =============
building_zones = m.process_osm_building_zone_refs(transformer)
places = m.process_osm_place_refs(transformer)
......@@ -477,6 +496,10 @@ def process(transformer: Transformation) -> None:
_assign_city_blocks(building_zones, highways_dict)
last_time = time_logging('Time used in seconds for splitting into city blocks', last_time)
# now assign the osm_buildings to the city blocks
for building_zone in building_zones:
building_zone.reassign_osm_buildings_to_city_blocks()
# ============finally guess the land-use type ========================================
for my_zone in building_zones:
if isinstance(my_zone, m.GeneratedBuildingZone):
......@@ -489,3 +512,19 @@ def process(transformer: Transformation) -> None:
plotting.draw_zones(highways_dict, osm_buildings, building_zones, btg_building_zones,
lit_areas, bounds)
time_logging("Time used in seconds for plotting", last_time)
# =========== WRITE TO CACHE AND RETURN
if parameters.OWBB_LANDUSE_CACHE:
try:
with open(cache_file_la, 'wb') as file_pickle:
pickle.dump(lit_areas, file_pickle)
logging.info('Successfully saved %i objects to %s', len(lit_areas), cache_file_la)
with open(cache_file_bz, 'wb') as file_pickle:
pickle.dump(building_zones, file_pickle)
logging.info('Successfully saved %i objects to %s', len(building_zones), cache_file_bz)
except (IOError, EOFError) as reason:
logging.info("Saving of cache %s or %s failed (%s)", cache_file_la, cache_file_bz, reason)
return lit_areas, building_zones
......@@ -320,6 +320,7 @@ class CityBlock():
self.osm_id = osm_id
self.geometry = geometry
self.building_zone_type = feature_type
self.osm_buildings = list() # List of already existing osm buildings
class BuildingZone(OSMFeatureArea):
......@@ -338,7 +339,7 @@ class BuildingZone(OSMFeatureArea):
# i.e. they are not directly based on OSM data
self.linked_blocked_areas = list() # List of BlockedArea objects for blocked areas.
self.osm_buildings = list() # List of already existing osm buildings
self.generated_buildings = list() # List og GenBuilding objects for generated non-osm buildings
self.generated_buildings = list() # List of GenBuilding objects for generated non-osm buildings
self.linked_genways = list() # List of Highways that are available for generating buildings
self.linked_city_blocks = list()
......@@ -376,6 +377,14 @@ class BuildingZone(OSMFeatureArea):
def add_city_block(self, city_block: CityBlock) -> None:
self.linked_city_blocks.append(city_block)
def reassign_osm_buildings_to_city_blocks(self) -> None:
for osm_building in self.osm_buildings:
for city_block in self.linked_city_blocks:
if osm_building.geometry.within(city_block.geometry) or osm_building.geometry.intersects(
city_block.geometry):
city_block.osm_buildings.append(osm_building)
class BTGBuildingZone(object):
"""A land-use from materials in FlightGear read from BTG-files"""
def __init__(self, external_id, type_, geometry) -> None:
......@@ -1135,7 +1144,7 @@ def process_osm_building_zone_refs(transformer: co.Transformation) -> List[Build
def process_osm_building_refs(transformer: co.Transformation) -> List[Building]:
osm_result = op.fetch_osm_db_data_ways_keys([BUILDING_KEY])
osm_result = op.fetch_osm_db_data_ways_keys([BUILDING_KEY, BUILDING_PART_KEY])
my_ways = list()
for way in list(osm_result.ways_dict.values()):
my_way = Building.create_from_way(way, osm_result.nodes_dict, transformer)
......
......@@ -61,8 +61,9 @@ def _draw_highways(highways_dict, ax: maxs.Axes) -> None:
_plot_line(ax, my_highway.geometry, "lime", 1)
def _draw_buildings(buildings, ax: maxs.Axes) -> None:
for building in buildings:
def _draw_buildings(building_zones, ax: maxs.Axes) -> None:
for building_zone in building_zones:
for building in building_zone.osm_buildings:
if isinstance(building.geometry, Polygon):
patch = PolygonPatch(building.geometry, facecolor="black", edgecolor="black")
ax.add_patch(patch)
......@@ -211,7 +212,7 @@ def _draw_nodes_to_change(nodes_to_change: List[bl.NodeInRectifyBuilding], ax: m
color='green', fill=False))
def draw_buildings(buildings, building_zones, bounds) -> None:
def draw_buildings(building_zones, bounds) -> None:
pdf_pages = _create_pdf_pages("would-be-buildings")
# Generated buildings
......@@ -221,7 +222,7 @@ def draw_buildings(buildings, building_zones, bounds) -> None:
ax = my_figure.add_subplot(111)
_draw_background_zones(building_zones, ax)
_draw_blocked_areas(building_zones, ax)
_draw_buildings(buildings, ax)
_draw_buildings(building_zones, ax)
_set_ax_limits_from_bounds(ax, bounds)
pdf_pages.savefig(my_figure)
......
......@@ -2,6 +2,7 @@
"""Module analyzing OSM data and generating buildings at plausible places."""
import logging
import pickle
import random
import time
from typing import List
......@@ -275,13 +276,23 @@ def _read_building_models_library() -> List[m.BuildingModel]:
return models
def process(transformer: co.Transformation) -> List[building_lib.Building]:
def process(transformer: co.Transformation, building_zones: List[m.BuildingZone]) -> List[building_lib.Building]:
last_time = time.time()
# =========== TRY TO READ CACHED DATA FIRST =======
tile_index = parameters.get_tile_index()
cache_file = str(tile_index) + '_generated_buildings.pkl'
if parameters.OWBB_GENERATED_BUILDINGS_CACHE:
try:
with open(cache_file, 'rb') as file_pickle:
generated_buildings = pickle.load(file_pickle)
logging.info('Successfully loaded %i objects from %s', len(generated_buildings), cache_file)
return generated_buildings
except (IOError, EOFError) as reason:
logging.info("Loading of cache %s failed (%s)", cache_file, reason)
# =========== READ OSM DATA =============
osm_buildings = m.process_osm_building_refs(transformer)
building_zones = m.process_osm_building_zone_refs(transformer)
open_spaces_dict = m.process_osm_open_space_refs(transformer)
highways_dict, nodes_dict = m.process_osm_highway_refs(transformer)
railways_dict = m.process_osm_railway_refs(transformer)
......@@ -305,14 +316,6 @@ def process(transformer: co.Transformation) -> List[building_lib.Building]:
last_time = time_logging("Time used in seconds for reading building model data", last_time)
# =========== ASSIGN OSM_BUILDINGS TO ZONES =============
for candidate in osm_buildings:
for b_zone in building_zones:
if candidate.geometry.within(b_zone.geometry) or candidate.geometry.intersects(b_zone.geometry):
b_zone.osm_buildings.append(candidate)
break
last_time = time_logging("Time used in seconds for assigning buildings to OSM zones", last_time)
# =========== SELECT ZONES FOR GENERATION OF BUILDINGS =============
not_used_zones = list() # not used for generation of new buildings
used_zones = list() # used for generation of new buildings
......@@ -354,24 +357,35 @@ def process(transformer: co.Transformation) -> List[building_lib.Building]:
last_time = time_logging("Time used in seconds for preparing building zones for building generation", last_time)
building_zones = list() # will be filled again with used_zones out of the parallel processes
generated_buildings = list()
preliminary_buildings = list()
for b_zone in used_zones:
_generate_extra_buildings(b_zone, shared_models_library, bounding_box)
building_zones.append(b_zone)
logging.debug("Generated %d buildings for building zone %d", len(b_zone.generated_buildings),
b_zone.osm_id)
generated_buildings.extend(b_zone.generated_buildings)
preliminary_buildings.extend(b_zone.generated_buildings)
last_time = time_logging("Time used in seconds for generating buildings", last_time)
logging.info("Total number of buildings generated: %d", len(generated_buildings))
logging.info("Total number of buildings generated: %d", len(preliminary_buildings))
building_zones.extend(not_used_zones) # lets add the not_used_zones again, so we have everything again
if parameters.DEBUG_PLOT:
plotting.draw_buildings(osm_buildings, building_zones, bounds)
plotting.draw_buildings(building_zones, bounds)
time_logging("Time used in seconds for plotting", last_time)
# ============== Create buildings for building_lib processing ==
building_buildings = list()
for gen_building in generated_buildings:
building_buildings.append(gen_building.create_building_lib_building())
return building_buildings
generated_buildings = list()
for pre_building in preliminary_buildings:
generated_buildings.append(pre_building.create_building_lib_building())
# =========== WRITE TO CACHE AND RETURN
if parameters.OWBB_GENERATED_BUILDINGS_CACHE:
try:
with open(cache_file, 'wb') as file_pickle:
pickle.dump(generated_buildings, file_pickle)
logging.info('Successfully saved %i objects to %s', len(generated_buildings), cache_file)
except (IOError, EOFError) as reason:
logging.info("Saving of cache %s failed (%s)", cache_file, reason)
return generated_buildings
......@@ -295,6 +295,7 @@ TEXTURES_EMPTY_LM_RGB_VALUE = 35
# ==================== BUILD-ZONES GENERATION ============
OWBB_LANDUSE_CACHE = False
OWBB_GENERATE_LANDUSE = False # from buildings outside of existing land-use zones
OWBB_GENERATE_LANDUSE_BUILDING_BUFFER_DISTANCE = 30
OWBB_GENERATE_LANDUSE_BUILDING_BUFFER_DISTANCE_MAX = 50
......@@ -313,6 +314,7 @@ OWBB_SPLIT_MADE_UP_LANDUSE_WATERWAY_BUFFER = 10 # meters
# ==================== BUILDING GENERATION ============
OWBB_GENERATED_BUILDINGS_CACHE = False
OWBB_GENERATE_BUILDINGS = False
OWBB_STEP_DISTANCE = 2 # in meters
OWBB_MIN_STREET_LENGTH = 10 # in meters
......
......@@ -30,7 +30,7 @@ import parameters
import roads
import shapely.geometry as shg
import utils.osmparser as op
from utils import vec2d, coordinates, stg_io2, utilities, landuse
from utils import vec2d, coordinates, stg_io2, utilities
OUR_MAGIC = "pylons" # Used in e.g. stg files to mark edits by osm2pylon
OUT_MAGIC_DETAILS = "pylonsDetails"
......@@ -1196,7 +1196,7 @@ def _process_osm_rail_overhead(nodes_dict, ways_dict, fg_elev: utilities.FGElev,
return my_railways
def _process_highways_for_streetlamps(my_highways, landuse_buffers) -> List[StreetlampWay]:
def _process_highways_for_streetlamps(my_highways, lit_areas: List[shg.Polygon]) -> List[StreetlampWay]:
"""
Test whether the highway is within appropriate land use or intersects with appropriate land use
No attempt to merge lines because most probably the lines are split at crossing.
......@@ -1210,12 +1210,12 @@ def _process_highways_for_streetlamps(my_highways, landuse_buffers) -> List[Stre
continue
is_within = False
intersections = []
for lu_buffer in landuse_buffers:
if my_highway.linear.within(lu_buffer):
for lit_area in lit_areas:
if my_highway.linear.within(lit_area):
is_within = True
break
elif my_highway.linear.intersects(lu_buffer):
intersections.append(my_highway.linear.intersection(lu_buffer))
elif my_highway.linear.intersects(lit_area):
intersections.append(my_highway.linear.intersection(lit_area))
if is_within:
my_streetlamps[my_highway.osm_id] = StreetlampWay(my_highway.osm_id, my_highway)
else:
......@@ -1249,26 +1249,6 @@ def _process_highways_for_streetlamps(my_highways, landuse_buffers) -> List[Stre
return list(my_streetlamps.values())
def _merge_streetlamp_buffers(landuse_refs):
"""Based on existing landuses applies extra buffer and then unions as many as possible"""
landuse_buffers = []
for landuse_ref in list(landuse_refs.values()):
streetlamp_buffer = landuse_ref.polygon.buffer(parameters.C2P_STREETLAMPS_MAX_DISTANCE_LANDUSE)
if 0 == len(landuse_buffers):
landuse_buffers.append(streetlamp_buffer)
else:
is_found = False
for i in range(len(landuse_buffers)):
merged_buffer = landuse_buffers[i]
if streetlamp_buffer.intersects(merged_buffer):
landuse_buffers[i] = merged_buffer.union(streetlamp_buffer)
is_found = True
break
if not is_found:
landuse_buffers.append(streetlamp_buffer)
return landuse_buffers
def _process_osm_power_aerialway(nodes_dict, ways_dict, fg_elev: utilities.FGElev, my_coord_transformator,
building_refs: List[shg.Polygon]) -> Tuple[List[WayLine], List[WayLine]]:
"""
......@@ -1861,8 +1841,8 @@ def process_pylons(coords_transform: coordinates.Transformation, fg_elev: utilit
stg_manager.write(file_lock)
def process_details(coords_transform: coordinates.Transformation, fg_elev: utilities.FGElev,
file_lock: mp.Lock=None) -> None:
def process_details(coords_transform: coordinates.Transformation, lit_areas: List[shg.Polygon],
fg_elev: utilities.FGElev, file_lock: mp.Lock=None) -> None:
# Transform to real objects
logging.info("Transforming OSM data to Line and Pylon objects -> details")
......@@ -1915,22 +1895,15 @@ def process_details(coords_transform: coordinates.Transformation, fg_elev: utili
# street lamps
streetlamp_ways = list()
if parameters.C2P_PROCESS_STREETLAMPS:
osm_way_result = op.fetch_osm_db_data_ways_keys(["landuse", "highway"])
osm_way_result = op.fetch_osm_db_data_ways_keys(["highway"])
osm_nodes_dict = osm_way_result.nodes_dict
osm_ways_dict = osm_way_result.ways_dict
landuse_refs = landuse.process_osm_landuse_refs(osm_nodes_dict, osm_ways_dict, coords_transform)
if parameters.LU_LANDUSE_GENERATE_LANDUSE:
landuse.generate_landuse_from_buildings(landuse_refs, building_refs)
logging.info('Number of landuse references: %s', len(landuse_refs))
streetlamp_buffers = _merge_streetlamp_buffers(landuse_refs)
logging.info('Number of streetlamp buffers: %s', len(streetlamp_buffers))
highways = _process_osm_highway(osm_nodes_dict, osm_ways_dict, coords_transform)
streetlamp_ways = _process_highways_for_streetlamps(highways, streetlamp_buffers)
streetlamp_ways = _process_highways_for_streetlamps(highways, lit_areas)
logging.info('Reduced number of streetlamp ways: %s', len(streetlamp_ways))
for highway in streetlamp_ways:
highway.calc_and_map(fg_elev, coords_transform)
del landuse_refs
# free some memory
del building_refs
......
......@@ -93,7 +93,7 @@ import linear_bridge
import parameters
import textures.road
import utils.osmparser as op
from utils import coordinates, ac3d, stg_io2, utilities, landuse, graph
from utils import coordinates, ac3d, stg_io2, utilities, graph
from utils.vec2d import Vec2d
OUR_MAGIC = "osm2roads" # Used in e.g. stg files to mark our edits
......@@ -409,7 +409,7 @@ class Roads(object):
len(self.railway_list), len(self.bridges_list))
def process(self, blocked_areas: List[shg.Polygon], stg_entries: List[stg_io2.STGEntry],
landuses_lit: List[landuse.Landuse], stats: utilities.Stats) -> None:
lit_areas: List[shg.Polygon], stats: utilities.Stats) -> None:
"""Processes the OSM data until data can be clusterized.
"""
self._remove_tunnels()
......@@ -417,7 +417,7 @@ class Roads(object):
self._check_ways_in_water()
self._check_against_blocked_areas(blocked_areas)
self._check_against_stg_entries(stg_entries)
self._check_lighting(landuses_lit)
self._check_lighting(lit_areas)
self._cleanup_topology()
self._check_points_on_line_distance()
......@@ -549,12 +549,12 @@ class Roads(object):
self.ways_list.extend(new_ways)
def _check_lighting(self, landuses_lit: List[landuse.Landuse]) -> None:
def _check_lighting(self, lit_areas: List[shg.Polygon]) -> None:
"""Checks ways for lighting and maybe splits at borders for built-up areas."""
way_lul_map = dict() # key: way, value: list(landuse_lit) from split -> prevent re-check of mini-residuals
new_ways_1 = self._check_lighting_inner(self.ways_list, landuses_lit, way_lul_map)
way_la_map = dict() # key: way, value: list(lit_Area) from split -> prevent re-check of mini-residuals
new_ways_1 = self._check_lighting_inner(self.ways_list, lit_areas, way_la_map)
self.ways_list.extend(new_ways_1)
new_ways_2 = self._check_lighting_inner(new_ways_1, landuses_lit, way_lul_map)
new_ways_2 = self._check_lighting_inner(new_ways_1, lit_areas, way_la_map)
self.ways_list.extend(new_ways_2)
# Looping again might get even better splits, but is quite costly for the gained extra effect.
# now replace 'gen' with 'yes'
......@@ -562,8 +562,8 @@ class Roads(object):
if LIT in way.tags and way.tags[LIT] == 'gen':
way.tags[LIT] = 'yes'
def _check_lighting_inner(self, ways_list: List[op.Way], landuses_lit: List[landuse.Landuse],
way_lul_map: Dict[op.Way, landuse.Landuse]) -> List[op.Way]:
def _check_lighting_inner(self, ways_list: List[op.Way], lit_areas: List[shg.Polygon],
way_la_map: Dict[op.Way, shg.Polygon]) -> List[op.Way]:
"""Inner method for _check_lighting doing the actual checking. New split ways are the outcome of the method.
However all ways by reference get updated tags. This method exists such that new ways can be checked again
against other built-up areas.
......@@ -581,34 +581,34 @@ class Roads(object):
orig_lit += 1
continue # nothing further to do with this way
if way in way_lul_map:
already_checked_luls = way_lul_map[way]
if way in way_la_map:
already_checked_luls = way_la_map[way]
else:
already_checked_luls = list()
way_lul_map[way] = already_checked_luls
way_la_map[way] = already_checked_luls
way_changed = True
my_line = None
my_line_bounds = None
for landuse_lit in landuses_lit:
for lit_area in lit_areas:
if way_changed:
my_line = self._line_string_from_way(way) # needs to be re-calculated because could change below
my_line_bounds = my_line.bounds
way_changed = False
if landuse_lit in already_checked_luls:
if lit_area in already_checked_luls:
continue
# do a fast cheap check on intersection working with static .bounds (ca. 200 times faster than calling
# every time)
if coordinates.disjoint_bounds(my_line_bounds, landuse_lit.bounds):
if coordinates.disjoint_bounds(my_line_bounds, lit_area.bounds):
continue
# do more narrow intersection checks
if my_line.within(landuse_lit.polygon):
if my_line.within(lit_area):
way.tags[LIT] = 'gen'
break # it cannot be in more than one built_up area at a time
intersection_points = list()
some_geometry = my_line.intersection(landuse_lit.polygon.exterior)
some_geometry = my_line.intersection(lit_area.exterior)
if isinstance(some_geometry, shg.LineString):
continue # it only touches
elif isinstance(some_geometry, shg.Point):
......@@ -635,11 +635,11 @@ class Roads(object):
is_new_way = False # the first item in the dict is the original way by convention
for cut_way, distance in cut_ways_dict.items():
my_point = my_line.interpolate(distance)
if my_point.within(landuse_lit.polygon):
if my_point.within(lit_area):
cut_way.tags[LIT] = 'gen'
else:
cut_way.tags[LIT] = 'no'
already_checked_luls.append(landuse_lit)
already_checked_luls.append(lit_area)
if is_new_way:
new_ways.append(cut_way)
else:
......@@ -1279,8 +1279,8 @@ def _process_clusters(clusters, replacement_prefix, fg_elev: utilities.FGElev, s
def process_roads(coords_transform: coordinates.Transformation, fg_elev: utilities.FGElev,
blocked_areas: List[shg.Polygon], stg_entries: List[stg_io2.STGEntry],
file_lock: mp.Lock=None) -> None:
blocked_areas: List[shg.Polygon], lit_areas: List[shg.Polygon],
stg_entries: List[stg_io2.STGEntry], file_lock: mp.Lock=None) -> None:
random.seed(42)
stats = utilities.Stats()
......@@ -1297,16 +1297,10 @@ def process_roads(coords_transform: coordinates.Transformation, fg_elev: utiliti
roads = Roads(filtered_osm_ways_list, osm_nodes_dict, coords_transform, fg_elev)
# land-use data for lighting
landuse_result = op.fetch_osm_db_data_ways_keys(['landuse'])
landuse_nodes_dict = landuse_result.nodes_dict
landuse_ways_dict = landuse_result.ways_dict
landuses_lit = landuse.process_osm_landuse_for_lighting(landuse_nodes_dict, landuse_ways_dict, coords_transform)
path_to_output = parameters.get_output_path()
logging.debug("before linear " + str(roads))
roads.process(blocked_areas, stg_entries, landuses_lit, stats) # does the heavy lifting incl. clustering
roads.process(blocked_areas, stg_entries, lit_areas, stats) # does the heavy lifting incl. clustering
replacement_prefix = parameters.get_repl_prefix()
stg_manager = stg_io2.STGManager(path_to_output, stg_io2.SceneryType.roads, OUR_MAGIC, replacement_prefix)
......
import logging
import math
from typing import Dict, List
import shapely.geometry as shg
import parameters
from utils.coordinates import Transformation
import utils.osmparser as osm
class Landuse(object):
TYPE_COMMERCIAL = 10
TYPE_INDUSTRIAL = 20
TYPE_RESIDENTIAL = 30
TYPE_RETAIL = 40
TYPE_NON_OSM = 50 # used for land-uses constructed with heuristics and not in original data from OSM
def __init__(self, osm_id):
self.osm_id = osm_id
self.type_ = 0
self._polygon = None # the polygon defining its outer boundary
self.number_of_buildings = 0 # only set for generated TYPE_NON_OSM land-uses during generation
self._bounds = None # is set when the polygon is set for performance reasons (.bounds makes a new calc)
@property
def bounds(self):
if self._bounds is None:
raise AttributeError('Trying to get bounds before valid geometry has been added')
return self._bounds
@property
def polygon(self):
return self._polygon
@polygon.setter
def polygon(self, polygon: shg.Polygon):
self._polygon = polygon
self._bounds = self._polygon.bounds
def process_osm_landuse_refs(nodes_dict: Dict[int, osm.Node], ways_dict: Dict[int, osm.Way],
my_coord_transformator: Transformation) -> Dict[int, Landuse]:
my_landuses = dict() # osm_id as key, Landuse as value
for way in list(ways_dict.values()):
my_landuse = Landuse(way.osm_id)
valid_landuse = False
for key in way.tags:
value = way.tags[key]
if "landuse" == key:
if value == "commercial":
my_landuse.type_ = Landuse.TYPE_COMMERCIAL
valid_landuse = True
elif value == "industrial":
my_landuse.type_ = Landuse.TYPE_INDUSTRIAL
valid_landuse = True
elif value == "residential":
my_landuse.type_ = Landuse.TYPE_RESIDENTIAL
valid_landuse = True
elif value == "retail":
my_landuse.type_ = Landuse.TYPE_RETAIL
valid_landuse = True
if valid_landuse:
my_polygon = way.polygon_from_osm_way(nodes_dict, my_coord_transformator)
if my_polygon is not None:
my_landuse.polygon = my_polygon
if my_landuse.polygon.is_valid and not my_landuse.polygon.is_empty:
my_landuses[my_landuse.osm_id] = my_landuse
logging.info("OSM land-uses found: %s", len(my_landuses))
return my_landuses
def process_osm_landuse_for_lighting(nodes_dict: Dict[int, osm.Node], ways_dict: Dict[int, osm.Way],
my_coord_transformator: Transformation) -> List[Landuse]:
"""Wrapper around process_osm_landuse_refs(...) to get landuses with BUILT_UP_AREA_LIT_BUFFER."""
landuse_refs = process_osm_landuse_refs(nodes_dict, ways_dict, my_coord_transformator)
if parameters.LU_LANDUSE_GENERATE_LANDUSE:
building_refs = _process_osm_building_refs(my_coord_transformator)
generate_landuse_from_buildings(landuse_refs, building_refs)
for key, landuse in landuse_refs.items():
landuse.polygon = landuse.polygon.buffer(parameters.BUILT_UP_AREA_LIT_BUFFER)
return list(landuse_refs.values())
def generate_landuse_from_buildings(osm_landuses: Dict[int, Landuse], building_refs: List[shg.Polygon]) -> None:
"""Generates 'missing' land-uses based on building clusters."""
my_landuse_candidates = dict()
index = 0
for my_building in building_refs:
# check whether the building already is in a land use
within_existing_landuse = False
for osm_landuse in list(osm_landuses.values()):
if my_building.intersects(osm_landuse.polygon):
within_existing_landuse = True
break
if not within_existing_landuse:
# create new clusters of land uses
buffer_distance = parameters.LU_LANDUSE_BUILDING_BUFFER_DISTANCE
if my_building.area > parameters.LU_LANDUSE_BUILDING_BUFFER_DISTANCE**2:
factor = math.sqrt(my_building.area / parameters.LU_LANDUSE_BUILDING_BUFFER_DISTANCE**2)
buffer_distance = min(factor*parameters.LU_LANDUSE_BUILDING_BUFFER_DISTANCE,
parameters.LU_LANDUSE_BUILDING_BUFFER_DISTANCE_MAX)
buffer_polygon = my_building.buffer(buffer_distance)
buffer_polygon = buffer_polygon
within_existing_landuse = False
for key, candidate in my_landuse_candidates.items():
if buffer_polygon.intersects(candidate.polygon):
candidate.polygon = candidate.polygon.union(buffer_polygon)
candidate.number_of_buildings += 1
within_existing_landuse = True
break
if not within_existing_landuse:
index -= 1
my_candidate = Landuse(index)
my_candidate.polygon = buffer_polygon
my_candidate.number_of_buildings = 1
my_candidate.type_ = Landuse.TYPE_NON_OSM
my_landuse_candidates[my_candidate.osm_id] = my_candidate
# add landuse candidates to landuses
logging.debug("Candidate land-uses found: %d", len(my_landuse_candidates))
added = 0
for key, candidate in my_landuse_candidates.items():
if candidate.polygon.area >= parameters.LU_LANDUSE_MIN_AREA:
osm_landuses[key] = candidate
added += 1
logging.info("Candidate land-uses with sufficient area added: %d", added)
def _process_osm_building_refs(my_coord_transformator: Transformation) -> List[shg.Polygon]:
"""Takes all buildings' convex hull (but not building:part) to be used for landuse processing.
Only valid if database is used.
"""
my_buildings = list()
osm_way_result = osm.fetch_osm_db_data_ways_keys(['building'])
osm_nodes_dict = osm_way_result.nodes_dict
osm_ways_dict = osm_way_result.ways_dict
for way in list(osm_ways_dict.values()):
for key in way.tags:
if "building" == key:
my_coordinates = list()
for ref in way.refs:
if ref in osm_nodes_dict:
my_node = osm_nodes_dict[ref]
my_coordinates.append(my_coord_transformator.to_local((my_node.lon, my_node.lat)))
if 2 < len(my_coordinates):
my_polygon = shg.Polygon(my_coordinates)
if my_polygon.is_valid and not my_polygon.is_empty:
my_buildings.append(my_polygon.convex_hull)
return my_buildings