"""Simulated annealing algorithm used to match the simulation of the Antennae to the observations by comparing binarized images.""" import numpy as np import scipy import pickle from scipy import ndimage from fast_histogram import histogram2d from scipy.signal import convolve2d from numba import jit from matplotlib.image import imread import datetime from simulation import Simulation, Galaxy T = .25 # Initial tempreature STEPS = 1500 DECAY = .998 # Exponential decay factor def simToBitmap(sim, theta, phi, view, scale, x, y, galaxy): """Obtain a bitmap of one galaxy as viewed from a given direction. The binning has been chosen so that the scale and the offset (x,y) are expected to be approximately 1 and (0, 0) respectively. Parameters: sim (Simulation): Simulation to project. theta (float): polar angle of viewing direction. phi (float): azimuthal angle of viewing direction. view (float): rotation angle along viewing direction. scale (float): scaling factor. x (float): x offset y (float): y offset galaxy (int): galaxy to plot. Either 0, or 1. They are assumed to have the same number of particles. """ # Obtain components in new x',y' plane r_vec = (sim.project(theta, phi, view) - [[x+.12,y+1.3]]) * scale # Select a single galaxy. We match them separately in the algorithm. if galaxy==0: r_vec = r_vec[2:len(r_vec)//2-1] #omit central masses if galaxy==1: r_vec = r_vec[len(r_vec)//2-1:] # Use a fast histogram, much faster than numpy () H = histogram2d(r_vec[:,0], r_vec[:,1], range=[[-5,5], [-5,5]], bins=(50, 50)) im = np.zeros((50, 50)) H = convolve2d(H, np.ones((2,2)), mode='same') # Smooth the image im[H>=1] = 1 # Binary the image return im @jit(nopython=True) # Numba annotation def bitmapScoreAlgo(im1, im2): """Computes the f1 score betwen two binarized images Parameters im1 (arr): nxm ground truth image im2 (arr): nxm candidate image Returns f1 score """ TP = np.sum((im1==1.0) & (im2==1.0)) TN = np.sum((im1==0.0) & (im2==0.0)) FP = np.sum((im1==0.0) & (im2==1.0)) FN = np.sum((im1==1.0) & (im2==0.0)) if TP==0: return 0 precision = TP/(TP+FP) recall = TP/(TP+FN) return 2*precision*recall / (precision + recall) # The matching algorithm attempts to improve the match by shifting the # image by one pixel in each direction. If none improve the score the # f1-score of said local maximum is retuned. To make this highly efficient # (as this is run millions of time) we explicitely write functions to # shift an image by 1 pixel in each direction, as these can then be compiled # using Numba (jit annotation) to low-level code. # Performance is crucial here and must sadly be prioritized over conciseness @jit(nopython=True) def shiftBottom(im, im2): """Shifts an image by one pixel downwards. Parameters: im (arr): the nxm image to shift by one pixel. im2 (arr): an nxm array where the new image will be placed. Returns: A reference to im2 """ im2[1:] = im[:-1] im2[0] = 0 return im2 @jit(nopython=True) def shiftTop(im, im2): """Shifts an image by one pixel upwards.""" im2[:-1] = im[1:] im2[-1] = 0 return im2 @jit(nopython=True) def shiftLeft(im, im2): """Shifts an image by one pixel to the left.""" im2[:,1:] = im[:,:-1] im2[:,0] = 0 return im2 @jit(nopython=True) def shiftRight(im, im2): """Shifts an image by one pixel to the right.""" im2[:,:-1] = im[:,1:] im2[:,-1] = 0 return im2 @jit def bitmapScore(im1, im2, _prev=None, _bestscore=None, _zeros=None): """Computes the bitmap score between two images. This is the f1-score but we allow the algorithm to attempt to improve the score by shifting the images. The algorithm is implemented recursively. Parameters: im1 (array): The ground truth nxm image. im2 (array): The candidate nxm imgae. _prev: Used internally for recursion. _bestscore: Used internally for recursion. _zeros: Used internally for recursion. Returns: f1-score for the two images. """ # When the function is called externally, initialize an array of zeros # and compute the score for no shifting. The zeros array is used for # performance to only create a new array once. if _bestscore is None: _bestscore = bitmapScoreAlgo(im1, im2) _zeros = np.zeros_like(im2) # Attempt to improve the score by shifting the image in a direction # Keeping track of _prev allows to not 'go back' and undo and attempt # to undo a shift needlessly. if _prev is not 0: # try left shifted = shiftLeft(im2, _zeros) score = bitmapScoreAlgo(im1, shifted) if score > _bestscore: return bitmapScore(im1, shifted, _prev=1, _bestscore=score, _zeros=_zeros) if _prev is not 1: # try right shifted = shiftRight(im2, _zeros) score = bitmapScoreAlgo(im1, shifted) if score > _bestscore: return bitmapScore(im1, shifted, _prev=0, _bestscore=score, _zeros=_zeros) if _prev is not 2: # try top shifted = shiftTop(im2, _zeros) score = bitmapScoreAlgo(im1, shifted) if score > _bestscore: return bitmapScore(im1, shifted, _prev=3, _bestscore=score, _zeros=_zeros) if _prev is not 3: # try bottom shifted = shiftBottom(im2, _zeros) score = bitmapScoreAlgo(im1, shifted) if score > _bestscore: return bitmapScore(im1, shifted, _prev=2, _bestscore=score, _zeros=_zeros) # Return _bestscore if shifting did not improve (local maximum). return _bestscore def attemptSimulation(theta1, phi1, theta2, phi2, rmin=1, e=.5, R=2, disk1=.75, disk2=.65, mu=1, plot=False, steps=2000): """Runs a simulation with the given parameters and compares it to observations of the antennae to return a score out of 2. Parameters: theta1 (float): polar angle for the spin of the first galaxy. phi1 (float): azimuthal angle for the spin of the first galaxy. theta2 (float): polar angle for the spin of the second galaxy. phi2 (float): azimuthal angle for the spin of the second galaxy. rmin (float): closest distance of approach of orbit. e (float): eccentricity of the orbit. R (float): initial separation disk1 (float): radius of the uniform disk of the first galaxy disk2 (float): radius of the uniform disk of the first galaxy mu (float): ratio of masses of the two galaxies plot (float): if true the simulation will be saved to data/annealing/ steps (float): number of times the f1 score will be computed, along random viewing directions per 100 timesteps. Returns: f1-score: score obtained by the simulation. """ # Initialize the simulation sim = Simulation(dt=1E-3, soft=0.1, verbose=False, CONFIG=None, method='bruteForceNumbaOptimized') galaxy1 = Galaxy(orientation = (theta1, phi1), centralMass=2/(1+mu), sim=sim, bulge={'model':'plummer', 'particles':0, 'totalMass':0, 'l':0}, disk={'model':'uniform', 'particles':2000, 'l':disk1, 'totalMass':0}, halo={'model':'NFW', 'particles':0, 'rs':1, 'totalMass':0}) galaxy2 = Galaxy(orientation = (theta2, phi2), centralMass=2*mu/(1+mu), sim=sim, bulge={'model':'plummer', 'particles':0, 'totalMass':0, 'l':0}, disk={'model':'uniform', 'particles':2000, 'l':disk2, 'totalMass':0}, halo={'model':'NFW', 'particles':0, 'rs':1, 'totalMass':0}) sim.setOrbit(galaxy1, galaxy2, e=e, R0=R, rmin=rmin) # define the orbit # Run the simulation manually # Initialize the parameters consistently with the velocities sim.rprev_vec = sim.r_vec - sim.v_vec * sim.dt # Keep track of the best score bestScore = 0 # and its corresponding binarized image and parameters bestImage, bestParams = 0, 0 hasReachedPericenter = False # Run until tmax = 25 for i in range(25001): sim.step() if i%100==0: # Every $\Delta t = 0.1$ # Check if we are close to pericenter centers = sim.r_vec[sim.type[:,0] == 'center'] if np.linalg.norm(centers[0] - centers[1]) < 1.3*rmin: hasReachedPericenter = True # Do not evaluate the f1-score until pericenter. if not hasReachedPericenter: continue # Check multiple (steps) viewing directions at random localBestScore = 0 localBestImage, localBestParams = 0, 0 for j in range(steps): # The viewing directions are isotropically distributed theta = np.arccos(np.random.uniform(-1, 1)) phi = np.random.uniform(0, 2*np.pi) # Rotation along line of sight view = np.random.uniform(0, 2*np.pi) scale = np.random.uniform(0.6, 1.0) x = np.random.uniform(-1.0, 1.0) # Offsets y = np.random.uniform(-1.0, 1.0) # Get images for each galaxy and compute their score separately im1 = simToBitmap(sim, theta, phi, view, scale, x, y, galaxy=0) im2 = simToBitmap(sim, theta, phi, view, scale, x, y, galaxy=1) score = bitmapScore(GT1, im1) + bitmapScore(GT2, im2) if score > localBestScore: localBestScore = score localBestImage = [im1,im2] localBestParams = [i, theta, phi, view, scale, x, y] if bestScore < localBestScore: bestScore = localBestScore bestImage = localBestImage bestParams = localBestParams if plot: sim.save('annealing', type='2D') print('Best score for this attempt', bestScore) print('using viewing parameters', bestParams) return bestScore ################################################################################ ################################################################################ # Generate a (50, 50) bitmap for each galaxy # They are stored globally in GT1 and GT2 (Ground Truth) im = imread('literature/figs/g1c.tif') im = np.mean(im, axis=-1) im = scipy.misc.imresize(im, (50,50)) GT1 = np.zeros((50, 50)) GT1[im > 50] = 1 im = imread('literature/figs/g2c.tif') im = np.mean(im, axis=-1) im = scipy.misc.imresize(im, (50,50)) GT2 = np.zeros((50, 50)) # Define the limits and relate scale of the variations in each parameter # In the same order as attemptSimulation # phi1, theta1, phi2, theta2, rmin (fixed), e, R (fixed), disk1, disk2 LIMITS = np.array([[np.pi, 2 * np.pi], [-np.pi, np.pi], [0, np.pi], [-np.pi, np.pi], [1,1], [.5,1.0], [2.2,2.2], [.5,.8], [.5,.8]]) VARIATIONS = np.array([.08, .15, .08, .15, 0, .01, 0, .01, .01]) # Choose a random starting point and evaluate it bestparams = [np.random.uniform(*l) for l in LIMITS] log = [] bestscore = attemptSimulation(*bestparams, steps=500) print('Starting with score', bestscore, 'with parameters', bestparams) log.append([bestscore, bestparams, True]) for i in range(STEPS): T = T * DECAY #exponential decay # Perturb the parameters params = bestparams + np.random.normal(scale=VARIATIONS) * 2 * T / 0.04 # Allow the angles from $-\pi$ to $\pi$ to wrap around for j in [1,3]: params[j] = np.mod(params[j] - LIMITS[j][0], LIMITS[j][1] - LIMITS[j][0]) params[j] += LIMITS[j][0] # Clip parameters outside their allowed range params = np.clip(params, LIMITS[:, 0], LIMITS[:, 1]) # Evaluate the score for this attempt, use more steps as i progresses # so as to reduce the noise in the evaluation of the score score = attemptSimulation(*params, steps=500 + i) # Perform simulated annealing with a typical exponential rule if score > bestscore or np.exp(-(bestscore-score)/T) > np.random.rand(): # Flip to this new point print('NEW BEST ____', i, T, score, params) bestscore = score bestparams = params log.append([score, params, True]) else: # Remain in the old point log.append([score, params, False]) # Save the progress for future plotting pickle.dump(log, open('data/logs/logb.pickle', "wb" ) )