Collective effects
Tracking with collective elements
A collective beam element is an element that needs access to the entire particle set (in read and/or write mode). The following example shows how to handle such elements in Xsuite.
Example
A typical example of collective element is a space-charge interaction. We can create a space-charge beam element as follows:
import xobjects as xo
import xfields as xf
context = xo.ContextCpu()
spcharge = xf.SpaceChargeBiGaussian(_context=context,
update_on_track = ['sigma_x', 'sigma_y'], length=2,
longitudinal_profile=xf.LongitudinalProfileQGaussian(
_context=context, number_of_particles=1e11, sigma_z=0.2))
This creates a space-charge element where the transverse beam sizes are updated based on the particle set at each interaction. Such an element can be included in an xtrack tracker similarly to single-particle elements.
import xtrack as xt
## Generate a simple beam line including the spacecharge element
myqf = xt.Multipole(knl=[0, 1.])
myqd = xt.Multipole(knl=[0, -1.])
mydrift = xt.Drift(length=1.)
line = xt.Line(
elements = [myqf, mydrift, myqd, mydrift,
spcharge,
myqf, mydrift, myqd, mydrift,],
element_names = ['qf1', 'drift1', 'qd1', 'drift2',
'spcharge'
'qf2', 'drift3', 'qd2', 'drift4'])
## Transfer lattice on context and compile tracking code
line.build_tracker(_context=context)
## Build particle object on context
n_part = 200
particles = xt.Particles(_context=context,
p0c=6500e9,
x=np.random.uniform(-1e-3, 1e-3, n_part),
px=np.random.uniform(-1e-5, 1e-5, n_part),
y=np.random.uniform(-2e-3, 2e-3, n_part),
py=np.random.uniform(-3e-5, 3e-5, n_part),
zeta=np.random.uniform(-1e-2, 1e-2, n_part),
delta=np.random.uniform(-1e-4, 1e-4, n_part),
)
## Track (saving turn-by-turn data)
n_turns = 100
line.track(particles, num_turns=n_turns,
turn_by_turn_monitor=True)
How does it work?
To decide whether or not an element needs to be treated as collective, the
tracker inspects its iscollective attribute. In our example:
print(qf.iscollective)
# Gives "False"
print(spcharge.iscollective)
# Gives "True"
Based in this information the line is divided in parts that are either collective elements or xtrack trackers simulating groups of consecutive non-collective elements.
We can visualize this in our example:
print(line.tracker._parts)
# Gives:
# [<xtrack.tracker.Tracker object at 0x7f5ba8ce7760>,
# <xfields.beam_elements.spacecharge.SpaceChargeBiGaussian object at 0x7f5ba8e1bd30>,
# <xtrack.tracker.Tracker object at 0x7f5ba8ce7610>]
where the first part tracks the particles through to the first potion of the machine up to the space-charge element, the second part simulates the space-charge interaction, the third part tracks the particles from the space-charge element to the end of the line.
As all xsuite and xsuite-compatible beam elements need to expose a .track
method the instruction:
line.track(particles)
is equivalent to the loop:
for pp in line.tracker._parts:
pp.track(particles)
Any python object exposing a ‘.track’ method can be used as beam_element. If the
attribute iscollective is not present the element is handled as collective.
Xwakes: wakefields and impedances
xwakes provides wakefield and impedance elements that plug into the
Xsuite tracking environment. It offers analytic resonator wakes, wakes read from
tables, multi-bunch/multi-turn support, transverse damping, monitoring, and
MPI pipeline wiring. The list of available wakefield elements is available in
the API reference.
Quick start: resonator wake on a single bunch
import numpy as np
import xtrack as xt
import xwakes as xw
# Build a wake as the sum of dipolar and quadrupolar resonators
wf_dip_1 = xw.WakeResonator(kind='dipolar_x', r=1e8, q=1e5, f_r=1e3)
wf_dip_2 = xw.WakeResonator(kind='dipolar_x', r=5e7, q=5e4, f_r=5e2)
wf_quad_1 = xw.WakeResonator(kind='quadrupolar_x', r=2e7, q=8e4, f_r=2e3)
wf_quad_2 = xw.WakeResonator(kind='quadrupolar_y', r=3e7, q=6e4, f_r=1.5e3)
wf = wf_dip_1 + wf_dip_2 + wf_quad_1 + wf_quad_2
# Configure for tracking: zeta range and number of slices
wf.configure_for_tracking(
zeta_range=(-0.1, 0.1), # meters
num_slices=100
)
# Simple accelerator lattice: one turn map plus wake
one_turn = xt.LineSegmentMap(length=26000, betx=50., bety=40., qx=62.28, qy=62.31,
longitudinal_mode='linear_fixed_qs', qs=1e-3, bets=100)
line = xt.Line(elements=[one_turn, wf], element_names=['one_turn', 'wake'])
line.set_particle_ref('proton', p0c=7e12)
# One particle per slice, give it an offset, track once and inspect the kick
particles = line.build_particles(
x=0, px=0, y=0, py=0,
zeta=wf.slicer.zeta_centers.flatten(),
)
particles.x += 1e-3 # mm-level offset
# Track one turn
line.track(particles, num_turns=1)
configure_for_tracking prepares the internal slicer and wake tracker
(zeta_range, num_slices) and must be called before tracking. Summing
wakes works naturally: wf_total = wf1 + wf2 + wf3; the combined wake is
configured once and used like a single element.
Wakefield definitions
Transverse wakefields are defined such that the transverse kicks are given by:
where \(\bar{x}(z)\) and \(\bar{y}(z)\) are the transverse centroids, and \(\lambda(z)\) is the line density. The exponents \((i,j)\) belong to the source moments, while \((k,l)\) apply to the test particle offsets.
Longitudinal kicks are defined so that the energy momentum deviation change is:
with the sign convention that a positive wake causes energy loss.
Each predefined kind maps to a plane and to a set of source exponents
\((i,j)\) and test exponents \((k,l)\), following \(W^{i,j,k,l}\) in
the formulas above. The available kinds are listed below:
kind |
plane |
source_exponents |
test_exponents |
meaning |
|---|---|---|---|---|
|
z |
(0, 0) |
(0, 0) |
longitudinal |
|
x |
(0, 0) |
(0, 0) |
constant x |
|
y |
(0, 0) |
(0, 0) |
constant y |
|
x |
(1, 0) |
(0, 0) |
dipolar / driving x |
|
y |
(0, 1) |
(0, 0) |
dipolar / driving y |
|
x |
(0, 1) |
(0, 0) |
dipolar / driving xy |
|
y |
(1, 0) |
(0, 0) |
dipolar / driving yx |
|
x |
(0, 0) |
(1, 0) |
quadrupolar / detuning x |
|
y |
(0, 0) |
(0, 1) |
quadrupolar / detuning y |
|
x |
(0, 0) |
(0, 1) |
quadrupolar / detuning xy |
|
y |
(0, 0) |
(1, 0) |
quadrupolar / detuning yx |
Wakefield objects can be initialized in different ways, as illustrated by the following examples.
- Single component
# Horizontal dipolar resonator w1 = xw.WakeResonator(kind='dipolar_x', r=1e8, q=1e5, f_r=1e9)
- Multiple components (list/tuple)
# Horizontal + vertical dipolar with same r/q/f_r w2 = xw.WakeResonator(kind=['dipolar_x', 'dipolar_y'], r=1e8, q=1e5, f_r=1e9)
- Weighted components (dict)
# Scale horizontal twice as strong as vertical w3 = xw.WakeResonator(kind={'dipolar_x': 2.0, 'dipolar_y': 1.0}, r=1e8, q=1e5, f_r=1e9)
- Using Yokoya factors
# Flat chamber (horizontal) yokoya factors expanded into the right components w4 = xw.WakeResonator(kind=xw.Yokoya('flat_horizontal'), r=1e8, q=1e5, f_r=1e9)
- Custom polynomial term
# Custom plane/exponents without a predefined kind entry w5 = xw.WakeResonator( plane='y', source_exponents=(1, 0), # x_source^1 y_source^0 test_exponents=(0, 2), # x_test^0 y_test^2 r=1e8, q=1e5, f_r=1e9)
- Combine and configure
w = w1 + w2 + w3.components[0] # mix whole wakes and single components w.configure_for_tracking(zeta_range=(-0.1, 0.1), num_slices=200)
Building wakes from tables
The class WakeFromTable builds wakefields from tabulated data in the time domain.
Columns must correspond to the wakefield kinds defined above. The function
xw.read_headtail_file can be used to read HEADTAIL-format files, as illustrated
in the following examples.
import pathlib
import xwakes as xw
test_data = pathlib.Path(__file__).parent / 'test_data' / 'HLLHC_wake.dat'
columns = ['time', 'longitudinal', 'dipolar_x', 'dipolar_y',
'quadrupolar_x', 'quadrupolar_y']
table = xw.read_headtail_file(test_data, columns)
# use only dipolar terms
wf = xw.WakeFromTable(table, columns=['dipolar_x', 'dipolar_y'])
# Configure for tracking
wf.configure_for_tracking(zeta_range=(-0.4, 0.4), num_slices=100)
# Track as usual in a line
# ...
Defining custom wakes
For forms not covered by the built-in components, you can build a Component directly
from a wake callable in the time domain. Provide the plane of the kick, and the polynomial
exponents applied to the source/test offsets (source_exponents for the source particle,
test_exponents for the particle being kicked). The wake callable receives time t in
seconds; enforce causality by zeroing t <= 0. Wrap one or more components in xw.Wake
and configure it like any other wake.
import numpy as np
import xwakes as xw
a, b, c = 1.0e9, 0.1e9, 2.0 # frequency, damping rate, amplitude
def wake_vs_t(t):
t = np.atleast_1d(t)
out = c * np.sin(a * t) * np.exp(-b * t)
out[t <= 0] = 0.0 # causal wake
return out
custom_component = xw.Component(
wake=wake_vs_t,
plane='y',
source_exponents=(2, 0),
test_exponents=(1, 1),
name="Example damped sine wake",
)
custom_wake = xw.Wake(components=[custom_component])
# Inspect the zeta-domain wake or combine with other components
zeta = np.linspace(-10, 10, 500)
values = custom_component.function_vs_zeta(zeta, beta0=0.7)
custom_wake.configure_for_tracking(zeta_range=(-0.1, 0.1), num_slices=200)
The snippet mirrors xwakes/examples/003_custom_wake.py; you can mix these components with
resonators or tables via custom_wake + other_wake.
Multi-bunch, multi-turn wakes
configure_for_tracking accepts multi-bunch/multi-turn options:
filling_scheme: array of 0/1 slots (length = number of RF buckets considered)bunch_spacing_zeta: spacing between buckets in metersnum_turnsandcircumference: enable multi-turn wake memory
Example (two bunches, one-turn memory):
import numpy as np
import xwakes as xw
import xpart as xp
import xtrack as xt
filling_scheme = np.zeros(3564, dtype=int)
filling_scheme[0] = filling_scheme[1] = 1
wf = xw.WakeResonator(kind='dipolar_x', r=1e8, q=1e5, f_r=600e6)
wf.configure_for_tracking(
zeta_range=(-0.2, 0.2),
num_slices=200,
filling_scheme=filling_scheme,
bunch_spacing_zeta=26658.8832 / 3564,
num_turns=1,
circumference=26658.8832,
)
# Simple one-turn map and line
one_turn = xt.LineSegmentMap(
length=26658.8832, betx=70., bety=80., qx=62.31, qy=60.32,
longitudinal_mode='nonlinear', qs=2e-3, bets=731.27)
line = xt.Line([one_turn, wf], element_names=['one_turn', 'wake'])
line.particle_ref = xt.Particles(p0c=7e12)
# Generate a matched two-bunch beam and track
particles = xp.generate_matched_gaussian_multibunch_beam(
line=line, filling_scheme=filling_scheme,
bunch_num_particles=100_000, bunch_intensity_particles=2.3e11,
nemitt_x=2e-6, nemitt_y=2e-6, sigma_z=0.08,
bucket_length=26658.8832 / 35640, bunch_spacing_buckets=10,
)
line.track(particles, num_turns=10)
Space charge
Xsuite can be used to perform simulations including space-charge effects. Three modes can be used to simulate the space-charge forces:
Frozen: A fixed Gaussian bunch distribution is used to compute the space-charge forces.
Quasi-frozen: Forces are computed assuming a Gaussian distribution. The properties of the distribution (transverse r.m.s. sizes,transverse positions) are updated at each interaction based on the particle distribution.
PIC: The Particle In Cell method is used to compute the forces acting among particles. No assumption is made on the bunch shape.
The last two options constitute collective interactions. As discussed in the dedicated section, the Xtrack Line works such that particles are tracked asynchronously by separate threads in the non-collective sections of the sequence and are regrouped at each collective element (in PIC or quasi-frozen space-charge lenses).
The following example illustrates how to configure and run a space-charge simulation.
The variable mode is used to switch between frozen, quasi-frozen and PIC.
See also: xfields.install_spacecharge_frozen(),
xfields.replace_spacecharge_with_quasi_frozen(),
xfields.replace_spacecharge_with_PIC().
Example
import json
import numpy as np
import xobjects as xo
import xpart as xp
import xtrack as xt
import xfields as xf
############
# Settings #
############
fname_line = ('../../test_data/sps_w_spacecharge/'
'line_no_spacecharge_and_particle.json')
bunch_intensity = 1e11/3 # Need short bunch to avoid bucket non-linearity
# to compare frozen/quasi-frozen and PIC
sigma_z = 22.5e-2/3
nemitt_x=2.5e-6
nemitt_y=2.5e-6
n_part=int(1e6)
num_turns=32
num_spacecharge_interactions = 540
# Available modes: frozen/quasi-frozen/pic
# mode = 'pic'
mode = 'quasi-frozen'
# Choose solver between `FFTSolver2p5DAveraged` and `FFTSolver2p5D`
pic_solver = 'FFTSolver2p5DAveraged'
####################
# Choose a context #
####################
context = xo.ContextCpu()
#context = xo.ContextCupy()
#context = xo.ContextPyopencl('0.0')
print(context)
#############
# Load line #
#############
with open(fname_line, 'r') as fid:
input_data = json.load(fid)
line = xt.Line.from_dict(input_data['line'])
line.particle_ref = xt.Particles.from_dict(input_data['particle'])
line.build_tracker(_context=context, compile=False)
#############################################
# Install spacecharge interactions (frozen) #
#############################################
lprofile = xf.LongitudinalProfileQGaussian(
number_of_particles=bunch_intensity,
sigma_z=sigma_z,
z0=0.,
q_parameter=1.)
xf.install_spacecharge_frozen(line=line,
longitudinal_profile=lprofile,
nemitt_x=nemitt_x, nemitt_y=nemitt_y,
sigma_z=sigma_z,
num_spacecharge_interactions=num_spacecharge_interactions,
)
#################################
# Switch to PIC or quasi-frozen #
#################################
if mode == 'frozen':
pass # Already configured in line
elif mode == 'quasi-frozen':
xf.replace_spacecharge_with_quasi_frozen(
line,
update_mean_x_on_track=True,
update_mean_y_on_track=True)
elif mode == 'pic':
pic_collection, all_pics = xf.replace_spacecharge_with_PIC(
line=line,
n_sigmas_range_pic_x=8,
n_sigmas_range_pic_y=8,
nx_grid=256, ny_grid=256, nz_grid=100,
n_lims_x=7, n_lims_y=3,
z_range=(-3*sigma_z, 3*sigma_z),
solver=pic_solver)
else:
raise ValueError(f'Invalid mode: {mode}')
#################
# Build Tracker #
#################
line.build_tracker(_context=context)
######################
# Generate particles #
######################
# Build a line without spacecharge (recycling the track kernel)
line_sc_off = line.filter_elements(exclude_types_starting_with='SpaceCh')
line_sc_off.build_tracker(track_kernel=line.tracker.track_kernel)
# (we choose to match the distribution without accounting for spacecharge)
particles = xp.generate_matched_gaussian_bunch(line=line_sc_off,
num_particles=n_part, total_intensity_particles=bunch_intensity,
nemitt_x=nemitt_x, nemitt_y=nemitt_y, sigma_z=sigma_z)
####################################################
# Check that everything is on the selected context #
####################################################
assert particles._context == context
assert len(set([ee._buffer for ee in line.elements])) == 1 # All on same context
assert line._context == context
#########
# Track #
#########
line.track(particles, num_turns=num_turns, with_progress=1)
# Complete source: xtrack/examples/spacecharge/000_spacecharge_example.py
Beam-beam
Weak-strong 2D
The example below shows how to introduce a 2D beam-beam element in a line and perform few studies based on tracking. The 2D beam-beam element provides a kick based on the Basseti-Erskine formula neglecting any longitudinal varitations of the beam-beam force.
import numpy as np
from matplotlib import pyplot as plt
import xobjects as xo
import xtrack as xt
import xfields as xf
import xpart as xp
context = xo.ContextCpu(omp_num_threads=0)
# Machine and beam parameters
n_macroparticles = int(1e4)
bunch_intensity = 2.3e11
nemitt_x = 2E-6
nemitt_y = 2E-6
p0c = 7e12
gamma = p0c/xp.PROTON_MASS_EV
physemit_x = nemitt_x/gamma
physemit_y = nemitt_y/gamma
beta_x = 1.0
beta_y = 1.0
sigma_z = 0.08
sigma_delta = 1E-4
beta_s = sigma_z/sigma_delta
Qx = 0.31
Qy = 0.32
Qs = 2.1E-3
##############################
# Plot the beam-beam force #
##############################
# Particles algined on the x axis
particles = xp.Particles(_context=context,
p0c=p0c,
x=np.sqrt(physemit_x*beta_x)*np.linspace(-6,6,n_macroparticles),
px=np.zeros(n_macroparticles),
y=np.zeros(n_macroparticles),
py=np.zeros(n_macroparticles),
zeta=np.zeros(n_macroparticles),
delta=np.zeros(n_macroparticles),
)
# Definition of the beam-beam force based on the other beam's
# properties (here assumed identical to the tracked beam)
# Note that the arguments 'Sigma' are sigma**2
bbeam = xf.BeamBeamBiGaussian2D(
_context=context,
other_beam_q0 = particles.q0,
other_beam_beta0 = particles.beta0[0],
other_beam_num_particles = bunch_intensity,
other_beam_Sigma_11 = physemit_x*beta_x,
other_beam_Sigma_33 = physemit_y*beta_y)
# Build line and tracker with only the beam-beam element
elements = [bbeam]
line = xt.Line(elements=elements)
line.particle_ref = xp.Particles(mass0=xp.PROTON_MASS_EV, p0c=7e12)
line.build_tracker()
# track on turn
line.track(particles,num_turns=1)
#plot the resulting change of px
plt.figure()
plt.plot(particles.x/np.sqrt(physemit_x*beta_x),particles.px)
plt.xlabel(r'x [$\sigma_x$]')
plt.ylabel(r'$\Delta p_x$')
##############################
# Tune footprint #
##############################
# Build linear lattice
arc = xt.LineSegmentMap(
betx = beta_x,bety = beta_y,
qx = Qx, qy = Qy,bets = beta_s, qs=Qs)
# Build line and tracker with a beam-beam element and linear lattice
elements = [bbeam,arc]
line = xt.Line(elements=elements)
line.particle_ref = xp.Particles(mass0=xp.PROTON_MASS_EV, p0c=7e12)
line.build_tracker()
# plot footprint
plt.figure()
fp0 = line.get_footprint(nemitt_x=nemitt_x, nemitt_y=nemitt_y)
fp0.plot(color='k')
##############################
# Phase space distorion #
##############################
# Initialise particles randomly in 6D phase space
particles = xp.Particles(_context=context,
p0c=p0c,
x=np.sqrt(physemit_x*beta_x)*(np.random.randn(n_macroparticles)),
px=np.sqrt(physemit_x/beta_x)*np.random.randn(n_macroparticles),
y=np.sqrt(physemit_y*beta_y)*(np.random.randn(n_macroparticles)),
py=np.sqrt(physemit_y/beta_y)*np.random.randn(n_macroparticles),
zeta=sigma_z*np.random.randn(n_macroparticles),
delta=sigma_delta*np.random.randn(n_macroparticles),
)
# Change the tune to better visualise the 4th order resonance
arc.qx=0.255
# Track
line.track(particles,num_turns=10000)
# Plot phase space
plt.figure()
plt.plot(particles.x/np.sqrt(physemit_x*beta_x),particles.px/np.sqrt(physemit_x/beta_x),'.')
plt.xlabel('$x$ [$\sigma_x$]')
plt.ylabel('$p_x$ [$\sigma_{px}$]')
plt.show()
# Complete source: xfields/examples/002_beambeam/010_beambeam2d_weakstrong.py
Weak-strong 3D
The 3D beam-beam element can be used similarly, replacing the instanciation of the beam-beam element as in the example below. This element takes into account longitudinal variations of the beam-beam force (hourglass, crossing angle) based on a longitudinal slicing of the beam (Hirata’s method) handled by the xfields.beam_elements.TempSlicer.
# Number of longitudinal slices
n_slices = 21
# Slicer used to determine the position and charge of the slices
# based on an optimised algorithm by D. Shatilov. (other options are also possible)
slicer = xf.TempSlicer(n_slices=n_slices, sigma_z=sigma_z, mode="shatilov")
bbeam = xf.BeamBeamBiGaussian3D(
_context=context,
other_beam_q0 = particles.q0,
# Full crossing angle at the IP in rad
phi = 500.0E-2,
# Roll angle of the crossing angle in rad (0 -> horizontal, pi/2 -> vertical)
alpha = 0.0,
# charge in each slice
slices_other_beam_num_particles = slicer.bin_weights * bunch_intensity,
# longitudinal position of the slice
slices_other_beam_zeta_center = slicer.bin_centers,
# transverse sizes of the slices at the IP (squarred)
slices_other_beam_Sigma_11 = np.zeros(n_slices,dtype=float)+physemit_x*beta_x,
slices_other_beam_Sigma_12 = np.zeros(n_slices,dtype=float),
slices_other_beam_Sigma_13 = np.zeros(n_slices,dtype=float),
slices_other_beam_Sigma_14 = np.zeros(n_slices,dtype=float),
slices_other_beam_Sigma_22 = np.zeros(n_slices,dtype=float)+physemit_x/beta_x,
slices_other_beam_Sigma_23 = np.zeros(n_slices,dtype=float),
slices_other_beam_Sigma_24 = np.zeros(n_slices,dtype=float),
slices_other_beam_Sigma_33 = np.zeros(n_slices,dtype=float)+physemit_y*beta_y,
slices_other_beam_Sigma_34 = np.zeros(n_slices,dtype=float),
slices_other_beam_Sigma_44 = np.zeros(n_slices,dtype=float)+physemit_y/beta_y)
Strong-strong (soft-Gaussian)
Strong-strong simulations can be performed using the Pipeline for multibunch simulations, as in the example below.
import time
import numpy as np
from matplotlib import pyplot as plt
from mpi4py import MPI
import xobjects as xo
import xtrack as xt
import xfields as xf
import xpart as xp
context = xo.ContextCpu(omp_num_threads=0)
####################
# Pipeline manager #
####################
# Retrieving MPI info
comm = MPI.COMM_WORLD
nprocs = comm.Get_size()
my_rank = comm.Get_rank()
# Building the pipeline manager based on a MPI communicator
pipeline_manager = xt.PipelineManager(comm)
if nprocs >= 2:
# Add information about the Particles instance that require
# communication through the pipeline manager.
# Each Particles instance must be identifiable by a unique name
# The second argument is the MPI rank in which the Particles object
# lives.
pipeline_manager.add_particles('B1b1',0)
pipeline_manager.add_particles('B2b1',1)
# Add information about the elements that require communication
# through the pipeline manager.
# Each Element instance must be identifiable by a unique name
# All Elements are instanciated in all ranks
pipeline_manager.add_element('IP1')
pipeline_manager.add_element('IP2')
else:
print('Need at least 2 MPI processes for this test')
exit()
#################################
# Generate particles #
#################################
n_macroparticles = int(1e4)
bunch_intensity = 2.3e11
physemit_x = 2E-6*0.938/7E3
physemit_y = 2E-6*0.938/7E3
beta_x_IP1 = 1.0
beta_y_IP1 = 1.0
beta_x_IP2 = 1.0
beta_y_IP2 = 1.0
sigma_z = 0.08
sigma_delta = 1E-4
beta_s = sigma_z/sigma_delta
Qx = 0.285
Qy = 0.32
Qs = 2.1E-3
#Offsets [sigma] the two beams with opposite direction
#to enhance oscillation of the pi-mode
mean_x_init = 0.1
mean_y_init = 0.1
p0c = 7000e9
print('Initialising particles')
if my_rank == 0:
particles = xp.Particles(_context=context,
p0c=p0c,
x=np.sqrt(physemit_x*beta_x_IP1)
*(np.random.randn(n_macroparticles)+mean_x_init),
px=np.sqrt(physemit_x/beta_x_IP1)
*np.random.randn(n_macroparticles),
y=np.sqrt(physemit_y*beta_y_IP1)
*(np.random.randn(n_macroparticles)+mean_y_init),
py=np.sqrt(physemit_y/beta_y_IP1)
*np.random.randn(n_macroparticles),
zeta=sigma_z*np.random.randn(n_macroparticles),
delta=sigma_delta*np.random.randn(n_macroparticles),
weight=bunch_intensity/n_macroparticles
)
# Initialise the Particles object with its unique name
# (must match the info provided to the pipeline manager)
particles.init_pipeline('B1b1')
# Keep in memory the name of the Particles object with which this
# rank will communicate
# (must match the info provided to the pipeline manager)
partner_particles_name = 'B2b1'
elif my_rank == 1:
particles = xp.Particles(_context=context,
p0c=p0c,
x=np.sqrt(physemit_x*beta_x_IP1)
*(np.random.randn(n_macroparticles)+mean_x_init),
px=np.sqrt(physemit_x/beta_x_IP1)
*np.random.randn(n_macroparticles),
y=np.sqrt(physemit_y*beta_y_IP1)
*(np.random.randn(n_macroparticles)-mean_y_init),
py=np.sqrt(physemit_y/beta_y_IP1)
*np.random.randn(n_macroparticles),
zeta=sigma_z*np.random.randn(n_macroparticles),
delta=sigma_delta*np.random.randn(n_macroparticles),
weight=bunch_intensity/n_macroparticles
)
# Initialise the Particles object with its unique name
# (must match the info provided to the pipeline manager)
particles.init_pipeline('B2b1')
# Keep in memory the name of the Particles object with which this
# rank will communicate
# (must match the info provided to the pipeline manager)
partner_particles_name = 'B1b1'
else:
# Stop the execution of ranks that are not needed
print('No need for rank',my_rank)
exit()
#############
# Beam-beam #
#############
nb_slice = 11
slicer = xf.TempSlicer(sigma_z=sigma_z, n_slices=nb_slice)
config_for_update_IP1=xf.ConfigForUpdateBeamBeamBiGaussian3D(
pipeline_manager=pipeline_manager,
element_name='IP1', # The element name must be unique and match the
# one given to the pipeline manager
partner_particles_name = partner_particles_name, # the name of the
# Particles object with which to collide
slicer=slicer,
update_every=1 # Setup for strong-strong simulation
)
config_for_update_IP2=xf.ConfigForUpdateBeamBeamBiGaussian3D(
pipeline_manager=pipeline_manager,
element_name='IP2', # The element name must be unique and match the
# one given to the pipeline manager
partner_particles_name = partner_particles_name, # the name of the
#Particles object with which to collide
slicer=slicer,
update_every=1 # Setup for strong-strong simulation
)
print('build bb elements...')
bbeamIP1 = xf.BeamBeamBiGaussian3D(
_context=context,
other_beam_q0 = particles.q0,
phi = 500E-6,alpha=0.0,
config_for_update = config_for_update_IP1)
bbeamIP2 = xf.BeamBeamBiGaussian3D(
_context=context,
other_beam_q0 = particles.q0,
phi = 500E-6,alpha=np.pi/2,
config_for_update = config_for_update_IP2)
#################################################################
# arcs (here they are all the same with half the phase advance) #
#################################################################
arc12 = xt.LineSegmentMap(
betx = beta_x_IP1,bety = beta_y_IP1,
qx = Qx/2, qy = Qy/2,bets = beta_s, qs=Qs/2)
arc21 = xt.LineSegmentMap(
betx = beta_x_IP1,bety = beta_y_IP1,
qx = Qx/2, qy = Qy/2,bets = beta_s, qs=Qs/2)
#################################################################
# Tracker #
#################################################################
# In this example there is one Particles object per rank
# We build its line and tracker as usual
elements = [bbeamIP1,arc12,bbeamIP2,arc21]
line = xt.Line(elements=elements)
line.build_tracker()
# A pipeline branch is a line and Particles object that will
# be tracked through it
branch = xt.PipelineBranch(line, particles)
# The multitracker can deal with a set of branches (here only one)
multitracker = xt.PipelineMultiTracker(branches=[branch])
#################################################################
# Tracking (Same as usual) #
#################################################################
print('Tracking...')
time0 = time.time()
nTurn = 1024
multitracker.track(num_turns=nTurn,turn_by_turn_monitor=True)
print('Done with tracking.',(time.time()-time0)/1024,'[s/turn]')
#################################################################
# Post-processing: raw data and spectrum #
#################################################################
# Show beam oscillation spectrum (only one rank)
if my_rank == 0:
positions_x_b1 = np.average(line.record_last_track.x,axis=0)
positions_y_b1 = np.average(line.record_last_track.y,axis=0)
plt.figure(1)
freqs = np.fft.fftshift(np.fft.fftfreq(nTurn))
mask = freqs > 0
myFFT = np.fft.fftshift(np.fft.fft(positions_x_b1))
plt.semilogy(freqs[mask], (np.abs(myFFT[mask])),label='Horizontal')
myFFT = np.fft.fftshift(np.fft.fft(positions_y_b1))
plt.semilogy(freqs[mask], (np.abs(myFFT[mask])),label='Vertical')
plt.xlabel('Frequency [$f_{rev}$]')
plt.ylabel('Amplitude')
plt.legend(loc=0)
plt.show()
# Complete source: xfields/examples/002_beambeam/006_beambeam6d_strongstrong_pipeline_MPI2Procs.py
In collisions featuring a low disruption (i.e. the beam moments do not vary significantly during the interaction), the quasi-strong-strong (aka frozen-strong-strong) model may be enabled by setting the argument ‘quasistrongstrong
= True’ in xfields.beam_elements.ConfigForUpdate*. In this configuration, the beam moments are computed once at the start of the collison and kept constant throught the collison, thus reducing the computing load. The argument ‘update_every’ allows to further reduce the computing load by keeping the moments for the given amount of turns. This model is suitable for effects that build up over may turns. (more details in https://accelconf.web.cern.ch/eefact2022/papers/wezat0102.pdf)
For a 2D beam-beam interactions, the beam-beam element and the xfields.beam_elements.ConfigForUpdate* have to be redifined as in the example below.
config_for_update_b1_IP1=xf.ConfigForUpdateBeamBeamBiGaussian2D(
pipeline_manager=pipeline_manager,
element_name='IP1',
partner_particles_name = 'B2b1',
update_every=1
)
config_for_update_b2_IP1=xf.ConfigForUpdateBeamBeamBiGaussian2D(
pipeline_manager=pipeline_manager,
element_name='IP1',
partner_particles_name = 'B1b1',
update_every=1
)
bbeamIP1_b1 = xf.BeamBeamBiGaussian2D(
_context=context,
other_beam_q0 = particles_b2.q0,
other_beam_beta0 = particles_b2.beta0[0],
config_for_update = config_for_update_b1_IP1)
bbeamIP1_b2 = xf.BeamBeamBiGaussian2D(
_context=context,
other_beam_q0 = particles_b1.q0,
other_beam_beta0 = particles_b1.beta0[0],
config_for_update = config_for_update_b2_IP1)
Poisson Solver
Particle-in-cell simulations using a Poisson solver for the beam-beam interaction is currently not implemented in xfields
Beam-beam in a real lattice
Identically to the examples above, beam-beam elements can be introduced into the lattice of a full machine. Several tools exist to ease the setup of beam-beam interactions in a collider lattice: Xmask. An example snippet is shown below where an xtrack lattice in json format is loaded and a weakstrong beam-beam element is inserted at the marker “ip.1”.
path = "/path/to/lattice"
seq_name = "lattice.json"
seq_path = os.path.join(path, seq_name)
with open(seq_path, 'r') as f:
line = xt.Line.from_dict(json.load(f))
# define slicer
slicer = xf.TempSlicer(n_slices=n_slices, sigma_z=sigma_z, mode=binning_mode)
# define weakstrong beambeam element (n*[x] yields a list with n elements each of which is x, e.g. 3*[5]=[5,5,5])
el_beambeam = xf.BeamBeamBiGaussian3D(
_context=context,
config_for_update = None,
other_beam_q0=other_beam_q0,
phi=phi, # half-crossing angle in radians
alpha=alpha, # crossing plane in radians
# slice intensity [num. real particles] number of slices inferred from the length of this list
slices_other_beam_num_particles = slicer.bin_weights * nb,
# unboosted opposing (strong) beam moments
slices_other_beam_zeta_center = slicer.bin_centers,
slices_other_beam_Sigma_11 = n_slices*[ sigma_x**2], # Beam sizes for the other beam, assuming the same value for all slices
slices_other_beam_Sigma_22 = n_slices*[sigma_px**2],
slices_other_beam_Sigma_33 = n_slices*[ sigma_y**2],
slices_other_beam_Sigma_44 = n_slices*[sigma_py**2],
# only if beamstrahlung on
slices_other_beam_zeta_bin_width_star_beamstrahlung = None if not beamstrahlung_on else slicer.bin_widths_beamstrahlung / np.cos(phi), # boosted dz
# has to be set (0 in most cases) if config_for_update = None
slices_other_beam_Sigma_12 = n_slices*[0],
slices_other_beam_Sigma_34 = n_slices*[0],
slices_other_beam_pzeta_center = n_slices*[pzeta], # important when tapering (FCC-ee specific)
)
element_name = "ip.1" # this is the marker name of an IP in the lattice where the beam-beam element will be inserted
element_line_index = line.element_names.index(element_name) # get the array index of the element "ip.1"
line.insert_element(index=element_line_index, element=el_beambeam, name='beambeam')
Combined CPU - GPU simulations
When performing simulations on GPU contexts, it is possible to execute on CPU
the computation for specific beam elements, for example PyHEADTAIL elements
that do not support the Xsuite GPU contexts. For that purpose, one needs to set
a `needs_cpu` flag on the concerned elements before inserting them in an Xtrack
line. This instructs the Xtrack tracker to handle the required data transfers.
Presently PyHEADTAIL elements are not able to skip lost particles in
the Xtrack Particles objects. This can be handled by setting the flag
`needs_hidden_lost_particles`, which instructs the Xtrack tracker to
temporarily hide the lost particles for the concerned elements.
The following example shows a simulation including the SPS lattice elements, space-charge elements and wakefields. The lattice tracking and the spacecharge calculations are performed on GPU with Xtrack/Xfields while the wakefield computation is performed on CPU with PyHEADTAIL.
import numpy as np
from cpymad.madx import Madx
import xpart as xp
import xobjects as xo
import xtrack as xt
import xfields as xf
###############################
# Enable pyheadtail interface #
###############################
xt.enable_pyheadtail_interface()
############
# Settings #
############
gpu_device = 0 # Set to None to use CPU, or to an integer to specify the GPU device to use (e.g. 0)
seq_name = "sps"
qx0,qy0 = 20.13, 20.18
p0c = 26e9
cavity_name = "actcse.31632"
cavity_phase = np.pi
frequency = 200e6
rf_voltage = 4e6
use_wakes = True
n_slices_wakes = 100
limit_z = 0.7
bunch_intensity = 1e11
sigma_z = 22.5e-2
nemitt_x=2.5e-6
nemitt_y=2.5e-6
n_part=int(1e4)
num_turns=2
num_spacecharge_interactions = 540
# mode = 'frozen'
mode = 'pic'
#######################################################
# Use cpymad to load the sequence and match the tunes #
#######################################################
mad = Madx()
mad.call("./madx_sps/sps.seq")
mad.call("./madx_sps/lhc_q20.str")
mad.call("./madx_sps/macro.madx")
mad.beam()
mad.use(seq_name)
mad.twiss()
tw_thick = mad.table.twiss.dframe()
summ_thick = mad.table.summ.dframe()
mad.input("""select, flag=makethin, slice=1, thick=false;
makethin, sequence=sps, style=teapot, makedipedge=false;""")
mad.use(seq_name)
mad.exec(f"sps_match_tunes({qx0},{qy0});")
##################
# Switch context #
##################
if gpu_device is None:
context = xo.ContextCpu()
else:
context = xo.ContextCupy(device=gpu_device)
#################################
# Build line from MAD-X lattice #
#################################
line = xt.Line.from_madx_sequence(sequence=mad.sequence[seq_name],
deferred_expressions=True, install_apertures=True,
apply_madx_errors=False)
# Define reference particle
line.particle_ref = xt.Particles(p0c=p0c,mass0=xt.PROTON_MASS_EV)
################
# Switch on RF #
################
line[cavity_name].voltage = rf_voltage
line[cavity_name].phase = cavity_phase
line[cavity_name].frequency = frequency
############################################
# Twiss (check that the lattice is stable) #
############################################
line.build_tracker()
tw = line.twiss()
#####################################
# Install spacecharge interactions) #
#####################################
# Install frozen space-charge lenses
lprofile = xf.LongitudinalProfileQGaussian(
number_of_particles=bunch_intensity,
sigma_z=sigma_z,
z0=0.,
q_parameter=1.)
xf.install_spacecharge_frozen(line=line,
particle_ref=line.particle_ref,
longitudinal_profile=lprofile,
nemitt_x=nemitt_x, nemitt_y=nemitt_y,
sigma_z=sigma_z,
num_spacecharge_interactions=num_spacecharge_interactions,
)
# Switch to PIC or quasi-frozen
if mode == 'frozen':
pass # Already configured in line
elif mode == 'quasi-frozen':
xf.replace_spacecharge_with_quasi_frozen(
line,
update_mean_x_on_track=True,
update_mean_y_on_track=True)
elif mode == 'pic':
pic_collection, all_pics = xf.replace_spacecharge_with_PIC(
_context=context, line=line,
n_sigmas_range_pic_x=8,
n_sigmas_range_pic_y=8,
nx_grid=256, ny_grid=256, nz_grid=100,
n_lims_x=7, n_lims_y=3,
z_range=(-3*sigma_z, 3*sigma_z))
else:
raise ValueError(f'Invalid mode: {mode}')
######################
# Install wakefields #
######################
if use_wakes:
# We rescale the waketable to take into account for the difference between
# the average beta functions (for which the table is calculated) and the beta
# functions at the location in the lattice at which the wake element is
# installed.
wakefields = np.genfromtxt('wakes/wakeforhdtl_PyZbase_Allthemachine_7000GeV_B1_2021_TeleIndex1_wake.dat')
wakefields[:,1] *= 54.65/tw['betx'][0]
wakefields[:,2] *= 54.51/tw['bety'][0]
wakefields[:,3] *= 54.65/tw['betx'][0]
wakefields[:,4] *= 54.65/tw['betx'][0]
wakefile = 'wakes/wakefields.wake'
np.savetxt(wakefile,wakefields,delimiter='\t')
# Build PyHEADTAIL wakefield element
from PyHEADTAIL.particles.slicing import UniformBinSlicer
from PyHEADTAIL.impedances.wakes import WakeTable, WakeField
n_slices_wakes = 500
slicer_for_wakefields = UniformBinSlicer(n_slices_wakes, z_cuts=(-limit_z, limit_z))
waketable = WakeTable(wakefile, ['time', 'dipole_x', 'dipole_y',
'quadrupole_x', 'quadrupole_y'])
wakefield = WakeField(slicer_for_wakefields, waketable)
# Specity that the wakefield element needs to run on CPU and that lost
# particles need to be hidden for this element (required by PyHEADTAIL)
wakefield.needs_cpu = True
wakefield.needs_hidden_lost_particles = True
# Insert element in the line
line.insert_element(element=wakefield,name="wakefield", at_s=0)
#################
# Build Tracker #
#################
line.build_tracker(_context=context)
######################
# Generate particles #
######################
# (we choose to match the distribution without accounting for spacecharge)
line_sc_off = line.filter_elements(exclude_types_starting_with='SpaceCh')
particles = xp.generate_matched_gaussian_bunch(_context=context,
num_particles=n_part, total_intensity_particles=bunch_intensity,
nemitt_x=nemitt_x, nemitt_y=nemitt_y, sigma_z=sigma_z,
particle_ref=line.particle_ref, line=line_sc_off)
particles.circumference = line.get_length() # Needed by pyheadtail
###########################################################
# We use a phase monitor to measure the tune turn by turn #
###########################################################
phasem = xp.PhaseMonitor(line,
num_particles=n_part, twiss=line_sc_off.twiss())
#########
# Track #
#########
for turn in range(num_turns):
phasem.measure(particles)
#import pdb; pdb.set_trace()
line.track(particles)
##################
# Plot footprint #
##################
import matplotlib.pyplot as plt
plt.close('all')
f,ax = plt.subplots()
ax.plot(phasem.qx, phasem.qy,'b.',ms=1)
ax.set_xlim(0, 0.5)
ax.set_ylim(0, 0.5)
plt.show()
# Complete source: xtrack/examples/pyheadtail_interface/004_imped_spacech_cpu_gpu.py
Pipeline for multibunch simulations
Xsuite can be used to simulate multiple bunches interacting through collective forces, such as beam-beam interactions or wakefields. The xtrack.pipeline.MultiTracker handles the tracking of multiple xpart.particles.Particles objects through their own xtrack.Line (e.g. each of the two rings in a circular collider). Each Particles instance and its Line make a xtrack.pipeline.Branch. The MultiTracker will iteratively track its branches until an xtrack.beam_elements.Element which requires communication is reached (e.g. a strong-strong beam-beam collision, where information about the other beam’s charge distribution is required). At this point the MultiTracker will use the xtrack.pipeline.PipelineManager to establish communication between the branches that need to exchange information. If the communication cannot take place, e.g. because one of the two branches is not ready, the Multitracker resumes tracking with another branch.
The PipelineManager is aware of the communication required by the different Elements and the different Particles instances (e.g. in a collider with many bunches and many interactions points, the PipelineManager knows which bunch collides with which and where.) By default the PipelineManager uses a dummy communicator which does not require MPI, but an MPI communicator can be provided instead thus enabling tracking on multiple CPUs.
It is important that the Particles instances as well as the Element instances in the line that require communication through the pipeline are attributed a uniquely defined name.
The pipeline algorithm is detailed in https://doi.org/10.1016/j.cpc.2019.06.006.
The following example illustrates how to configure and run a simulation with two bunches colliding at two interaction points using MPI. It must be run like a MPI application
mpirun -np 2 python example.py
Example
import time
import numpy as np
from matplotlib import pyplot as plt
from mpi4py import MPI
import xobjects as xo
import xtrack as xt
import xfields as xf
import xpart as xp
context = xo.ContextCpu(omp_num_threads=0)
####################
# Pipeline manager #
####################
# Retrieving MPI info
comm = MPI.COMM_WORLD
nprocs = comm.Get_size()
my_rank = comm.Get_rank()
# Building the pipeline manager based on a MPI communicator
pipeline_manager = xt.PipelineManager(comm)
if nprocs >= 2:
# Add information about the Particles instance that require
# communication through the pipeline manager.
# Each Particles instance must be identifiable by a unique name
# The second argument is the MPI rank in which the Particles object
# lives.
pipeline_manager.add_particles('B1b1',0)
pipeline_manager.add_particles('B2b1',1)
# Add information about the elements that require communication
# through the pipeline manager.
# Each Element instance must be identifiable by a unique name
# All Elements are instanciated in all ranks
pipeline_manager.add_element('IP1')
pipeline_manager.add_element('IP2')
else:
print('Need at least 2 MPI processes for this test')
exit()
#################################
# Generate particles #
#################################
n_macroparticles = int(1e4)
bunch_intensity = 2.3e11
physemit_x = 2E-6*0.938/7E3
physemit_y = 2E-6*0.938/7E3
beta_x_IP1 = 1.0
beta_y_IP1 = 1.0
beta_x_IP2 = 1.0
beta_y_IP2 = 1.0
sigma_z = 0.08
sigma_delta = 1E-4
beta_s = sigma_z/sigma_delta
Qx = 0.285
Qy = 0.32
Qs = 2.1E-3
#Offsets [sigma] the two beams with opposite direction
#to enhance oscillation of the pi-mode
mean_x_init = 0.1
mean_y_init = 0.1
p0c = 7000e9
print('Initialising particles')
if my_rank == 0:
particles = xp.Particles(_context=context,
p0c=p0c,
x=np.sqrt(physemit_x*beta_x_IP1)
*(np.random.randn(n_macroparticles)+mean_x_init),
px=np.sqrt(physemit_x/beta_x_IP1)
*np.random.randn(n_macroparticles),
y=np.sqrt(physemit_y*beta_y_IP1)
*(np.random.randn(n_macroparticles)+mean_y_init),
py=np.sqrt(physemit_y/beta_y_IP1)
*np.random.randn(n_macroparticles),
zeta=sigma_z*np.random.randn(n_macroparticles),
delta=sigma_delta*np.random.randn(n_macroparticles),
weight=bunch_intensity/n_macroparticles
)
# Initialise the Particles object with its unique name
# (must match the info provided to the pipeline manager)
particles.init_pipeline('B1b1')
# Keep in memory the name of the Particles object with which this
# rank will communicate
# (must match the info provided to the pipeline manager)
partner_particles_name = 'B2b1'
elif my_rank == 1:
particles = xp.Particles(_context=context,
p0c=p0c,
x=np.sqrt(physemit_x*beta_x_IP1)
*(np.random.randn(n_macroparticles)+mean_x_init),
px=np.sqrt(physemit_x/beta_x_IP1)
*np.random.randn(n_macroparticles),
y=np.sqrt(physemit_y*beta_y_IP1)
*(np.random.randn(n_macroparticles)-mean_y_init),
py=np.sqrt(physemit_y/beta_y_IP1)
*np.random.randn(n_macroparticles),
zeta=sigma_z*np.random.randn(n_macroparticles),
delta=sigma_delta*np.random.randn(n_macroparticles),
weight=bunch_intensity/n_macroparticles
)
# Initialise the Particles object with its unique name
# (must match the info provided to the pipeline manager)
particles.init_pipeline('B2b1')
# Keep in memory the name of the Particles object with which this
# rank will communicate
# (must match the info provided to the pipeline manager)
partner_particles_name = 'B1b1'
else:
# Stop the execution of ranks that are not needed
print('No need for rank',my_rank)
exit()
#############
# Beam-beam #
#############
nb_slice = 11
slicer = xf.TempSlicer(sigma_z=sigma_z, n_slices=nb_slice)
config_for_update_IP1=xf.ConfigForUpdateBeamBeamBiGaussian3D(
pipeline_manager=pipeline_manager,
element_name='IP1', # The element name must be unique and match the
# one given to the pipeline manager
partner_particles_name = partner_particles_name, # the name of the
# Particles object with which to collide
slicer=slicer,
update_every=1 # Setup for strong-strong simulation
)
config_for_update_IP2=xf.ConfigForUpdateBeamBeamBiGaussian3D(
pipeline_manager=pipeline_manager,
element_name='IP2', # The element name must be unique and match the
# one given to the pipeline manager
partner_particles_name = partner_particles_name, # the name of the
#Particles object with which to collide
slicer=slicer,
update_every=1 # Setup for strong-strong simulation
)
print('build bb elements...')
bbeamIP1 = xf.BeamBeamBiGaussian3D(
_context=context,
other_beam_q0 = particles.q0,
phi = 500E-6,alpha=0.0,
config_for_update = config_for_update_IP1)
bbeamIP2 = xf.BeamBeamBiGaussian3D(
_context=context,
other_beam_q0 = particles.q0,
phi = 500E-6,alpha=np.pi/2,
config_for_update = config_for_update_IP2)
#################################################################
# arcs (here they are all the same with half the phase advance) #
#################################################################
arc12 = xt.LineSegmentMap(
betx = beta_x_IP1,bety = beta_y_IP1,
qx = Qx/2, qy = Qy/2,bets = beta_s, qs=Qs/2)
arc21 = xt.LineSegmentMap(
betx = beta_x_IP1,bety = beta_y_IP1,
qx = Qx/2, qy = Qy/2,bets = beta_s, qs=Qs/2)
#################################################################
# Tracker #
#################################################################
# In this example there is one Particles object per rank
# We build its line and tracker as usual
elements = [bbeamIP1,arc12,bbeamIP2,arc21]
line = xt.Line(elements=elements)
line.build_tracker()
# A pipeline branch is a line and Particles object that will
# be tracked through it
branch = xt.PipelineBranch(line, particles)
# The multitracker can deal with a set of branches (here only one)
multitracker = xt.PipelineMultiTracker(branches=[branch])
#################################################################
# Tracking (Same as usual) #
#################################################################
print('Tracking...')
time0 = time.time()
nTurn = 1024
multitracker.track(num_turns=nTurn,turn_by_turn_monitor=True)
print('Done with tracking.',(time.time()-time0)/1024,'[s/turn]')
#################################################################
# Post-processing: raw data and spectrum #
#################################################################
# Show beam oscillation spectrum (only one rank)
if my_rank == 0:
positions_x_b1 = np.average(line.record_last_track.x,axis=0)
positions_y_b1 = np.average(line.record_last_track.y,axis=0)
plt.figure(1)
freqs = np.fft.fftshift(np.fft.fftfreq(nTurn))
mask = freqs > 0
myFFT = np.fft.fftshift(np.fft.fft(positions_x_b1))
plt.semilogy(freqs[mask], (np.abs(myFFT[mask])),label='Horizontal')
myFFT = np.fft.fftshift(np.fft.fft(positions_y_b1))
plt.semilogy(freqs[mask], (np.abs(myFFT[mask])),label='Vertical')
plt.xlabel('Frequency [$f_{rev}$]')
plt.ylabel('Amplitude')
plt.legend(loc=0)
plt.show()
# Complete source: xfields/examples/002_beambeam/006_beambeam6d_strongstrong_pipeline_MPI2Procs.py
If the communicator is not specified when instantiating the PipelineManager, it will not use MPI:
pipeline_manager = xt.PipelineManager()