{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys\n", "if \"pyodide\" in sys.modules:\n", " import piplite\n", " await piplite.install('networkx')\n", " await piplite.install('pyb2d-jupyterlite-backend>=0.4.2')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import b2d\n", "from b2d.testbed import TestbedBase\n", "import math\n", "import random\n", "import numpy\n", "from functools import partial\n", "import networkx\n", "from pyb2d_jupyterlite_backend.async_jupyter_gui import JupyterAsyncGui\n", "\n", "def best_pairwise_distance(data, f, distance):\n", " n = len(data)\n", " best = (None, None, float(\"inf\"))\n", " for i in range(n - 1):\n", " da = f(data[i])\n", " for j in range(i + 1, n):\n", " db = f(data[j])\n", "\n", " d = distance(da, db)\n", " if d < best[2]:\n", " best = (i, j, d)\n", " return best\n", "\n", "class Level(object):\n", " def __init__(self, testbed):\n", " self.testbed = testbed\n", " self.world = testbed.world\n", "\n", " self.gap_size = 15\n", " self.kill_sensors_height = 0.5\n", " self.usable_size = 20\n", " self.h = 10\n", " self.end_zone_height = 3\n", "\n", " self.outline_verts = [\n", " (0, self.h),\n", " (0, 2 * self.h),\n", " (0, self.h),\n", " (self.usable_size, self.h),\n", " (self.usable_size, 0),\n", " (self.usable_size + self.gap_size, 0),\n", " (self.usable_size + self.gap_size, self.h),\n", " (2 * self.usable_size + self.gap_size, self.h),\n", " (2 * self.usable_size + self.gap_size, 2 * self.h),\n", " ]\n", "\n", " # outline of the level\n", " shape = b2d.chain_shape(vertices=numpy.flip(self.outline_verts, axis=0))\n", " self.outline = self.world.create_static_body(position=(0, 0), shape=shape)\n", "\n", " # kill sensors\n", " self.kill_sensor_pos = (\n", " self.usable_size + self.gap_size / 2,\n", " self.kill_sensors_height / 2,\n", " )\n", "\n", " shape = b2d.polygon_shape(box=(self.gap_size / 2, self.kill_sensors_height / 2))\n", " self._kill_sensor = self.world.create_static_body(\n", " position=self.kill_sensor_pos,\n", " fixtures=b2d.fixture_def(shape=shape, is_sensor=True),\n", " )\n", " self._kill_sensor.user_data = \"destroyer\"\n", "\n", " # end sensor\n", " shape = b2d.polygon_shape(box=(self.usable_size / 2, self.end_zone_height / 2))\n", " self._end_sensor = self.world.create_static_body(\n", " position=(\n", " 1.5 * self.usable_size + self.gap_size,\n", " self.h + self.end_zone_height / 2,\n", " ),\n", " fixtures=b2d.fixture_def(shape=shape, is_sensor=True),\n", " )\n", " self._end_sensor.user_data = \"goal\"\n", "\n", " goo_radius = 1\n", " a = self.testbed.insert_goo(\n", " pos=(self.usable_size / 3, self.h + goo_radius), static=True\n", " )\n", " b = self.testbed.insert_goo(\n", " pos=(self.usable_size * 2 / 3, self.h + goo_radius), static=True\n", " )\n", " c = self.testbed.insert_goo(\n", " pos=(self.usable_size * 1 / 2, self.h + goo_radius + 4), static=False\n", " )\n", "\n", " self.testbed.connect_goos(a, b)\n", " self.testbed.connect_goos(a, c)\n", " self.testbed.connect_goos(b, c)\n", "\n", " def draw(self, debug_draw):\n", "\n", " # draw outline\n", " for i in range(len(self.outline_verts) - 1):\n", " debug_draw.draw_segment(\n", " self.outline_verts[i],\n", " self.outline_verts[i + 1],\n", " color=(1, 1, 0),\n", " line_width=0.3,\n", " )\n", "\n", " left = list(self.kill_sensor_pos)\n", " left[0] -= self.gap_size / 2\n", " left[1] += self.kill_sensors_height / 2\n", "\n", " right = list(self.kill_sensor_pos)\n", " right[0] += self.gap_size / 2\n", " right[1] += self.kill_sensors_height / 2\n", " debug_draw.draw_segment(left, right, (1, 0, 0), line_width=0.4)\n", "\n", "\n", "class FindGoos(b2d.QueryCallback):\n", " def __init__(self):\n", " super(FindGoos, self).__init__()\n", " self.goos = []\n", "\n", " def report_fixture(self, fixture):\n", " body = fixture.body\n", " if body.user_data == \"goo\":\n", " self.goos.append(body)\n", " return True\n", "\n", "\n", "class Goo(TestbedBase):\n", "\n", " name = \"Goo\"\n", "\n", " def __init__(self, settings=None):\n", " super(Goo, self).__init__(settings=settings)\n", "\n", " self.goo_graph = networkx.Graph()\n", " self.level = Level(testbed=self)\n", "\n", " # mouse related\n", " self.last_mouse_pos = None\n", " self.is_mouse_down = False\n", " self.could_place_goo_when_mouse_was_down = False\n", "\n", " # callback to draw tentative placement\n", " self.draw_callback = None\n", "\n", " # goos marked for destruction\n", " self.goo_to_destroy = []\n", "\n", " # joints marked for destruction\n", " self.joints_to_destroy = []\n", " self.gamma = 0.003\n", " self.break_threshold = 0.5\n", "\n", " # time point when goo can be inserted\n", " self.insert_time_point = 0\n", " self.insert_delay = 1.0\n", "\n", " # handle finishing of level\n", " self.with_goal_contact = dict()\n", "\n", " # amount of seconds one has to be in the finishing zone\n", " self.win_delay = 3.0\n", "\n", " # particle system will be defined an used on win!\n", " # this is then used for some kind of fireworks\n", " self.psystem = None\n", " self.emitter = None\n", " self.emitter_stop_time = None\n", " self.emitter_start_time = None\n", "\n", " # trigger some fireworks on win\n", " def on_win(self, win_body):\n", "\n", " if self.psystem is None:\n", " # particle system\n", " pdef = b2d.particle_system_def(\n", " viscous_strength=0.9,\n", " spring_strength=0.0,\n", " damping_strength=100.5,\n", " pressure_strength=1.0,\n", " color_mixing_strength=0.05,\n", " density=0.1,\n", " )\n", "\n", " self.psystem = self.world.create_particle_system(pdef)\n", " self.psystem.radius = 0.1\n", " self.psystem.damping = 0.5\n", "\n", " emitter_def = b2d.RandomizedRadialEmitterDef()\n", " emitter_def.emite_rate = 2000\n", " emitter_def.lifetime = 0.9\n", " emitter_def.enabled = True\n", " emitter_def.inner_radius = 0.0\n", " emitter_def.outer_radius = 0.1\n", " emitter_def.velocity_magnitude = 1000.0\n", " emitter_def.start_angle = 0\n", " emitter_def.stop_angle = 2 * math.pi\n", " emitter_def.transform = b2d.Transform(\n", " win_body.position + b2d.vec2(0, 20), b2d.Rot(0)\n", " )\n", " self.emitter = b2d.RandomizedRadialEmitter(self.psystem, emitter_def)\n", " self.emitter_stop_time = self.elapsed_time + 0.2\n", "\n", " def draw_goo(self, pos, angle, body=None):\n", " self.debug_draw.draw_solid_circle(pos, 1, axis=None, color=(1, 0, 1))\n", " self.debug_draw.draw_circle(pos, 1.1, (1, 1, 1), line_width=0.1)\n", "\n", " if body is not None:\n", " centers = [\n", " body.get_world_point((-0.3, 0.2)),\n", " body.get_world_point((0.3, 0.2)),\n", " ]\n", " for center in centers:\n", " self.debug_draw.draw_solid_circle(\n", " center, 0.4, axis=None, color=(1, 1, 1)\n", " )\n", " self.debug_draw.draw_solid_circle(\n", " center, 0.2, axis=None, color=(0, 0, 0)\n", " )\n", "\n", " def draw_edge(self, pos_a, pos_b, stress):\n", " no_stress = numpy.array([1, 1, 1])\n", " has_stress = numpy.array([1, 0, 0])\n", " color = (1.0 - stress) * no_stress + stress * has_stress\n", " color = tuple([float(c) for c in color])\n", " self.debug_draw.draw_segment(pos_a, pos_b, color=color, line_width=0.4)\n", "\n", " def insert_goo(self, pos, static=False):\n", " if static:\n", " f = self.world.create_static_body\n", " else:\n", " f = self.world.create_dynamic_body\n", "\n", " goo = f(\n", " position=pos,\n", " fixtures=b2d.fixture_def(shape=b2d.circle_shape(radius=1), density=1),\n", " user_data=\"goo\",\n", " )\n", " self.goo_graph.add_node(goo)\n", " return goo\n", "\n", " def connect_goos(self, goo_a, goo_b):\n", " length = (goo_a.position - goo_b.position).length\n", " joint = self.world.create_distance_joint(\n", " goo_a,\n", " goo_b,\n", " stiffness=500,\n", " damping=0.1,\n", " length=length,\n", " user_data=dict(length=length, stress=0),\n", " )\n", " self.goo_graph.add_edge(goo_a, goo_b, joint=joint)\n", "\n", " def query_placement(self, pos):\n", "\n", " radius = 8\n", "\n", " # find all goos in around pos\n", " pos = b2d.vec2(pos)\n", " box = b2d.aabb(\n", " lower_bound=pos - b2d.vec2(radius, radius),\n", " upper_bound=pos + b2d.vec2(radius, radius),\n", " )\n", " query = FindGoos()\n", " self.world.query_aabb(query, box)\n", " goos = query.goos\n", " n_goos = len(goos)\n", "\n", " if n_goos >= 2:\n", "\n", " # try to insert to goo as edge between\n", " # 2 existing goos\n", " def distance(a, b, p):\n", " if self.goo_graph.has_edge(a[0], b[0]):\n", " return float(\"inf\")\n", " return numpy.linalg.norm((a[1] + b[1]) / 2 - p)\n", "\n", " i, j, best_dist = best_pairwise_distance(\n", " goos,\n", " f=lambda goo: (goo, numpy.array(goo.position)),\n", " distance=partial(distance, p=pos),\n", " )\n", "\n", " if best_dist < 0.8:\n", "\n", " def draw_callback():\n", " self.draw_edge(goos[i].position, goos[j].position, stress=0)\n", "\n", " def insert_callack():\n", " self.connect_goos(goos[i], goos[j])\n", "\n", " return True, draw_callback, insert_callack\n", "\n", " # try to insert the goo as brand new\n", " # goo and connect it with 2 existing goos\n", " f = lambda goo: (goo, (goo.position - b2d.vec2(pos)).length)\n", "\n", " def distance(a, b):\n", " if not self.goo_graph.has_edge(a[0], b[0]):\n", " return float(\"inf\")\n", " return a[1] + b[1]\n", "\n", " i, j, best_dist = best_pairwise_distance(goos, f=f, distance=distance)\n", " if best_dist < float(\"inf\"):\n", "\n", " def draw_callback():\n", "\n", " self.draw_edge(pos, goos[i].position, stress=0)\n", " self.draw_edge(pos, goos[j].position, stress=0)\n", " self.draw_goo(pos, angle=None)\n", "\n", " def insert_callack():\n", " goo = self.insert_goo(pos=pos)\n", " self.connect_goos(goo, goos[i])\n", " self.connect_goos(goo, goos[j])\n", "\n", " return True, draw_callback, insert_callack\n", "\n", " return False, None, None\n", "\n", " def on_mouse_down(self, pos):\n", " self.last_mouse_pos = pos\n", " self.is_mouse_down = True\n", " can_be_placed, draw_callback, insert_callback = self.query_placement(pos)\n", " self.could_place_goo_when_mouse_was_down = can_be_placed\n", " if can_be_placed:\n", " if self.elapsed_time < self.insert_time_point:\n", " return True\n", " self.draw_callback = draw_callback\n", " return True\n", " return False\n", "\n", " def on_mouse_move(self, pos):\n", " self.last_mouse_pos = pos\n", " if self.is_mouse_down:\n", " can_be_placed, draw_callback, insert_callback = self.query_placement(pos)\n", " if can_be_placed:\n", " if self.elapsed_time < self.insert_time_point:\n", " return True\n", " self.draw_callback = draw_callback\n", " return True\n", " else:\n", " self.draw_callback = None\n", " return self.could_place_goo_when_mouse_was_down\n", "\n", " def on_mouse_up(self, pos):\n", " self.last_mouse_pos = pos\n", " self.is_mouse_down = False\n", " self.draw_callback = None\n", " can_be_placed, draw_callback, insert_callback = self.query_placement(pos)\n", " if can_be_placed:\n", " if self.elapsed_time < self.insert_time_point:\n", " return True\n", " # self.draw_callback = draw_callback\n", " insert_callback()\n", " self.insert_time_point = self.elapsed_time + self.insert_delay\n", " return True\n", " return False\n", "\n", " def begin_contact(self, contact):\n", " body_a = contact.body_a\n", " body_b = contact.body_b\n", " if body_b.user_data == \"goo\":\n", " body_a, body_b = body_b, body_a\n", "\n", " user_data_a = body_a.user_data\n", " user_data_b = body_b.user_data\n", " if body_a.user_data == \"goo\":\n", " if user_data_b == \"destroyer\":\n", " self.goo_to_destroy.append(body_a)\n", " elif user_data_b == \"goal\":\n", " self.with_goal_contact[body_a] = self.elapsed_time + self.win_delay\n", "\n", " def end_contact(self, contact):\n", " body_a = contact.body_a\n", " body_b = contact.body_b\n", " if body_b.user_data == \"goo\":\n", " body_a, body_b = body_b, body_a\n", "\n", " user_data_a = body_a.user_data\n", " user_data_b = body_b.user_data\n", " if body_a.user_data == \"goo\":\n", " if user_data_b == \"goal\":\n", " if body_a in self.with_goal_contact:\n", " del self.with_goal_contact[body_a]\n", "\n", " def pre_step(self, dt):\n", "\n", " # query if goo can be inserted\n", " if (\n", " self.is_mouse_down\n", " and self.last_mouse_pos is not None\n", " and self.draw_callback is None\n", " ):\n", " can_be_placed, draw_callback, insert_callback = self.query_placement(\n", " self.last_mouse_pos\n", " )\n", " if can_be_placed and self.elapsed_time >= self.insert_time_point:\n", " self.draw_callback = draw_callback\n", "\n", " # compute joint stress\n", " for goo_a, goo_b, joint in self.goo_graph.edges(data=\"joint\"):\n", " jd = joint.user_data\n", "\n", " # distance based stress\n", " insert_length = jd[\"length\"]\n", " length = (goo_a.position - goo_b.position).length\n", "\n", " d = length - insert_length\n", " if d > 0:\n", "\n", " # reaction force based stress\n", " rf = joint.get_reaction_force(30).length\n", "\n", " normalized_rf = 1.0 - math.exp(-rf * self.gamma)\n", "\n", " jd[\"stress\"] = normalized_rf / self.break_threshold\n", " if normalized_rf > self.break_threshold:\n", " self.joints_to_destroy.append((goo_a, goo_b, joint))\n", "\n", " else:\n", " jd[\"stress\"] = 0\n", "\n", " for goo_a, goo_b, joint in self.joints_to_destroy:\n", " self.goo_graph.remove_edge(u=goo_a, v=goo_b)\n", " self.world.destroy_joint(joint)\n", " self.joints_to_destroy = []\n", "\n", " # destroy goos\n", " for goo in self.goo_to_destroy:\n", " self.goo_graph.remove_node(goo)\n", " self.world.destroy_body(goo)\n", "\n", " # destroy all with wrong degree\n", " while True:\n", " destroyed_any = False\n", " to_remove = []\n", " for goo in self.goo_graph.nodes:\n", " if self.goo_graph.degree(goo) < 2:\n", " destroyed_any = True\n", " to_remove.append(goo)\n", " if not destroyed_any:\n", " break\n", " for goo in to_remove:\n", " self.goo_graph.remove_node(goo)\n", " self.world.destroy_body(goo)\n", " self.goo_to_destroy = []\n", "\n", " # check if we are done\n", " for goo, finish_time in self.with_goal_contact.items():\n", " if finish_time <= self.elapsed_time:\n", " self.on_win(goo)\n", "\n", " if self.emitter is not None:\n", " if self.emitter_stop_time is not None:\n", " if self.elapsed_time > self.emitter_stop_time:\n", " self.emitter.enabled = False\n", " self.emitter_start_time = self.elapsed_time + 0.4\n", " self.emitter_stop_time = None\n", " p = list(self.emitter.position)\n", " p[0] += (random.random() - 0.5) * 10.0\n", " p[1] += (random.random() - 0.5) * 2.0\n", " self.emitter.position = p\n", " if self.emitter_start_time is not None:\n", " if self.elapsed_time > self.emitter_start_time:\n", " self.emitter.enabled = True\n", " self.emitter_start_time = None\n", " self.emitter_stop_time = self.elapsed_time + 0.2\n", " self.emitter.step(dt)\n", "\n", " def post_debug_draw(self):\n", "\n", " self.level.draw(self.debug_draw)\n", "\n", " # draw mouse when mouse is down\n", " if (\n", " self.is_mouse_down\n", " and self.last_mouse_pos is not None\n", " and self.draw_callback is None\n", " ):\n", " d = (self.insert_time_point - self.elapsed_time) / self.insert_delay\n", " if d > 0:\n", " d = d * math.pi * 2\n", " x = math.sin(d)\n", " y = math.cos(d)\n", " p = self.last_mouse_pos[0] + x, self.last_mouse_pos[1] + y\n", " self.debug_draw.draw_segment(\n", " p, self.last_mouse_pos, color=(1, 0, 0), line_width=0.2\n", " )\n", " self.debug_draw.draw_circle(\n", " self.last_mouse_pos, 1, (1, 0, 0), line_width=0.2\n", " )\n", "\n", " # draw the tentative placement\n", " if self.draw_callback is not None:\n", " self.draw_callback()\n", "\n", " for goo_a, goo_b, joint in self.goo_graph.edges(data=\"joint\"):\n", " self.draw_edge(\n", " goo_a.position, goo_b.position, stress=joint.user_data[\"stress\"]\n", " )\n", "\n", " for goo in self.goo_graph:\n", " self.draw_goo(goo.position, goo.angle, body=goo)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Controlls\n", "* To play this game, click and drag next to the existing \"goos\"\n", "* try to bridge the tiny gap\n", "* Use the mouse-wheel to zoom in/out, a\n", "* Click and drag in the empty space to translate the view." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "s = JupyterAsyncGui.Settings()\n", "s.resolution = [1000,500]\n", "s.scale = 8\n", "tb = b2d.testbed.run(Goo, backend=JupyterAsyncGui, gui_settings=s);" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.4" } }, "nbformat": 4, "nbformat_minor": 4 }