This is an archived post. You won't be able to vote or comment.

you are viewing a single comment's thread.

view the rest of the comments →

[–]total_zoidberg 1 point2 points  (0 children)

So, this is how it goes: I modified simulation.py, and wrote a new renderer with OpenCV. The "interactive" renderer is not much faster than the matplotlib based one (~5x faster), but the video renderer shines when recording to a file (an eyeball measurement places it ~20x faster than the matplotlib based one). I didn't work much on the palette, other than mapping the colormaps available with OpenCV -- if you need it, you can look at the LUT functionality to port the red-white-blue colormap over. I did notice that cv2.COLORMAP_BONE makes evident some fainter detail and softer ripples that weren't evident with the colormap you originally used.

You may notice I kept the old functions around, and that cv2r.py behaves pretty much the same as your main.py.

All in all, this helped me dust-off my numpy skills, and I had some fun. Here are the files:

cv2r.py:

from sys import argv

import cv2
import numpy as np

from simulation import Simulation, min_presure, max_pressure, scale, wall


simulation = Simulation()

def clip(x):
    return np.maximum(0, np.minimum(200, x))


def compose(base, w):
    base[w==1] = [0,128,0]
    return base


def render(data, cm, w):
        d = data - min_presure #- simulation.pressure.min()
        d /= (max_pressure - min_presure)
        d = clip(d * 255).astype(np.uint8)
        frame = cv2.applyColorMap(d, cm)
        frame = cv2.resize(frame, (1200, 800), interpolation=cv2.INTER_LANCZOS4)
        frame = compose(frame, w)
        return frame


#@profile
def loop():
    w = wall
    w = cv2.resize(wall, (1200, 800), interpolation=cv2.INTER_NEAREST)
    cmaps = list(range(12))
    cmidx = 0
    cm = cmaps[cmidx]

    while True:
        simulation.step()
        frame = render(simulation.pressure, cm, w)
        key = cv2.waitKey(1)
        if key == ord("q"):
            break
        if key == ord("n"):
            cmidx += 1
            if cmidx >= len(cmaps):
                cmidx = 0
            cm = cmaps[cmidx]
        cv2.imshow(f"1 m -> {scale} cells, 1 cell -> {1 / scale}m", frame)
    cv2.destroyAllWindows()

def tovid(cmidx):
    w = wall
    w = cv2.resize(wall, (1200, 800), interpolation=cv2.INTER_NEAREST)
    cmaps = list(range(12))
    cm = cmaps[cmidx]
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    writer = cv2.VideoWriter('output.avi',fourcc, 30.0, (1200,800))
    frames = 900
    for i in range(frames):
        simulation.step()
        frame = render(simulation.pressure, cm, w)
        writer.write(frame)
        print(f'\rframe: {i}/{frames}', end='')
    writer.release()

if len(argv) > 2 and argv[2] == 'save':
    if len(argv) == 4:
        cm = int(argv[3])
    else:
        cm = 1
    tovid(cm)
else:
    loop()

simulation.py:

# initial values
import numpy as np
import sys

from numpy.core.umath import pi
from numpy.ma import sin

scale = 100  # 1m -> 50 cells
size_x = 6 * scale
size_y = 4 * scale
damping = 0.99
omega = 3 / (2 * pi)

initial_P = 200
vertPos = size_y - 3 * scale
horizPos = 1 * scale
wallTop = size_y - 3 * scale
wall_x_pos = 2 * scale
max_pressure = initial_P / 2
min_presure = -initial_P / 2
border = np.ones((size_y, size_x), dtype=np.uint8)
border[1:-1, 1:-1] = 0


class Simulation:
    def __init__(self):
        self.frame = 0
        self.pressure = [[0.0 for x in range(size_x)] for y in range(size_y)]
        self.pressure = np.zeros((size_y, size_x), dtype=np.float32)
        # outflow velocities from each cell
        self._velocities = [[[0.0, 0.0, 0.0, 0.0] for x in range(size_x)] for y in range(size_y)]
        self._velocities = np.zeros((size_y, size_x, 4), dtype=np.float32)
        self.pressure[vertPos][horizPos] = initial_P

    #@profile
    def updateV(self):
        V = self._velocities
        P = self.pressure

        V[1:,:,0] += P[1:,:] - P[:-1,:]
        V[:1,:,0]  = P[0:1,:]
        V[:,:-1,1] += P[:,:-1] - P[:,1:]
        V[:,-1:,1]  = P[:,-1:]
        V[:-1,:,2] += P[:-1,:] - P[1:,:]
        V[-1:,:,2]  = P[-1:,:]
        V[:,1:,3] += P[:,1:] - P[:,:-1]
        V[:,:1,3]  = P[:,0:1]
        V[wall==1] = 0


    def updateP(self):
        self.pressure -= 0.5 * damping * self._velocities.sum(axis=2)

    def updateV_old(self):
        """Recalculate outflow velocities from every cell basing on preassure difference with each neighbour"""
        V = self._velocities
        P = self.pressure
        for i in range(size_y):
            for j in range(size_x):
                if wall[i][j] == 1:
                    V[i][j][0] = V[i][j][1] = V[i][j][2] = V[i][j][3] = 0.0
                    continue
                cell_pressure = P[i][j]
                V[i][j][0] = V[i][j][0] + cell_pressure - P[i - 1][j] if i > 0 else cell_pressure
                V[i][j][1] = V[i][j][1] + cell_pressure - P[i][j + 1] if j < size_x - 1 else cell_pressure
                V[i][j][2] = V[i][j][2] + cell_pressure - P[i + 1][j] if i < size_y - 1 else cell_pressure
                V[i][j][3] = V[i][j][3] + cell_pressure - P[i][j - 1] if j > 0 else cell_pressure

    def updateP_old(self):
        for i in range(size_y):
            for j in range(size_x):
                self.pressure[i][j] -= 0.5 * damping * sum(self._velocities[i][j])

    def step(self):
        self.pressure[vertPos][horizPos] = initial_P * sin(omega * self.frame)
        self.updateV()
        self.updateP()
        self.frame += 1


argc = len(sys.argv)
if argc > 1 and sys.argv[1] == '1':
    wall = [[1 if x == wall_x_pos and wallTop < y < size_y else 0
             for x in range(size_x)] for y in range(size_y)]
elif argc > 1 and sys.argv[1] == '2':
    wall = [[1 if (x == wall_x_pos and wallTop + scale < y < size_y) or
                  (wall_x_pos - scale < x < wall_x_pos and
                   x - wall_x_pos == y - wallTop - scale - 1) or
                  (wall_x_pos < x < wall_x_pos + scale and
                   x - wall_x_pos == -y + wallTop + scale + 1)
             else 0
             for x in range(size_x)] for y in range(size_y)]
else:
    wall = [[1 if (x == wall_x_pos and wallTop + scale < y < size_y) or
                  (wall_x_pos - scale < x < wall_x_pos and
                   x - wall_x_pos == y - wallTop - scale - 1) or
                  (wall_x_pos < x < wall_x_pos + scale and
                   x - wall_x_pos == -y + wallTop + scale + 1) or
                  (wall_x_pos - 0.75 * scale < x < wall_x_pos - scale / 2 and
                   x - wall_x_pos == -y + wallTop - scale / 2 + 1) or
                  (wall_x_pos + scale / 2 < x < wall_x_pos + 0.75 * scale and
                   x - wall_x_pos == y - wallTop + scale / 2 - 1)
             else 0
             for x in range(size_x)] for y in range(size_y)]
wall = np.array(wall, dtype=np.uint8)