run_simulated_annealing module

Simulated annealing algorithm used to match the simulation of the Antennae to the observations by comparing binarized images.

Source code
"""Simulated annealing algorithm used to match the simulation of the
Antennae to the observations by comparing binarized images."""

import numpy as np
import scipy
import pickle
from scipy import ndimage
from fast_histogram import histogram2d
from scipy.signal import convolve2d
from numba import jit
from matplotlib.image import imread
import datetime

from simulation import Simulation, Galaxy

T = .25 # Initial tempreature
STEPS = 0
DECAY = .998 # Exponential decay factor

def simToBitmap(sim, theta, phi, view, scale, x, y, galaxy):
    """Obtain a bitmap of one galaxy as viewed from a given direction.
       The binning has been chosen so that the scale and the offset (x,y)
       are expected to be approximately 1 and (0, 0) respectively.

        Parameters:
            sim (Simulation): Simulation to project.
            theta (float): polar angle of viewing direction.
            phi (float): azimuthal angle of viewing direction.
            view (float): rotation angle along viewing direction.
            scale (float): scaling factor.
            x (float): x offset
            y (float): y offset
            galaxy (int): galaxy to plot. Either 0, or 1. They are assumed    
                to have the same number of particles.
    """
    # Obtain components in new x',y' plane
    r_vec = (sim.project(theta, phi, view) - [[x+.12,y+1.3]]) * scale
    # Select a single galaxy. We match them separately in the algorithm.
    if galaxy==0: r_vec = r_vec[2:len(r_vec)//2-1] #omit central masses
    if galaxy==1: r_vec = r_vec[len(r_vec)//2-1:]
    # Use a fast histogram, much faster than numpy ()
    H = histogram2d(r_vec[:,0], r_vec[:,1], 
        range=[[-5,5], [-5,5]], bins=(50, 50))
    im = np.zeros((50, 50))
    H = convolve2d(H, np.ones((2,2)), mode='same') # Smooth the image
    im[H>=1] = 1 # Binary the image
    
    return im

@jit(nopython=True) # Numba annotation
def bitmapScoreAlgo(im1, im2):
    """Computes the f1 score betwen two binarized images

    Parameters
        im1 (arr): nxm ground truth image
        im2 (arr): nxm candidate image
    
    Returns
        f1 score
    """
    TP = np.sum((im1==1.0) & (im2==1.0))
    TN = np.sum((im1==0.0) & (im2==0.0))
    FP = np.sum((im1==0.0) & (im2==1.0))
    FN = np.sum((im1==1.0) & (im2==0.0))
    if TP==0: return 0
    precision = TP/(TP+FP) 
    recall = TP/(TP+FN)
    return 2*precision*recall / (precision + recall)

# The matching algorithm attempts to improve the match by shifting the
# image by one pixel in each direction. If none improve the score the
# f1-score of said local maximum is retuned. To make this highly efficient
# (as this is run millions of time) we explicitely write functions to 
# shift an image by 1 pixel in each direction, as these can then be compiled
# using Numba (jit annotation) to low-level code.
# Performance is crucial here and must sadly be prioritized over conciseness
@jit(nopython=True)
def shiftBottom(im, im2):
    """Shifts an image by one pixel downwards.
    
    Parameters:
        im (arr): the nxm image to shift by one pixel.
        im2 (arr): an nxm array where the new image will be placed.

    Returns:
        A reference to im2
    """
    im2[1:] = im[:-1]
    im2[0] = 0
    return im2

@jit(nopython=True)
def shiftTop(im, im2):
    """Shifts an image by one pixel upwards."""
    im2[:-1] = im[1:]
    im2[-1] = 0
    return im2

@jit(nopython=True)
def shiftLeft(im, im2):
    """Shifts an image by one pixel to the left."""
    im2[:,1:] = im[:,:-1]
    im2[:,0] = 0
    return im2

@jit(nopython=True)
def shiftRight(im, im2):
    """Shifts an image by one pixel to the right."""
    im2[:,:-1] = im[:,1:]
    im2[:,-1] = 0
    return im2

@jit
def bitmapScore(im1, im2, _prev=None, _bestscore=None, _zeros=None):
    """Computes the bitmap score between two images. This is the f1-score
       but we allow the algorithm to attempt to improve the score by
       shifting the images. The algorithm is implemented recursively.

    Parameters:
        im1 (array): The ground truth nxm image.
        im2 (array): The candidate nxm imgae.
        _prev: Used internally for recursion.
        _bestscore: Used internally for recursion.
        _zeros: Used internally for recursion.
    
    Returns:
        f1-score for the two images.
    """
    # When the function is called externally, initialize an array of zeros
    # and compute the score for no shifting. The zeros array is used for
    # performance to only create a new array once.
    if _bestscore is None:
        _bestscore = bitmapScoreAlgo(im1, im2)
        _zeros = np.zeros_like(im2)
    # Attempt to improve the score by shifting the image in a direction
    # Keeping track of _prev allows to not 'go back' and undo and attempt
    # to undo a shift needlessly. 
    if _prev is not 0: # try left
        shifted = shiftLeft(im2, _zeros)
        score = bitmapScoreAlgo(im1, shifted)
        if score > _bestscore: return bitmapScore(im1, shifted, 
            _prev=1, _bestscore=score, _zeros=_zeros)
    if _prev is not 1: # try right
        shifted = shiftRight(im2, _zeros)
        score = bitmapScoreAlgo(im1, shifted)
        if score > _bestscore: return bitmapScore(im1, shifted, 
            _prev=0, _bestscore=score, _zeros=_zeros)
    if _prev is not 2: # try top
        shifted = shiftTop(im2, _zeros) 
        score = bitmapScoreAlgo(im1, shifted)
        if score > _bestscore: return bitmapScore(im1, shifted, 
            _prev=3, _bestscore=score, _zeros=_zeros)
    if _prev is not 3: # try bottom
        shifted = shiftBottom(im2, _zeros)
        score = bitmapScoreAlgo(im1, shifted)
        if score > _bestscore: return bitmapScore(im1, shifted, 
            _prev=2, _bestscore=score, _zeros=_zeros)
    # Return _bestscore if shifting did not improve (local maximum).
    return _bestscore


def attemptSimulation(theta1, phi1, theta2, phi2, rmin=1, e=.5, R=2, 
        disk1=.75, disk2=.65, mu=1, plot=False, steps=2000):
    """Runs a simulation with the given parameters and compares it 
    to observations of the antennae to return a score out of 2.

    Parameters:
        theta1 (float): polar angle for the spin of the first galaxy.
        phi1 (float): azimuthal angle for the spin of the first galaxy.
        theta2 (float): polar angle for the spin of the second galaxy.
        phi2 (float): azimuthal angle for the spin of the second galaxy.
        rmin (float): closest distance of approach of orbit.
        e (float): eccentricity of the orbit.
        R (float): initial separation
        disk1 (float): radius of the uniform disk of the first galaxy
        disk2 (float): radius of the uniform disk of the first galaxy
        mu (float): ratio of masses of the two galaxies
        plot (float): if true the simulation will be saved to data/annealing/
        steps (float): number of times the f1 score will be computed, along
            random viewing directions per 100 timesteps.

    Returns:
        f1-score: score obtained by the simulation. 
    """

    # Initialize the simulation
    sim = Simulation(dt=1E-3, soft=0.1, verbose=False, CONFIG=None, method='bruteForceNumbaOptimized')
    galaxy1 = Galaxy(orientation = (theta1, phi1), centralMass=2/(1+mu),
        sim=sim, bulge={'model':'plummer', 'particles':0, 'totalMass':0, 'l':0},
        disk={'model':'uniform', 'particles':2000, 'l':disk1, 'totalMass':0},
        halo={'model':'NFW', 'particles':0, 'rs':1, 'totalMass':0})
    galaxy2 = Galaxy(orientation = (theta2, phi2), centralMass=2*mu/(1+mu),
        sim=sim, bulge={'model':'plummer', 'particles':0, 'totalMass':0, 'l':0},
        disk={'model':'uniform', 'particles':2000, 'l':disk2, 'totalMass':0},
        halo={'model':'NFW', 'particles':0, 'rs':1, 'totalMass':0})
    sim.setOrbit(galaxy1, galaxy2, e=e, R0=R, rmin=rmin) # define the orbit

    # Run the simulation manually
    # Initialize the parameters consistently with the velocities
    sim.rprev_vec = sim.r_vec - sim.v_vec * sim.dt
    # Keep track of the best score
    bestScore = 0
    # and its corresponding binarized image and parameters
    bestImage, bestParams = 0, 0
    hasReachedPericenter = False

    # Run until tmax = 25
    for i in range(25001):
        sim.step()
        if i%100==0: # Every $\Delta t = 0.1$
            # Check if we are close to pericenter
            centers = sim.r_vec[sim.type[:,0] == 'center']
            if np.linalg.norm(centers[0] - centers[1]) < 1.3*rmin: 
                hasReachedPericenter = True
            # Do not evaluate the f1-score until pericenter.
            if not hasReachedPericenter: continue

            # Check multiple (steps) viewing directions at random
            localBestScore = 0
            localBestImage, localBestParams = 0, 0
            for j in range(steps):
                # The viewing directions are isotropically distributed
                theta = np.arccos(np.random.uniform(-1, 1))
                phi = np.random.uniform(0, 2*np.pi)
                # Rotation along line of sight
                view = np.random.uniform(0, 2*np.pi)
                scale = np.random.uniform(0.6, 1.0)
                x = np.random.uniform(-1.0, 1.0) # Offsets
                y = np.random.uniform(-1.0, 1.0)
                # Get images for each galaxy and compute their score separately
                im1 = simToBitmap(sim, theta, phi, view, scale, x, y, galaxy=0)
                im2 = simToBitmap(sim, theta, phi, view, scale, x, y, galaxy=1)
                score = bitmapScore(GT1, im1) + bitmapScore(GT2, im2)
                if score > localBestScore: 
                    localBestScore = score
                    localBestImage = [im1,im2]
                    localBestParams = [i, theta, phi, view, scale, x, y]

            if bestScore < localBestScore: 
                bestScore = localBestScore
                bestImage = localBestImage
                bestParams = localBestParams
            if plot:
                sim.save('annealing', type='2D')
    
    print('Best score for this attempt', bestScore)
    print('using viewing parameters', bestParams)
    
    return bestScore


################################################################################
################################################################################

# Generate a (50, 50) bitmap for each galaxy
# They are stored globally in GT1 and GT2 (Ground Truth)
im = imread('literature/figs/g1c.tif')
im = np.mean(im, axis=-1)
im = scipy.misc.imresize(im, (50,50))
GT1 = np.zeros((50, 50))
GT1[im > 50] = 1

im = imread('literature/figs/g2c.tif')
im = np.mean(im, axis=-1)
im = scipy.misc.imresize(im, (50,50))
GT2 = np.zeros((50, 50))

# Define the limits and relate scale of the variations in each parameter
# In the same order as attemptSimulation
# phi1, theta1, phi2, theta2, rmin (fixed), e, R (fixed), disk1, disk2
LIMITS = np.array([[np.pi, 2 * np.pi], [-np.pi, np.pi],
                   [0, np.pi], [-np.pi, np.pi],
                   [1,1], [.5,1.0], [2.2,2.2], [.5,.8], [.5,.8]])
VARIATIONS = np.array([.08, .15, .08, .15, 0, .01, 0, .01, .01])

# Choose a random starting point and evaluate it
bestparams = [np.random.uniform(*l) for l in LIMITS]
log = []
bestscore = attemptSimulation(*bestparams, steps=500)
print('Starting with score', bestscore, 'with parameters', bestparams)
log.append([bestscore, bestparams, True])

for i in range(STEPS):
    T = T * DECAY #exponential decay
    # Perturb the parameters
    params = bestparams + np.random.normal(scale=VARIATIONS) * 2 * T / 0.04
    # Allow the angles from $-\pi$ to $\pi$ to wrap around
    for j in [1,3]:
        params[j] = np.mod(params[j] - LIMITS[j][0], LIMITS[j][1] - LIMITS[j][0])
        params[j] += LIMITS[j][0]
    # Clip parameters outside their allowed range
    params = np.clip(params, LIMITS[:, 0], LIMITS[:, 1])
    # Evaluate the score for this attempt, use more steps as i progresses
    # so as to reduce the noise in the evaluation of the score
    score = attemptSimulation(*params, steps=500 + i)
    # Perform simulated annealing with a typical exponential rule
    if score > bestscore or np.exp(-(bestscore-score)/T) > np.random.rand():
        # Flip to this new point
        print('NEW BEST ____', i, T, score, params)
        bestscore = score
        bestparams = params
        log.append([score, params, True])
    else: # Remain in the old point
        log.append([score, params, False])
    # Save the progress for future plotting
    pickle.dump(log, open('data/logs/logb.pickle', "wb" ) )   

Functions

def attemptSimulation(theta1, phi1, theta2, phi2, rmin=1, e=0.5, R=2, disk1=0.75, disk2=0.65, mu=1, plot=False, steps=2000)

Runs a simulation with the given parameters and compares it to observations of the antennae to return a score out of 2.

Parameters

theta1 : float
polar angle for the spin of the first galaxy.
phi1 : float
azimuthal angle for the spin of the first galaxy.
theta2 : float
polar angle for the spin of the second galaxy.
phi2 : float
azimuthal angle for the spin of the second galaxy.
rmin : float
closest distance of approach of orbit.
e : float
eccentricity of the orbit.
R : float
initial separation
disk1 : float
radius of the uniform disk of the first galaxy
disk2 : float
radius of the uniform disk of the first galaxy
mu : float
ratio of masses of the two galaxies
plot : float
if true the simulation will be saved to data/annealing/
steps : float
number of times the f1 score will be computed, along random viewing directions per 100 timesteps.

Returns

f1-score: score obtained by the simulation.

Source code
def attemptSimulation(theta1, phi1, theta2, phi2, rmin=1, e=.5, R=2, 
        disk1=.75, disk2=.65, mu=1, plot=False, steps=2000):
    """Runs a simulation with the given parameters and compares it 
    to observations of the antennae to return a score out of 2.

    Parameters:
        theta1 (float): polar angle for the spin of the first galaxy.
        phi1 (float): azimuthal angle for the spin of the first galaxy.
        theta2 (float): polar angle for the spin of the second galaxy.
        phi2 (float): azimuthal angle for the spin of the second galaxy.
        rmin (float): closest distance of approach of orbit.
        e (float): eccentricity of the orbit.
        R (float): initial separation
        disk1 (float): radius of the uniform disk of the first galaxy
        disk2 (float): radius of the uniform disk of the first galaxy
        mu (float): ratio of masses of the two galaxies
        plot (float): if true the simulation will be saved to data/annealing/
        steps (float): number of times the f1 score will be computed, along
            random viewing directions per 100 timesteps.

    Returns:
        f1-score: score obtained by the simulation. 
    """

    # Initialize the simulation
    sim = Simulation(dt=1E-3, soft=0.1, verbose=False, CONFIG=None, method='bruteForceNumbaOptimized')
    galaxy1 = Galaxy(orientation = (theta1, phi1), centralMass=2/(1+mu),
        sim=sim, bulge={'model':'plummer', 'particles':0, 'totalMass':0, 'l':0},
        disk={'model':'uniform', 'particles':2000, 'l':disk1, 'totalMass':0},
        halo={'model':'NFW', 'particles':0, 'rs':1, 'totalMass':0})
    galaxy2 = Galaxy(orientation = (theta2, phi2), centralMass=2*mu/(1+mu),
        sim=sim, bulge={'model':'plummer', 'particles':0, 'totalMass':0, 'l':0},
        disk={'model':'uniform', 'particles':2000, 'l':disk2, 'totalMass':0},
        halo={'model':'NFW', 'particles':0, 'rs':1, 'totalMass':0})
    sim.setOrbit(galaxy1, galaxy2, e=e, R0=R, rmin=rmin) # define the orbit

    # Run the simulation manually
    # Initialize the parameters consistently with the velocities
    sim.rprev_vec = sim.r_vec - sim.v_vec * sim.dt
    # Keep track of the best score
    bestScore = 0
    # and its corresponding binarized image and parameters
    bestImage, bestParams = 0, 0
    hasReachedPericenter = False

    # Run until tmax = 25
    for i in range(25001):
        sim.step()
        if i%100==0: # Every $\Delta t = 0.1$
            # Check if we are close to pericenter
            centers = sim.r_vec[sim.type[:,0] == 'center']
            if np.linalg.norm(centers[0] - centers[1]) < 1.3*rmin: 
                hasReachedPericenter = True
            # Do not evaluate the f1-score until pericenter.
            if not hasReachedPericenter: continue

            # Check multiple (steps) viewing directions at random
            localBestScore = 0
            localBestImage, localBestParams = 0, 0
            for j in range(steps):
                # The viewing directions are isotropically distributed
                theta = np.arccos(np.random.uniform(-1, 1))
                phi = np.random.uniform(0, 2*np.pi)
                # Rotation along line of sight
                view = np.random.uniform(0, 2*np.pi)
                scale = np.random.uniform(0.6, 1.0)
                x = np.random.uniform(-1.0, 1.0) # Offsets
                y = np.random.uniform(-1.0, 1.0)
                # Get images for each galaxy and compute their score separately
                im1 = simToBitmap(sim, theta, phi, view, scale, x, y, galaxy=0)
                im2 = simToBitmap(sim, theta, phi, view, scale, x, y, galaxy=1)
                score = bitmapScore(GT1, im1) + bitmapScore(GT2, im2)
                if score > localBestScore: 
                    localBestScore = score
                    localBestImage = [im1,im2]
                    localBestParams = [i, theta, phi, view, scale, x, y]

            if bestScore < localBestScore: 
                bestScore = localBestScore
                bestImage = localBestImage
                bestParams = localBestParams
            if plot:
                sim.save('annealing', type='2D')
    
    print('Best score for this attempt', bestScore)
    print('using viewing parameters', bestParams)
    
    return bestScore
def bitmapScore(im1, im2)

Computes the bitmap score between two images. This is the f1-score but we allow the algorithm to attempt to improve the score by shifting the images. The algorithm is implemented recursively.

Parameters

im1 : array
The ground truth nxm image.
im2 : array
The candidate nxm imgae.
_prev
Used internally for recursion.
_bestscore
Used internally for recursion.
_zeros
Used internally for recursion.

Returns

f1-score for the two images.

Source code
@jit
def bitmapScore(im1, im2, _prev=None, _bestscore=None, _zeros=None):
    """Computes the bitmap score between two images. This is the f1-score
       but we allow the algorithm to attempt to improve the score by
       shifting the images. The algorithm is implemented recursively.

    Parameters:
        im1 (array): The ground truth nxm image.
        im2 (array): The candidate nxm imgae.
        _prev: Used internally for recursion.
        _bestscore: Used internally for recursion.
        _zeros: Used internally for recursion.
    
    Returns:
        f1-score for the two images.
    """
    # When the function is called externally, initialize an array of zeros
    # and compute the score for no shifting. The zeros array is used for
    # performance to only create a new array once.
    if _bestscore is None:
        _bestscore = bitmapScoreAlgo(im1, im2)
        _zeros = np.zeros_like(im2)
    # Attempt to improve the score by shifting the image in a direction
    # Keeping track of _prev allows to not 'go back' and undo and attempt
    # to undo a shift needlessly. 
    if _prev is not 0: # try left
        shifted = shiftLeft(im2, _zeros)
        score = bitmapScoreAlgo(im1, shifted)
        if score > _bestscore: return bitmapScore(im1, shifted, 
            _prev=1, _bestscore=score, _zeros=_zeros)
    if _prev is not 1: # try right
        shifted = shiftRight(im2, _zeros)
        score = bitmapScoreAlgo(im1, shifted)
        if score > _bestscore: return bitmapScore(im1, shifted, 
            _prev=0, _bestscore=score, _zeros=_zeros)
    if _prev is not 2: # try top
        shifted = shiftTop(im2, _zeros) 
        score = bitmapScoreAlgo(im1, shifted)
        if score > _bestscore: return bitmapScore(im1, shifted, 
            _prev=3, _bestscore=score, _zeros=_zeros)
    if _prev is not 3: # try bottom
        shifted = shiftBottom(im2, _zeros)
        score = bitmapScoreAlgo(im1, shifted)
        if score > _bestscore: return bitmapScore(im1, shifted, 
            _prev=2, _bestscore=score, _zeros=_zeros)
    # Return _bestscore if shifting did not improve (local maximum).
    return _bestscore
def bitmapScoreAlgo(im1, im2)

Computes the f1 score betwen two binarized images

Parameters
im1 (arr): nxm ground truth image im2 (arr): nxm candidate image
Returns
f1 score
Source code
@jit(nopython=True) # Numba annotation
def bitmapScoreAlgo(im1, im2):
    """Computes the f1 score betwen two binarized images

    Parameters
        im1 (arr): nxm ground truth image
        im2 (arr): nxm candidate image
    
    Returns
        f1 score
    """
    TP = np.sum((im1==1.0) & (im2==1.0))
    TN = np.sum((im1==0.0) & (im2==0.0))
    FP = np.sum((im1==0.0) & (im2==1.0))
    FN = np.sum((im1==1.0) & (im2==0.0))
    if TP==0: return 0
    precision = TP/(TP+FP) 
    recall = TP/(TP+FN)
    return 2*precision*recall / (precision + recall)
def shiftBottom(im, im2)

Shifts an image by one pixel downwards.

Parameters

im : arr
the nxm image to shift by one pixel.
im2 : arr
an nxm array where the new image will be placed.

Returns

A reference to im2

Source code
@jit(nopython=True)
def shiftBottom(im, im2):
    """Shifts an image by one pixel downwards.
    
    Parameters:
        im (arr): the nxm image to shift by one pixel.
        im2 (arr): an nxm array where the new image will be placed.

    Returns:
        A reference to im2
    """
    im2[1:] = im[:-1]
    im2[0] = 0
    return im2
def shiftLeft(im, im2)

Shifts an image by one pixel to the left.

Source code
@jit(nopython=True)
def shiftLeft(im, im2):
    """Shifts an image by one pixel to the left."""
    im2[:,1:] = im[:,:-1]
    im2[:,0] = 0
    return im2
def shiftRight(im, im2)

Shifts an image by one pixel to the right.

Source code
@jit(nopython=True)
def shiftRight(im, im2):
    """Shifts an image by one pixel to the right."""
    im2[:,:-1] = im[:,1:]
    im2[:,-1] = 0
    return im2
def shiftTop(im, im2)

Shifts an image by one pixel upwards.

Source code
@jit(nopython=True)
def shiftTop(im, im2):
    """Shifts an image by one pixel upwards."""
    im2[:-1] = im[1:]
    im2[-1] = 0
    return im2
def simToBitmap(sim, theta, phi, view, scale, x, y, galaxy)

Obtain a bitmap of one galaxy as viewed from a given direction. The binning has been chosen so that the scale and the offset (x,y) are expected to be approximately 1 and (0, 0) respectively.

Parameters: sim (Simulation): Simulation to project. theta (float): polar angle of viewing direction. phi (float): azimuthal angle of viewing direction. view (float): rotation angle along viewing direction. scale (float): scaling factor. x (float): x offset y (float): y offset galaxy (int): galaxy to plot. Either 0, or 1. They are assumed
to have the same number of particles.

Source code
def simToBitmap(sim, theta, phi, view, scale, x, y, galaxy):
    """Obtain a bitmap of one galaxy as viewed from a given direction.
       The binning has been chosen so that the scale and the offset (x,y)
       are expected to be approximately 1 and (0, 0) respectively.

        Parameters:
            sim (Simulation): Simulation to project.
            theta (float): polar angle of viewing direction.
            phi (float): azimuthal angle of viewing direction.
            view (float): rotation angle along viewing direction.
            scale (float): scaling factor.
            x (float): x offset
            y (float): y offset
            galaxy (int): galaxy to plot. Either 0, or 1. They are assumed    
                to have the same number of particles.
    """
    # Obtain components in new x',y' plane
    r_vec = (sim.project(theta, phi, view) - [[x+.12,y+1.3]]) * scale
    # Select a single galaxy. We match them separately in the algorithm.
    if galaxy==0: r_vec = r_vec[2:len(r_vec)//2-1] #omit central masses
    if galaxy==1: r_vec = r_vec[len(r_vec)//2-1:]
    # Use a fast histogram, much faster than numpy ()
    H = histogram2d(r_vec[:,0], r_vec[:,1], 
        range=[[-5,5], [-5,5]], bins=(50, 50))
    im = np.zeros((50, 50))
    H = convolve2d(H, np.ones((2,2)), mode='same') # Smooth the image
    im[H>=1] = 1 # Binary the image
    
    return im