Collective effects

Tracking with collective elements

A collective beam element is an element that needs access to the entire particle set (in read and/or write mode). The following example shows how to handle such elements in Xsuite.

Example

A typical example of collective element is a space-charge interaction. We can create a space-charge beam element as follows:

import xobjects as xo
import xfields as xf

context = xo.ContextCpu()

spcharge = xf.SpaceChargeBiGaussian(_context=context,
    update_on_track = ['sigma_x', 'sigma_y'], length=2,
    longitudinal_profile=xf.LongitudinalProfileQGaussian(
        _context=context, number_of_particles=1e11, sigma_z=0.2))

This creates a space-charge element where the transverse beam sizes are updated based on the particle set at each interaction. Such an element can be included in an xtrack tracker similarly to single-particle elements.

import xtrack as xt

## Generate a simple beam line including the spacecharge element
myqf = xt.Multipole(knl=[0, 1.])
myqd = xt.Multipole(knl=[0, -1.])
mydrift = xt.Drift(length=1.)
line = xt.Line(
    elements = [myqf, mydrift, myqd, mydrift,
                spcharge,
                myqf, mydrift, myqd, mydrift,],
    element_names = ['qf1', 'drift1', 'qd1', 'drift2',
                        'spcharge'
                        'qf2', 'drift3', 'qd2', 'drift4'])

## Transfer lattice on context and compile tracking code
line.build_tracker(_context=context)

## Build particle object on context
n_part = 200
particles = xt.Particles(_context=context,
                        p0c=6500e9,
                        x=np.random.uniform(-1e-3, 1e-3, n_part),
                        px=np.random.uniform(-1e-5, 1e-5, n_part),
                        y=np.random.uniform(-2e-3, 2e-3, n_part),
                        py=np.random.uniform(-3e-5, 3e-5, n_part),
                        zeta=np.random.uniform(-1e-2, 1e-2, n_part),
                        delta=np.random.uniform(-1e-4, 1e-4, n_part),
                        )

## Track (saving turn-by-turn data)
n_turns = 100
line.track(particles, num_turns=n_turns,
            turn_by_turn_monitor=True)

How does it work?

To decide whether or not an element needs to be treated as collective, the tracker inspects its iscollective attribute. In our example:

print(qf.iscollective)
# Gives "False"

print(spcharge.iscollective)
# Gives "True"

Based in this information the line is divided in parts that are either collective elements or xtrack trackers simulating groups of consecutive non-collective elements.

We can visualize this in our example:

print(line.tracker._parts)
# Gives:
# [<xtrack.tracker.Tracker object at 0x7f5ba8ce7760>,
#  <xfields.beam_elements.spacecharge.SpaceChargeBiGaussian object at 0x7f5ba8e1bd30>,
#  <xtrack.tracker.Tracker object at 0x7f5ba8ce7610>]

where the first part tracks the particles through to the first potion of the machine up to the space-charge element, the second part simulates the space-charge interaction, the third part tracks the particles from the space-charge element to the end of the line.

As all xsuite and xsuite-compatible beam elements need to expose a .track method the instruction:

line.track(particles)

is equivalent to the loop:

for pp in line.tracker._parts:
    pp.track(particles)

Any python object exposing a ‘.track’ method can be used as beam_element. If the attribute iscollective is not present the element is handled as collective.

Space charge

Xsuite can be used to perform simulations including space-charge effects. Three modes can be used to simulate the space-charge forces:

  • Frozen: A fixed Gaussian bunch distribution is used to compute the space-charge forces.

  • Quasi-frozen: Forces are computed assuming a Gaussian distribution. The properties of the distribution (transverse r.m.s. sizes,transverse positions) are updated at each interaction based on the particle distribution.

  • PIC: The Particle In Cell method is used to compute the forces acting among particles. No assumption is made on the bunch shape.

The last two options constitute collective interactions. As discussed in the dedicated section, the Xtrack Line works such that particles are tracked asynchronously by separate threads in the non-collective sections of the sequence and are regrouped at each collective element (in PIC or quasi-frozen space-charge lenses).

The following example illustrates how to configure and run a space-charge simulation. The variable mode is used to switch between frozen, quasi-frozen and PIC.

See also: xfields.install_spacecharge_frozen(), xfields.replace_spacecharge_with_quasi_frozen(), xfields.replace_spacecharge_with_PIC().

Example

import json
import numpy as np

import xobjects as xo
import xpart as xp
import xtrack as xt
import xfields as xf

############
# Settings #
############

fname_line = ('../../test_data/sps_w_spacecharge/'
                  'line_no_spacecharge_and_particle.json')

bunch_intensity = 1e11/3 # Need short bunch to avoid bucket non-linearity
                         # to compare frozen/quasi-frozen and PIC
sigma_z = 22.5e-2/3
nemitt_x=2.5e-6
nemitt_y=2.5e-6
n_part=int(1e6)
num_turns=32

num_spacecharge_interactions = 540

# Available modes: frozen/quasi-frozen/pic
# mode = 'pic'
mode = 'quasi-frozen'

# Choose solver between `FFTSolver2p5DAveraged` and `FFTSolver2p5D`
pic_solver = 'FFTSolver2p5DAveraged'

####################
# Choose a context #
####################

context = xo.ContextCpu()
#context = xo.ContextCupy()
#context = xo.ContextPyopencl('0.0')

print(context)

#############
# Load line #
#############

with open(fname_line, 'r') as fid:
     input_data = json.load(fid)
line = xt.Line.from_dict(input_data['line'])
line.particle_ref = xt.Particles.from_dict(input_data['particle'])


line.build_tracker(_context=context, compile=False)

#############################################
# Install spacecharge interactions (frozen) #
#############################################

lprofile = xf.LongitudinalProfileQGaussian(
        number_of_particles=bunch_intensity,
        sigma_z=sigma_z,
        z0=0.,
        q_parameter=1.)

xf.install_spacecharge_frozen(line=line,
                   longitudinal_profile=lprofile,
                   nemitt_x=nemitt_x, nemitt_y=nemitt_y,
                   sigma_z=sigma_z,
                   num_spacecharge_interactions=num_spacecharge_interactions,
                   )

#################################
# Switch to PIC or quasi-frozen #
#################################

if mode == 'frozen':
    pass # Already configured in line
elif mode == 'quasi-frozen':
    xf.replace_spacecharge_with_quasi_frozen(
                                    line,
                                    update_mean_x_on_track=True,
                                    update_mean_y_on_track=True)
elif mode == 'pic':
    pic_collection, all_pics = xf.replace_spacecharge_with_PIC(
        line=line,
        n_sigmas_range_pic_x=8,
        n_sigmas_range_pic_y=8,
        nx_grid=256, ny_grid=256, nz_grid=100,
        n_lims_x=7, n_lims_y=3,
        z_range=(-3*sigma_z, 3*sigma_z),
        solver=pic_solver)
else:
    raise ValueError(f'Invalid mode: {mode}')

#################
# Build Tracker #
#################

line.build_tracker(_context=context)

######################
# Generate particles #
######################

# Build a line without spacecharge (recycling the track kernel)
line_sc_off = line.filter_elements(exclude_types_starting_with='SpaceCh')
line_sc_off.build_tracker(track_kernel=line.tracker.track_kernel)

# (we choose to match the distribution without accounting for spacecharge)
particles = xp.generate_matched_gaussian_bunch(line=line_sc_off,
         num_particles=n_part, total_intensity_particles=bunch_intensity,
         nemitt_x=nemitt_x, nemitt_y=nemitt_y, sigma_z=sigma_z)

####################################################
# Check that everything is on the selected context #
####################################################

assert particles._context == context
assert len(set([ee._buffer for ee in line.elements])) == 1 # All on same context
assert line._context == context

#########
# Track #
#########
line.track(particles, num_turns=num_turns, with_progress=1)

# Complete source: xtrack/examples/spacecharge/000_spacecharge_example.py

Beam-beam

Weak-strong 2D

The example below shows how to introduce a 2D beam-beam element in a line and perform few studies based on tracking. The 2D beam-beam element provides a kick based on the Basseti-Erskine formula neglecting any longitudinal varitations of the beam-beam force.

import numpy as np
from matplotlib import pyplot as plt
import xobjects as xo
import xtrack as xt
import xfields as xf
import xpart as xp

context = xo.ContextCpu(omp_num_threads=0)

# Machine and beam parameters
n_macroparticles = int(1e4)
bunch_intensity = 2.3e11
nemitt_x = 2E-6
nemitt_y = 2E-6
p0c = 7e12
gamma = p0c/xp.PROTON_MASS_EV
physemit_x = nemitt_x/gamma
physemit_y = nemitt_y/gamma
beta_x = 1.0
beta_y = 1.0
sigma_z = 0.08
sigma_delta = 1E-4
beta_s = sigma_z/sigma_delta
Qx = 0.31
Qy = 0.32
Qs = 2.1E-3

##############################
# Plot the beam-beam force   #
##############################

# Particles algined on the x axis
particles = xp.Particles(_context=context,
    p0c=p0c,
    x=np.sqrt(physemit_x*beta_x)*np.linspace(-6,6,n_macroparticles),
    px=np.zeros(n_macroparticles),
    y=np.zeros(n_macroparticles),
    py=np.zeros(n_macroparticles),
    zeta=np.zeros(n_macroparticles),
    delta=np.zeros(n_macroparticles),
)
# Definition of the beam-beam force based on the other beam's
# properties (here assumed identical to the tracked beam)
# Note that the arguments 'Sigma' are sigma**2 
bbeam = xf.BeamBeamBiGaussian2D(
            _context=context,
            other_beam_q0 = particles.q0,
            other_beam_beta0 = particles.beta0[0],
            other_beam_num_particles = bunch_intensity,
            other_beam_Sigma_11 = physemit_x*beta_x,
            other_beam_Sigma_33 = physemit_y*beta_y)

# Build line and tracker with only the beam-beam element
elements = [bbeam]
line = xt.Line(elements=elements)
line.particle_ref = xp.Particles(mass0=xp.PROTON_MASS_EV, p0c=7e12)
line.build_tracker()
# track on turn
line.track(particles,num_turns=1)
#plot the resulting change of px
plt.figure()
plt.plot(particles.x/np.sqrt(physemit_x*beta_x),particles.px)
plt.xlabel(r'x [$\sigma_x$]')
plt.ylabel(r'$\Delta p_x$')

##############################
# Tune footprint             #
##############################

# Build linear lattice
arc = xt.LineSegmentMap(
        betx = beta_x,bety = beta_y,
        qx = Qx, qy = Qy,bets = beta_s, qs=Qs)
# Build line and tracker with a beam-beam element and linear lattice
elements = [bbeam,arc]
line = xt.Line(elements=elements)
line.particle_ref = xp.Particles(mass0=xp.PROTON_MASS_EV, p0c=7e12)
line.build_tracker()
# plot footprint
plt.figure()
fp0 = line.get_footprint(nemitt_x=nemitt_x, nemitt_y=nemitt_y)
fp0.plot(color='k')

##############################
# Phase space distorion      #
##############################

# Initialise particles randomly in 6D phase space
particles = xp.Particles(_context=context,
    p0c=p0c,
    x=np.sqrt(physemit_x*beta_x)*(np.random.randn(n_macroparticles)),
    px=np.sqrt(physemit_x/beta_x)*np.random.randn(n_macroparticles),
    y=np.sqrt(physemit_y*beta_y)*(np.random.randn(n_macroparticles)),
    py=np.sqrt(physemit_y/beta_y)*np.random.randn(n_macroparticles),
    zeta=sigma_z*np.random.randn(n_macroparticles),
    delta=sigma_delta*np.random.randn(n_macroparticles),
)

# Change the tune to better visualise the 4th order resonance
arc.qx=0.255
# Track
line.track(particles,num_turns=10000)
# Plot phase space
plt.figure()
plt.plot(particles.x/np.sqrt(physemit_x*beta_x),particles.px/np.sqrt(physemit_x/beta_x),'.')
plt.xlabel('$x$ [$\sigma_x$]')
plt.ylabel('$p_x$ [$\sigma_{px}$]')
plt.show()

# Complete source: xfields/examples/002_beambeam/010_beambeam2d_weakstrong.py
_images/footprint_HO2D.png
_images/beambeamkick.png
_images/phasespacedistortion.png

Weak-strong 3D

The 3D beam-beam element can be used similarly, replacing the instanciation of the beam-beam element as in the example below. This element takes into account longitudinal variations of the beam-beam force (hourglass, crossing angle) based on a longitudinal slicing of the beam (Hirata’s method) handled by the xfields.beam_elements.TempSlicer.

# Number of longitudinal slices
n_slices = 21
# Slicer used to determine the position and charge of the slices
# based on an optimised algorithm by D. Shatilov. (other options are also possible)
slicer = xf.TempSlicer(n_slices=n_slices, sigma_z=sigma_z, mode="shatilov")
bbeam = xf.BeamBeamBiGaussian3D(
            _context=context,
            other_beam_q0 = particles.q0,
            # Full crossing angle at the IP in rad
            phi = 500.0E-2,
            # Roll angle of the crossing angle in rad (0 -> horizontal, pi/2 -> vertical)
            alpha = 0.0,
            # charge in each slice
            slices_other_beam_num_particles = slicer.bin_weights * bunch_intensity,
            # longitudinal position of the slice
            slices_other_beam_zeta_center = slicer.bin_centers,
            # transverse sizes of the slices at the IP (squarred)
            slices_other_beam_Sigma_11 = np.zeros(n_slices,dtype=float)+physemit_x*beta_x,
            slices_other_beam_Sigma_12 = np.zeros(n_slices,dtype=float),
            slices_other_beam_Sigma_13 = np.zeros(n_slices,dtype=float),
            slices_other_beam_Sigma_14 = np.zeros(n_slices,dtype=float),
            slices_other_beam_Sigma_22 = np.zeros(n_slices,dtype=float)+physemit_x/beta_x,
            slices_other_beam_Sigma_23 = np.zeros(n_slices,dtype=float),
            slices_other_beam_Sigma_24 = np.zeros(n_slices,dtype=float),
            slices_other_beam_Sigma_33 = np.zeros(n_slices,dtype=float)+physemit_y*beta_y,
            slices_other_beam_Sigma_34 = np.zeros(n_slices,dtype=float),
            slices_other_beam_Sigma_44 = np.zeros(n_slices,dtype=float)+physemit_y/beta_y)

Strong-strong (soft-Gaussian)

Strong-strong simulations can be performed using the Pipeline for multibunch simulations, as in the example below.

import time

import numpy as np
from matplotlib import pyplot as plt
from mpi4py import MPI
import xobjects as xo
import xtrack as xt
import xfields as xf
import xpart as xp

context = xo.ContextCpu(omp_num_threads=0)

####################
# Pipeline manager #
####################

# Retrieving MPI info
comm = MPI.COMM_WORLD
nprocs = comm.Get_size()
my_rank   = comm.Get_rank()

# Building the pipeline manager based on a MPI communicator
pipeline_manager = xt.PipelineManager(comm)
if nprocs >= 2:
    # Add information about the Particles instance that require
    # communication through the pipeline manager.
    # Each Particles instance must be identifiable by a unique name
    # The second argument is the MPI rank in which the Particles object
    # lives.
    pipeline_manager.add_particles('B1b1',0)
    pipeline_manager.add_particles('B2b1',1)
    # Add information about the elements that require communication
    # through the pipeline manager.
    # Each Element instance must be identifiable by a unique name
    # All Elements are instanciated in all ranks
    pipeline_manager.add_element('IP1')
    pipeline_manager.add_element('IP2')
else:
    print('Need at least 2 MPI processes for this test')
    exit()

#################################
# Generate particles            #
#################################

n_macroparticles = int(1e4)
bunch_intensity = 2.3e11
physemit_x = 2E-6*0.938/7E3
physemit_y = 2E-6*0.938/7E3
beta_x_IP1 = 1.0
beta_y_IP1 = 1.0
beta_x_IP2 = 1.0
beta_y_IP2 = 1.0
sigma_z = 0.08
sigma_delta = 1E-4
beta_s = sigma_z/sigma_delta
Qx = 0.285
Qy = 0.32
Qs = 2.1E-3

#Offsets [sigma] the two beams with opposite direction
#to enhance oscillation of the pi-mode 
mean_x_init = 0.1
mean_y_init = 0.1

p0c = 7000e9

print('Initialising particles')
if my_rank == 0:
    particles = xp.Particles(_context=context,
        p0c=p0c,
        x=np.sqrt(physemit_x*beta_x_IP1)
            *(np.random.randn(n_macroparticles)+mean_x_init),
        px=np.sqrt(physemit_x/beta_x_IP1)
            *np.random.randn(n_macroparticles),
        y=np.sqrt(physemit_y*beta_y_IP1)
            *(np.random.randn(n_macroparticles)+mean_y_init),
        py=np.sqrt(physemit_y/beta_y_IP1)
            *np.random.randn(n_macroparticles),
        zeta=sigma_z*np.random.randn(n_macroparticles),
        delta=sigma_delta*np.random.randn(n_macroparticles),
        weight=bunch_intensity/n_macroparticles
    )
    # Initialise the Particles object with its unique name
    # (must match the info provided to the pipeline manager)
    particles.init_pipeline('B1b1')
    # Keep in memory the name of the Particles object with which this
    # rank will communicate
    # (must match the info provided to the pipeline manager)
    partner_particles_name = 'B2b1'
elif my_rank == 1:
    particles = xp.Particles(_context=context,
        p0c=p0c,
        x=np.sqrt(physemit_x*beta_x_IP1)
            *(np.random.randn(n_macroparticles)+mean_x_init),
        px=np.sqrt(physemit_x/beta_x_IP1)
            *np.random.randn(n_macroparticles),
        y=np.sqrt(physemit_y*beta_y_IP1)
            *(np.random.randn(n_macroparticles)-mean_y_init),
        py=np.sqrt(physemit_y/beta_y_IP1)
            *np.random.randn(n_macroparticles),
        zeta=sigma_z*np.random.randn(n_macroparticles),
        delta=sigma_delta*np.random.randn(n_macroparticles),
        weight=bunch_intensity/n_macroparticles
    )
    # Initialise the Particles object with its unique name
    # (must match the info provided to the pipeline manager)
    particles.init_pipeline('B2b1')
    # Keep in memory the name of the Particles object with which this
    # rank will communicate
    # (must match the info provided to the pipeline manager)
    partner_particles_name = 'B1b1'
else:
    # Stop the execution of ranks that are not needed
    print('No need for rank',my_rank)
    exit()

#############
# Beam-beam #
#############

nb_slice = 11
slicer = xf.TempSlicer(sigma_z=sigma_z, n_slices=nb_slice)
config_for_update_IP1=xf.ConfigForUpdateBeamBeamBiGaussian3D(
   pipeline_manager=pipeline_manager,
   element_name='IP1', # The element name must be unique and match the
                       # one given to the pipeline manager
   partner_particles_name = partner_particles_name, # the name of the
                       # Particles object with which to collide
   slicer=slicer,
   update_every=1 # Setup for strong-strong simulation
   )
config_for_update_IP2=xf.ConfigForUpdateBeamBeamBiGaussian3D(
   pipeline_manager=pipeline_manager,
   element_name='IP2', # The element name must be unique and match the
                       #  one given to the pipeline manager
   partner_particles_name = partner_particles_name, # the name of the
                       #Particles object with which to collide
   slicer=slicer,
   update_every=1 # Setup for strong-strong simulation
   )



print('build bb elements...')
bbeamIP1 = xf.BeamBeamBiGaussian3D(
            _context=context,
            other_beam_q0 = particles.q0,
            phi = 500E-6,alpha=0.0,
            config_for_update = config_for_update_IP1)

bbeamIP2 = xf.BeamBeamBiGaussian3D(
            _context=context,
            other_beam_q0 = particles.q0,
            phi = 500E-6,alpha=np.pi/2,
            config_for_update = config_for_update_IP2)

#################################################################
# arcs (here they are all the same with half the phase advance) #
#################################################################

arc12 = xt.LineSegmentMap(
        betx = beta_x_IP1,bety = beta_y_IP1,
        qx = Qx/2, qy = Qy/2,bets = beta_s, qs=Qs/2)

arc21 = xt.LineSegmentMap(
        betx = beta_x_IP1,bety = beta_y_IP1,
        qx = Qx/2, qy = Qy/2,bets = beta_s, qs=Qs/2)

#################################################################
# Tracker                                                       #
#################################################################

# In this example there is one Particles object per rank
# We build its line and tracker as usual
elements = [bbeamIP1,arc12,bbeamIP2,arc21]
line = xt.Line(elements=elements)
line.build_tracker()
# A pipeline branch is a line and Particles object that will
# be tracked through it
branch = xt.PipelineBranch(line, particles)
# The multitracker can deal with a set of branches (here only one)
multitracker = xt.PipelineMultiTracker(branches=[branch])

#################################################################
# Tracking  (Same as usual)                                     #
#################################################################
print('Tracking...')
time0 = time.time()
nTurn = 1024
multitracker.track(num_turns=nTurn,turn_by_turn_monitor=True)
print('Done with tracking.',(time.time()-time0)/1024,'[s/turn]')


#################################################################
# Post-processing: raw data and spectrum                        #
#################################################################

# Show beam oscillation spectrum (only one rank)
if my_rank == 0:
    positions_x_b1 = np.average(line.record_last_track.x,axis=0)
    positions_y_b1 = np.average(line.record_last_track.y,axis=0)
    plt.figure(1)
    freqs = np.fft.fftshift(np.fft.fftfreq(nTurn))
    mask = freqs > 0
    myFFT = np.fft.fftshift(np.fft.fft(positions_x_b1))
    plt.semilogy(freqs[mask], (np.abs(myFFT[mask])),label='Horizontal')
    myFFT = np.fft.fftshift(np.fft.fft(positions_y_b1))
    plt.semilogy(freqs[mask], (np.abs(myFFT[mask])),label='Vertical')
    plt.xlabel('Frequency [$f_{rev}$]')
    plt.ylabel('Amplitude')
    plt.legend(loc=0)
    plt.show()

# Complete source: xfields/examples/002_beambeam/006_beambeam6d_strongstrong_pipeline_MPI2Procs.py
_images/beambeam_sigmapi.png

In collisions featuring a low disruption (i.e. the beam moments do not vary significantly during the interaction), the quasi-strong-strong (aka frozen-strong-strong) model may be enabled by setting the argument ‘quasistrongstrong = True’ in xfields.beam_elements.ConfigForUpdate*. In this configuration, the beam moments are computed once at the start of the collison and kept constant throught the collison, thus reducing the computing load. The argument ‘update_every’ allows to further reduce the computing load by keeping the moments for the given amount of turns. This model is suitable for effects that build up over may turns. (more details in https://accelconf.web.cern.ch/eefact2022/papers/wezat0102.pdf)

For a 2D beam-beam interactions, the beam-beam element and the xfields.beam_elements.ConfigForUpdate* have to be redifined as in the example below.

config_for_update_b1_IP1=xf.ConfigForUpdateBeamBeamBiGaussian2D(
   pipeline_manager=pipeline_manager,
   element_name='IP1',
   partner_particles_name = 'B2b1',
   update_every=1
   )
config_for_update_b2_IP1=xf.ConfigForUpdateBeamBeamBiGaussian2D(
   pipeline_manager=pipeline_manager,
   element_name='IP1',
   partner_particles_name = 'B1b1',
   update_every=1
   )
bbeamIP1_b1 = xf.BeamBeamBiGaussian2D(
            _context=context,
            other_beam_q0 = particles_b2.q0,
            other_beam_beta0 = particles_b2.beta0[0],
            config_for_update = config_for_update_b1_IP1)
bbeamIP1_b2 = xf.BeamBeamBiGaussian2D(
            _context=context,
            other_beam_q0 = particles_b1.q0,
            other_beam_beta0 = particles_b1.beta0[0],
            config_for_update = config_for_update_b2_IP1)

Poisson Solver

Particle-in-cell simulations using a Poisson solver for the beam-beam interaction is currently not implemented in xfields

Beam-beam in a real lattice

Identically to the examples above, beam-beam elements can be introduced into the lattice of a full machine. Several tools exist to ease the setup of beam-beam interactions in a collider lattice: Xmask

Interface to PyHEADTAIL

PyHEADTAIL elements cannot be natively used by an Xsuite line due to different naming conventions for the particles coordinates. A specific interface has been introduced in Xsuite which indtroduces additional properties in the Particles objects in order to make them compatible with PyHEADTAIL beam elements. The interface can be enabled by calling the function enable_pyheadtail_interface right after importing xtrack, as illustrated in the following example.

import pathlib
import json
import numpy as np

import xobjects as xo
import xtrack as xt
xt.enable_pyheadtail_interface()

fname_line = '../../test_data/lhc_no_bb/line_and_particle.json'
num_turns = int(100)
n_part = 200

####################
# Choose a context #
####################

context = xo.ContextCpu()

##############
# Get a line #
##############

with open(fname_line, 'r') as fid:
    input_data = json.load(fid)
line = xt.Line.from_dict(input_data['line'])

#########################
# Add PyHEADTAIL damper #
#########################

from PyHEADTAIL.feedback.transverse_damper import TransverseDamper
damper = TransverseDamper(dampingrate_x=10., dampingrate_y=15.)
line.append_element(damper, 'Damper')

##################
# Build TrackJob #
##################

line.build_tracker(_context=context)

######################
# Get some particles #
######################

particles = xt.Particles(_context=context,
                        p0c=6500e9,
                        x=np.random.uniform(-1e-3, 1e-3, n_part)+1e-3,
                        px=np.random.uniform(-1e-7, 1e-7, n_part),
                        y=np.random.uniform(-0.5e-3, 0.5e-3, n_part)-1.2e-3,
                        py=np.random.uniform(-1e-7, 1e-7, n_part),
                        zeta=0*np.random.uniform(-1e-2, 1e-2, n_part),
                        delta=0.*np.random.uniform(-1e-5, 1e-5, n_part))

#########
# Track #
#########

line.track(particles, num_turns=num_turns, turn_by_turn_monitor=True)

########
# Plot #
########

res = line.record_last_track

import matplotlib.pyplot as plt
plt.close('all')
fig1 = plt.figure(1)
sp1 = fig1.add_subplot(2,1,1)
sp2 = fig1.add_subplot(2,1,2)
sp1.plot(np.mean(res.x, axis=0))
sp2.plot(np.mean(res.y, axis=0))
plt.show()

Combined CPU - GPU simulations

When performing simulations on GPU contexts, it is possible to execute on CPU the computation for specific beam elements, for example PyHEADTAIL elements that do not support the Xsuite GPU contexts. For that purpose, one needs to set a `needs_cpu` flag on the concerned elements before inserting them in an Xtrack line. This instructs the Xtrack tracker to handle the required data transfers.

Presently PyHEADTAIL elements are not able to skip lost particles in the Xtrack Particles objects. This can be handled by setting the flag `needs_hidden_lost_particles`, which instructs the Xtrack tracker to temporarily hide the lost particles for the concerned elements.

The following example shows a simulation including the SPS lattice elements, space-charge elements and wakefields. The lattice tracking and the spacecharge calculations are performed on GPU with Xtrack/Xfields while the wakefield computation is performed on CPU with PyHEADTAIL.

import numpy as np

from cpymad.madx import Madx

import xpart as xp
import xobjects as xo
import xtrack as xt
import xfields as xf

###############################
# Enable pyheadtail interface #
###############################

xt.enable_pyheadtail_interface()

############
# Settings #
############

gpu_device = 0
seq_name = "sps"
qx0,qy0 = 20.13, 20.18
p0c = 26e9
cavity_name = "actcse.31632"
cavity_lag = 180
frequency = 200e6
rf_voltage = 4e6
use_wakes = True
n_slices_wakes = 100
limit_z = 0.7
bunch_intensity = 1e11
sigma_z = 22.5e-2
nemitt_x=2.5e-6
nemitt_y=2.5e-6
n_part=int(1e4)
num_turns=2
num_spacecharge_interactions = 540

# mode = 'frozen'
mode = 'pic'

#######################################################
# Use cpymad to load the sequence and match the tunes #
#######################################################

mad = Madx()
mad.call("./madx_sps/sps.seq")
mad.call("./madx_sps/lhc_q20.str")
mad.call("./madx_sps/macro.madx")
mad.beam()
mad.use(seq_name)

mad.twiss()
tw_thick = mad.table.twiss.dframe()
summ_thick = mad.table.summ.dframe()

mad.input("""select, flag=makethin, slice=1, thick=false;
            makethin, sequence=sps, style=teapot, makedipedge=false;""")
mad.use(seq_name)
mad.exec(f"sps_match_tunes({qx0},{qy0});")

##################
# Switch context #
##################

if gpu_device is None:
   context = xo.ContextCpu()
else:
   context = xo.ContextCupy(device=gpu_device)

#################################
# Build line from MAD-X lattice #
#################################

line = xt.Line.from_madx_sequence(sequence=mad.sequence[seq_name],
           deferred_expressions=True, install_apertures=True,
           apply_madx_errors=False)
# Define reference particle
line.particle_ref = xt.Particles(p0c=p0c,mass0=xt.PROTON_MASS_EV)

################
# Switch on RF #
################

line[cavity_name].voltage = rf_voltage
line[cavity_name].lag = cavity_lag
line[cavity_name].frequency = frequency

############################################
# Twiss (check that the lattice is stable) #
############################################

line.build_tracker()
tw = line.twiss()

# We unfreeze the line so that we can still insert elements in the
# original one (would be prevented by the existence of a tracker linked to the
# line).
line.unfreeze()

#####################################
# Install spacecharge interactions) #
#####################################

# Install frozen space-charge lenses

lprofile = xf.LongitudinalProfileQGaussian(
       number_of_particles=bunch_intensity,
       sigma_z=sigma_z,
       z0=0.,
       q_parameter=1.)

xf.install_spacecharge_frozen(line=line,
                  particle_ref=line.particle_ref,
                  longitudinal_profile=lprofile,
                  nemitt_x=nemitt_x, nemitt_y=nemitt_y,
                  sigma_z=sigma_z,
                  num_spacecharge_interactions=num_spacecharge_interactions,
                  )

# Switch to PIC or quasi-frozen
if mode == 'frozen':
   pass # Already configured in line
elif mode == 'quasi-frozen':
   xf.replace_spacecharge_with_quasi_frozen(
                                   line,
                                   update_mean_x_on_track=True,
                                   update_mean_y_on_track=True)
elif mode == 'pic':
   pic_collection, all_pics = xf.replace_spacecharge_with_PIC(
       _context=context, line=line,
       n_sigmas_range_pic_x=8,
       n_sigmas_range_pic_y=8,
       nx_grid=256, ny_grid=256, nz_grid=100,
       n_lims_x=7, n_lims_y=3,
       z_range=(-3*sigma_z, 3*sigma_z))
else:
   raise ValueError(f'Invalid mode: {mode}')

######################
# Install wakefields #
######################

if use_wakes:

   # We rescale the waketable to take into account for the difference between
   # the average beta functions (for which the table is calculated) and the beta
   # functions at the location in the lattice at which the wake element is
   # installed.
   wakefields = np.genfromtxt('wakes/kickerSPSwake_2020_oldMKP.wake')
   wakefields[:,1] *= 54.65/tw['betx'][0]
   wakefields[:,2] *= 54.51/tw['bety'][0]
   wakefields[:,3] *= 54.65/tw['betx'][0]
   wakefields[:,4] *= 54.65/tw['betx'][0]
   wakefile = 'wakes/wakefields.wake'
   np.savetxt(wakefile,wakefields,delimiter='\t')

   # Build PyHEADTAIL wakefield element
   from PyHEADTAIL.particles.slicing import UniformBinSlicer
   from PyHEADTAIL.impedances.wakes import WakeTable, WakeField
   n_slices_wakes = 500
   slicer_for_wakefields = UniformBinSlicer(n_slices_wakes, z_cuts=(-limit_z, limit_z))
   waketable = WakeTable(wakefile, ['time', 'dipole_x', 'dipole_y',
                  'quadrupole_x', 'quadrupole_y'])
   wakefield = WakeField(slicer_for_wakefields, waketable)

   # Specity that the wakefield element needs to run on CPU and that lost
   # particles need to be hidden for this element (required by PyHEADTAIL)
   wakefield.needs_cpu = True
   wakefield.needs_hidden_lost_particles = True

   # Insert element in the line
   line.insert_element(element=wakefield,name="wakefield", at_s=0)

#################
# Build Tracker #
#################

line.build_tracker(_context=context)

######################
# Generate particles #
######################

# (we choose to match the distribution without accounting for spacecharge)
line_sc_off = line.filter_elements(exclude_types_starting_with='SpaceCh')

particles = xp.generate_matched_gaussian_bunch(_context=context,
        num_particles=n_part, total_intensity_particles=bunch_intensity,
        nemitt_x=nemitt_x, nemitt_y=nemitt_y, sigma_z=sigma_z,
        particle_ref=line.particle_ref, line=line_sc_off)
particles.circumference = line.get_length() # Needed by pyheadtail

###########################################################
# We use a phase monitor to measure the tune turn by turn #
###########################################################

phasem = xp.PhaseMonitor(line,
                 num_particles=n_part, twiss=line_sc_off.twiss())

#########
# Track #
#########

for turn in range(num_turns):
   phasem.measure(particles)
   #import pdb; pdb.set_trace()
   line.track(particles)

##################
# Plot footprint #
##################

import matplotlib.pyplot as plt
plt.close('all')
f,ax = plt.subplots()
ax.plot(phasem.qx, phasem.qy,'b.',ms=1)
ax.set_xlim(0, 0.5)
ax.set_ylim(0, 0.5)
plt.show()

# Complete source: xtrack/examples/pyheadtail_interface/004_imped_spacech_cpu_gpu.py

Pipeline for multibunch simulations

Xsuite can be used to simulate multiple bunches interacting through collective forces, such as beam-beam interactions or wakefields. The xtrack.pipeline.MultiTracker handles the tracking of multiple xpart.particles.Particles objects through their own xtrack.Line (e.g. each of the two rings in a circular collider). Each Particles instance and its Line make a xtrack.pipeline.Branch. The MultiTracker will iteratively track its branches until an xtrack.beam_elements.Element which requires communication is reached (e.g. a strong-strong beam-beam collision, where information about the other beam’s charge distribution is required). At this point the MultiTracker will use the xtrack.pipeline.PipelineManager to establish communication between the branches that need to exchange information. If the communication cannot take place, e.g. because one of the two branches is not ready, the Multitracker resumes tracking with another branch.

The PipelineManager is aware of the communication required by the different Elements and the different Particles instances (e.g. in a collider with many bunches and many interactions points, the PipelineManager knows which bunch collides with which and where.) By default the PipelineManager uses a dummy communicator which does not require MPI, but an MPI communicator can be provided instead thus enabling tracking on multiple CPUs.

It is important that the Particles instances as well as the Element instances in the line that require communication through the pipeline are attributed a uniquely defined name.

The pipeline algorithm is detailed in https://doi.org/10.1016/j.cpc.2019.06.006.

The following example illustrates how to configure and run a simulation with two bunches colliding at two interaction points using MPI. It must be run like a MPI application

mpirun -np 2 python example.py

Example

import time

import numpy as np
from matplotlib import pyplot as plt
from mpi4py import MPI
import xobjects as xo
import xtrack as xt
import xfields as xf
import xpart as xp

context = xo.ContextCpu(omp_num_threads=0)

####################
# Pipeline manager #
####################

# Retrieving MPI info
comm = MPI.COMM_WORLD
nprocs = comm.Get_size()
my_rank   = comm.Get_rank()

# Building the pipeline manager based on a MPI communicator
pipeline_manager = xt.PipelineManager(comm)
if nprocs >= 2:
    # Add information about the Particles instance that require
    # communication through the pipeline manager.
    # Each Particles instance must be identifiable by a unique name
    # The second argument is the MPI rank in which the Particles object
    # lives.
    pipeline_manager.add_particles('B1b1',0)
    pipeline_manager.add_particles('B2b1',1)
    # Add information about the elements that require communication
    # through the pipeline manager.
    # Each Element instance must be identifiable by a unique name
    # All Elements are instanciated in all ranks
    pipeline_manager.add_element('IP1')
    pipeline_manager.add_element('IP2')
else:
    print('Need at least 2 MPI processes for this test')
    exit()

#################################
# Generate particles            #
#################################

n_macroparticles = int(1e4)
bunch_intensity = 2.3e11
physemit_x = 2E-6*0.938/7E3
physemit_y = 2E-6*0.938/7E3
beta_x_IP1 = 1.0
beta_y_IP1 = 1.0
beta_x_IP2 = 1.0
beta_y_IP2 = 1.0
sigma_z = 0.08
sigma_delta = 1E-4
beta_s = sigma_z/sigma_delta
Qx = 0.285
Qy = 0.32
Qs = 2.1E-3

#Offsets [sigma] the two beams with opposite direction
#to enhance oscillation of the pi-mode 
mean_x_init = 0.1
mean_y_init = 0.1

p0c = 7000e9

print('Initialising particles')
if my_rank == 0:
    particles = xp.Particles(_context=context,
        p0c=p0c,
        x=np.sqrt(physemit_x*beta_x_IP1)
            *(np.random.randn(n_macroparticles)+mean_x_init),
        px=np.sqrt(physemit_x/beta_x_IP1)
            *np.random.randn(n_macroparticles),
        y=np.sqrt(physemit_y*beta_y_IP1)
            *(np.random.randn(n_macroparticles)+mean_y_init),
        py=np.sqrt(physemit_y/beta_y_IP1)
            *np.random.randn(n_macroparticles),
        zeta=sigma_z*np.random.randn(n_macroparticles),
        delta=sigma_delta*np.random.randn(n_macroparticles),
        weight=bunch_intensity/n_macroparticles
    )
    # Initialise the Particles object with its unique name
    # (must match the info provided to the pipeline manager)
    particles.init_pipeline('B1b1')
    # Keep in memory the name of the Particles object with which this
    # rank will communicate
    # (must match the info provided to the pipeline manager)
    partner_particles_name = 'B2b1'
elif my_rank == 1:
    particles = xp.Particles(_context=context,
        p0c=p0c,
        x=np.sqrt(physemit_x*beta_x_IP1)
            *(np.random.randn(n_macroparticles)+mean_x_init),
        px=np.sqrt(physemit_x/beta_x_IP1)
            *np.random.randn(n_macroparticles),
        y=np.sqrt(physemit_y*beta_y_IP1)
            *(np.random.randn(n_macroparticles)-mean_y_init),
        py=np.sqrt(physemit_y/beta_y_IP1)
            *np.random.randn(n_macroparticles),
        zeta=sigma_z*np.random.randn(n_macroparticles),
        delta=sigma_delta*np.random.randn(n_macroparticles),
        weight=bunch_intensity/n_macroparticles
    )
    # Initialise the Particles object with its unique name
    # (must match the info provided to the pipeline manager)
    particles.init_pipeline('B2b1')
    # Keep in memory the name of the Particles object with which this
    # rank will communicate
    # (must match the info provided to the pipeline manager)
    partner_particles_name = 'B1b1'
else:
    # Stop the execution of ranks that are not needed
    print('No need for rank',my_rank)
    exit()

#############
# Beam-beam #
#############

nb_slice = 11
slicer = xf.TempSlicer(sigma_z=sigma_z, n_slices=nb_slice)
config_for_update_IP1=xf.ConfigForUpdateBeamBeamBiGaussian3D(
   pipeline_manager=pipeline_manager,
   element_name='IP1', # The element name must be unique and match the
                       # one given to the pipeline manager
   partner_particles_name = partner_particles_name, # the name of the
                       # Particles object with which to collide
   slicer=slicer,
   update_every=1 # Setup for strong-strong simulation
   )
config_for_update_IP2=xf.ConfigForUpdateBeamBeamBiGaussian3D(
   pipeline_manager=pipeline_manager,
   element_name='IP2', # The element name must be unique and match the
                       #  one given to the pipeline manager
   partner_particles_name = partner_particles_name, # the name of the
                       #Particles object with which to collide
   slicer=slicer,
   update_every=1 # Setup for strong-strong simulation
   )



print('build bb elements...')
bbeamIP1 = xf.BeamBeamBiGaussian3D(
            _context=context,
            other_beam_q0 = particles.q0,
            phi = 500E-6,alpha=0.0,
            config_for_update = config_for_update_IP1)

bbeamIP2 = xf.BeamBeamBiGaussian3D(
            _context=context,
            other_beam_q0 = particles.q0,
            phi = 500E-6,alpha=np.pi/2,
            config_for_update = config_for_update_IP2)

#################################################################
# arcs (here they are all the same with half the phase advance) #
#################################################################

arc12 = xt.LineSegmentMap(
        betx = beta_x_IP1,bety = beta_y_IP1,
        qx = Qx/2, qy = Qy/2,bets = beta_s, qs=Qs/2)

arc21 = xt.LineSegmentMap(
        betx = beta_x_IP1,bety = beta_y_IP1,
        qx = Qx/2, qy = Qy/2,bets = beta_s, qs=Qs/2)

#################################################################
# Tracker                                                       #
#################################################################

# In this example there is one Particles object per rank
# We build its line and tracker as usual
elements = [bbeamIP1,arc12,bbeamIP2,arc21]
line = xt.Line(elements=elements)
line.build_tracker()
# A pipeline branch is a line and Particles object that will
# be tracked through it
branch = xt.PipelineBranch(line, particles)
# The multitracker can deal with a set of branches (here only one)
multitracker = xt.PipelineMultiTracker(branches=[branch])

#################################################################
# Tracking  (Same as usual)                                     #
#################################################################
print('Tracking...')
time0 = time.time()
nTurn = 1024
multitracker.track(num_turns=nTurn,turn_by_turn_monitor=True)
print('Done with tracking.',(time.time()-time0)/1024,'[s/turn]')


#################################################################
# Post-processing: raw data and spectrum                        #
#################################################################

# Show beam oscillation spectrum (only one rank)
if my_rank == 0:
    positions_x_b1 = np.average(line.record_last_track.x,axis=0)
    positions_y_b1 = np.average(line.record_last_track.y,axis=0)
    plt.figure(1)
    freqs = np.fft.fftshift(np.fft.fftfreq(nTurn))
    mask = freqs > 0
    myFFT = np.fft.fftshift(np.fft.fft(positions_x_b1))
    plt.semilogy(freqs[mask], (np.abs(myFFT[mask])),label='Horizontal')
    myFFT = np.fft.fftshift(np.fft.fft(positions_y_b1))
    plt.semilogy(freqs[mask], (np.abs(myFFT[mask])),label='Vertical')
    plt.xlabel('Frequency [$f_{rev}$]')
    plt.ylabel('Amplitude')
    plt.legend(loc=0)
    plt.show()

# Complete source: xfields/examples/002_beambeam/006_beambeam6d_strongstrong_pipeline_MPI2Procs.py
_images/beambeam_sigmapi.png

If the communicator is not specified when instantiating the PipelineManager, it will not use MPI:

pipeline_manager = xt.PipelineManager()