Collective effects

Tracking with collective elements

A collective beam element is an element that needs access to the entire particle set (in read and/or write mode). The following example shows how to handle such elements in Xsuite.

Example

A typical example of collective element is a space-charge interaction. We can create a space-charge beam element as follows:

import xobjects as xo
import xfields as xf

context = xo.ContextCpu()

spcharge = xf.SpaceChargeBiGaussian(_context=context,
    update_on_track = ['sigma_x', 'sigma_y'], length=2,
    longitudinal_profile=xf.LongitudinalProfileQGaussian(
        _context=context, number_of_particles=1e11, sigma_z=0.2))

This creates a space-charge element where the transverse beam sizes are updated based on the particle set at each interaction. Such an element can be included in an xtrack tracker similarly to single-particle elements.

import xtrack as xt

## Generate a simple beam line including the spacecharge element
myqf = xt.Multipole(knl=[0, 1.])
myqd = xt.Multipole(knl=[0, -1.])
mydrift = xt.Drift(length=1.)
line = xt.Line(
    elements = [myqf, mydrift, myqd, mydrift,
                spcharge,
                myqf, mydrift, myqd, mydrift,],
    element_names = ['qf1', 'drift1', 'qd1', 'drift2',
                        'spcharge'
                        'qf2', 'drift3', 'qd2', 'drift4'])

## Transfer lattice on context and compile tracking code
line.build_tracker(_context=context)

## Build particle object on context
n_part = 200
particles = xt.Particles(_context=context,
                        p0c=6500e9,
                        x=np.random.uniform(-1e-3, 1e-3, n_part),
                        px=np.random.uniform(-1e-5, 1e-5, n_part),
                        y=np.random.uniform(-2e-3, 2e-3, n_part),
                        py=np.random.uniform(-3e-5, 3e-5, n_part),
                        zeta=np.random.uniform(-1e-2, 1e-2, n_part),
                        delta=np.random.uniform(-1e-4, 1e-4, n_part),
                        )

## Track (saving turn-by-turn data)
n_turns = 100
line.track(particles, num_turns=n_turns,
            turn_by_turn_monitor=True)

How does it work?

To decide whether or not an element needs to be treated as collective, the tracker inspects its iscollective attribute. In our example:

print(qf.iscollective)
# Gives "False"

print(spcharge.iscollective)
# Gives "True"

Based in this information the line is divided in parts that are either collective elements or xtrack trackers simulating groups of consecutive non-collective elements.

We can visualize this in our example:

print(line.tracker._parts)
# Gives:
# [<xtrack.tracker.Tracker object at 0x7f5ba8ce7760>,
#  <xfields.beam_elements.spacecharge.SpaceChargeBiGaussian object at 0x7f5ba8e1bd30>,
#  <xtrack.tracker.Tracker object at 0x7f5ba8ce7610>]

where the first part tracks the particles through to the first potion of the machine up to the space-charge element, the second part simulates the space-charge interaction, the third part tracks the particles from the space-charge element to the end of the line.

As all xsuite and xsuite-compatible beam elements need to expose a .track method the instruction:

line.track(particles)

is equivalent to the loop:

for pp in line.tracker._parts:
    pp.track(particles)

Any python object exposing a ‘.track’ method can be used as beam_element. If the attribute iscollective is not present the element is handled as collective.

Xwakes: wakefields and impedances

xwakes provides wakefield and impedance elements that plug into the Xsuite tracking environment. It offers analytic resonator wakes, wakes read from tables, multi-bunch/multi-turn support, transverse damping, monitoring, and MPI pipeline wiring. The list of available wakefield elements is available in the API reference.

Quick start: resonator wake on a single bunch

import numpy as np
import xtrack as xt
import xwakes as xw

# Build a wake as the sum of dipolar and quadrupolar resonators
wf_dip_1 = xw.WakeResonator(kind='dipolar_x', r=1e8, q=1e5, f_r=1e3)
wf_dip_2 = xw.WakeResonator(kind='dipolar_x', r=5e7, q=5e4, f_r=5e2)
wf_quad_1 = xw.WakeResonator(kind='quadrupolar_x', r=2e7, q=8e4, f_r=2e3)
wf_quad_2 = xw.WakeResonator(kind='quadrupolar_y', r=3e7, q=6e4, f_r=1.5e3)
wf = wf_dip_1 + wf_dip_2 + wf_quad_1 + wf_quad_2

# Configure for tracking: zeta range and number of slices
wf.configure_for_tracking(
    zeta_range=(-0.1, 0.1),  # meters
    num_slices=100
)

# Simple accelerator lattice: one turn map plus wake
one_turn = xt.LineSegmentMap(length=26000, betx=50., bety=40., qx=62.28, qy=62.31,
                             longitudinal_mode='linear_fixed_qs', qs=1e-3, bets=100)
line = xt.Line(elements=[one_turn, wf], element_names=['one_turn', 'wake'])
line.set_particle_ref('proton', p0c=7e12)

# One particle per slice, give it an offset, track once and inspect the kick
particles = line.build_particles(
    x=0, px=0, y=0, py=0,
    zeta=wf.slicer.zeta_centers.flatten(),
)
particles.x += 1e-3  # mm-level offset

# Track one turn
line.track(particles, num_turns=1)

configure_for_tracking prepares the internal slicer and wake tracker (zeta_range, num_slices) and must be called before tracking. Summing wakes works naturally: wf_total = wf1 + wf2 + wf3; the combined wake is configured once and used like a single element.

Wakefield definitions

Transverse wakefields are defined such that the transverse kicks are given by:

\[\begin{split}\Delta p_x &= \frac{q^2 e^2}{m_0 \gamma \beta_0^2 c^2} \sum_{i,j,k,l \ge 0} x^k y^l \int_{-\infty}^{\infty} \bar{x}^i(z')\,\bar{y}^j(z')\,\lambda(z')\,W^{i,j,k,l}_{x}(z - z')\,dz' \\ \Delta p_y &= \frac{q^2 e^2}{m_0 \gamma \beta_0^2 c^2} \sum_{i,j,k,l \ge 0} x^k y^l \int_{-\infty}^{\infty} \bar{x}^i(z')\,\bar{y}^j(z')\,\lambda(z')\,W^{i,j,k,l}_{y}(z - z')\,dz'\end{split}\]

where \(\bar{x}(z)\) and \(\bar{y}(z)\) are the transverse centroids, and \(\lambda(z)\) is the line density. The exponents \((i,j)\) belong to the source moments, while \((k,l)\) apply to the test particle offsets.

Longitudinal kicks are defined so that the energy momentum deviation change is:

\[\Delta \delta = -\frac{q^2 e^2}{m_0 \gamma \beta_0^2 c^2} \int_{-\infty}^{\infty} \lambda(z')\, W_s(z - z')\,dz' ~,\]

with the sign convention that a positive wake causes energy loss.

Each predefined kind maps to a plane and to a set of source exponents \((i,j)\) and test exponents \((k,l)\), following \(W^{i,j,k,l}\) in the formulas above. The available kinds are listed below:

kind

plane

source_exponents

test_exponents

meaning

longitudinal

z

(0, 0)

(0, 0)

longitudinal

constant_x

x

(0, 0)

(0, 0)

constant x

constant_y

y

(0, 0)

(0, 0)

constant y

dipolar_x

x

(1, 0)

(0, 0)

dipolar / driving x

dipolar_y

y

(0, 1)

(0, 0)

dipolar / driving y

dipolar_xy

x

(0, 1)

(0, 0)

dipolar / driving xy

dipolar_yx

y

(1, 0)

(0, 0)

dipolar / driving yx

quadrupolar_x

x

(0, 0)

(1, 0)

quadrupolar / detuning x

quadrupolar_y

y

(0, 0)

(0, 1)

quadrupolar / detuning y

quadrupolar_xy

x

(0, 0)

(0, 1)

quadrupolar / detuning xy

quadrupolar_yx

y

(0, 0)

(1, 0)

quadrupolar / detuning yx

Wakefield objects can be initialized in different ways, as illustrated by the following examples.

Single component
# Horizontal dipolar resonator
w1 = xw.WakeResonator(kind='dipolar_x', r=1e8, q=1e5, f_r=1e9)
Multiple components (list/tuple)
# Horizontal + vertical dipolar with same r/q/f_r
w2 = xw.WakeResonator(kind=['dipolar_x', 'dipolar_y'],
                      r=1e8, q=1e5, f_r=1e9)
Weighted components (dict)
# Scale horizontal twice as strong as vertical
w3 = xw.WakeResonator(kind={'dipolar_x': 2.0, 'dipolar_y': 1.0},
                      r=1e8, q=1e5, f_r=1e9)
Using Yokoya factors
# Flat chamber (horizontal) yokoya factors expanded into the right components
w4 = xw.WakeResonator(kind=xw.Yokoya('flat_horizontal'),
                      r=1e8, q=1e5, f_r=1e9)
Custom polynomial term
# Custom plane/exponents without a predefined kind entry
w5 = xw.WakeResonator(
    plane='y',
    source_exponents=(1, 0),  # x_source^1 y_source^0
    test_exponents=(0, 2),    # x_test^0 y_test^2
    r=1e8, q=1e5, f_r=1e9)
Combine and configure
w = w1 + w2 + w3.components[0]  # mix whole wakes and single components
w.configure_for_tracking(zeta_range=(-0.1, 0.1), num_slices=200)

Building wakes from tables

The class WakeFromTable builds wakefields from tabulated data in the time domain. Columns must correspond to the wakefield kinds defined above. The function xw.read_headtail_file can be used to read HEADTAIL-format files, as illustrated in the following examples.

import pathlib
import xwakes as xw

test_data = pathlib.Path(__file__).parent / 'test_data' / 'HLLHC_wake.dat'
columns = ['time', 'longitudinal', 'dipolar_x', 'dipolar_y',
           'quadrupolar_x', 'quadrupolar_y']

table = xw.read_headtail_file(test_data, columns)

# use only dipolar terms
wf = xw.WakeFromTable(table, columns=['dipolar_x', 'dipolar_y'])

# Configure for tracking
wf.configure_for_tracking(zeta_range=(-0.4, 0.4), num_slices=100)

# Track as usual in a line
# ...

Defining custom wakes

For forms not covered by the built-in components, you can build a Component directly from a wake callable in the time domain. Provide the plane of the kick, and the polynomial exponents applied to the source/test offsets (source_exponents for the source particle, test_exponents for the particle being kicked). The wake callable receives time t in seconds; enforce causality by zeroing t <= 0. Wrap one or more components in xw.Wake and configure it like any other wake.

import numpy as np
import xwakes as xw

a, b, c = 1.0e9, 0.1e9, 2.0  # frequency, damping rate, amplitude

def wake_vs_t(t):
    t = np.atleast_1d(t)
    out = c * np.sin(a * t) * np.exp(-b * t)
    out[t <= 0] = 0.0  # causal wake
    return out

custom_component = xw.Component(
    wake=wake_vs_t,
    plane='y',
    source_exponents=(2, 0),
    test_exponents=(1, 1),
    name="Example damped sine wake",
)

custom_wake = xw.Wake(components=[custom_component])

# Inspect the zeta-domain wake or combine with other components
zeta = np.linspace(-10, 10, 500)
values = custom_component.function_vs_zeta(zeta, beta0=0.7)
custom_wake.configure_for_tracking(zeta_range=(-0.1, 0.1), num_slices=200)

The snippet mirrors xwakes/examples/003_custom_wake.py; you can mix these components with resonators or tables via custom_wake + other_wake.

Multi-bunch, multi-turn wakes

configure_for_tracking accepts multi-bunch/multi-turn options:

  • filling_scheme: array of 0/1 slots (length = number of RF buckets considered)

  • bunch_spacing_zeta: spacing between buckets in meters

  • num_turns and circumference: enable multi-turn wake memory

Example (two bunches, one-turn memory):

import numpy as np
import xwakes as xw
import xpart as xp
import xtrack as xt

filling_scheme = np.zeros(3564, dtype=int)
filling_scheme[0] = filling_scheme[1] = 1

wf = xw.WakeResonator(kind='dipolar_x', r=1e8, q=1e5, f_r=600e6)
wf.configure_for_tracking(
    zeta_range=(-0.2, 0.2),
    num_slices=200,
    filling_scheme=filling_scheme,
    bunch_spacing_zeta=26658.8832 / 3564,
    num_turns=1,
    circumference=26658.8832,
)

# Simple one-turn map and line
one_turn = xt.LineSegmentMap(
    length=26658.8832, betx=70., bety=80., qx=62.31, qy=60.32,
    longitudinal_mode='nonlinear', qs=2e-3, bets=731.27)
line = xt.Line([one_turn, wf], element_names=['one_turn', 'wake'])
line.particle_ref = xt.Particles(p0c=7e12)

# Generate a matched two-bunch beam and track
particles = xp.generate_matched_gaussian_multibunch_beam(
    line=line, filling_scheme=filling_scheme,
    bunch_num_particles=100_000, bunch_intensity_particles=2.3e11,
    nemitt_x=2e-6, nemitt_y=2e-6, sigma_z=0.08,
    bucket_length=26658.8832 / 35640, bunch_spacing_buckets=10,
)
line.track(particles, num_turns=10)

Space charge

Xsuite can be used to perform simulations including space-charge effects. Three modes can be used to simulate the space-charge forces:

  • Frozen: A fixed Gaussian bunch distribution is used to compute the space-charge forces.

  • Quasi-frozen: Forces are computed assuming a Gaussian distribution. The properties of the distribution (transverse r.m.s. sizes,transverse positions) are updated at each interaction based on the particle distribution.

  • PIC: The Particle In Cell method is used to compute the forces acting among particles. No assumption is made on the bunch shape.

The last two options constitute collective interactions. As discussed in the dedicated section, the Xtrack Line works such that particles are tracked asynchronously by separate threads in the non-collective sections of the sequence and are regrouped at each collective element (in PIC or quasi-frozen space-charge lenses).

The following example illustrates how to configure and run a space-charge simulation. The variable mode is used to switch between frozen, quasi-frozen and PIC.

See also: xfields.install_spacecharge_frozen(), xfields.replace_spacecharge_with_quasi_frozen(), xfields.replace_spacecharge_with_PIC().

Example

import json
import numpy as np

import xobjects as xo
import xpart as xp
import xtrack as xt
import xfields as xf

############
# Settings #
############

fname_line = ('../../test_data/sps_w_spacecharge/'
                  'line_no_spacecharge_and_particle.json')

bunch_intensity = 1e11/3 # Need short bunch to avoid bucket non-linearity
                         # to compare frozen/quasi-frozen and PIC
sigma_z = 22.5e-2/3
nemitt_x=2.5e-6
nemitt_y=2.5e-6
n_part=int(1e6)
num_turns=32

num_spacecharge_interactions = 540

# Available modes: frozen/quasi-frozen/pic
# mode = 'pic'
mode = 'quasi-frozen'

# Choose solver between `FFTSolver2p5DAveraged` and `FFTSolver2p5D`
pic_solver = 'FFTSolver2p5DAveraged'

####################
# Choose a context #
####################

context = xo.ContextCpu()
#context = xo.ContextCupy()
#context = xo.ContextPyopencl('0.0')

print(context)

#############
# Load line #
#############

with open(fname_line, 'r') as fid:
     input_data = json.load(fid)
line = xt.Line.from_dict(input_data['line'])
line.particle_ref = xt.Particles.from_dict(input_data['particle'])


line.build_tracker(_context=context, compile=False)

#############################################
# Install spacecharge interactions (frozen) #
#############################################

lprofile = xf.LongitudinalProfileQGaussian(
        number_of_particles=bunch_intensity,
        sigma_z=sigma_z,
        z0=0.,
        q_parameter=1.)

xf.install_spacecharge_frozen(line=line,
                   longitudinal_profile=lprofile,
                   nemitt_x=nemitt_x, nemitt_y=nemitt_y,
                   sigma_z=sigma_z,
                   num_spacecharge_interactions=num_spacecharge_interactions,
                   )

#################################
# Switch to PIC or quasi-frozen #
#################################

if mode == 'frozen':
    pass # Already configured in line
elif mode == 'quasi-frozen':
    xf.replace_spacecharge_with_quasi_frozen(
                                    line,
                                    update_mean_x_on_track=True,
                                    update_mean_y_on_track=True)
elif mode == 'pic':
    pic_collection, all_pics = xf.replace_spacecharge_with_PIC(
        line=line,
        n_sigmas_range_pic_x=8,
        n_sigmas_range_pic_y=8,
        nx_grid=256, ny_grid=256, nz_grid=100,
        n_lims_x=7, n_lims_y=3,
        z_range=(-3*sigma_z, 3*sigma_z),
        solver=pic_solver)
else:
    raise ValueError(f'Invalid mode: {mode}')

#################
# Build Tracker #
#################

line.build_tracker(_context=context)

######################
# Generate particles #
######################

# Build a line without spacecharge (recycling the track kernel)
line_sc_off = line.filter_elements(exclude_types_starting_with='SpaceCh')
line_sc_off.build_tracker(track_kernel=line.tracker.track_kernel)

# (we choose to match the distribution without accounting for spacecharge)
particles = xp.generate_matched_gaussian_bunch(line=line_sc_off,
         num_particles=n_part, total_intensity_particles=bunch_intensity,
         nemitt_x=nemitt_x, nemitt_y=nemitt_y, sigma_z=sigma_z)

####################################################
# Check that everything is on the selected context #
####################################################

assert particles._context == context
assert len(set([ee._buffer for ee in line.elements])) == 1 # All on same context
assert line._context == context

#########
# Track #
#########
line.track(particles, num_turns=num_turns, with_progress=1)

# Complete source: xtrack/examples/spacecharge/000_spacecharge_example.py

Beam-beam

Weak-strong 2D

The example below shows how to introduce a 2D beam-beam element in a line and perform few studies based on tracking. The 2D beam-beam element provides a kick based on the Basseti-Erskine formula neglecting any longitudinal varitations of the beam-beam force.

import numpy as np
from matplotlib import pyplot as plt
import xobjects as xo
import xtrack as xt
import xfields as xf
import xpart as xp

context = xo.ContextCpu(omp_num_threads=0)

# Machine and beam parameters
n_macroparticles = int(1e4)
bunch_intensity = 2.3e11
nemitt_x = 2E-6
nemitt_y = 2E-6
p0c = 7e12
gamma = p0c/xp.PROTON_MASS_EV
physemit_x = nemitt_x/gamma
physemit_y = nemitt_y/gamma
beta_x = 1.0
beta_y = 1.0
sigma_z = 0.08
sigma_delta = 1E-4
beta_s = sigma_z/sigma_delta
Qx = 0.31
Qy = 0.32
Qs = 2.1E-3

##############################
# Plot the beam-beam force   #
##############################

# Particles algined on the x axis
particles = xp.Particles(_context=context,
    p0c=p0c,
    x=np.sqrt(physemit_x*beta_x)*np.linspace(-6,6,n_macroparticles),
    px=np.zeros(n_macroparticles),
    y=np.zeros(n_macroparticles),
    py=np.zeros(n_macroparticles),
    zeta=np.zeros(n_macroparticles),
    delta=np.zeros(n_macroparticles),
)
# Definition of the beam-beam force based on the other beam's
# properties (here assumed identical to the tracked beam)
# Note that the arguments 'Sigma' are sigma**2 
bbeam = xf.BeamBeamBiGaussian2D(
            _context=context,
            other_beam_q0 = particles.q0,
            other_beam_beta0 = particles.beta0[0],
            other_beam_num_particles = bunch_intensity,
            other_beam_Sigma_11 = physemit_x*beta_x,
            other_beam_Sigma_33 = physemit_y*beta_y)

# Build line and tracker with only the beam-beam element
elements = [bbeam]
line = xt.Line(elements=elements)
line.particle_ref = xp.Particles(mass0=xp.PROTON_MASS_EV, p0c=7e12)
line.build_tracker()
# track on turn
line.track(particles,num_turns=1)
#plot the resulting change of px
plt.figure()
plt.plot(particles.x/np.sqrt(physemit_x*beta_x),particles.px)
plt.xlabel(r'x [$\sigma_x$]')
plt.ylabel(r'$\Delta p_x$')

##############################
# Tune footprint             #
##############################

# Build linear lattice
arc = xt.LineSegmentMap(
        betx = beta_x,bety = beta_y,
        qx = Qx, qy = Qy,bets = beta_s, qs=Qs)
# Build line and tracker with a beam-beam element and linear lattice
elements = [bbeam,arc]
line = xt.Line(elements=elements)
line.particle_ref = xp.Particles(mass0=xp.PROTON_MASS_EV, p0c=7e12)
line.build_tracker()
# plot footprint
plt.figure()
fp0 = line.get_footprint(nemitt_x=nemitt_x, nemitt_y=nemitt_y)
fp0.plot(color='k')

##############################
# Phase space distorion      #
##############################

# Initialise particles randomly in 6D phase space
particles = xp.Particles(_context=context,
    p0c=p0c,
    x=np.sqrt(physemit_x*beta_x)*(np.random.randn(n_macroparticles)),
    px=np.sqrt(physemit_x/beta_x)*np.random.randn(n_macroparticles),
    y=np.sqrt(physemit_y*beta_y)*(np.random.randn(n_macroparticles)),
    py=np.sqrt(physemit_y/beta_y)*np.random.randn(n_macroparticles),
    zeta=sigma_z*np.random.randn(n_macroparticles),
    delta=sigma_delta*np.random.randn(n_macroparticles),
)

# Change the tune to better visualise the 4th order resonance
arc.qx=0.255
# Track
line.track(particles,num_turns=10000)
# Plot phase space
plt.figure()
plt.plot(particles.x/np.sqrt(physemit_x*beta_x),particles.px/np.sqrt(physemit_x/beta_x),'.')
plt.xlabel('$x$ [$\sigma_x$]')
plt.ylabel('$p_x$ [$\sigma_{px}$]')
plt.show()

# Complete source: xfields/examples/002_beambeam/010_beambeam2d_weakstrong.py
_images/footprint_HO2D.png
_images/beambeamkick.png
_images/phasespacedistortion.png

Weak-strong 3D

The 3D beam-beam element can be used similarly, replacing the instanciation of the beam-beam element as in the example below. This element takes into account longitudinal variations of the beam-beam force (hourglass, crossing angle) based on a longitudinal slicing of the beam (Hirata’s method) handled by the xfields.beam_elements.TempSlicer.

# Number of longitudinal slices
n_slices = 21
# Slicer used to determine the position and charge of the slices
# based on an optimised algorithm by D. Shatilov. (other options are also possible)
slicer = xf.TempSlicer(n_slices=n_slices, sigma_z=sigma_z, mode="shatilov")
bbeam = xf.BeamBeamBiGaussian3D(
            _context=context,
            other_beam_q0 = particles.q0,
            # Full crossing angle at the IP in rad
            phi = 500.0E-2,
            # Roll angle of the crossing angle in rad (0 -> horizontal, pi/2 -> vertical)
            alpha = 0.0,
            # charge in each slice
            slices_other_beam_num_particles = slicer.bin_weights * bunch_intensity,
            # longitudinal position of the slice
            slices_other_beam_zeta_center = slicer.bin_centers,
            # transverse sizes of the slices at the IP (squarred)
            slices_other_beam_Sigma_11 = np.zeros(n_slices,dtype=float)+physemit_x*beta_x,
            slices_other_beam_Sigma_12 = np.zeros(n_slices,dtype=float),
            slices_other_beam_Sigma_13 = np.zeros(n_slices,dtype=float),
            slices_other_beam_Sigma_14 = np.zeros(n_slices,dtype=float),
            slices_other_beam_Sigma_22 = np.zeros(n_slices,dtype=float)+physemit_x/beta_x,
            slices_other_beam_Sigma_23 = np.zeros(n_slices,dtype=float),
            slices_other_beam_Sigma_24 = np.zeros(n_slices,dtype=float),
            slices_other_beam_Sigma_33 = np.zeros(n_slices,dtype=float)+physemit_y*beta_y,
            slices_other_beam_Sigma_34 = np.zeros(n_slices,dtype=float),
            slices_other_beam_Sigma_44 = np.zeros(n_slices,dtype=float)+physemit_y/beta_y)

Strong-strong (soft-Gaussian)

Strong-strong simulations can be performed using the Pipeline for multibunch simulations, as in the example below.

import time

import numpy as np
from matplotlib import pyplot as plt
from mpi4py import MPI
import xobjects as xo
import xtrack as xt
import xfields as xf
import xpart as xp

context = xo.ContextCpu(omp_num_threads=0)

####################
# Pipeline manager #
####################

# Retrieving MPI info
comm = MPI.COMM_WORLD
nprocs = comm.Get_size()
my_rank   = comm.Get_rank()

# Building the pipeline manager based on a MPI communicator
pipeline_manager = xt.PipelineManager(comm)
if nprocs >= 2:
    # Add information about the Particles instance that require
    # communication through the pipeline manager.
    # Each Particles instance must be identifiable by a unique name
    # The second argument is the MPI rank in which the Particles object
    # lives.
    pipeline_manager.add_particles('B1b1',0)
    pipeline_manager.add_particles('B2b1',1)
    # Add information about the elements that require communication
    # through the pipeline manager.
    # Each Element instance must be identifiable by a unique name
    # All Elements are instanciated in all ranks
    pipeline_manager.add_element('IP1')
    pipeline_manager.add_element('IP2')
else:
    print('Need at least 2 MPI processes for this test')
    exit()

#################################
# Generate particles            #
#################################

n_macroparticles = int(1e4)
bunch_intensity = 2.3e11
physemit_x = 2E-6*0.938/7E3
physemit_y = 2E-6*0.938/7E3
beta_x_IP1 = 1.0
beta_y_IP1 = 1.0
beta_x_IP2 = 1.0
beta_y_IP2 = 1.0
sigma_z = 0.08
sigma_delta = 1E-4
beta_s = sigma_z/sigma_delta
Qx = 0.285
Qy = 0.32
Qs = 2.1E-3

#Offsets [sigma] the two beams with opposite direction
#to enhance oscillation of the pi-mode 
mean_x_init = 0.1
mean_y_init = 0.1

p0c = 7000e9

print('Initialising particles')
if my_rank == 0:
    particles = xp.Particles(_context=context,
        p0c=p0c,
        x=np.sqrt(physemit_x*beta_x_IP1)
            *(np.random.randn(n_macroparticles)+mean_x_init),
        px=np.sqrt(physemit_x/beta_x_IP1)
            *np.random.randn(n_macroparticles),
        y=np.sqrt(physemit_y*beta_y_IP1)
            *(np.random.randn(n_macroparticles)+mean_y_init),
        py=np.sqrt(physemit_y/beta_y_IP1)
            *np.random.randn(n_macroparticles),
        zeta=sigma_z*np.random.randn(n_macroparticles),
        delta=sigma_delta*np.random.randn(n_macroparticles),
        weight=bunch_intensity/n_macroparticles
    )
    # Initialise the Particles object with its unique name
    # (must match the info provided to the pipeline manager)
    particles.init_pipeline('B1b1')
    # Keep in memory the name of the Particles object with which this
    # rank will communicate
    # (must match the info provided to the pipeline manager)
    partner_particles_name = 'B2b1'
elif my_rank == 1:
    particles = xp.Particles(_context=context,
        p0c=p0c,
        x=np.sqrt(physemit_x*beta_x_IP1)
            *(np.random.randn(n_macroparticles)+mean_x_init),
        px=np.sqrt(physemit_x/beta_x_IP1)
            *np.random.randn(n_macroparticles),
        y=np.sqrt(physemit_y*beta_y_IP1)
            *(np.random.randn(n_macroparticles)-mean_y_init),
        py=np.sqrt(physemit_y/beta_y_IP1)
            *np.random.randn(n_macroparticles),
        zeta=sigma_z*np.random.randn(n_macroparticles),
        delta=sigma_delta*np.random.randn(n_macroparticles),
        weight=bunch_intensity/n_macroparticles
    )
    # Initialise the Particles object with its unique name
    # (must match the info provided to the pipeline manager)
    particles.init_pipeline('B2b1')
    # Keep in memory the name of the Particles object with which this
    # rank will communicate
    # (must match the info provided to the pipeline manager)
    partner_particles_name = 'B1b1'
else:
    # Stop the execution of ranks that are not needed
    print('No need for rank',my_rank)
    exit()

#############
# Beam-beam #
#############

nb_slice = 11
slicer = xf.TempSlicer(sigma_z=sigma_z, n_slices=nb_slice)
config_for_update_IP1=xf.ConfigForUpdateBeamBeamBiGaussian3D(
   pipeline_manager=pipeline_manager,
   element_name='IP1', # The element name must be unique and match the
                       # one given to the pipeline manager
   partner_particles_name = partner_particles_name, # the name of the
                       # Particles object with which to collide
   slicer=slicer,
   update_every=1 # Setup for strong-strong simulation
   )
config_for_update_IP2=xf.ConfigForUpdateBeamBeamBiGaussian3D(
   pipeline_manager=pipeline_manager,
   element_name='IP2', # The element name must be unique and match the
                       #  one given to the pipeline manager
   partner_particles_name = partner_particles_name, # the name of the
                       #Particles object with which to collide
   slicer=slicer,
   update_every=1 # Setup for strong-strong simulation
   )



print('build bb elements...')
bbeamIP1 = xf.BeamBeamBiGaussian3D(
            _context=context,
            other_beam_q0 = particles.q0,
            phi = 500E-6,alpha=0.0,
            config_for_update = config_for_update_IP1)

bbeamIP2 = xf.BeamBeamBiGaussian3D(
            _context=context,
            other_beam_q0 = particles.q0,
            phi = 500E-6,alpha=np.pi/2,
            config_for_update = config_for_update_IP2)

#################################################################
# arcs (here they are all the same with half the phase advance) #
#################################################################

arc12 = xt.LineSegmentMap(
        betx = beta_x_IP1,bety = beta_y_IP1,
        qx = Qx/2, qy = Qy/2,bets = beta_s, qs=Qs/2)

arc21 = xt.LineSegmentMap(
        betx = beta_x_IP1,bety = beta_y_IP1,
        qx = Qx/2, qy = Qy/2,bets = beta_s, qs=Qs/2)

#################################################################
# Tracker                                                       #
#################################################################

# In this example there is one Particles object per rank
# We build its line and tracker as usual
elements = [bbeamIP1,arc12,bbeamIP2,arc21]
line = xt.Line(elements=elements)
line.build_tracker()
# A pipeline branch is a line and Particles object that will
# be tracked through it
branch = xt.PipelineBranch(line, particles)
# The multitracker can deal with a set of branches (here only one)
multitracker = xt.PipelineMultiTracker(branches=[branch])

#################################################################
# Tracking  (Same as usual)                                     #
#################################################################
print('Tracking...')
time0 = time.time()
nTurn = 1024
multitracker.track(num_turns=nTurn,turn_by_turn_monitor=True)
print('Done with tracking.',(time.time()-time0)/1024,'[s/turn]')


#################################################################
# Post-processing: raw data and spectrum                        #
#################################################################

# Show beam oscillation spectrum (only one rank)
if my_rank == 0:
    positions_x_b1 = np.average(line.record_last_track.x,axis=0)
    positions_y_b1 = np.average(line.record_last_track.y,axis=0)
    plt.figure(1)
    freqs = np.fft.fftshift(np.fft.fftfreq(nTurn))
    mask = freqs > 0
    myFFT = np.fft.fftshift(np.fft.fft(positions_x_b1))
    plt.semilogy(freqs[mask], (np.abs(myFFT[mask])),label='Horizontal')
    myFFT = np.fft.fftshift(np.fft.fft(positions_y_b1))
    plt.semilogy(freqs[mask], (np.abs(myFFT[mask])),label='Vertical')
    plt.xlabel('Frequency [$f_{rev}$]')
    plt.ylabel('Amplitude')
    plt.legend(loc=0)
    plt.show()

# Complete source: xfields/examples/002_beambeam/006_beambeam6d_strongstrong_pipeline_MPI2Procs.py
_images/beambeam_sigmapi.png

In collisions featuring a low disruption (i.e. the beam moments do not vary significantly during the interaction), the quasi-strong-strong (aka frozen-strong-strong) model may be enabled by setting the argument ‘quasistrongstrong = True’ in xfields.beam_elements.ConfigForUpdate*. In this configuration, the beam moments are computed once at the start of the collison and kept constant throught the collison, thus reducing the computing load. The argument ‘update_every’ allows to further reduce the computing load by keeping the moments for the given amount of turns. This model is suitable for effects that build up over may turns. (more details in https://accelconf.web.cern.ch/eefact2022/papers/wezat0102.pdf)

For a 2D beam-beam interactions, the beam-beam element and the xfields.beam_elements.ConfigForUpdate* have to be redifined as in the example below.

config_for_update_b1_IP1=xf.ConfigForUpdateBeamBeamBiGaussian2D(
   pipeline_manager=pipeline_manager,
   element_name='IP1',
   partner_particles_name = 'B2b1',
   update_every=1
   )
config_for_update_b2_IP1=xf.ConfigForUpdateBeamBeamBiGaussian2D(
   pipeline_manager=pipeline_manager,
   element_name='IP1',
   partner_particles_name = 'B1b1',
   update_every=1
   )
bbeamIP1_b1 = xf.BeamBeamBiGaussian2D(
            _context=context,
            other_beam_q0 = particles_b2.q0,
            other_beam_beta0 = particles_b2.beta0[0],
            config_for_update = config_for_update_b1_IP1)
bbeamIP1_b2 = xf.BeamBeamBiGaussian2D(
            _context=context,
            other_beam_q0 = particles_b1.q0,
            other_beam_beta0 = particles_b1.beta0[0],
            config_for_update = config_for_update_b2_IP1)

Poisson Solver

Particle-in-cell simulations using a Poisson solver for the beam-beam interaction is currently not implemented in xfields

Beam-beam in a real lattice

Identically to the examples above, beam-beam elements can be introduced into the lattice of a full machine. Several tools exist to ease the setup of beam-beam interactions in a collider lattice: Xmask. An example snippet is shown below where an xtrack lattice in json format is loaded and a weakstrong beam-beam element is inserted at the marker “ip.1”.

path     = "/path/to/lattice"
seq_name = "lattice.json"
seq_path = os.path.join(path, seq_name)

 with open(seq_path, 'r') as f:
     line = xt.Line.from_dict(json.load(f))

# define slicer
slicer = xf.TempSlicer(n_slices=n_slices, sigma_z=sigma_z, mode=binning_mode)

# define weakstrong beambeam element (n*[x] yields a list with n elements each of which is x, e.g. 3*[5]=[5,5,5])
el_beambeam = xf.BeamBeamBiGaussian3D(
         _context=context,
         config_for_update = None,
         other_beam_q0=other_beam_q0,
         phi=phi, # half-crossing angle in radians
         alpha=alpha, # crossing plane in radians
         # slice intensity [num. real particles] number of slices inferred from the length of this list
         slices_other_beam_num_particles = slicer.bin_weights * nb,
         # unboosted opposing (strong) beam moments
         slices_other_beam_zeta_center = slicer.bin_centers,
         slices_other_beam_Sigma_11    = n_slices*[ sigma_x**2], # Beam sizes for the other beam, assuming the same value for all slices
         slices_other_beam_Sigma_22    = n_slices*[sigma_px**2],
         slices_other_beam_Sigma_33    = n_slices*[ sigma_y**2],
         slices_other_beam_Sigma_44    = n_slices*[sigma_py**2],
         # only if beamstrahlung on
         slices_other_beam_zeta_bin_width_star_beamstrahlung = None if not beamstrahlung_on else slicer.bin_widths_beamstrahlung / np.cos(phi),  # boosted dz
         # has to be set (0 in most cases) if config_for_update = None
         slices_other_beam_Sigma_12     = n_slices*[0],
         slices_other_beam_Sigma_34     = n_slices*[0],
         slices_other_beam_pzeta_center = n_slices*[pzeta], # important when tapering (FCC-ee specific)
     )

element_name = "ip.1"  # this is the marker name of an IP in the lattice where the beam-beam element will be inserted
element_line_index = line.element_names.index(element_name)  # get the array index of the element "ip.1"
line.insert_element(index=element_line_index, element=el_beambeam, name='beambeam')

Combined CPU - GPU simulations

When performing simulations on GPU contexts, it is possible to execute on CPU the computation for specific beam elements, for example PyHEADTAIL elements that do not support the Xsuite GPU contexts. For that purpose, one needs to set a `needs_cpu` flag on the concerned elements before inserting them in an Xtrack line. This instructs the Xtrack tracker to handle the required data transfers.

Presently PyHEADTAIL elements are not able to skip lost particles in the Xtrack Particles objects. This can be handled by setting the flag `needs_hidden_lost_particles`, which instructs the Xtrack tracker to temporarily hide the lost particles for the concerned elements.

The following example shows a simulation including the SPS lattice elements, space-charge elements and wakefields. The lattice tracking and the spacecharge calculations are performed on GPU with Xtrack/Xfields while the wakefield computation is performed on CPU with PyHEADTAIL.

import numpy as np

from cpymad.madx import Madx

import xpart as xp
import xobjects as xo
import xtrack as xt
import xfields as xf

###############################
# Enable pyheadtail interface #
###############################

xt.enable_pyheadtail_interface()

############
# Settings #
############

gpu_device = 0 # Set to None to use CPU, or to an integer to specify the GPU device to use (e.g. 0)
seq_name = "sps"
qx0,qy0 = 20.13, 20.18
p0c = 26e9
cavity_name = "actcse.31632"
cavity_phase = np.pi
frequency = 200e6
rf_voltage = 4e6
use_wakes = True
n_slices_wakes = 100
limit_z = 0.7
bunch_intensity = 1e11
sigma_z = 22.5e-2
nemitt_x=2.5e-6
nemitt_y=2.5e-6
n_part=int(1e4)
num_turns=2
num_spacecharge_interactions = 540

# mode = 'frozen'
mode = 'pic'

#######################################################
# Use cpymad to load the sequence and match the tunes #
#######################################################

mad = Madx()
mad.call("./madx_sps/sps.seq")
mad.call("./madx_sps/lhc_q20.str")
mad.call("./madx_sps/macro.madx")
mad.beam()
mad.use(seq_name)

mad.twiss()
tw_thick = mad.table.twiss.dframe()
summ_thick = mad.table.summ.dframe()

mad.input("""select, flag=makethin, slice=1, thick=false;
            makethin, sequence=sps, style=teapot, makedipedge=false;""")
mad.use(seq_name)
mad.exec(f"sps_match_tunes({qx0},{qy0});")

##################
# Switch context #
##################

if gpu_device is None:
   context = xo.ContextCpu()
else:
   context = xo.ContextCupy(device=gpu_device)

#################################
# Build line from MAD-X lattice #
#################################

line = xt.Line.from_madx_sequence(sequence=mad.sequence[seq_name],
           deferred_expressions=True, install_apertures=True,
           apply_madx_errors=False)
# Define reference particle
line.particle_ref = xt.Particles(p0c=p0c,mass0=xt.PROTON_MASS_EV)

################
# Switch on RF #
################

line[cavity_name].voltage = rf_voltage
line[cavity_name].phase = cavity_phase
line[cavity_name].frequency = frequency

############################################
# Twiss (check that the lattice is stable) #
############################################

line.build_tracker()
tw = line.twiss()

#####################################
# Install spacecharge interactions) #
#####################################

# Install frozen space-charge lenses

lprofile = xf.LongitudinalProfileQGaussian(
       number_of_particles=bunch_intensity,
       sigma_z=sigma_z,
       z0=0.,
       q_parameter=1.)

xf.install_spacecharge_frozen(line=line,
                  particle_ref=line.particle_ref,
                  longitudinal_profile=lprofile,
                  nemitt_x=nemitt_x, nemitt_y=nemitt_y,
                  sigma_z=sigma_z,
                  num_spacecharge_interactions=num_spacecharge_interactions,
                  )

# Switch to PIC or quasi-frozen
if mode == 'frozen':
   pass # Already configured in line
elif mode == 'quasi-frozen':
   xf.replace_spacecharge_with_quasi_frozen(
                                   line,
                                   update_mean_x_on_track=True,
                                   update_mean_y_on_track=True)
elif mode == 'pic':
   pic_collection, all_pics = xf.replace_spacecharge_with_PIC(
       _context=context, line=line,
       n_sigmas_range_pic_x=8,
       n_sigmas_range_pic_y=8,
       nx_grid=256, ny_grid=256, nz_grid=100,
       n_lims_x=7, n_lims_y=3,
       z_range=(-3*sigma_z, 3*sigma_z))
else:
   raise ValueError(f'Invalid mode: {mode}')

######################
# Install wakefields #
######################

if use_wakes:

   # We rescale the waketable to take into account for the difference between
   # the average beta functions (for which the table is calculated) and the beta
   # functions at the location in the lattice at which the wake element is
   # installed.
   wakefields = np.genfromtxt('wakes/wakeforhdtl_PyZbase_Allthemachine_7000GeV_B1_2021_TeleIndex1_wake.dat')
   wakefields[:,1] *= 54.65/tw['betx'][0]
   wakefields[:,2] *= 54.51/tw['bety'][0]
   wakefields[:,3] *= 54.65/tw['betx'][0]
   wakefields[:,4] *= 54.65/tw['betx'][0]
   wakefile = 'wakes/wakefields.wake'
   np.savetxt(wakefile,wakefields,delimiter='\t')

   # Build PyHEADTAIL wakefield element
   from PyHEADTAIL.particles.slicing import UniformBinSlicer
   from PyHEADTAIL.impedances.wakes import WakeTable, WakeField
   n_slices_wakes = 500
   slicer_for_wakefields = UniformBinSlicer(n_slices_wakes, z_cuts=(-limit_z, limit_z))
   waketable = WakeTable(wakefile, ['time', 'dipole_x', 'dipole_y',
                  'quadrupole_x', 'quadrupole_y'])
   wakefield = WakeField(slicer_for_wakefields, waketable)

   # Specity that the wakefield element needs to run on CPU and that lost
   # particles need to be hidden for this element (required by PyHEADTAIL)
   wakefield.needs_cpu = True
   wakefield.needs_hidden_lost_particles = True

   # Insert element in the line
   line.insert_element(element=wakefield,name="wakefield", at_s=0)

#################
# Build Tracker #
#################

line.build_tracker(_context=context)

######################
# Generate particles #
######################

# (we choose to match the distribution without accounting for spacecharge)
line_sc_off = line.filter_elements(exclude_types_starting_with='SpaceCh')

particles = xp.generate_matched_gaussian_bunch(_context=context,
        num_particles=n_part, total_intensity_particles=bunch_intensity,
        nemitt_x=nemitt_x, nemitt_y=nemitt_y, sigma_z=sigma_z,
        particle_ref=line.particle_ref, line=line_sc_off)
particles.circumference = line.get_length() # Needed by pyheadtail

###########################################################
# We use a phase monitor to measure the tune turn by turn #
###########################################################

phasem = xp.PhaseMonitor(line,
                 num_particles=n_part, twiss=line_sc_off.twiss())

#########
# Track #
#########

for turn in range(num_turns):
   phasem.measure(particles)
   #import pdb; pdb.set_trace()
   line.track(particles)

##################
# Plot footprint #
##################

import matplotlib.pyplot as plt
plt.close('all')
f,ax = plt.subplots()
ax.plot(phasem.qx, phasem.qy,'b.',ms=1)
ax.set_xlim(0, 0.5)
ax.set_ylim(0, 0.5)
plt.show()

# Complete source: xtrack/examples/pyheadtail_interface/004_imped_spacech_cpu_gpu.py

Pipeline for multibunch simulations

Xsuite can be used to simulate multiple bunches interacting through collective forces, such as beam-beam interactions or wakefields. The xtrack.pipeline.MultiTracker handles the tracking of multiple xpart.particles.Particles objects through their own xtrack.Line (e.g. each of the two rings in a circular collider). Each Particles instance and its Line make a xtrack.pipeline.Branch. The MultiTracker will iteratively track its branches until an xtrack.beam_elements.Element which requires communication is reached (e.g. a strong-strong beam-beam collision, where information about the other beam’s charge distribution is required). At this point the MultiTracker will use the xtrack.pipeline.PipelineManager to establish communication between the branches that need to exchange information. If the communication cannot take place, e.g. because one of the two branches is not ready, the Multitracker resumes tracking with another branch.

The PipelineManager is aware of the communication required by the different Elements and the different Particles instances (e.g. in a collider with many bunches and many interactions points, the PipelineManager knows which bunch collides with which and where.) By default the PipelineManager uses a dummy communicator which does not require MPI, but an MPI communicator can be provided instead thus enabling tracking on multiple CPUs.

It is important that the Particles instances as well as the Element instances in the line that require communication through the pipeline are attributed a uniquely defined name.

The pipeline algorithm is detailed in https://doi.org/10.1016/j.cpc.2019.06.006.

The following example illustrates how to configure and run a simulation with two bunches colliding at two interaction points using MPI. It must be run like a MPI application

mpirun -np 2 python example.py

Example

import time

import numpy as np
from matplotlib import pyplot as plt
from mpi4py import MPI
import xobjects as xo
import xtrack as xt
import xfields as xf
import xpart as xp

context = xo.ContextCpu(omp_num_threads=0)

####################
# Pipeline manager #
####################

# Retrieving MPI info
comm = MPI.COMM_WORLD
nprocs = comm.Get_size()
my_rank   = comm.Get_rank()

# Building the pipeline manager based on a MPI communicator
pipeline_manager = xt.PipelineManager(comm)
if nprocs >= 2:
    # Add information about the Particles instance that require
    # communication through the pipeline manager.
    # Each Particles instance must be identifiable by a unique name
    # The second argument is the MPI rank in which the Particles object
    # lives.
    pipeline_manager.add_particles('B1b1',0)
    pipeline_manager.add_particles('B2b1',1)
    # Add information about the elements that require communication
    # through the pipeline manager.
    # Each Element instance must be identifiable by a unique name
    # All Elements are instanciated in all ranks
    pipeline_manager.add_element('IP1')
    pipeline_manager.add_element('IP2')
else:
    print('Need at least 2 MPI processes for this test')
    exit()

#################################
# Generate particles            #
#################################

n_macroparticles = int(1e4)
bunch_intensity = 2.3e11
physemit_x = 2E-6*0.938/7E3
physemit_y = 2E-6*0.938/7E3
beta_x_IP1 = 1.0
beta_y_IP1 = 1.0
beta_x_IP2 = 1.0
beta_y_IP2 = 1.0
sigma_z = 0.08
sigma_delta = 1E-4
beta_s = sigma_z/sigma_delta
Qx = 0.285
Qy = 0.32
Qs = 2.1E-3

#Offsets [sigma] the two beams with opposite direction
#to enhance oscillation of the pi-mode 
mean_x_init = 0.1
mean_y_init = 0.1

p0c = 7000e9

print('Initialising particles')
if my_rank == 0:
    particles = xp.Particles(_context=context,
        p0c=p0c,
        x=np.sqrt(physemit_x*beta_x_IP1)
            *(np.random.randn(n_macroparticles)+mean_x_init),
        px=np.sqrt(physemit_x/beta_x_IP1)
            *np.random.randn(n_macroparticles),
        y=np.sqrt(physemit_y*beta_y_IP1)
            *(np.random.randn(n_macroparticles)+mean_y_init),
        py=np.sqrt(physemit_y/beta_y_IP1)
            *np.random.randn(n_macroparticles),
        zeta=sigma_z*np.random.randn(n_macroparticles),
        delta=sigma_delta*np.random.randn(n_macroparticles),
        weight=bunch_intensity/n_macroparticles
    )
    # Initialise the Particles object with its unique name
    # (must match the info provided to the pipeline manager)
    particles.init_pipeline('B1b1')
    # Keep in memory the name of the Particles object with which this
    # rank will communicate
    # (must match the info provided to the pipeline manager)
    partner_particles_name = 'B2b1'
elif my_rank == 1:
    particles = xp.Particles(_context=context,
        p0c=p0c,
        x=np.sqrt(physemit_x*beta_x_IP1)
            *(np.random.randn(n_macroparticles)+mean_x_init),
        px=np.sqrt(physemit_x/beta_x_IP1)
            *np.random.randn(n_macroparticles),
        y=np.sqrt(physemit_y*beta_y_IP1)
            *(np.random.randn(n_macroparticles)-mean_y_init),
        py=np.sqrt(physemit_y/beta_y_IP1)
            *np.random.randn(n_macroparticles),
        zeta=sigma_z*np.random.randn(n_macroparticles),
        delta=sigma_delta*np.random.randn(n_macroparticles),
        weight=bunch_intensity/n_macroparticles
    )
    # Initialise the Particles object with its unique name
    # (must match the info provided to the pipeline manager)
    particles.init_pipeline('B2b1')
    # Keep in memory the name of the Particles object with which this
    # rank will communicate
    # (must match the info provided to the pipeline manager)
    partner_particles_name = 'B1b1'
else:
    # Stop the execution of ranks that are not needed
    print('No need for rank',my_rank)
    exit()

#############
# Beam-beam #
#############

nb_slice = 11
slicer = xf.TempSlicer(sigma_z=sigma_z, n_slices=nb_slice)
config_for_update_IP1=xf.ConfigForUpdateBeamBeamBiGaussian3D(
   pipeline_manager=pipeline_manager,
   element_name='IP1', # The element name must be unique and match the
                       # one given to the pipeline manager
   partner_particles_name = partner_particles_name, # the name of the
                       # Particles object with which to collide
   slicer=slicer,
   update_every=1 # Setup for strong-strong simulation
   )
config_for_update_IP2=xf.ConfigForUpdateBeamBeamBiGaussian3D(
   pipeline_manager=pipeline_manager,
   element_name='IP2', # The element name must be unique and match the
                       #  one given to the pipeline manager
   partner_particles_name = partner_particles_name, # the name of the
                       #Particles object with which to collide
   slicer=slicer,
   update_every=1 # Setup for strong-strong simulation
   )



print('build bb elements...')
bbeamIP1 = xf.BeamBeamBiGaussian3D(
            _context=context,
            other_beam_q0 = particles.q0,
            phi = 500E-6,alpha=0.0,
            config_for_update = config_for_update_IP1)

bbeamIP2 = xf.BeamBeamBiGaussian3D(
            _context=context,
            other_beam_q0 = particles.q0,
            phi = 500E-6,alpha=np.pi/2,
            config_for_update = config_for_update_IP2)

#################################################################
# arcs (here they are all the same with half the phase advance) #
#################################################################

arc12 = xt.LineSegmentMap(
        betx = beta_x_IP1,bety = beta_y_IP1,
        qx = Qx/2, qy = Qy/2,bets = beta_s, qs=Qs/2)

arc21 = xt.LineSegmentMap(
        betx = beta_x_IP1,bety = beta_y_IP1,
        qx = Qx/2, qy = Qy/2,bets = beta_s, qs=Qs/2)

#################################################################
# Tracker                                                       #
#################################################################

# In this example there is one Particles object per rank
# We build its line and tracker as usual
elements = [bbeamIP1,arc12,bbeamIP2,arc21]
line = xt.Line(elements=elements)
line.build_tracker()
# A pipeline branch is a line and Particles object that will
# be tracked through it
branch = xt.PipelineBranch(line, particles)
# The multitracker can deal with a set of branches (here only one)
multitracker = xt.PipelineMultiTracker(branches=[branch])

#################################################################
# Tracking  (Same as usual)                                     #
#################################################################
print('Tracking...')
time0 = time.time()
nTurn = 1024
multitracker.track(num_turns=nTurn,turn_by_turn_monitor=True)
print('Done with tracking.',(time.time()-time0)/1024,'[s/turn]')


#################################################################
# Post-processing: raw data and spectrum                        #
#################################################################

# Show beam oscillation spectrum (only one rank)
if my_rank == 0:
    positions_x_b1 = np.average(line.record_last_track.x,axis=0)
    positions_y_b1 = np.average(line.record_last_track.y,axis=0)
    plt.figure(1)
    freqs = np.fft.fftshift(np.fft.fftfreq(nTurn))
    mask = freqs > 0
    myFFT = np.fft.fftshift(np.fft.fft(positions_x_b1))
    plt.semilogy(freqs[mask], (np.abs(myFFT[mask])),label='Horizontal')
    myFFT = np.fft.fftshift(np.fft.fft(positions_y_b1))
    plt.semilogy(freqs[mask], (np.abs(myFFT[mask])),label='Vertical')
    plt.xlabel('Frequency [$f_{rev}$]')
    plt.ylabel('Amplitude')
    plt.legend(loc=0)
    plt.show()

# Complete source: xfields/examples/002_beambeam/006_beambeam6d_strongstrong_pipeline_MPI2Procs.py
_images/beambeam_sigmapi.png

If the communicator is not specified when instantiating the PipelineManager, it will not use MPI:

pipeline_manager = xt.PipelineManager()