1 !
2 !Authors: Katarzyna Boronska, Jean-Luc Guermond, Copyrights 2007
3 !
5
6  IMPLICIT NONE
10  PRIVATE
11
12 CONTAINS
13  SUBROUTINE fft_par_real(communicator, V1_in, V_out, opt_nb_plane)
14  USE my_util
15  IMPLICIT NONE
16  include 'fftw3.f'
17  ! Format: V_1in(1:np,1:6,1:m_max_c)
18  ! INPUT ARE COSINE AND SINE COEFFICIENTS
19  ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2
20  REAL(KIND=8), DIMENSION(:,:,:), INTENT(IN) :: v1_in
21  REAL(KIND=8), DIMENSION(:,:,:), ALLOCATABLE :: v_out
22 ! FL-CN possible faute ??? 19/03/2013
23  !REAL(KIND=8), DIMENSION(:,:,:), POINTER :: V_out
24  INTEGER, OPTIONAL :: opt_nb_plane
25  INTEGER :: np, bloc_size, nb_field, &
26  m_max, m_max_c, rang, nb_procs, mpid, m_max_pad, n_r_pad
27  INTEGER(KIND=8) :: fftw_plan_multi_c2r
28  COMPLEX(KIND=8), ALLOCATABLE, DIMENSION(:,:,:) :: cu
29  INTEGER :: nb, nf, shiftc, shiftl, jindex, longueur_tranche, i, n, code
30  REAL(KIND=8), ALLOCATABLE, DIMENSION(:,:,:) :: dist_field, combined_field
31  INTEGER :: fft_dim, howmany, istride, ostride, idist, odist
32  INTEGER, DIMENSION(1) :: dim, inembed, onembed
33  ! Recall complexes must be rescaled
34  ! End FFTW parameters
35 #include "petsc/finclude/petsc.h"
36  mpi_comm :: communicator
37
38  CALL mpi_comm_size(communicator, nb_procs, code)
39  CALL mpi_comm_rank(communicator, rang, code)
40
41  np = SIZE(v1_in,1)
42  nb_field = SIZE(v1_in,2) ! Number of fields
43  m_max_c = SIZE(v1_in,3) ! Number of complex (cosines + sines) coefficients per point
44  m_max = m_max_c*nb_procs ! Number of comlex coefficients per point per processor
45  IF (mod(nb_field,2)/=0 .OR. m_max_c==0) THEN
46  CALL error_petsc(.OR.'Bug in FFT_PAR_REAL: MOD(nb_field,2)/=0 m_max_c==0')
47  END IF
48
49  !===Bloc_size is the number of points that are handled by one processor
50  !===once the Fourier modes are all collected on the processor
51  IF (modulo(np,nb_procs)==0) THEN
52  bloc_size = np/nb_procs
53  ELSE
54  CALL error_petsc('Bug in FFT_PAR_REAL: np is not a multiple of nb_procs')
55  END IF
56
57  IF (present(opt_nb_plane)) THEN
58  IF (opt_nb_plane> 2*m_max-1) THEN
59  m_max_pad = (opt_nb_plane+1)/2
60  ELSE
61  m_max_pad = m_max
62  END IF
63  ELSE
64  m_max_pad = m_max
65  END IF
67
69  ALLOCATE(dist_field(m_max_c,nb_field,np))
70  ALLOCATE(combined_field(m_max_c,nb_field,np))
71
72  DO i = 1, m_max_c
73  dist_field(i,:,:) = transpose(v1_in(:,:,i))
74  END DO
75
76  longueur_tranche=bloc_size*m_max_c*nb_field
77
78  mpid=mpi_double_precision
79  CALL mpi_alltoall(dist_field,longueur_tranche, mpid, combined_field, longueur_tranche, &
80  mpid, communicator, code)
81
82  cu = 0.d0
83  DO n = 1, bloc_size
84  DO nb = 1, nb_procs
85  shiftc = (nb-1)*bloc_size
86  shiftl = (nb-1)*m_max_c
87  jindex = n + shiftc
88  DO nf = 1, nb_field/2
89  !===Put real and imaginary parts in a complex
90  !===nf=1,2,3 => V1_in
91  !===INPUT ARE COSINE AND SINE COEFFICIENTS
92  !===THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2
93  cu(shiftl+1:shiftl+m_max_c,nf,n) = cmplx(combined_field(:,2*nf-1,jindex),&
94  -combined_field(:,2*nf,jindex),kind=8)/2
95  END DO
96  END DO
97  END DO
98  cu(1,:,:) = 2*cmplx(REAL(cu(1,:,:),KIND=8),0.d0,kind=8)
99  !===Padding is done by initialization of cu: cu = 0
100  !===This is equivalent to cu(m_max+1:m_max_pad,:,:) = 0.d0
101
102  !===Set the parameters for dfftw
103  fft_dim = 1
104  istride = 1
105  ostride = 1
106  idist = n_r_pad
108  dim(1) = n_r_pad
109  odist = m_max_pad
111  howmany = bloc_size*nb_field/2
112
113 ! FL-CN possible faute ??? 19/03/2013
114 ! IF (ASSOCIATED(V_out)) NULLIFY(V_out)
115 ! IF (ASSOCIATED(V_out)) DEALLOCATE(V_out)
116 ! pb sur la ligne suivante
117  IF (ALLOCATED(v_out)) DEALLOCATE(v_out)
119
120  CALL dfftw_plan_many_dft_c2r(fftw_plan_multi_c2r, fft_dim, dim, howmany, cu, &
121  onembed, ostride, odist, v_out, inembed, istride, idist, fftw_estimate)
122  CALL dfftw_execute(fftw_plan_multi_c2r)
123
124  DEALLOCATE(cu, dist_field, combined_field)
125
126  END SUBROUTINE fft_par_real
127
128  SUBROUTINE fft_par_cross_prod_bug(communicator,V1_in, V2_in, V_out, nb_procs, bloc_size, m_max_pad, temps)
129  !This a de-aliased version of the code, FEB 4, 2011, JLG
130  IMPLICIT NONE
131  include 'fftw3.f'
132  ! Format: V_1in(1:np,1:6,1:m_max_c)
133  ! INPUT ARE COSINE AND SINE COEFFICIENTS
134  ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2
135  REAL(KIND=8), DIMENSION(:,:,:), INTENT(IN) :: v1_in, v2_in
136  REAL(KIND=8), DIMENSION(:,:,:), INTENT(OUT) :: v_out
137  REAL(KIND=8), DIMENSION(:), OPTIONAL, INTENT(INOUT) :: temps
138  INTEGER, INTENT(IN) :: nb_procs, bloc_size, m_max_pad
139
140  INTEGER :: np, np_tot, nb_field, &
141  m_max, m_max_c, mpid, n_r_pad
142  INTEGER(KIND=8) :: fftw_plan_multi_c2r, fftw_plan_multi_r2c
143  COMPLEX(KIND=8), DIMENSION(m_max_pad,SIZE(V1_in,2),bloc_size) :: cu
144  COMPLEX(KIND=8), DIMENSION(2*m_max_pad-1,SIZE(V1_in,2)/2,bloc_size) :: prod_cu
145  REAL(KIND=8), DIMENSION(2*m_max_pad-1,SIZE(V1_in,2),bloc_size) :: ru
146  REAL(KIND=8), DIMENSION(2*m_max_pad-1,SIZE(V1_in,2)/2,bloc_size) :: prod_ru
147  COMPLEX(KIND=8), DIMENSION(SIZE(V1_in,2)/2,bloc_size) :: intermediate
148  REAL(KIND=8), DIMENSION(SIZE(V1_in,3),2*SIZE(V1_in,2),bloc_size*nb_procs) :: dist_field, combined_field
149  COMPLEX(KIND=8), DIMENSION(SIZE(V1_in,2)/2,bloc_size,SIZE(V1_in,3)*nb_procs) :: combined_prod_cu, dist_prod_cu
150  COMPLEX(KIND=8), DIMENSION(SIZE(V1_in,3),bloc_size*nb_procs,SIZE(V1_in,2)/2) :: out_prod_cu
151
152  INTEGER :: i_field
153  INTEGER :: nb, nf, shiftc, shiftl, jindex, longueur_tranche, i, n, code
154  REAL(KIND=8) :: t
155
156  ! FFTW parameters
157  INTEGER :: fft_dim, howmany, istride, ostride, idist, odist
158  INTEGER, DIMENSION(1) :: dim, inembed, onembed
159  ! Recall complexes must be rescaled
160  ! End FFTW parameters
161
162  !Temps(1) = Temps de communication
163  !Temps(2) = Temps de calcul
164  !Temps(3) = Temps de changement de format
165
166  !EXTERNAL hostnm
167  !EXTERNAL gethostname
168 #include "petsc/finclude/petsc.h"
169  mpi_comm :: communicator
170
171  IF (present(temps)) temps = 0.d0
172
173  nb_field= SIZE(v1_in,2)
174  m_max_c = SIZE(v1_in,3) ! Number of complex (cosines + sines) coefficients per point
175  m_max = m_max_c*nb_procs! Number of comlex coefficients per point per processor
176  np_tot = nb_procs*bloc_size
177  np = SIZE(v1_in,1)
179
180  IF (mod(nb_field,2)/=0 .OR. m_max_c==0) THEN
181  WRITE(*,*) ' BUG '
182  stop
183  END IF
184
185  ! Packing all 3 complex components of both v1 and v2 input fields
186  ! into dist_field, where the dimension indexing the nodal points varies the least rapidly,
187  ! so that after distributing the data to the processes, each one will obtain a part
188  ! on nodal points
189  ! TRANSPOSE pr que la variable i associee aux modes soit la 1ere sur laquelle on va faire la FFT
190  t = mpi_wtime()
191
192  DO i = 1, m_max_c
193  dist_field(i,1:nb_field,1:np) = transpose(v1_in(:,:,i))
194  dist_field(i,nb_field+1:2*nb_field,1:np) = transpose(v2_in(:,:,i))
195  END DO
196  IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t
197
198  IF (np/=np_tot) dist_field(:,:,np+1:np_tot) = 1.d100
199
200  longueur_tranche=bloc_size*m_max_c*nb_field*2
201
202  t = mpi_wtime()
203  mpid=mpi_double_precision
204  CALL mpi_alltoall(dist_field,longueur_tranche, mpid, combined_field, longueur_tranche, &
205  mpid, communicator, code)
206  IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t
207
208  t = mpi_wtime()
209  !JLG, FEB 4, 2011
210  cu = 0.d0
211  !JLG, FEB 4, 2011
212  DO n = 1, bloc_size
213  DO nb = 1, nb_procs
214  shiftc = (nb-1)*bloc_size
215  shiftl = (nb-1)*m_max_c
216  jindex = n + shiftc
217  DO nf = 1, nb_field
218  ! Put real and imaginary parts in a complex
219  ! nf=1,2,3 => V1_in
220  ! nf=4,5,6 => V2_in
221  ! INPUT ARE COSINE AND SINE COEFFICIENTS
222  ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2
223  cu(shiftl+1:shiftl+m_max_c,nf,n) = cmplx(combined_field(:,2*nf-1,jindex),&
224  -combined_field(:,2*nf,jindex),kind=8)/2
225  END DO
226  END DO
227  END DO
228  cu(1,:,:) = 2*cmplx(REAL(cu(1,:,:),KIND=8),0.d0,kind=8)
229  !JLG, FEB 4, 2011
230  !Padding is done by initialization of cu: cu = 0
231  !This is eequivalent to cu(m_max+1:m_max_pad,:,:) = 0.d0
232  !JLG, FEB 4, 2011
233
234  IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t
235
236  ! Set the parameters for dfftw
237  fft_dim=1; istride=1; ostride=1;
238  !JLG, FEB 4, 2011
239 !!$idist=N_r; inembed(1)=N_r; DIM(1)=N_r 240 !!$ odist=m_max; onembed(1)=m_max
243  !JLG, FEB 4, 2011
244
245  howmany=bloc_size*nb_field
246
247  t = mpi_wtime()
248  CALL dfftw_plan_many_dft_c2r(fftw_plan_multi_c2r, fft_dim, dim, howmany, cu, &
249  onembed, ostride, odist, ru, inembed, istride, idist, fftw_estimate)
250  !write(*,*) ' FFT_PAR_CROSS_PROD: fftw_plan_multi_c2r', fftw_plan_multi_c2r
251  CALL dfftw_execute(fftw_plan_multi_c2r)
252
253  ! CROSS PRODDUCT
254  IF (nb_field==6) THEN
255  prod_ru(:,1,:) = ru(:,2,:)*ru(:,6,:) - ru(:,3,:)*ru(:,5,:)
256  prod_ru(:,2,:) = ru(:,3,:)*ru(:,4,:) - ru(:,1,:)*ru(:,6,:)
257  prod_ru(:,3,:) = ru(:,1,:)*ru(:,5,:) - ru(:,2,:)*ru(:,4,:)
258  END IF
259  ! CROSS PRODUCT
260
261  howmany = howmany/2
262  CALL dfftw_plan_many_dft_r2c(fftw_plan_multi_r2c, fft_dim, dim, howmany, prod_ru, &
263  inembed, istride, idist, prod_cu, onembed, ostride, odist, fftw_estimate)
264  !write(*,*) ' FFT_PAR_CROSS_PROD: fftw_plan_multi_r2c', fftw_plan_multi_r2c
265  CALL dfftw_execute(fftw_plan_multi_r2c)
266  !JLG, FEB 4, 2011
267 !!$prod_cu = prod_cu/N_r !Scaling 268 prod_cu = prod_cu/n_r_pad !Scaling 269 !JLG, FEB 4, 2011 270 IF (present(temps)) temps(2) = temps(2) + mpi_wtime() -t 271 272 !Now we need to redistribute the Fourier coefficients on each processor 273 274 t = mpi_wtime() 275 combined_prod_cu(:,:,1)=prod_cu(1,:,:) 276 DO n=2, m_max 277 !combined_prod_cu(:,:,n)=prod_cu(n,:,:) 278 combined_prod_cu(:,:,n)=2*conjg(prod_cu(n,:,:)) 279 END DO 280 281 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 282 283 t = mpi_wtime() 284 longueur_tranche=bloc_size*m_max_c*nb_field 285 mpid=mpi_double_precision 286 CALL mpi_alltoall(combined_prod_cu,longueur_tranche,mpid, dist_prod_cu,longueur_tranche, & 287 mpid,communicator,code) 288 IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t 289 ! dimensions: 290 t = mpi_wtime() 291 DO i = 1, m_max_c 292 DO nb = 1, nb_procs 293 shiftc = (nb-1)*bloc_size 294 shiftl = (nb-1)*m_max_c 295 intermediate = dist_prod_cu(:,:,shiftl+i) 296 DO n = 1, bloc_size 297 IF (n+shiftc > np ) cycle 298 DO i_field = 1, nb_field/2 299 v_out(n+shiftc, i_field*2-1, i) = REAL (intermediate(i_field,n),kind=8) 300 v_out(n+shiftc, i_field*2 , i) = aimag(intermediate(i_field,n)) 301 END DO 302 END DO 303 END DO 304 END DO 305 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 306 307 308 END SUBROUTINE fft_par_cross_prod_bug 309 310 SUBROUTINE fft_par_cross_prod_dcl(communicator,V1_in, V2_in, V_out, nb_procs, bloc_size, m_max_pad, temps, padding) 311 !This a de-aliased version of the code, FEB 4, 2011, JLG 312 IMPLICIT NONE 313 include 'fftw3.f' 314 ! Format: V_1in(1:np,1:6,1:m_max_c) 315 ! INPUT ARE COSINE AND SINE COEFFICIENTS 316 ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2 317 REAL(KIND=8), DIMENSION(:,:,:), INTENT(IN) :: v1_in, v2_in 318 REAL(KIND=8), DIMENSION(:,:,:), INTENT(OUT) :: v_out 319 REAL(KIND=8), DIMENSION(:), OPTIONAL, INTENT(INOUT) :: temps 320 LOGICAL, OPTIONAL, INTENT(IN) :: padding 321 INTEGER, INTENT(IN) :: bloc_size, m_max_pad, nb_procs 322 INTEGER :: np, np_tot, nb_field, m_max, m_max_c, mpid, n_r_pad 323 INTEGER(KIND=8) :: fftw_plan_multi_c2r, fftw_plan_multi_r2c 324 325 COMPLEX(KIND=8), DIMENSION(m_max_pad, SIZE(V1_in,2), bloc_size) :: cu 326 REAL(KIND=8), DIMENSION(2*m_max_pad-1,SIZE(V1_in,2),bloc_size) :: ru 327 COMPLEX(KIND=8), DIMENSION(m_max_pad,SIZE(V1_in,2)/2,bloc_size) :: prod_cu 328 REAL(KIND=8), DIMENSION(2*m_max_pad-1,SIZE(V1_in,2)/2,bloc_size) :: prod_ru 329 COMPLEX(KIND=8), DIMENSION(SIZE(V1_in,2)/2,bloc_size) :: intermediate 330 REAL(KIND=8), DIMENSION(SIZE(V1_in,3),2*SIZE(V1_in,2),bloc_size*nb_procs) :: dist_field, combined_field 331 COMPLEX(KIND=8), DIMENSION(SIZE(V1_in,2)/2,bloc_size,SIZE(V1_in,3)*nb_procs) :: combined_prod_cu 332 COMPLEX(KIND=8), DIMENSION(SIZE(V1_in,2)/2,bloc_size,SIZE(V1_in,3)*nb_procs) :: dist_prod_cu 333 334 INTEGER :: i_field 335 INTEGER :: nb, nf, shiftc, shiftl, jindex, longueur_tranche, i, n, code 336 REAL(KIND=8) :: t 337 338 ! FFTW parameters 339 INTEGER :: fft_dim, howmany, istride, ostride, idist, odist 340 INTEGER, DIMENSION(1) :: dim, inembed, onembed 341 ! Recall complexes must be rescaled 342 ! End FFTW parameters 343 #include "petsc/finclude/petsc.h" 344 mpi_comm :: communicator 345 346 IF (present(temps)) temps = 0.d0 347 348 np = SIZE(v1_in,1) 349 nb_field= SIZE(v1_in,2) ! Number of fields 350 m_max_c = SIZE(v1_in,3) ! Number of complex (cosines + sines) coefficients per point 351 m_max = m_max_c*nb_procs! Number of comlex coefficients per point per processor 352 n_r_pad=2*m_max_pad-1 353 np_tot = nb_procs*bloc_size 354 355 IF (mod(nb_field,2)/=0 .OR. m_max_c==0) THEN 356 WRITE(*,*) ' BUG ' 357 stop 358 END IF 359 360 ! Bloc_size is the number of points that are handled by one processor 361 ! once the Fourier modes are all collected 362 ! Computation of bloc_size and np_tot 363 ! fin de la repartition des points 364 365 ! Packing all 3 complex components of both v1 and v2 input fields 366 ! into dist_field, where the dimension indexing the nodal points varies the least rapidly, 367 ! so that after distributing the data to the processes, each one will obtain a part 368 ! on nodal points 369 ! TRANSPOSE pr que la variable i associee aux modes soit la 1ere sur laquelle on va faire la FFT 370 t = mpi_wtime() 371 372 DO i = 1, m_max_c 373 dist_field(i,1:nb_field,1:np) = transpose(v1_in(:,:,i)) 374 dist_field(i,nb_field+1:2*nb_field,1:np) = transpose(v2_in(:,:,i)) 375 END DO 376 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 377 378 IF (np/=np_tot) dist_field(:,:,np+1:np_tot) = 1.d100 379 380 longueur_tranche=bloc_size*m_max_c*nb_field*2 381 382 t = mpi_wtime() 383 mpid=mpi_double_precision 384 CALL mpi_alltoall(dist_field,longueur_tranche, mpid, combined_field, longueur_tranche, & 385 mpid, communicator, code) 386 IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t 387 388 t = mpi_wtime() 389 !JLG, FEB 4, 2011 390 cu = 0.d0 391 !JLG, FEB 4, 2011 392 DO n = 1, bloc_size 393 DO nb = 1, nb_procs 394 shiftc = (nb-1)*bloc_size 395 shiftl = (nb-1)*m_max_c 396 jindex = n + shiftc 397 DO nf = 1, nb_field 398 ! Put real and imaginary parts in a complex 399 ! nf=1,2,3 => V1_in 400 ! nf=4,5,6 => V2_in 401 ! INPUT ARE COSINE AND SINE COEFFICIENTS 402 ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2 403 cu(shiftl+1:shiftl+m_max_c,nf,n) = cmplx(combined_field(:,2*nf-1,jindex),& 404 -combined_field(:,2*nf,jindex),kind=8)/2 405 END DO 406 END DO 407 END DO 408 cu(1,:,:) = 2*cmplx(REAL(cu(1,:,:),KIND=8),0.d0,kind=8) 409 !JLG, FEB 4, 2011 410 !Padding is done by initialization of cu: cu = 0 411 !This is eequivalent to cu(m_max+1:m_max_pad,:,:) = 0.d0 412 !JLG, FEB 4, 2011 413 414 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 415 416 ! Set the parameters for dfftw 417 fft_dim=1; istride=1; ostride=1; 418 !JLG, FEB 4, 2011 419 !!$ idist=N_r; inembed(1)=N_r; DIM(1)=N_r
420 !!$odist=m_max; onembed(1)=m_max 421 idist=n_r_pad; inembed(1)=n_r_pad; dim(1)=n_r_pad 422 odist=m_max_pad; onembed(1)=m_max_pad 423 !JLG, FEB 4, 2011 424 425 howmany=bloc_size*nb_field 426 427 428 t = mpi_wtime() 429 CALL dfftw_plan_many_dft_c2r(fftw_plan_multi_c2r, fft_dim, dim, howmany, cu, & 430 onembed, ostride, odist, ru, inembed, istride, idist, fftw_estimate) 431 !write(*,*) ' FFT_PAR_CROSS_PROD: fftw_plan_multi_c2r', fftw_plan_multi_c2r 432 CALL dfftw_execute(fftw_plan_multi_c2r) 433 434 ! CROSS PRODDUCT 435 IF (nb_field==6) THEN 436 prod_ru(:,1,:) = ru(:,2,:)*ru(:,6,:) - ru(:,3,:)*ru(:,5,:) 437 prod_ru(:,2,:) = ru(:,3,:)*ru(:,4,:) - ru(:,1,:)*ru(:,6,:) 438 prod_ru(:,3,:) = ru(:,1,:)*ru(:,5,:) - ru(:,2,:)*ru(:,4,:) 439 END IF 440 ! CROSS PRODUCT 441 442 howmany = howmany/2 443 CALL dfftw_plan_many_dft_r2c(fftw_plan_multi_r2c, fft_dim, dim, howmany, prod_ru, & 444 inembed, istride, idist, prod_cu, onembed, ostride, odist, fftw_estimate) 445 !write(*,*) ' FFT_PAR_CROSS_PROD: fftw_plan_multi_r2c', fftw_plan_multi_r2c 446 CALL dfftw_execute(fftw_plan_multi_r2c) 447 !JLG, FEB 4, 2011 448 !!$ prod_cu = prod_cu/N_r !Scaling
449  prod_cu = prod_cu/n_r_pad !Scaling
450  !JLG, FEB 4, 2011
451  IF (present(temps)) temps(2) = temps(2) + mpi_wtime() -t
452
453  !Now we need to redistribute the Fourier coefficients on each processor
454
455  t = mpi_wtime()
456  combined_prod_cu(:,:,1)=prod_cu(1,:,:)
457  DO n=2, m_max
458  !combined_prod_cu(:,:,n)=prod_cu(n,:,:)
459  combined_prod_cu(:,:,n)=2*conjg(prod_cu(n,:,:))
460  END DO
461
462  IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t
463
464  t = mpi_wtime()
465  longueur_tranche=bloc_size*m_max_c*nb_field
466  mpid=mpi_double_precision
467  CALL mpi_alltoall(combined_prod_cu,longueur_tranche,mpid, dist_prod_cu,longueur_tranche, &
468  mpid,communicator,code)
469  IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t
470  ! dimensions:
471  t = mpi_wtime()
472  DO i = 1, m_max_c
473  DO nb = 1, nb_procs
474  shiftc = (nb-1)*bloc_size
475  shiftl = (nb-1)*m_max_c
476  intermediate = dist_prod_cu(:,:,shiftl+i)
477  DO n = 1, bloc_size
478  IF (n+shiftc > np ) cycle
479  DO i_field = 1, nb_field/2
480  v_out(n+shiftc, i_field*2-1, i) = REAL (intermediate(i_field,n),kind=8)
481  v_out(n+shiftc, i_field*2 , i) = aimag(intermediate(i_field,n))
482  END DO
483  END DO
484  END DO
485  END DO
486  IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t
487
488
489  END SUBROUTINE fft_par_cross_prod_dcl
490
491  SUBROUTINE fft_par_compressive_visc_dcl(communicator,V1_in, V2_in, V_out, pb, nb_procs, &
492  bloc_size, m_max_pad,l_g, opt_norm_out, opt_m_vel, opt_norm, temps, padding)
493  !This a de-aliased version of the code, FEB 4, 2011, JLG
494  USE my_util
495  IMPLICIT NONE
496  include 'fftw3.f'
497  ! Format: V_1in(1:np,1:6,1:m_max_c)
498  ! INPUT ARE COSINE AND SINE COEFFICIENTS
499  ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2
500  REAL(KIND=8), DIMENSION(:,:,:), INTENT(IN) :: v1_in, v2_in
501  REAL(KIND=8), DIMENSION(:,:,:), INTENT(OUT) :: v_out
502  REAL(KIND=8), DIMENSION(:,:), OPTIONAL, INTENT(OUT) :: opt_norm_out
503  REAL(KIND=8), DIMENSION(:,:), OPTIONAL, INTENT(IN) :: opt_norm
504  REAL(KIND=8), DIMENSION(:), OPTIONAL, INTENT(INOUT) :: temps
505  LOGICAL, OPTIONAL, INTENT(IN) :: padding
506  INTEGER, INTENT(IN) :: bloc_size, m_max_pad, nb_procs
507  REAL(KIND=8), OPTIONAL, INTENT(IN) :: opt_m_vel
508  INTEGER, INTENT(IN) :: l_g
509  INTEGER, INTENT(IN) :: pb
510  INTEGER :: np, np_tot, nb_field1, nb_field2, m_max, m_max_c, mpid, n_r_pad
511  INTEGER(KIND=8) :: fftw_plan_multi_c2r, fftw_plan_multi_r2c
512
513  COMPLEX(KIND=8), DIMENSION(m_max_pad, (SIZE(V1_in,2)+SIZE(V2_in,2))/2, bloc_size) :: cu
514  REAL(KIND=8), DIMENSION(2*m_max_pad-1,(SIZE(V1_in,2)+SIZE(V2_in,2))/2,bloc_size) :: ru
515  COMPLEX(KIND=8), DIMENSION(m_max_pad,SIZE(V1_in,2)/2,bloc_size) :: prod_cu
516  REAL(KIND=8), DIMENSION(2*m_max_pad-1,SIZE(V1_in,2)/2,bloc_size) :: prod_ru
517  COMPLEX(KIND=8), DIMENSION(SIZE(V1_in,2)/2, bloc_size) :: intermediate
519  REAL(KIND=8), DIMENSION(2*m_max_pad-1, bloc_size) :: norm_vel
520  REAL(KIND=8), DIMENSION(2*m_max_pad-1, bloc_size/l_G) :: norm_vel_int
521
522  REAL(KIND=8), DIMENSION(SIZE(V1_in,3),SIZE(V1_in,2)+SIZE(V2_in,2),bloc_size*nb_procs) :: dist_field, combined_field
523  COMPLEX(KIND=8), DIMENSION(SIZE(V1_in,2)/2,bloc_size,SIZE(V1_in,3)*nb_procs) :: combined_prod_cu
524  COMPLEX(KIND=8), DIMENSION(SIZE(V1_in,2)/2,bloc_size,SIZE(V1_in,3)*nb_procs) :: dist_prod_cu
525
526  INTEGER :: i_field
527  INTEGER :: nb, nf, shiftc, shiftl, jindex, longueur_tranche, i, n, code, l
528  REAL(KIND=8) :: t, x
529
530  ! FFTW parameters
531  INTEGER :: fft_dim, howmany, istride, ostride, idist, odist
532  INTEGER, DIMENSION(1) :: dim, inembed, onembed
533  ! Recall complexes must be rescaled
534  ! End FFTW parameters
535 #include "petsc/finclude/petsc.h"
536  mpi_comm :: communicator
537
538  IF (present(temps)) temps = 0.d0
539
540  np = SIZE(v1_in,1)
541  nb_field1= SIZE(v1_in,2) ! Number of fields
542  nb_field2= SIZE(v2_in,2) ! Number of fields
543  m_max_c = SIZE(v1_in,3) ! Number of complex (cosines + sines) coefficients per point
544  m_max = m_max_c*nb_procs! Number of comlex coefficients per point per processor
546  np_tot = nb_procs*bloc_size
547
548  IF (mod(nb_field1,2)/=0 .OR. mod(nb_field2,2)/=0 .OR. m_max_c==0) THEN
549  WRITE(*,*) ' BUG '
550  stop
551  END IF
552
553  ! Bloc_size is the number of points that are handled by one processor
554  ! once the Fourier modes are all collected
555  ! Computation of bloc_size and np_tot
556  ! fin de la repartition des points
557
558  ! Packing all 3 complex components of both v1 and v2 input fields
559  ! into dist_field, where the dimension indexing the nodal points varies the least rapidly,
560  ! so that after distributing the data to the processes, each one will obtain a part
561  ! on nodal points
562  ! TRANSPOSE pr que la variable i associee aux modes soit la 1ere sur laquelle on va faire la FFT
563  t = mpi_wtime()
564
565  DO i = 1, m_max_c
566  dist_field(i,1:nb_field1,1:np) = transpose(v1_in(:,:,i))
567  dist_field(i,nb_field1+1:nb_field1+nb_field2,1:np) = transpose(v2_in(:,:,i))
568  END DO
569  IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t
570
571  IF (np/=np_tot) dist_field(:,:,np+1:np_tot) = 1.d100
572
573  longueur_tranche=bloc_size*m_max_c*(nb_field1+nb_field2)
574
575  t = mpi_wtime()
576  mpid=mpi_double_precision
577  CALL mpi_alltoall(dist_field,longueur_tranche, mpid, combined_field, longueur_tranche, &
578  mpid, communicator, code)
579  IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t
580
581  t = mpi_wtime()
582  !JLG, FEB 4, 2011
583  cu = 0.d0
584  !JLG, FEB 4, 2011
585  DO n = 1, bloc_size
586  DO nb = 1, nb_procs
587  shiftc = (nb-1)*bloc_size
588  shiftl = (nb-1)*m_max_c
589  jindex = n + shiftc
590  DO nf = 1, (nb_field1+nb_field2)/2
591  ! Put real and imaginary parts in a complex
592  ! nf=1,2,3 => V1_in
593  ! nf=4 => V2_in
594  ! INPUT ARE COSINE AND SINE COEFFICIENTS
595  ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2
596  cu(shiftl+1:shiftl+m_max_c,nf,n) = cmplx(combined_field(:,2*nf-1,jindex),&
597  -combined_field(:,2*nf,jindex),kind=8)/2
598  END DO
599  END DO
600  END DO
601  cu(1,:,:) = 2*cmplx(REAL(cu(1,:,:),KIND=8),0.d0,kind=8)
602  !JLG, FEB 4, 2011
603  !Padding is done by initialization of cu: cu = 0
604  !This is eequivalent to cu(m_max+1:m_max_pad,:,:) = 0.d0
605  !JLG, FEB 4, 2011
606
607  IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t
608
609  ! Set the parameters for dfftw
610  fft_dim=1; istride=1; ostride=1;
611  !JLG, FEB 4, 2011
612 !!$idist=N_r; inembed(1)=N_r; DIM(1)=N_r 613 !!$ odist=m_max; onembed(1)=m_max
616  !JLG, FEB 4, 2011
617
618  howmany=bloc_size*(nb_field1+nb_field2)/2
619
620  t = mpi_wtime()
621  CALL dfftw_plan_many_dft_c2r(fftw_plan_multi_c2r, fft_dim, dim, howmany, cu, &
622  onembed, ostride, odist, ru, inembed, istride, idist, fftw_estimate)
623  !write(*,*) ' FFT_PAR_CROSS_PROD: fftw_plan_multi_c2r', fftw_plan_multi_c2r
624  CALL dfftw_execute(fftw_plan_multi_c2r)
625
626  IF (pb==1) THEN
628  IF (nb_field1 == 6 .AND. nb_field2 == 6) THEN
629  norm_vel(:,:) = sqrt(ru(:,4,:)**2 + ru(:,5,:)**2 + ru(:,6,:)**2)
630  DO i = 1, 2*m_max_pad - 1
631  DO l = 1, bloc_size/l_g
632  x = maxval(norm_vel(i,(l-1)*l_g+1:l*l_g))
633  norm_vel_int(i,l) = x
634  END DO
635  END DO
636  DO l = 1, bloc_size/l_g
637  DO i = 2, 2*m_max_pad - 2
638  norm_vel(i,(l-1)*l_g+1:l*l_g) = maxval(norm_vel_int(i-1:i+1,l))
639  END DO
640  norm_vel(1,(l-1)*l_g+1:l*l_g) = max(norm_vel_int(1,l),norm_vel_int(2,l),norm_vel_int(2*m_max_pad - 1,l))
641  norm_vel(2*m_max_pad - 1,(l-1)*l_g+1:l*l_g) = &
643  END DO
644  prod_ru(:,1,:) = (opt_m_vel - norm_vel(:,:))*ru(:,1,:)
645  prod_ru(:,2,:) = (opt_m_vel - norm_vel(:,:))*ru(:,2,:)
646  prod_ru(:,3,:) = (opt_m_vel - norm_vel(:,:))*ru(:,3,:)
647  END IF
648  opt_norm_out = norm_vel
649  ELSE IF (pb==2) THEN
651  IF (nb_field1 == 6 .AND. nb_field2 == 2) THEN
652  norm_grad_phi(:,:) = sqrt(ru(:,1,:)**2 + ru(:,2,:)**2 + ru(:,3,:)**2) + 1.d-14
653  prod_ru(:,1,:) = opt_norm(:,:)*ru(:,4,:)*(1.d0 - ru(:,4,:))*ru(:,1,:)/norm_grad_phi(:,:)
654  prod_ru(:,2,:) = opt_norm(:,:)*ru(:,4,:)*(1.d0 - ru(:,4,:))*ru(:,2,:)/norm_grad_phi(:,:)
655  prod_ru(:,3,:) = opt_norm(:,:)*ru(:,4,:)*(1.d0 - ru(:,4,:))*ru(:,3,:)/norm_grad_phi(:,:)
656  END IF
657  ELSE
658  CALL error_petsc('error in problem type while calling FFT_PAR_COMPRESSIVE_VISC_DCL ')
659  END IF
660
661  howmany = bloc_size*nb_field1/2
662
663  CALL dfftw_plan_many_dft_r2c(fftw_plan_multi_r2c, fft_dim, dim, howmany, prod_ru, &
664  inembed, istride, idist, prod_cu, onembed, ostride, odist, fftw_estimate)
665  !write(*,*) ' FFT_PAR_CROSS_PROD: fftw_plan_multi_r2c', fftw_plan_multi_r2c
666  CALL dfftw_execute(fftw_plan_multi_r2c)
667  !JLG, FEB 4, 2011
668 !!$prod_cu = prod_cu/N_r !Scaling 669 prod_cu = prod_cu/n_r_pad !Scaling 670 !JLG, FEB 4, 2011 671 IF (present(temps)) temps(2) = temps(2) + mpi_wtime() -t 672 673 !Now we need to redistribute the Fourier coefficients on each processor 674 675 t = mpi_wtime() 676 combined_prod_cu(:,:,1)=prod_cu(1,:,:) 677 DO n=2, m_max 678 !combined_prod_cu(:,:,n)=prod_cu(n,:,:) 679 combined_prod_cu(:,:,n)=2*conjg(prod_cu(n,:,:)) 680 END DO 681 682 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 683 684 longueur_tranche=bloc_size*m_max_c*nb_field1 685 686 t = mpi_wtime() 687 mpid=mpi_double_precision 688 CALL mpi_alltoall(combined_prod_cu,longueur_tranche,mpid, dist_prod_cu,longueur_tranche, & 689 mpid,communicator,code) 690 691 IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t 692 ! dimensions: 693 t = mpi_wtime() 694 695 DO i = 1, m_max_c 696 DO nb = 1, nb_procs 697 shiftc = (nb-1)*bloc_size 698 shiftl = (nb-1)*m_max_c 699 intermediate = dist_prod_cu(:,:,shiftl+i) 700 DO n = 1, bloc_size 701 IF (n+shiftc > np ) cycle 702 DO i_field = 1, nb_field1/2 703 v_out(n+shiftc, i_field*2-1, i) = REAL (intermediate(i_field,n),kind=8) 704 v_out(n+shiftc, i_field*2 , i) = aimag(intermediate(i_field,n)) 705 END DO 706 END DO 707 END DO 708 END DO 709 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 710 711 END SUBROUTINE fft_par_compressive_visc_dcl 712 713 SUBROUTINE fft_par_dot_prod_dcl(communicator,V1_in, V2_in, c_out, nb_procs, bloc_size, m_max_pad, temps) 714 !FFT (FFT(-1) V1 . FFT(-1) V2) = c_out 715 !This a de-aliased version of the code, FEB 4, 2011, JLG 716 IMPLICIT NONE 717 include 'fftw3.f' 718 ! Format: V_1in(1:np,1:6,1:m_max_c) 719 ! INPUT ARE COSINE AND SINE COEFFICIENTS 720 ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2 721 REAL(KIND=8), DIMENSION(:,:,:), INTENT(IN) :: v1_in, v2_in 722 REAL(KIND=8), DIMENSION(:,:,:), INTENT(OUT) :: c_out 723 REAL(KIND=8), DIMENSION(:), OPTIONAL, INTENT(INOUT) :: temps 724 INTEGER, INTENT(IN) :: nb_procs, bloc_size, m_max_pad 725 COMPLEX(KIND=8), DIMENSION(m_max_pad, SIZE(V1_in,2), bloc_size) :: cu 726 REAL(KIND=8), DIMENSION(2*m_max_pad-1,SIZE(V1_in,2),bloc_size) :: ru 727 COMPLEX(KIND=8), DIMENSION(m_max_pad,bloc_size) :: prod_cu 728 REAL(KIND=8), DIMENSION(2*m_max_pad-1,bloc_size) :: prod_ru 729 COMPLEX(KIND=8), DIMENSION(bloc_size) :: intermediate 730 REAL(KIND=8), DIMENSION(SIZE(V1_in,3),2*SIZE(V1_in,2),bloc_size*nb_procs) :: dist_field, combined_field 731 COMPLEX(KIND=8), DIMENSION(bloc_size,SIZE(V1_in,3)*nb_procs) :: combined_prod_cu 732 COMPLEX(KIND=8), DIMENSION(bloc_size,SIZE(V1_in,3)*nb_procs) :: dist_prod_cu 733 734 INTEGER :: np, np_tot, nb_field, m_max, m_max_c, mpid, n_r_pad 735 INTEGER(KIND=8) :: fftw_plan_multi_c2r, fftw_plan_multi_r2c 736 INTEGER :: nb, nf, shiftc, shiftl, jindex, longueur_tranche, i, n, code 737 REAL(KIND=8) :: t 738 ! FFTW parameters 739 INTEGER :: fft_dim, howmany, istride, ostride, idist, odist 740 INTEGER, DIMENSION(1) :: dim, inembed, onembed 741 ! Recall complexes must be rescaled 742 ! End FFTW parameters 743 #include "petsc/finclude/petsc.h" 744 mpi_comm :: communicator 745 746 IF (present(temps)) temps = 0.d0 747 748 np = SIZE(v1_in,1) 749 nb_field= SIZE(v1_in,2) ! Number of fields 750 m_max_c = SIZE(v1_in,3) ! Number of complex (cosines + sines) coefficients per point 751 m_max = m_max_c*nb_procs! Number of comlex coefficients per point per processor 752 np_tot = nb_procs*bloc_size 753 n_r_pad=2*m_max_pad-1 754 755 IF (mod(nb_field,2)/=0 .OR. m_max_c==0) THEN 756 WRITE(*,*) ' BUG ' 757 stop 758 END IF 759 760 ! Packing all 3 complex components of both v1 and v2 input fields 761 ! into dist_field, where the dimension indexing the nodal points varies the least rapidly, 762 ! so that after distributing the data to the processes, each one will obtain a part 763 ! on nodal points 764 ! TRANSPOSE pr que la variable i associee aux modes soit la 1ere sur laquelle on va faire la FFT 765 t = mpi_wtime() 766 767 DO i = 1, m_max_c 768 dist_field(i,1:nb_field,1:np) = transpose(v1_in(:,:,i)) 769 dist_field(i,nb_field+1:2*nb_field,1:np) = transpose(v2_in(:,:,i)) 770 END DO 771 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 772 773 IF (np/=np_tot) dist_field(:,:,np+1:np_tot) = 1.d100 774 775 longueur_tranche=bloc_size*m_max_c*nb_field*2 776 777 t = mpi_wtime() 778 mpid=mpi_double_precision 779 CALL mpi_alltoall(dist_field,longueur_tranche, mpid, combined_field, longueur_tranche, & 780 mpid, communicator, code) 781 IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t 782 783 t = mpi_wtime() 784 !JLG, FEB 4, 2011 785 cu = 0.d0 786 !JLG, FEB 4, 2011 787 DO n = 1, bloc_size 788 DO nb = 1, nb_procs 789 shiftc = (nb-1)*bloc_size 790 shiftl = (nb-1)*m_max_c 791 jindex = n + shiftc 792 DO nf = 1, nb_field 793 ! Put real and imaginary parts in a complex 794 ! nf=1,2,3 => V1_in 795 ! nf=4,5,6 => V2_in 796 ! INPUT ARE COSINE AND SINE COEFFICIENTS 797 ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2 798 cu(shiftl+1:shiftl+m_max_c,nf,n) = cmplx(combined_field(:,2*nf-1,jindex),& 799 -combined_field(:,2*nf,jindex),kind=8)/2 800 END DO 801 END DO 802 END DO 803 cu(1,:,:) = 2*cmplx(REAL(cu(1,:,:),KIND=8),0.d0,kind=8) 804 !JLG, FEB 4, 2011 805 !Padding is done by initialization of cu: cu = 0 806 !This is eequivalent to cu(m_max+1:m_max_pad,:,:) = 0.d0 807 !JLG, FEB 4, 2011 808 809 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 810 811 ! Set the parameters for dfftw 812 fft_dim=1; istride=1; ostride=1; 813 !JLG, FEB 4, 2011 814 !!$ idist=N_r; inembed(1)=N_r; DIM(1)=N_r
815 !!$odist=m_max; onembed(1)=m_max 816 idist=n_r_pad; inembed(1)=n_r_pad; dim(1)=n_r_pad 817 odist=m_max_pad; onembed(1)=m_max_pad 818 !JLG, FEB 4, 2011 819 820 howmany=bloc_size*nb_field 821 822 823 t = mpi_wtime() 824 CALL dfftw_plan_many_dft_c2r(fftw_plan_multi_c2r, fft_dim, dim, howmany, cu, & 825 onembed, ostride, odist, ru, inembed, istride, idist, fftw_estimate) 826 !write(*,*) ' FFT_PAR_DOT_PROD: fftw_plan_multi_c2r', fftw_plan_multi_c2r 827 CALL dfftw_execute(fftw_plan_multi_c2r) 828 829 ! DOT PRODDUCT 830 IF (nb_field==6) THEN 831 prod_ru(:,:) = ru(:,1,:)*ru(:,4,:) + ru(:,2,:)*ru(:,5,:) + ru(:,3,:)*ru(:,6,:) 832 END IF 833 ! DOT PRODUCT 834 835 howmany = bloc_size*1 836 CALL dfftw_plan_many_dft_r2c(fftw_plan_multi_r2c, fft_dim, dim, howmany, prod_ru, & 837 inembed, istride, idist, prod_cu, onembed, ostride, odist, fftw_estimate) 838 !write(*,*) ' FFT_PAR_DOT_PROD: fftw_plan_multi_r2c', fftw_plan_multi_r2c 839 CALL dfftw_execute(fftw_plan_multi_r2c) 840 !JLG, FEB 4, 2011 841 !!$ prod_cu = prod_cu/N_r !Scaling
842  prod_cu = prod_cu/n_r_pad !Scaling
843  !JLG, FEB 4, 2011
844  IF (present(temps)) temps(2) = temps(2) + mpi_wtime() -t
845
846  !Now we need to redistribute the Fourier coefficients on each processor
847  t = mpi_wtime()
848  combined_prod_cu(:,1)=prod_cu(1,:)
849  DO n=2, m_max
850  combined_prod_cu(:,n)=2*conjg(prod_cu(n,:))
851  END DO
852
853  IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t
854
855  t = mpi_wtime()
856  longueur_tranche=bloc_size*m_max_c*2
857  mpid=mpi_double_precision
858  CALL mpi_alltoall(combined_prod_cu,longueur_tranche,mpid, dist_prod_cu,longueur_tranche, &
859  mpid,communicator,code)
860  IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t
861
862  t = mpi_wtime()
863  DO i = 1, m_max_c
864  DO nb = 1, nb_procs
865  shiftc = (nb-1)*bloc_size
866  shiftl = (nb-1)*m_max_c
867  intermediate = dist_prod_cu(:,shiftl+i)
868  DO n = 1, bloc_size
869  IF (n+shiftc > np ) cycle
870  c_out(n+shiftc, 1, i) = REAL (intermediate(n),kind=8)
871  c_out(n+shiftc, 2 , i) = aimag(intermediate(n))
872  END DO
873  END DO
874  END DO
875  IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t
876
877  END SUBROUTINE fft_par_dot_prod_dcl
878
879  SUBROUTINE fft_par_prod_dcl(communicator, c1_in, c2_in, c_out, nb_procs, bloc_size, m_max_pad, temps)
880  !FFT (FFT(-1) V1 . FFT(-1) V2) = c_out
881  !This a de-aliased version of the code, FEB 4, 2011, JLG
882  USE my_util
883  IMPLICIT NONE
884  include 'fftw3.f'
885  ! Format: c_1in(1:np,1:2,1:m_max_c)
886  ! INPUT ARE COSINE AND SINE COEFFICIENTS
887  ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2
888  REAL(KIND=8), DIMENSION(:,:,:), INTENT(IN) :: c1_in, c2_in
889  REAL(KIND=8), DIMENSION(:,:,:), INTENT(OUT) :: c_out
890  REAL(KIND=8), DIMENSION(:), OPTIONAL, INTENT(INOUT) :: temps
891  INTEGER, INTENT(IN) :: nb_procs, bloc_size, m_max_pad
892
893  COMPLEX(KIND=8), DIMENSION(m_max_pad, 2, bloc_size) :: cu
894  REAL(KIND=8), DIMENSION(2*m_max_pad-1, 2, bloc_size) :: ru
895  COMPLEX(KIND=8), DIMENSION(m_max_pad, bloc_size) :: prod_cu
896  REAL(KIND=8), DIMENSION(2*m_max_pad-1,bloc_size) :: prod_ru
897  REAL(KIND=8), DIMENSION(SIZE(c1_in,3),4, bloc_size*nb_procs) :: dist_field, combined_field
898  COMPLEX(KIND=8), DIMENSION(bloc_size,SIZE(c1_in,3)*nb_procs) :: dist_prod_cu, combined_prod_cu
899  COMPLEX(KIND=8), DIMENSION(bloc_size) :: intermediate
900
901  INTEGER :: np, np_tot, m_max, m_max_c, mpid, n_r_pad
902  INTEGER(KIND=8) :: fftw_plan_multi_c2r, fftw_plan_multi_r2c
903  INTEGER :: nb, nf, shiftc, shiftl, jindex, longueur_tranche, i, n, code
904  REAL(KIND=8) :: t
905  ! FFTW parameters
906  INTEGER :: fft_dim, howmany, istride, ostride, idist, odist
907  INTEGER, DIMENSION(1) :: dim, inembed, onembed
908  ! Recall complexes must be rescaled
909  ! End FFTW parameters
910 #include "petsc/finclude/petsc.h"
911  mpi_comm :: communicator
912
913  IF (present(temps)) temps = 0.d0
914
915  np = SIZE(c1_in,1)
916  m_max_c = SIZE(c1_in,3) ! Number of complex (cosines + sines) coefficients per point
917  m_max = m_max_c*nb_procs! Number of comlex coefficients per point per processor
918  np_tot = nb_procs*bloc_size
920
921  IF (m_max_c==0) THEN
922  WRITE(*,*) ' BUG '
923  stop
924  END IF
925
926  ! Packing all 3 complex components of both v1 and v2 input fields
927  ! into dist_field, where the dimension indexing the nodal points varies the least rapidly,
928  ! so that after distributing the data to the processes, each one will obtain a part
929  ! on nodal points
930  ! TRANSPOSE pr que la variable i associee aux modes soit la 1ere sur laquelle on va faire la FFT
931  t = mpi_wtime()
932  DO i = 1, m_max_c
933  dist_field(i,1:2,1:np) = transpose(c1_in(:,:,i))
934  dist_field(i,3:4,1:np) = transpose(c2_in(:,:,i))
935  END DO
936  IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t
937
938  IF (np/=np_tot) dist_field(:,:,np+1:np_tot) = 1.d100
939
940  longueur_tranche=bloc_size*m_max_c*4
941
942  t = mpi_wtime()
943  mpid=mpi_double_precision
944  CALL mpi_alltoall(dist_field,longueur_tranche, mpid, combined_field, longueur_tranche, &
945  mpid, communicator, code)
946  IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t
947
948  t = mpi_wtime()
949  !JLG, FEB 4, 2011
950  cu = 0.d0
951  !JLG, FEB 4, 2011
952  DO n = 1, bloc_size
953  DO nb = 1, nb_procs
954  shiftc = (nb-1)*bloc_size
955  shiftl = (nb-1)*m_max_c
956  jindex = n + shiftc
957  DO nf = 1, 2
958  ! Put real and imaginary parts in a complex
959  ! nf=1 => c1_in
960  ! nf=2 => c2_in
961  ! INPUT ARE COSINE AND SINE COEFFICIENTS
962  ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2
963  cu(shiftl+1:shiftl+m_max_c,nf,n) = cmplx(combined_field(:,2*nf-1,jindex),&
964  -combined_field(:,2*nf,jindex),kind=8)/2
965  END DO
966  END DO
967  END DO
968  cu(1,:,:) = 2*cmplx(REAL(cu(1,:,:),KIND=8),0.d0,kind=8)
969  !JLG, FEB 4, 2011
970  !Padding is done by initialization of cu: cu = 0
971  !This is eequivalent to cu(m_max+1:m_max_pad,:,:) = 0.d0
972  !JLG, FEB 4, 2011
973
974  IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t
975
976  ! Set the parameters for dfftw
977  fft_dim=1; istride=1; ostride=1;
978  !JLG, FEB 4, 2011
979 !!$idist=N_r; inembed(1)=N_r; DIM(1)=N_r 980 !!$ odist=m_max; onembed(1)=m_max
983  !JLG, FEB 4, 2011
984
985  howmany=bloc_size*2
986
987  t = mpi_wtime()
988  CALL dfftw_plan_many_dft_c2r(fftw_plan_multi_c2r, fft_dim, dim, howmany, cu, &
989  onembed, ostride, odist, ru, inembed, istride, idist, fftw_estimate)
990  CALL dfftw_execute(fftw_plan_multi_c2r)
991
992  !PRODDUCT
993  prod_ru(:,:) = ru(:,1,:)*ru(:,2,:)
994  !PRODUCT
995
996  howmany = bloc_size*1
997  CALL dfftw_plan_many_dft_r2c(fftw_plan_multi_r2c, fft_dim, dim, howmany, prod_ru, &
998  inembed, istride, idist, prod_cu, onembed, ostride, odist, fftw_estimate)
999  CALL dfftw_execute(fftw_plan_multi_r2c)
1000  !JLG, FEB 4, 2011
1001 !!$prod_cu = prod_cu/N_r !Scaling 1002 prod_cu = prod_cu/n_r_pad !Scaling 1003 !JLG, FEB 4, 2011 1004 IF (present(temps)) temps(2) = temps(2) + mpi_wtime() -t 1005 1006 !Now we need to redistribute the Fourier coefficients on each processor 1007 t = mpi_wtime() 1008 combined_prod_cu(:,1)=prod_cu(1,:) 1009 DO n=2, m_max 1010 combined_prod_cu(:,n)=2*conjg(prod_cu(n,:)) 1011 END DO 1012 1013 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 1014 1015 t = mpi_wtime() 1016 longueur_tranche=bloc_size*m_max_c*2 1017 mpid=mpi_double_precision 1018 CALL mpi_alltoall(combined_prod_cu,longueur_tranche,mpid, dist_prod_cu,longueur_tranche, & 1019 mpid,communicator,code) 1020 IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t 1021 1022 t = mpi_wtime() 1023 DO i = 1, m_max_c 1024 DO nb = 1, nb_procs 1025 shiftc = (nb-1)*bloc_size 1026 shiftl = (nb-1)*m_max_c 1027 intermediate = dist_prod_cu(:,shiftl+i) 1028 DO n = 1, bloc_size 1029 IF (n+shiftc > np ) cycle 1030 c_out(n+shiftc, 1, i) = REAL (intermediate(n),kind=8) 1031 c_out(n+shiftc, 2 , i) = aimag(intermediate(n)) 1032 END DO 1033 END DO 1034 END DO 1035 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 1036 1037 END SUBROUTINE fft_par_prod_dcl 1038 1039 SUBROUTINE fft_par_dot_prod_bis(communicator,V1_in, V2_in, V_out, nb_procs, bloc_size, m_max_pad, temps, padding) 1040 !This a de-aliased version of the code, FEB 4, 2011, JLG 1041 USE my_util 1042 IMPLICIT NONE 1043 include 'fftw3.f' 1044 ! Format: V_1in(1:np,1:6,1:m_max_c) 1045 ! INPUT ARE COSINE AND SINE COEFFICIENTS 1046 ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2 1047 REAL(KIND=8), DIMENSION(:,:,:), INTENT(IN) :: v1_in, v2_in 1048 REAL(KIND=8), DIMENSION(:,:,:), INTENT(OUT) :: v_out 1049 INTEGER, INTENT(IN) :: bloc_size, m_max_pad, nb_procs 1050 COMPLEX(KIND=8), DIMENSION(m_max_pad, SIZE(V1_in,2), bloc_size) :: cu 1051 REAL(KIND=8), DIMENSION(2*m_max_pad-1,SIZE(V1_in,2),bloc_size) :: ru 1052 COMPLEX(KIND=8), DIMENSION(m_max_pad,bloc_size) :: prod_cu 1053 REAL(KIND=8), DIMENSION(2*m_max_pad-1,bloc_size) :: prod_ru 1054 COMPLEX(KIND=8), DIMENSION(bloc_size) :: intermediate 1055 REAL(KIND=8), DIMENSION(SIZE(V1_in,3),2*SIZE(V1_in,2),bloc_size*nb_procs) :: dist_field, combined_field 1056 COMPLEX(KIND=8), DIMENSION(bloc_size,SIZE(V1_in,3)*nb_procs) :: combined_prod_cu 1057 COMPLEX(KIND=8), DIMENSION(bloc_size,SIZE(V1_in,3)*nb_procs) :: dist_prod_cu 1058 REAL(KIND=8), DIMENSION(:), OPTIONAL, INTENT(INOUT) :: temps 1059 LOGICAL, OPTIONAL, INTENT(IN) :: padding 1060 INTEGER :: np, np_tot, nb_field, m_max, m_max_c, mpid, n_r_pad 1061 INTEGER(KIND=8) :: fftw_plan_multi_c2r, fftw_plan_multi_r2c 1062 INTEGER :: i_field 1063 INTEGER :: nb, nf, shiftc, shiftl, jindex, longueur_tranche, i, n, code 1064 REAL(KIND=8) :: t 1065 1066 ! FFTW parameters 1067 INTEGER :: fft_dim, howmany, istride, ostride, idist, odist 1068 INTEGER, DIMENSION(1) :: dim, inembed, onembed 1069 ! Recall complexes must be rescaled 1070 ! End FFTW parameters 1071 #include "petsc/finclude/petsc.h" 1072 mpi_comm :: communicator 1073 1074 IF (present(temps)) temps = 0.d0 1075 1076 np = SIZE(v1_in,1) 1077 nb_field= SIZE(v1_in,2) ! Number of fields 1078 m_max_c = SIZE(v1_in,3) ! Number of complex (cosines + sines) coefficients per point 1079 m_max = m_max_c*nb_procs! Number of comlex coefficients per point per processor 1080 n_r_pad=2*m_max_pad-1 1081 np_tot = nb_procs*bloc_size 1082 1083 IF (mod(nb_field,2)/=0 .OR. m_max_c==0) THEN 1084 WRITE(*,*) ' BUG ' 1085 stop 1086 END IF 1087 1088 ! Bloc_size is the number of points that are handled by one processor 1089 ! once the Fourier modes are all collected 1090 ! Computation of bloc_size and np_tot 1091 ! fin de la repartition des points 1092 1093 ! Packing all 3 complex components of both v1 and v2 input fields 1094 ! into dist_field, where the dimension indexing the nodal points varies the least rapidly, 1095 ! so that after distributing the data to the processes, each one will obtain a part 1096 ! on nodal points 1097 ! TRANSPOSE pr que la variable i associee aux modes soit la 1ere sur laquelle on va faire la FFT 1098 t = mpi_wtime() 1099 1100 DO i = 1, m_max_c 1101 dist_field(i,1:nb_field,1:np) = transpose(v1_in(:,:,i)) 1102 dist_field(i,nb_field+1:2*nb_field,1:np) = transpose(v2_in(:,:,i)) 1103 END DO 1104 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 1105 1106 IF (np/=np_tot) dist_field(:,:,np+1:np_tot) = 1.d100 1107 1108 longueur_tranche=bloc_size*m_max_c*nb_field*2 1109 1110 t = mpi_wtime() 1111 mpid=mpi_double_precision 1112 CALL mpi_alltoall(dist_field,longueur_tranche, mpid, combined_field, longueur_tranche, & 1113 mpid, communicator, code) 1114 IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t 1115 1116 t = mpi_wtime() 1117 !JLG, FEB 4, 2011 1118 cu = 0.d0 1119 !JLG, FEB 4, 2011 1120 DO n = 1, bloc_size 1121 DO nb = 1, nb_procs 1122 shiftc = (nb-1)*bloc_size 1123 shiftl = (nb-1)*m_max_c 1124 jindex = n + shiftc 1125 DO nf = 1, nb_field 1126 ! Put real and imaginary parts in a complex 1127 ! nf=1,2,3 => V1_in 1128 ! nf=4,5,6 => V2_in 1129 ! INPUT ARE COSINE AND SINE COEFFICIENTS 1130 ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2 1131 cu(shiftl+1:shiftl+m_max_c,nf,n) = cmplx(combined_field(:,2*nf-1,jindex),& 1132 -combined_field(:,2*nf,jindex),kind=8)/2 1133 END DO 1134 END DO 1135 END DO 1136 cu(1,:,:) = 2*cmplx(REAL(cu(1,:,:),KIND=8),0.d0,kind=8) 1137 !JLG, FEB 4, 2011 1138 !Padding is done by initialization of cu: cu = 0 1139 !This is eequivalent to cu(m_max+1:m_max_pad,:,:) = 0.d0 1140 !JLG, FEB 4, 2011 1141 1142 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 1143 1144 ! Set the parameters for dfftw 1145 fft_dim=1; istride=1; ostride=1; 1146 !JLG, FEB 4, 2011 1147 !!$ idist=N_r; inembed(1)=N_r; DIM(1)=N_r
1148 !!$odist=m_max; onembed(1)=m_max 1149 idist=n_r_pad; inembed(1)=n_r_pad; dim(1)=n_r_pad 1150 odist=m_max_pad; onembed(1)=m_max_pad 1151 !JLG, FEB 4, 2011 1152 1153 howmany=bloc_size*nb_field 1154 1155 1156 t = mpi_wtime() 1157 CALL dfftw_plan_many_dft_c2r(fftw_plan_multi_c2r, fft_dim, dim, howmany, cu, & 1158 onembed, ostride, odist, ru, inembed, istride, idist, fftw_estimate) 1159 !write(*,*) ' FFT_PAR_CROSS_PROD: fftw_plan_multi_c2r', fftw_plan_multi_c2r 1160 CALL dfftw_execute(fftw_plan_multi_c2r) 1161 1162 ! DOT PRODDUCT 1163 IF (nb_field==6) THEN 1164 prod_ru(:,:) = ru(:,1,:)*ru(:,4,:) + ru(:,2,:)*ru(:,5,:) + ru(:,3,:)*ru(:,6,:) 1165 END IF 1166 ! DOT PRODUCT 1167 1168 howmany = bloc_size 1169 CALL dfftw_plan_many_dft_r2c(fftw_plan_multi_r2c, fft_dim, dim, howmany, prod_ru, & 1170 inembed, istride, idist, prod_cu, onembed, ostride, odist, fftw_estimate) 1171 !write(*,*) ' FFT_PAR_CROSS_PROD: fftw_plan_multi_r2c', fftw_plan_multi_r2c 1172 CALL dfftw_execute(fftw_plan_multi_r2c) 1173 !JLG, FEB 4, 2011 1174 ! prod_cu = prod_cu/N_r !Scaling 1175 prod_cu = prod_cu/n_r_pad !Scaling 1176 !JLG, FEB 4, 2011 1177 IF (present(temps)) temps(2) = temps(2) + mpi_wtime() -t 1178 1179 !Now we need to redistribute the Fourier coefficients on each processor 1180 1181 t = mpi_wtime() 1182 combined_prod_cu(:,1)=prod_cu(1,:) 1183 DO n=2, m_max 1184 !combined_prod_cu(:,:,n)=prod_cu(n,:,:) 1185 combined_prod_cu(:,n)=2*conjg(prod_cu(n,:)) 1186 END DO 1187 1188 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 1189 1190 t = mpi_wtime() 1191 longueur_tranche=bloc_size*m_max_c*2 1192 mpid=mpi_double_precision 1193 CALL mpi_alltoall(combined_prod_cu,longueur_tranche,mpid, dist_prod_cu,longueur_tranche, & 1194 mpid,communicator,code) 1195 IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t 1196 ! dimensions: 1197 t = mpi_wtime() 1198 DO i = 1, m_max_c 1199 DO nb = 1, nb_procs 1200 shiftc = (nb-1)*bloc_size 1201 shiftl = (nb-1)*m_max_c 1202 intermediate = dist_prod_cu(:,shiftl+i) 1203 DO n = 1, bloc_size 1204 IF (n+shiftc > np ) cycle 1205 v_out(n+shiftc, 1, i) = REAL (intermediate(n),kind=8) 1206 v_out(n+shiftc, 2, i) = aimag(intermediate(n)) 1207 END DO 1208 END DO 1209 END DO 1210 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 1211 1212 1213 END SUBROUTINE fft_par_dot_prod_bis 1214 1215 1216 1217 SUBROUTINE fft_par_cross_prod(communicator,V1_in, V2_in, V_out, temps, padding) 1218 !This a de-aliased version of the code, FEB 4, 2011, JLG 1219 IMPLICIT NONE 1220 include 'fftw3.f' 1221 ! Format: V_1in(1:np,1:6,1:m_max_c) 1222 ! INPUT ARE COSINE AND SINE COEFFICIENTS 1223 ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2 1224 REAL(KIND=8), DIMENSION(:,:,:), INTENT(IN) :: v1_in, v2_in 1225 REAL(KIND=8), DIMENSION(:,:,:), INTENT(OUT) :: v_out 1226 REAL(KIND=8), DIMENSION(:), OPTIONAL, INTENT(INOUT) :: temps 1227 LOGICAL, OPTIONAL, INTENT(IN) :: padding 1228 ! Saved variables 1229 LOGICAL, SAVE :: once=.true. 1230 INTEGER, SAVE :: np_ref, np, np_tot, bloc_size, nb_field, & 1231 m_max, m_max_c, n_r, rang, nb_procs, mpid, m_max_pad, n_r_pad 1232 INTEGER(KIND=8), SAVE :: fftw_plan_multi_c2r, fftw_plan_multi_r2c 1233 COMPLEX(KIND=8), ALLOCATABLE, DIMENSION(:,:,:), SAVE :: cu, prod_cu 1234 REAL(KIND=8), ALLOCATABLE, DIMENSION(:,:,:), SAVE :: ru, prod_ru 1235 ! End saved variables 1236 1237 INTEGER :: i_field 1238 INTEGER :: np_glob, np_loc, reste, np_alloc, nn, nb_bloc, n_sup, n_inf, n_up, n_cache 1239 INTEGER :: nb, nf, shiftc, shiftl, jindex, longueur_tranche, i, n, code 1240 REAL(KIND=8) :: t 1241 REAL(KIND=8), ALLOCATABLE, DIMENSION(:,:,:) :: dist_field, combined_field 1242 COMPLEX(KIND=8), ALLOCATABLE, DIMENSION(:,:,:) :: combined_prod_cu, dist_prod_cu, out_prod_cu 1243 1244 !Vectors to speed up the format changes 1245 COMPLEX(KIND=8), ALLOCATABLE, DIMENSION(:,:) :: intermediate 1246 !COMPLEX, ALLOCATABLE, DIMENSION(:,:) :: intermediate !There was a bug !15/09/2010 1247 !Vectors to speed up the format changes 1248 1249 ! FFTW parameters 1250 INTEGER :: fft_dim, howmany, istride, ostride, idist, odist 1251 INTEGER, DIMENSION(1) :: dim, inembed, onembed 1252 ! Recall complexes must be rescaled 1253 ! End FFTW parameters 1254 1255 !Temps(1) = Temps de communication 1256 !Temps(2) = Temps de calcul 1257 !Temps(3) = Temps de changement de format 1258 1259 !EXTERNAL hostnm 1260 !EXTERNAL gethostname 1261 #include "petsc/finclude/petsc.h" 1262 mpi_comm :: communicator 1263 1264 CALL mpi_comm_size(communicator,nb_procs,code) 1265 CALL mpi_comm_rank(communicator,rang,code) 1266 1267 IF (present(temps)) temps = 0.d0 1268 1269 IF (.NOT.once) THEN 1270 IF (SIZE(v1_in,1).NE.np_ref) THEN 1271 once = .true. !Something wrong happened, reinitialize 1272 np_ref = SIZE(v1_in,1) 1273 END IF 1274 END IF 1275 1276 once = .true. 1277 n_cache = 150000 1278 1279 np_glob = SIZE(v1_in,1) 1280 m_max_c = SIZE(v1_in,3) 1281 np_loc = n_cache/(12*m_max_c) 1282 nb_bloc = max(np_glob/np_loc,1) 1283 1284 100 np_loc = np_glob/nb_bloc 1285 reste = np_glob - np_loc*nb_bloc 1286 np_alloc = np_loc + reste 1287 reste = np_alloc*nb_bloc - np_glob 1288 IF (reste>np_alloc) THEN 1289 nb_bloc = nb_bloc - 1 1290 go to 100 1291 END IF 1292 ! nb_bloc = nbre de blocs qui decoupe le plan meridien 1293 ! np_alloc = nbre de points ds 1 bloc 1294 1295 n_sup = 0 1296 DO nn= 1, nb_bloc 1297 IF (once) THEN 1298 nb_field= SIZE(v1_in,2) ! Number of fields 1299 m_max_c = SIZE(v1_in,3) ! Number of complex (cosines + sines) coefficients per point 1300 m_max = m_max_c*nb_procs! Number of comlex coefficients per point per processor 1301 n_r=2*m_max-1 ! Number of Real coefficients per point 1302 IF (mod(nb_field,2)/=0 .OR. m_max_c==0) THEN 1303 WRITE(*,*) ' BUG ' 1304 stop 1305 END IF 1306 1307 ! Bloc_size is the number of points that are handled by one processor 1308 ! once the Fourier modes are all collected 1309 ! Computation of bloc_size and np_tot 1310 np = np_alloc 1311 IF (modulo(np,nb_procs)==0) THEN 1312 bloc_size = np/nb_procs 1313 ELSE 1314 bloc_size = np/nb_procs + 1 1315 END IF 1316 np_tot = nb_procs*bloc_size 1317 ! fin de la repartition des points 1318 1319 1320 !JLG, FEB 4, 2011 1321 !Only ru, cu, prod_ru, prod_cu are modified 1322 IF (present(padding)) THEN 1323 IF (padding) THEN 1324 m_max_pad = 3*m_max/2 1325 ELSE 1326 WRITE(*,*) ' NO PADDING ' 1327 m_max_pad = m_max 1328 END IF 1329 ELSE 1330 m_max_pad = 3*m_max/2 1331 END IF 1332 n_r_pad=2*m_max_pad-1 1333 1334 IF (ALLOCATED(ru)) DEALLOCATE(ru,cu,prod_ru,prod_cu) 1335 ALLOCATE(cu(m_max_pad,nb_field,bloc_size)) 1336 ALLOCATE(ru(n_r_pad, nb_field,bloc_size)) 1337 ALLOCATE(prod_cu(m_max_pad,nb_field/2,bloc_size)) 1338 ALLOCATE(prod_ru(n_r_pad, nb_field/2,bloc_size)) 1339 !JLG, FEB 4, 2001 1340 ALLOCATE(intermediate(nb_field/2,bloc_size)) 1341 ALLOCATE( dist_field(m_max_c,2*nb_field,np_tot)) 1342 ALLOCATE(combined_field(m_max_c,2*nb_field,np_tot)) 1343 ALLOCATE(dist_prod_cu(nb_field/2,bloc_size,m_max)) 1344 ALLOCATE(combined_prod_cu(nb_field/2,bloc_size,m_max)) 1345 ALLOCATE(out_prod_cu(m_max_c,np_tot,nb_field/2)) 1346 END IF 1347 1348 1349 ! Packing all 3 complex components of both v1 and v2 input fields 1350 ! into dist_field, where the dimension indexing the nodal points varies the least rapidly, 1351 ! so that after distributing the data to the processes, each one will obtain a part 1352 ! on nodal points 1353 ! TRANSPOSE pr que la variable i associee aux modes soit la 1ere sur laquelle on va faire la FFT 1354 t = mpi_wtime() 1355 n_inf = n_sup + 1 1356 IF (nn == nb_bloc) THEN 1357 n_up = np_glob - n_inf + 1 1358 ELSE 1359 n_up = np_alloc 1360 END IF 1361 n_sup = n_inf + n_up - 1 1362 1363 DO i = 1, m_max_c 1364 dist_field(i,1:nb_field,1:n_up) = transpose(v1_in(n_inf:n_sup,:,i)) 1365 dist_field(i,nb_field+1:2*nb_field,1:n_up) = transpose(v2_in(n_inf:n_sup,:,i)) 1366 END DO 1367 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 1368 1369 IF (np/=np_tot) dist_field(:,:,np+1:np_tot) = 1.d100 1370 1371 longueur_tranche=bloc_size*m_max_c*nb_field*2 1372 1373 t = mpi_wtime() 1374 mpid=mpi_double_precision 1375 CALL mpi_alltoall(dist_field,longueur_tranche, mpid, combined_field, longueur_tranche, & 1376 mpid, communicator, code) 1377 IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t 1378 1379 t = mpi_wtime() 1380 !JLG, FEB 4, 2011 1381 cu = 0.d0 1382 !JLG, FEB 4, 2011 1383 DO n = 1, bloc_size 1384 DO nb = 1, nb_procs 1385 shiftc = (nb-1)*bloc_size 1386 shiftl = (nb-1)*m_max_c 1387 jindex = n + shiftc 1388 DO nf = 1, nb_field 1389 ! Put real and imaginary parts in a complex 1390 ! nf=1,2,3 => V1_in 1391 ! nf=4,5,6 => V2_in 1392 ! INPUT ARE COSINE AND SINE COEFFICIENTS 1393 ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2 1394 cu(shiftl+1:shiftl+m_max_c,nf,n) = cmplx(combined_field(:,2*nf-1,jindex),& 1395 -combined_field(:,2*nf,jindex),kind=8)/2 1396 END DO 1397 END DO 1398 END DO 1399 cu(1,:,:) = 2*cmplx(REAL(cu(1,:,:),KIND=8),0.d0,kind=8) 1400 !JLG, FEB 4, 2011 1401 !Padding is done by initialization of cu: cu = 0 1402 !This is eequivalent to cu(m_max+1:m_max_pad,:,:) = 0.d0 1403 !JLG, FEB 4, 2011 1404 1405 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 1406 1407 ! Set the parameters for dfftw 1408 fft_dim=1; istride=1; ostride=1; 1409 !JLG, FEB 4, 2011 1410 !!$ idist=N_r; inembed(1)=N_r; DIM(1)=N_r
1411 !!$odist=m_max; onembed(1)=m_max 1412 idist=n_r_pad; inembed(1)=n_r_pad; dim(1)=n_r_pad 1413 odist=m_max_pad; onembed(1)=m_max_pad 1414 !JLG, FEB 4, 2011 1415 1416 howmany=bloc_size*nb_field 1417 1418 1419 t = mpi_wtime() 1420 IF (once) CALL dfftw_plan_many_dft_c2r(fftw_plan_multi_c2r, fft_dim, dim, howmany, cu, & 1421 onembed, ostride, odist, ru, inembed, istride, idist, fftw_estimate) 1422 !write(*,*) ' FFT_PAR_CROSS_PROD: fftw_plan_multi_c2r', fftw_plan_multi_c2r 1423 CALL dfftw_execute(fftw_plan_multi_c2r) 1424 1425 ! CROSS PRODDUCT 1426 IF (nb_field==6) THEN 1427 prod_ru(:,1,:) = ru(:,2,:)*ru(:,6,:) - ru(:,3,:)*ru(:,5,:) 1428 prod_ru(:,2,:) = ru(:,3,:)*ru(:,4,:) - ru(:,1,:)*ru(:,6,:) 1429 prod_ru(:,3,:) = ru(:,1,:)*ru(:,5,:) - ru(:,2,:)*ru(:,4,:) 1430 END IF 1431 ! CROSS PRODUCT 1432 1433 howmany = howmany/2 1434 IF (once) CALL dfftw_plan_many_dft_r2c(fftw_plan_multi_r2c, fft_dim, dim, howmany, prod_ru, & 1435 inembed, istride, idist, prod_cu, onembed, ostride, odist, fftw_estimate) 1436 !write(*,*) ' FFT_PAR_CROSS_PROD: fftw_plan_multi_r2c', fftw_plan_multi_r2c 1437 CALL dfftw_execute(fftw_plan_multi_r2c) 1438 !JLG, FEB 4, 2011 1439 !!$ prod_cu = prod_cu/N_r !Scaling
1440  prod_cu = prod_cu/n_r_pad !Scaling
1441  !JLG, FEB 4, 2011
1442  IF (present(temps)) temps(2) = temps(2) + mpi_wtime() -t
1443
1444  !Now we need to redistribute the Fourier coefficients on each processor
1445
1446  t = mpi_wtime()
1447  combined_prod_cu(:,:,1)=prod_cu(1,:,:)
1448  DO n=2, m_max
1449  !combined_prod_cu(:,:,n)=prod_cu(n,:,:)
1450  combined_prod_cu(:,:,n)=2*conjg(prod_cu(n,:,:))
1451  END DO
1452
1453  IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t
1454
1455  t = mpi_wtime()
1456  longueur_tranche=bloc_size*m_max_c*nb_field
1457  mpid=mpi_double_precision
1458  CALL mpi_alltoall(combined_prod_cu,longueur_tranche,mpid, dist_prod_cu,longueur_tranche, &
1459  mpid,communicator,code)
1460  IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t
1461  ! dimensions:
1462  t = mpi_wtime()
1463  DO i = 1, m_max_c
1464  DO nb = 1, nb_procs
1465  shiftc = (nb-1)*bloc_size
1466  shiftl = (nb-1)*m_max_c
1467  intermediate = dist_prod_cu(:,:,shiftl+i)
1468  DO n = 1, bloc_size
1469  IF (n_inf-1+n+shiftc > np_glob ) cycle
1470  DO i_field = 1, nb_field/2
1471  v_out(n_inf-1+n+shiftc, i_field*2-1, i) = REAL (intermediate(i_field,n),kind=8)
1472  v_out(n_inf-1+n+shiftc, i_field*2 , i) = aimag(intermediate(i_field,n))
1473  END DO
1474  END DO
1475  END DO
1476  END DO
1477  IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t
1478
1479  once = .false.
1480
1481  END DO
1482
1483  DEALLOCATE(dist_field, combined_field, dist_prod_cu, combined_prod_cu, intermediate, out_prod_cu)
1484
1485  END SUBROUTINE fft_par_cross_prod
1486
1487  SUBROUTINE fft_par_dot_prod(communicator,V1_in, V2_in, c_out, temps, padding)
1488  !FFT (FFT(-1) V1 . FFT(-1) V2) = c_out
1489  !This a de-aliased version of the code, FEB 4, 2011, JLG
1490  IMPLICIT NONE
1491  include 'fftw3.f'
1492  ! Format: V_1in(1:np,1:6,1:m_max_c)
1493  ! INPUT ARE COSINE AND SINE COEFFICIENTS
1494  ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2
1495  REAL(KIND=8), DIMENSION(:,:,:), INTENT(IN) :: v1_in, v2_in
1496  REAL(KIND=8), DIMENSION(:,:,:), INTENT(OUT) :: c_out
1497  REAL(KIND=8), DIMENSION(:), OPTIONAL, INTENT(INOUT) :: temps
1498  LOGICAL, OPTIONAL, INTENT(IN) :: padding
1499  ! Saved variables
1500  LOGICAL, SAVE :: once=.true.
1501  INTEGER, SAVE :: np_ref, np, np_tot, bloc_size, nb_field, &
1502  m_max, m_max_c, n_r, rang, nb_procs, mpid, m_max_pad, n_r_pad
1503  INTEGER(KIND=8), SAVE :: fftw_plan_multi_c2r, fftw_plan_multi_r2c
1504  COMPLEX(KIND=8), ALLOCATABLE, DIMENSION(:,:,:), SAVE :: cu
1505  REAL(KIND=8), ALLOCATABLE, DIMENSION(:,:,:), SAVE :: ru
1506  COMPLEX(KIND=8), ALLOCATABLE, DIMENSION(:,:), SAVE :: prod_cu
1507  REAL(KIND=8), ALLOCATABLE, DIMENSION(:,:), SAVE :: prod_ru
1508  ! End saved variables
1509
1510  INTEGER :: np_glob, np_loc, reste, np_alloc, nn, nb_bloc, n_sup, n_inf, n_up, n_cache
1511  INTEGER :: nb, nf, shiftc, shiftl, jindex, longueur_tranche, i, n, code
1512  REAL(KIND=8) :: t
1513  REAL(KIND=8), ALLOCATABLE, DIMENSION(:,:,:) :: dist_field, combined_field
1514  COMPLEX(KIND=8), ALLOCATABLE, DIMENSION(:,:) :: combined_prod_cu, dist_prod_cu, out_prod_cu
1515
1516  !Vectors to speed up the format changes
1517  COMPLEX(KIND=8), ALLOCATABLE, DIMENSION(:) :: intermediate
1518  !Vectors to speed up the format changes
1519
1520  ! FFTW parameters
1521  INTEGER :: fft_dim, howmany, istride, ostride, idist, odist
1522  INTEGER, DIMENSION(1) :: dim, inembed, onembed
1523  ! Recall complexes must be rescaled
1524  ! End FFTW parameters
1525
1526  !Temps(1) = Temps de communication
1527  !Temps(2) = Temps de calcul
1528  !Temps(3) = Temps de changement de format
1529
1530  !EXTERNAL hostnm
1531  !EXTERNAL gethostname
1532 #include "petsc/finclude/petsc.h"
1533  mpi_comm :: communicator
1534
1535  CALL mpi_comm_size(communicator,nb_procs,code)
1536  CALL mpi_comm_rank(communicator,rang,code)
1537
1538  IF (present(temps)) temps = 0.d0
1539
1540  IF (.NOT.once) THEN
1541  IF (SIZE(v1_in,1).NE.np_ref) THEN
1542  once = .true. !Something wrong happened, reinitialize
1543  np_ref = SIZE(v1_in,1)
1544  END IF
1545  END IF
1546
1547  once = .true.
1548  n_cache = 150000
1549  np_glob = SIZE(v1_in,1)
1550  m_max_c = SIZE(v1_in,3)
1551  np_loc = n_cache/(12*m_max_c)
1552  nb_bloc = max(np_glob/np_loc,1)
1553
1554 100 np_loc = np_glob/nb_bloc
1555  reste = np_glob - np_loc*nb_bloc
1556  np_alloc = np_loc + reste
1557  reste = np_alloc*nb_bloc - np_glob
1558  IF (reste>np_alloc) THEN
1559  nb_bloc = nb_bloc - 1
1560  go to 100
1561  END IF
1562  ! nb_bloc = nbre de blocs qui decoupe le plan meridien
1563  ! np_alloc = nbre de points ds 1 bloc
1564
1565  n_sup = 0
1566  DO nn= 1, nb_bloc
1567  IF (once) THEN
1568  nb_field= SIZE(v1_in,2) ! Number of fields
1569  m_max_c = SIZE(v1_in,3) ! Number of complex (cosines + sines) coefficients per point
1570  m_max = m_max_c*nb_procs! Number of comlex coefficients per point per processor
1571  n_r=2*m_max-1 ! Number of Real coefficients per point
1572  IF (mod(nb_field,2)/=0 .OR. m_max_c==0) THEN
1573  WRITE(*,*) ' BUG '
1574  stop
1575  END IF
1576
1577  ! Bloc_size is the number of points that are handled by one processor
1578  ! once the Fourier modes are all collected
1579  ! Computation of bloc_size and np_tot
1580  np = np_alloc
1581  IF (modulo(np,nb_procs)==0) THEN
1582  bloc_size = np/nb_procs
1583  ELSE
1584  bloc_size = np/nb_procs + 1
1585  END IF
1586  np_tot = nb_procs*bloc_size
1587  ! fin de la repartition des points
1588
1589
1590  !JLG, FEB 4, 2011
1591  !Only ru, cu, prod_ru, prod_cu are modified
1592  IF (present(padding)) THEN
1593  IF (padding) THEN
1594  m_max_pad = 3*m_max/2
1595  ELSE
1596  WRITE(*,*) ' NO PADDING '
1597  m_max_pad = m_max
1598  END IF
1599  ELSE
1600  m_max_pad = 3*m_max/2
1601  END IF
1603
1604  IF (ALLOCATED(ru)) DEALLOCATE(ru,cu,prod_ru,prod_cu)
1609  !JLG, FEB 4, 2001
1610  ALLOCATE(intermediate(bloc_size))
1611  ALLOCATE( dist_field(m_max_c,2*nb_field,np_tot))
1612  ALLOCATE(combined_field(m_max_c,2*nb_field,np_tot))
1613  ALLOCATE(dist_prod_cu(bloc_size,m_max))
1614  ALLOCATE(combined_prod_cu(bloc_size,m_max))
1615  ALLOCATE(out_prod_cu(m_max_c,np_tot))
1616  END IF
1617
1618  ! Packing all 3 complex components of both v1 and v2 input fields
1619  ! into dist_field, where the dimension indexing the nodal points varies the least rapidly,
1620  ! so that after distributing the data to the processes, each one will obtain a part
1621  ! on nodal points
1622  ! TRANSPOSE pr que la variable i associee aux modes soit la 1ere sur laquelle on va faire la FFT
1623  t = mpi_wtime()
1624  n_inf = n_sup + 1
1625  IF (nn == nb_bloc) THEN
1626  n_up = np_glob - n_inf + 1
1627  ELSE
1628  n_up = np_alloc
1629  END IF
1630  n_sup = n_inf + n_up - 1
1631
1632  DO i = 1, m_max_c
1633  dist_field(i,1:nb_field,1:n_up) = transpose(v1_in(n_inf:n_sup,:,i))
1634  dist_field(i,nb_field+1:2*nb_field,1:n_up) = transpose(v2_in(n_inf:n_sup,:,i))
1635  END DO
1636  IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t
1637
1638  IF (np/=np_tot) dist_field(:,:,np+1:np_tot) = 1.d100
1639
1640  longueur_tranche=bloc_size*m_max_c*nb_field*2
1641
1642  t = mpi_wtime()
1643  mpid=mpi_double_precision
1644  CALL mpi_alltoall(dist_field,longueur_tranche, mpid, combined_field, longueur_tranche, &
1645  mpid, communicator, code)
1646  IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t
1647
1648  t = mpi_wtime()
1649  !JLG, FEB 4, 2011
1650  cu = 0.d0
1651  !JLG, FEB 4, 2011
1652  DO n = 1, bloc_size
1653  DO nb = 1, nb_procs
1654  shiftc = (nb-1)*bloc_size
1655  shiftl = (nb-1)*m_max_c
1656  jindex = n + shiftc
1657  DO nf = 1, nb_field
1658  ! Put real and imaginary parts in a complex
1659  ! nf=1,2,3 => V1_in
1660  ! nf=4,5,6 => V2_in
1661  ! INPUT ARE COSINE AND SINE COEFFICIENTS
1662  ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2
1663  cu(shiftl+1:shiftl+m_max_c,nf,n) = cmplx(combined_field(:,2*nf-1,jindex),&
1664  -combined_field(:,2*nf,jindex),kind=8)/2
1665  END DO
1666  END DO
1667  END DO
1668  cu(1,:,:) = 2*cmplx(REAL(cu(1,:,:),KIND=8),0.d0,kind=8)
1669  !JLG, FEB 4, 2011
1670  !Padding is done by initialization of cu: cu = 0
1671  !This is eequivalent to cu(m_max+1:m_max_pad,:,:) = 0.d0
1672  !JLG, FEB 4, 2011
1673
1674  IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t
1675
1676  ! Set the parameters for dfftw
1677  fft_dim=1; istride=1; ostride=1;
1678  !JLG, FEB 4, 2011
1679 !!$idist=N_r; inembed(1)=N_r; DIM(1)=N_r 1680 !!$ odist=m_max; onembed(1)=m_max
1683  !JLG, FEB 4, 2011
1684
1685  howmany=bloc_size*nb_field
1686
1687
1688  t = mpi_wtime()
1689  IF (once) CALL dfftw_plan_many_dft_c2r(fftw_plan_multi_c2r, fft_dim, dim, howmany, cu, &
1690  onembed, ostride, odist, ru, inembed, istride, idist, fftw_estimate)
1691 !write(*,*) ' FFT_PAR_DOT_PROD: fftw_plan_multi_c2r', fftw_plan_multi_c2r
1692  CALL dfftw_execute(fftw_plan_multi_c2r)
1693
1694  ! DOT PRODDUCT
1695  IF (nb_field==6) THEN
1696  prod_ru(:,:) = ru(:,1,:)*ru(:,4,:) + ru(:,2,:)*ru(:,5,:) + ru(:,3,:)*ru(:,6,:)
1697  END IF
1698  ! DOT PRODUCT
1699
1700  howmany = bloc_size*1
1701  IF (once) CALL dfftw_plan_many_dft_r2c(fftw_plan_multi_r2c, fft_dim, dim, howmany, prod_ru, &
1702  inembed, istride, idist, prod_cu, onembed, ostride, odist, fftw_estimate)
1703 !write(*,*) ' FFT_PAR_DOT_PROD: fftw_plan_multi_r2c', fftw_plan_multi_r2c
1704  CALL dfftw_execute(fftw_plan_multi_r2c)
1705  !JLG, FEB 4, 2011
1706 !!$prod_cu = prod_cu/N_r !Scaling 1707 prod_cu = prod_cu/n_r_pad !Scaling 1708 !JLG, FEB 4, 2011 1709 IF (present(temps)) temps(2) = temps(2) + mpi_wtime() -t 1710 1711 !Now we need to redistribute the Fourier coefficients on each processor 1712 t = mpi_wtime() 1713 combined_prod_cu(:,1)=prod_cu(1,:) 1714 DO n=2, m_max 1715 combined_prod_cu(:,n)=2*conjg(prod_cu(n,:)) 1716 END DO 1717 1718 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 1719 1720 t = mpi_wtime() 1721 longueur_tranche=bloc_size*m_max_c*2 1722 mpid=mpi_double_precision 1723 CALL mpi_alltoall(combined_prod_cu,longueur_tranche,mpid, dist_prod_cu,longueur_tranche, & 1724 mpid,communicator,code) 1725 IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t 1726 ! dimensions: 1727 1728 t = mpi_wtime() 1729 DO i = 1, m_max_c 1730 DO nb = 1, nb_procs 1731 shiftc = (nb-1)*bloc_size 1732 shiftl = (nb-1)*m_max_c 1733 intermediate = dist_prod_cu(:,shiftl+i) 1734 DO n = 1, bloc_size 1735 IF (n_inf-1+n+shiftc > np_glob ) cycle 1736 c_out(n_inf-1+n+shiftc, 1, i) = REAL (intermediate(n),kind=8) 1737 c_out(n_inf-1+n+shiftc, 2 , i) = aimag(intermediate(n)) 1738 END DO 1739 END DO 1740 END DO 1741 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 1742 1743 once = .false. 1744 1745 END DO 1746 1747 DEALLOCATE(dist_field, combined_field, dist_prod_cu, combined_prod_cu, intermediate, out_prod_cu) 1748 1749 END SUBROUTINE fft_par_dot_prod 1750 1751 SUBROUTINE fft_par_prod(communicator, c1_in, c2_in, c_out, temps, padding) 1752 !FFT (FFT(-1) V1 . FFT(-1) V2) = c_out 1753 !This a de-aliased version of the code, FEB 4, 2011, JLG 1754 IMPLICIT NONE 1755 include 'fftw3.f' 1756 ! Format: c_1in(1:np,1:2,1:m_max_c) 1757 ! INPUT ARE COSINE AND SINE COEFFICIENTS 1758 ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2 1759 REAL(KIND=8), DIMENSION(:,:,:), INTENT(IN) :: c1_in, c2_in 1760 REAL(KIND=8), DIMENSION(:,:,:), INTENT(OUT) :: c_out 1761 REAL(KIND=8), DIMENSION(:), OPTIONAL, INTENT(INOUT) :: temps 1762 LOGICAL, OPTIONAL, INTENT(IN) :: padding 1763 ! Saved variables 1764 LOGICAL, SAVE :: once=.true. 1765 INTEGER, SAVE :: np_ref, np, np_tot, bloc_size, & 1766 m_max, m_max_c, n_r, rang, nb_procs, mpid, m_max_pad, n_r_pad 1767 INTEGER(KIND=8), SAVE :: fftw_plan_multi_c2r, fftw_plan_multi_r2c 1768 COMPLEX(KIND=8), ALLOCATABLE, DIMENSION(:,:,:), SAVE :: cu 1769 REAL(KIND=8), ALLOCATABLE, DIMENSION(:,:,:), SAVE :: ru 1770 COMPLEX(KIND=8), ALLOCATABLE, DIMENSION(:,:), SAVE :: prod_cu 1771 REAL(KIND=8), ALLOCATABLE, DIMENSION(:,:), SAVE :: prod_ru 1772 ! End saved variables 1773 1774 INTEGER :: np_glob, np_loc, reste, np_alloc, nn, nb_bloc, n_sup, n_inf, n_up, n_cache 1775 INTEGER :: nb, nf, shiftc, shiftl, jindex, longueur_tranche, i, n, code 1776 REAL(KIND=8) :: t 1777 REAL(KIND=8), ALLOCATABLE, DIMENSION(:,:,:) :: dist_field, combined_field 1778 COMPLEX(KIND=8), ALLOCATABLE, DIMENSION(:,:) :: combined_prod_cu, dist_prod_cu, out_prod_cu 1779 1780 !Vectors to speed up the format changes 1781 COMPLEX(KIND=8), ALLOCATABLE, DIMENSION(:) :: intermediate 1782 !Vectors to speed up the format changes 1783 1784 ! FFTW parameters 1785 INTEGER :: fft_dim, howmany, istride, ostride, idist, odist 1786 INTEGER, DIMENSION(1) :: dim, inembed, onembed 1787 ! Recall complexes must be rescaled 1788 ! End FFTW parameters 1789 1790 !Temps(1) = Temps de communication 1791 !Temps(2) = Temps de calcul 1792 !Temps(3) = Temps de changement de format 1793 1794 !EXTERNAL hostnm 1795 !EXTERNAL gethostname 1796 #include "petsc/finclude/petsc.h" 1797 mpi_comm :: communicator 1798 1799 CALL mpi_comm_size(communicator,nb_procs,code) 1800 CALL mpi_comm_rank(communicator,rang,code) 1801 1802 IF (present(temps)) temps = 0.d0 1803 1804 IF (.NOT.once) THEN 1805 IF (SIZE(c1_in,1).NE.np_ref) THEN 1806 once = .true. !Something wrong happened, reinitialize 1807 np_ref = SIZE(c1_in,1) 1808 END IF 1809 END IF 1810 1811 once = .true. 1812 n_cache = 150000 1813 np_glob = SIZE(c1_in,1) 1814 m_max_c = SIZE(c1_in,3) 1815 np_loc = n_cache/(12*m_max_c) 1816 nb_bloc = max(np_glob/np_loc,1) 1817 1818 100 np_loc = np_glob/nb_bloc 1819 reste = np_glob - np_loc*nb_bloc 1820 np_alloc = np_loc + reste 1821 reste = np_alloc*nb_bloc - np_glob 1822 IF (reste>np_alloc) THEN 1823 nb_bloc = nb_bloc - 1 1824 go to 100 1825 END IF 1826 ! nb_bloc = nbre de blocs qui decoupe le plan meridien 1827 ! np_alloc = nbre de points ds 1 bloc 1828 1829 n_sup = 0 1830 DO nn= 1, nb_bloc 1831 IF (once) THEN 1832 m_max_c = SIZE(c1_in,3) ! Number of complex (cosines + sines) coefficients per point 1833 m_max = m_max_c*nb_procs! Number of comlex coefficients per point per processor 1834 n_r=2*m_max-1 ! Number of Real coefficients per point 1835 IF (m_max_c==0) THEN 1836 WRITE(*,*) ' BUG ' 1837 stop 1838 END IF 1839 1840 ! Bloc_size is the number of points that are handled by one processor 1841 ! once the Fourier modes are all collected 1842 ! Computation of bloc_size and np_tot 1843 np = np_alloc 1844 IF (modulo(np,nb_procs)==0) THEN 1845 bloc_size = np/nb_procs 1846 ELSE 1847 bloc_size = np/nb_procs + 1 1848 END IF 1849 np_tot = nb_procs*bloc_size 1850 ! fin de la repartition des points 1851 1852 1853 !JLG, FEB 4, 2011 1854 !Only ru, cu, prod_ru, prod_cu are modified 1855 IF (present(padding)) THEN 1856 IF (padding) THEN 1857 m_max_pad = 3*m_max/2 1858 ELSE 1859 WRITE(*,*) ' NO PADDING ' 1860 m_max_pad = m_max 1861 END IF 1862 ELSE 1863 m_max_pad = 3*m_max/2 1864 END IF 1865 n_r_pad=2*m_max_pad-1 1866 1867 IF (ALLOCATED(ru)) DEALLOCATE(ru,cu,prod_ru,prod_cu) 1868 ALLOCATE(cu(m_max_pad,2,bloc_size)) 1869 ALLOCATE(ru(n_r_pad, 2,bloc_size)) 1870 ALLOCATE(prod_cu(m_max_pad,bloc_size)) 1871 ALLOCATE(prod_ru(n_r_pad, bloc_size)) 1872 !JLG, FEB 4, 2001 1873 ALLOCATE(intermediate(bloc_size)) 1874 ALLOCATE( dist_field(m_max_c,4,np_tot)) 1875 ALLOCATE(combined_field(m_max_c,4,np_tot)) 1876 ALLOCATE(dist_prod_cu(bloc_size,m_max)) 1877 ALLOCATE(combined_prod_cu(bloc_size,m_max)) 1878 ALLOCATE(out_prod_cu(m_max_c,np_tot)) 1879 END IF 1880 1881 ! Packing all 3 complex components of both v1 and v2 input fields 1882 ! into dist_field, where the dimension indexing the nodal points varies the least rapidly, 1883 ! so that after distributing the data to the processes, each one will obtain a part 1884 ! on nodal points 1885 ! TRANSPOSE pr que la variable i associee aux modes soit la 1ere sur laquelle on va faire la FFT 1886 t = mpi_wtime() 1887 n_inf = n_sup + 1 1888 IF (nn == nb_bloc) THEN 1889 n_up = np_glob - n_inf + 1 1890 ELSE 1891 n_up = np_alloc 1892 END IF 1893 n_sup = n_inf + n_up - 1 1894 1895 DO i = 1, m_max_c 1896 dist_field(i,1:2,1:n_up) = transpose(c1_in(n_inf:n_sup,:,i)) 1897 dist_field(i,3:4,1:n_up) = transpose(c2_in(n_inf:n_sup,:,i)) 1898 END DO 1899 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 1900 1901 IF (np/=np_tot) dist_field(:,:,np+1:np_tot) = 1.d100 1902 1903 longueur_tranche=bloc_size*m_max_c*4 1904 1905 t = mpi_wtime() 1906 mpid=mpi_double_precision 1907 CALL mpi_alltoall(dist_field,longueur_tranche, mpid, combined_field, longueur_tranche, & 1908 mpid, communicator, code) 1909 IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t 1910 1911 t = mpi_wtime() 1912 !JLG, FEB 4, 2011 1913 cu = 0.d0 1914 !JLG, FEB 4, 2011 1915 DO n = 1, bloc_size 1916 DO nb = 1, nb_procs 1917 shiftc = (nb-1)*bloc_size 1918 shiftl = (nb-1)*m_max_c 1919 jindex = n + shiftc 1920 DO nf = 1, 2 1921 ! Put real and imaginary parts in a complex 1922 ! nf=1 => c1_in 1923 ! nf=2 => c2_in 1924 ! INPUT ARE COSINE AND SINE COEFFICIENTS 1925 ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2 1926 cu(shiftl+1:shiftl+m_max_c,nf,n) = cmplx(combined_field(:,2*nf-1,jindex),& 1927 -combined_field(:,2*nf,jindex),kind=8)/2 1928 END DO 1929 END DO 1930 END DO 1931 cu(1,:,:) = 2*cmplx(REAL(cu(1,:,:),KIND=8),0.d0,kind=8) 1932 !JLG, FEB 4, 2011 1933 !Padding is done by initialization of cu: cu = 0 1934 !This is eequivalent to cu(m_max+1:m_max_pad,:,:) = 0.d0 1935 !JLG, FEB 4, 2011 1936 1937 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 1938 1939 ! Set the parameters for dfftw 1940 fft_dim=1; istride=1; ostride=1; 1941 !JLG, FEB 4, 2011 1942 !!$ idist=N_r; inembed(1)=N_r; DIM(1)=N_r
1943 !!$odist=m_max; onembed(1)=m_max 1944 idist=n_r_pad; inembed(1)=n_r_pad; dim(1)=n_r_pad 1945 odist=m_max_pad; onembed(1)=m_max_pad 1946 !JLG, FEB 4, 2011 1947 1948 howmany=bloc_size*2 1949 1950 t = mpi_wtime() 1951 IF (once) CALL dfftw_plan_many_dft_c2r(fftw_plan_multi_c2r, fft_dim, dim, howmany, cu, & 1952 onembed, ostride, odist, ru, inembed, istride, idist, fftw_estimate) 1953 CALL dfftw_execute(fftw_plan_multi_c2r) 1954 1955 !PRODDUCT 1956 prod_ru(:,:) = ru(:,1,:)*ru(:,2,:) 1957 !PRODUCT 1958 1959 howmany = bloc_size*1 1960 IF (once) CALL dfftw_plan_many_dft_r2c(fftw_plan_multi_r2c, fft_dim, dim, howmany, prod_ru, & 1961 inembed, istride, idist, prod_cu, onembed, ostride, odist, fftw_estimate) 1962 CALL dfftw_execute(fftw_plan_multi_r2c) 1963 !JLG, FEB 4, 2011 1964 !!$ prod_cu = prod_cu/N_r !Scaling
1965  prod_cu = prod_cu/n_r_pad !Scaling
1966  !JLG, FEB 4, 2011
1967  IF (present(temps)) temps(2) = temps(2) + mpi_wtime() -t
1968
1969  !Now we need to redistribute the Fourier coefficients on each processor
1970  t = mpi_wtime()
1971  combined_prod_cu(:,1)=prod_cu(1,:)
1972  DO n=2, m_max
1973  combined_prod_cu(:,n)=2*conjg(prod_cu(n,:))
1974  END DO
1975
1976  IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t
1977
1978  t = mpi_wtime()
1979  longueur_tranche=bloc_size*m_max_c*2
1980  mpid=mpi_double_precision
1981  CALL mpi_alltoall(combined_prod_cu,longueur_tranche,mpid, dist_prod_cu,longueur_tranche, &
1982  mpid,communicator,code)
1983  IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t
1984  ! dimensions:
1985  t = mpi_wtime()
1986  DO i = 1, m_max_c
1987  DO nb = 1, nb_procs
1988  shiftc = (nb-1)*bloc_size
1989  shiftl = (nb-1)*m_max_c
1990  intermediate = dist_prod_cu(:,shiftl+i)
1991  DO n = 1, bloc_size
1992  IF (n_inf-1+n+shiftc > np_glob ) cycle
1993  c_out(n_inf-1+n+shiftc, 1, i) = REAL (intermediate(n),kind=8)
1994  c_out(n_inf-1+n+shiftc, 2 , i) = aimag(intermediate(n))
1995  END DO
1996  END DO
1997  END DO
1998  IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t
1999
2000  once = .false.
2001
2002  END DO
2003
2004  DEALLOCATE(dist_field, combined_field, dist_prod_cu, combined_prod_cu, intermediate, out_prod_cu)
2005
2006  END SUBROUTINE fft_par_prod
2007
2008  SUBROUTINE fft_par_allen_cahn(communicator, c_in, c_out, temps, padding)
2009  !This a de-aliased version of the code, FEB 4, 2011, JLG
2010  IMPLICIT NONE
2011  include 'fftw3.f'
2012  ! Format: V_1in(1:np,1:6,1:m_max_c)
2013  ! INPUT ARE COSINE AND SINE COEFFICIENTS
2014  ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2
2015  REAL(KIND=8), DIMENSION(:,:,:), INTENT(IN) :: c_in
2016  REAL(KIND=8), DIMENSION(:,:,:), INTENT(OUT) :: c_out
2017  REAL(KIND=8), DIMENSION(:), OPTIONAL, INTENT(INOUT) :: temps
2018  LOGICAL, OPTIONAL, INTENT(IN) :: padding
2019  ! Saved variables
2020  LOGICAL, SAVE :: once=.true.
2021  INTEGER, SAVE :: np_ref, np, np_tot, bloc_size, &
2022  m_max, m_max_c, n_r, rang, nb_procs, mpid, m_max_pad, n_r_pad
2023  INTEGER(KIND=8), SAVE :: fftw_plan_multi_c2r, fftw_plan_multi_r2c
2024  COMPLEX(KIND=8), ALLOCATABLE, DIMENSION(:,:), SAVE :: cu
2025  REAL(KIND=8), ALLOCATABLE, DIMENSION(:,:), SAVE :: ru
2026  COMPLEX(KIND=8), ALLOCATABLE, DIMENSION(:,:), SAVE :: prod_cu
2027  REAL(KIND=8), ALLOCATABLE, DIMENSION(:,:), SAVE :: prod_ru
2028  ! End saved variables
2029
2030  INTEGER :: np_glob, np_loc, reste, np_alloc, nn, nb_bloc, n_sup, n_inf, n_up, n_cache
2031  INTEGER :: nb, shiftc, shiftl, jindex, longueur_tranche, i, n, code
2032  REAL(KIND=8) :: t
2033  REAL(KIND=8), ALLOCATABLE, DIMENSION(:,:,:) :: dist_field, combined_field
2034  COMPLEX(KIND=8), ALLOCATABLE, DIMENSION(:,:) :: combined_prod_cu, dist_prod_cu
2035
2036  !Vectors to speed up the format changes
2037  COMPLEX(KIND=8), ALLOCATABLE, DIMENSION(:) :: intermediate
2038  !Vectors to speed up the format changes
2039
2040  ! FFTW parameters
2041  INTEGER :: fft_dim, howmany, istride, ostride, idist, odist
2042  INTEGER, DIMENSION(1) :: dim, inembed, onembed
2043  ! Recall complexes must be rescaled
2044  ! End FFTW parameters
2045
2046  !Temps(1) = Temps de communication
2047  !Temps(2) = Temps de calcul
2048  !Temps(3) = Temps de changement de format
2049
2050  !EXTERNAL hostnm
2051  !EXTERNAL gethostname
2052 #include "petsc/finclude/petsc.h"
2053  mpi_comm :: communicator
2054
2055  CALL mpi_comm_size(communicator,nb_procs,code)
2056  CALL mpi_comm_rank(communicator,rang,code)
2057
2058  IF (present(temps)) temps = 0.d0
2059
2060  IF (.NOT.once) THEN
2061  IF (SIZE(c_in,1).NE.np_ref) THEN
2062  once = .true. !Something wrong happened, reinitialize
2063  np_ref = SIZE(c_in,1)
2064  END IF
2065  END IF
2066
2067  once = .true.
2068  n_cache = 150000
2069  np_glob = SIZE(c_in,1)
2070  m_max_c = SIZE(c_in,3)
2071  np_loc = n_cache/(12*m_max_c)
2072  nb_bloc = max(np_glob/np_loc,1)
2073
2074 100 np_loc = np_glob/nb_bloc
2075  reste = np_glob - np_loc*nb_bloc
2076  np_alloc = np_loc + reste
2077  reste = np_alloc*nb_bloc - np_glob
2078  IF (reste>np_alloc) THEN
2079  nb_bloc = nb_bloc - 1
2080  go to 100
2081  END IF
2082  ! nb_bloc = nbre de blocs qui decoupe le plan meridien
2083  ! np_alloc = nbre de points ds 1 bloc
2084
2085  n_sup = 0
2086  DO nn= 1, nb_bloc
2087  IF (once) THEN
2088  m_max_c = SIZE(c_in,3) ! Number of complex (cosines + sines) coefficients per point
2089  m_max = m_max_c*nb_procs! Number of comlex coefficients per point per processor
2090  n_r=2*m_max-1 ! Number of Real coefficients per point
2091  IF (m_max_c==0) THEN
2092  WRITE(*,*) ' BUG '
2093  stop
2094  END IF
2095
2096  ! Bloc_size is the number of points that are handled by one processor
2097  ! once the Fourier modes are all collected
2098  ! Computation of bloc_size and np_tot
2099  np = np_alloc
2100  IF (modulo(np,nb_procs)==0) THEN
2101  bloc_size = np/nb_procs
2102  ELSE
2103  bloc_size = np/nb_procs + 1
2104  END IF
2105  np_tot = nb_procs*bloc_size
2106  ! fin de la repartition des points
2107
2108
2109  !JLG, FEB 4, 2011
2110  !Only ru, cu, prod_ru, prod_cu are modified
2111  IF (present(padding)) THEN
2112  IF (padding) THEN
2113  m_max_pad = 3*m_max/2
2114  ELSE
2115  WRITE(*,*) ' NO PADDING '
2116  m_max_pad = m_max
2117  END IF
2118  ELSE
2119  m_max_pad = 3*m_max/2
2120  END IF
2122
2123  IF (ALLOCATED(ru)) DEALLOCATE(ru,cu,prod_ru,prod_cu)
2128  !JLG, FEB 4, 2001
2129  ALLOCATE(intermediate(bloc_size))
2130  ALLOCATE( dist_field(m_max_c,2,np_tot))
2131  ALLOCATE(combined_field(m_max_c,2,np_tot))
2132  ALLOCATE(dist_prod_cu(bloc_size,m_max))
2133  ALLOCATE(combined_prod_cu(bloc_size,m_max))
2134  END IF
2135
2136
2137  ! Packing all complex components of input field
2138  ! into dist_field, where the dimension indexing the nodal points varies the least rapidly,
2139  ! so that after distributing the data to the processes, each one will obtain a part
2140  ! on nodal points
2141  ! TRANSPOSE pr que la variable i associee aux modes soit la 1ere sur laquelle on va faire la FFT
2142  t = mpi_wtime()
2143  n_inf = n_sup + 1
2144  IF (nn == nb_bloc) THEN
2145  n_up = np_glob - n_inf + 1
2146  ELSE
2147  n_up = np_alloc
2148  END IF
2149  n_sup = n_inf + n_up - 1
2150
2151  DO i = 1, m_max_c
2152  dist_field(i,:,1:n_up) = transpose(c_in(n_inf:n_sup,:,i))
2153  END DO
2154  IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t
2155
2156  IF (np/=np_tot) dist_field(:,:,np+1:np_tot) = 1.d100
2157
2158  longueur_tranche=bloc_size*m_max_c*2
2159
2160  t = mpi_wtime()
2161  mpid=mpi_double_precision
2162  CALL mpi_alltoall(dist_field,longueur_tranche, mpid, combined_field, longueur_tranche, &
2163  mpid, communicator, code)
2164  IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t
2165
2166  t = mpi_wtime()
2167  !JLG, FEB 4, 2011
2168  cu = 0.d0
2169  !JLG, FEB 4, 2011
2170  DO n = 1, bloc_size
2171  DO nb = 1, nb_procs
2172  shiftc = (nb-1)*bloc_size
2173  shiftl = (nb-1)*m_max_c
2174  jindex = n + shiftc
2175  ! Put real and imaginary parts in a complex
2176  ! INPUT ARE COSINE AND SINE COEFFICIENTS
2177  ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2
2178  cu(shiftl+1:shiftl+m_max_c,n) = cmplx(combined_field(:,1,jindex),&
2179  -combined_field(:,2,jindex),kind=8)/2
2180  END DO
2181  END DO
2182  cu(1,:) = 2*cmplx(REAL(cu(1,:),KIND=8),0.d0,kind=8)
2183  !JLG, FEB 4, 2011
2184  !Padding is done by initialization of cu: cu = 0
2185  !This is eequivalent to cu(m_max+1:m_max_pad,:,:) = 0.d0
2186  !JLG, FEB 4, 2011
2187
2188  IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t
2189
2190  ! Set the parameters for dfftw
2191  fft_dim=1; istride=1; ostride=1;
2192  !JLG, FEB 4, 2011
2193 !!$idist=N_r; inembed(1)=N_r; DIM(1)=N_r 2194 !!$ odist=m_max; onembed(1)=m_max
2197  !JLG, FEB 4, 2011
2198
2199  howmany=bloc_size
2200  t = mpi_wtime()
2201  IF (once) CALL dfftw_plan_many_dft_c2r(fftw_plan_multi_c2r, fft_dim, dim, howmany, cu, &
2202  onembed, ostride, odist, ru, inembed, istride, idist, fftw_estimate);
2203  CALL dfftw_execute(fftw_plan_multi_c2r)
2204
2205  ! ALLEN CAHN FUNCTION
2206  prod_ru = ru*(ru**2-1)
2207  ! ALLEN CAHN FUNCTION
2208
2209  howmany = bloc_size
2210  IF (once) CALL dfftw_plan_many_dft_r2c(fftw_plan_multi_r2c, fft_dim, dim, howmany, prod_ru, &
2211  inembed, istride, idist, prod_cu, onembed, ostride, odist, fftw_estimate);
2212  CALL dfftw_execute(fftw_plan_multi_r2c)
2213  !JLG, FEB 4, 2011
2214 !!$prod_cu = prod_cu/N_r !Scaling 2215 prod_cu = prod_cu/n_r_pad !Scaling 2216 !JLG, FEB 4, 2011 2217 IF (present(temps)) temps(2) = temps(2) + mpi_wtime() -t 2218 2219 !Now we need to redistribute the Fourier coefficients on each processor 2220 t = mpi_wtime() 2221 combined_prod_cu(:,1)=prod_cu(1,:) 2222 DO n=2, m_max 2223 combined_prod_cu(:,n)=2*conjg(prod_cu(n,:)) 2224 END DO 2225 2226 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 2227 2228 t = mpi_wtime() 2229 longueur_tranche=bloc_size*m_max_c*2 2230 mpid=mpi_double_precision 2231 CALL mpi_alltoall(combined_prod_cu,longueur_tranche,mpid, dist_prod_cu,longueur_tranche, & 2232 mpid,communicator,code) 2233 IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t 2234 ! dimensions: 2235 t = mpi_wtime() 2236 DO i = 1, m_max_c 2237 DO nb = 1, nb_procs 2238 shiftc = (nb-1)*bloc_size 2239 shiftl = (nb-1)*m_max_c 2240 intermediate = dist_prod_cu(:,shiftl+i) 2241 DO n = 1, bloc_size 2242 IF (n_inf-1+n+shiftc > np_glob ) cycle 2243 c_out(n_inf-1+n+shiftc, 1, i) = REAL (intermediate(n),kind=8) 2244 c_out(n_inf-1+n+shiftc, 2, i) = aimag(intermediate(n)) 2245 END DO 2246 END DO 2247 END DO 2248 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 2249 2250 once = .false. 2251 2252 END DO 2253 2254 DEALLOCATE(dist_field, combined_field, dist_prod_cu, combined_prod_cu, intermediate) 2255 2256 END SUBROUTINE fft_par_allen_cahn 2257 2258 2259 SUBROUTINE ref(communicator,V1_in, V2_in, V_out, temps) 2260 IMPLICIT NONE 2261 2262 include 'fftw3.f' 2263 !INCLUDE 'mpif.h' 2264 ! Format: V_1in(1:np,1:6,1:m_max_c) 2265 ! INPUT ARE COSINE AND SINE COEFFICIENTS 2266 ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2 2267 REAL(KIND=8), DIMENSION(:,:,:), INTENT(IN) :: v1_in, v2_in 2268 REAL(KIND=8), DIMENSION(:,:,:), INTENT(OUT) :: v_out 2269 REAL(KIND=8), DIMENSION(:), OPTIONAL, INTENT(INOUT) :: temps 2270 2271 ! Saved variables 2272 LOGICAL, SAVE :: once=.true. 2273 INTEGER, SAVE :: np, np_tot, bloc_size, nb_field, m_max, m_max_c, n_r, rang, nb_procs, mpid 2274 INTEGER(KIND=8), SAVE :: fftw_plan_multi_c2r, fftw_plan_multi_r2c 2275 COMPLEX(KIND=8), ALLOCATABLE, DIMENSION(:,:,:), SAVE :: cu, prod_cu 2276 REAL(KIND=8), ALLOCATABLE, DIMENSION(:,:,:), SAVE :: ru, prod_ru 2277 ! End saved variables 2278 2279 INTEGER :: i_field 2280 INTEGER :: nb, nf, shiftc, shiftl, jindex, longueur_tranche, i, n, code 2281 REAL(KIND=8) :: t 2282 REAL(KIND=8), ALLOCATABLE, DIMENSION(:,:,:), SAVE :: dist_field, combined_field 2283 COMPLEX(KIND=8), ALLOCATABLE, DIMENSION(:,:,:), SAVE :: combined_prod_cu, dist_prod_cu, out_prod_cu 2284 2285 !Vectors to speed up the format changes 2286 COMPLEX(KIND=8), ALLOCATABLE, DIMENSION(:,:) :: intermediate 2287 !REAL(KIND=8), ALLOCATABLE, DIMENSION(:,:) :: V_int 2288 !Vectors to speed up the format changes 2289 2290 ! FFTW parameters 2291 INTEGER :: fft_dim, howmany, istride, ostride, idist, odist 2292 INTEGER, DIMENSION(1) :: dim, inembed, onembed 2293 ! Recall complexes must be rescaled 2294 ! End FFTW parameters 2295 #include "petsc/finclude/petsc.h" 2296 mpi_comm :: communicator 2297 2298 !Temps(1) = Temps de communication 2299 !Temps(2) = Temps de calcul 2300 !Temps(3) = Temps de changement de format 2301 IF (present(temps)) temps = 0.d0 2302 2303 IF (.NOT.once) THEN 2304 IF ((SIZE(v1_in,1).NE.np) .OR. (SIZE(v1_in,2).NE.nb_field) .OR. (SIZE(v1_in,3).NE.m_max_c)) THEN 2305 once = .true. !Something wrong happened, reinitialize 2306 END IF 2307 END IF 2308 2309 IF (once) THEN 2310 CALL mpi_comm_size(communicator,nb_procs,code) 2311 CALL mpi_comm_rank(communicator,rang,code) 2312 2313 np = SIZE(v1_in,1) ! Number of points in the meridian plane 2314 nb_field= SIZE(v1_in,2) ! Number of fields 2315 m_max_c = SIZE(v1_in,3) ! Number of complex (cosines + sines) coefficients per point 2316 m_max = m_max_c*nb_procs! Number of comlex coefficients per point per processor 2317 n_r=2*m_max-1 ! Number of Real coefficients per point 2318 IF (mod(nb_field,2)/=0 .OR. m_max_c==0) THEN 2319 WRITE(*,*) ' BUG ' 2320 stop 2321 END IF 2322 2323 ! Bloc_size is the number of points that are handled by one processor 2324 ! once the Fourier modes are all collected 2325 ! Computation of bloc_size and np_tot 2326 IF (modulo(np,nb_procs)==0) THEN 2327 bloc_size = np/nb_procs 2328 ELSE 2329 bloc_size = np/nb_procs + 1 2330 END IF 2331 np_tot = nb_procs*bloc_size 2332 2333 IF (ALLOCATED(ru)) DEALLOCATE(ru,cu,prod_ru,prod_cu) 2334 ALLOCATE(cu(m_max,nb_field,bloc_size)) 2335 ALLOCATE(ru(n_r, nb_field,bloc_size)) 2336 ALLOCATE(prod_cu(m_max,nb_field/2,bloc_size)) 2337 ALLOCATE(prod_ru(n_r, nb_field/2,bloc_size)) 2338 !ALLOCATE(V_int(nb_field,np)) 2339 !ALLOCATE(V_int(np,nb_field)) 2340 END IF 2341 2342 ALLOCATE(intermediate(nb_field/2,bloc_size)) 2343 ALLOCATE( dist_field(m_max_c,2*nb_field,np_tot)) 2344 ALLOCATE(combined_field(m_max_c,2*nb_field,np_tot)) 2345 ALLOCATE(dist_prod_cu(nb_field/2,bloc_size,m_max), combined_prod_cu(nb_field/2,bloc_size,m_max)) 2346 ALLOCATE(out_prod_cu(m_max_c,np_tot,nb_field/2)) 2347 2348 ! Packing all 3 complex components of both v1 and v2 input fields 2349 ! into dist_field, where the dimension indexing the nodal points varies the least rapidly, 2350 ! so that after distributing the data to the processes, each one will obtain a part 2351 ! on nodal points 2352 t = mpi_wtime() 2353 DO i = 1, m_max_c 2354 !V_int = V1_in(:,:,i) 2355 !dist_field(i,1:nb_field,:) = transpose(V_int) 2356 !V_int = V2_in(:,:,i) 2357 !dist_field(i,nb_field+1:2*nb_field,:) = transpose(V_int) 2358 dist_field(i,1:nb_field,1:np) = transpose(v1_in(:,:,i)) 2359 dist_field(i,nb_field+1:2*nb_field,1:np) = transpose(v2_in(:,:,i)) 2360 !DO nf = 1, nb_field/2 2361 ! dist_field(i,2*nf-1,1:np) = V1_in(:,2*nf-1,i) 2362 ! dist_field(i,2*nf ,1:np) = V1_in(:,2*nf ,i) 2363 ! dist_field(i,nb_field+2*nf-1,1:np) = V2_in(:,2*nf-1,i) 2364 ! dist_field(i,nb_field+2*nf ,1:np) = V2_in(:,2*nf,i) 2365 !END DO 2366 END DO 2367 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 2368 2369 IF (np/=np_tot) dist_field(:,:,np+1:np_tot) = 1.d100 2370 2371 longueur_tranche=bloc_size*m_max_c*nb_field*2 2372 2373 t = mpi_wtime() 2374 mpid=mpi_double_precision 2375 CALL mpi_alltoall(dist_field,longueur_tranche, mpid, combined_field, longueur_tranche, & 2376 mpid, communicator, code) 2377 IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t 2378 2379 t = mpi_wtime() 2380 2381 DO n = 1, bloc_size 2382 DO nb = 1, nb_procs 2383 shiftc = (nb-1)*bloc_size 2384 shiftl = (nb-1)*m_max_c 2385 jindex = n + shiftc 2386 DO nf = 1, nb_field 2387 ! Put real and imaginary parts in a complex 2388 ! for each field, nf=1,2,3,4,5,6 2389 ! nf=1,2,3 => V1_in 2390 ! nf=4,5,6 => V2_in 2391 ! cu(shiftl+1:shiftl+m_max_c,nf,n) = cmplx(combined_field(:,2*nf-1,jindex),combined_field(:,2*nf,jindex)) 2392 ! INPUT ARE COSINE AND SINE COEFFICIENTS 2393 ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2 2394 cu(shiftl+1:shiftl+m_max_c,nf,n) = 0.5d0*cmplx(combined_field(:,2*nf-1,jindex),-combined_field(:,2*nf,jindex),kind=8) 2395 END DO 2396 END DO 2397 END DO 2398 cu(1,:,:) = 2.d0*cmplx(REAL(cu(1,:,:),KIND=8),0.d0,kind=8) 2399 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 2400 2401 ! Set the parameters for dfftw 2402 fft_dim=1; istride=1; ostride=1; 2403 idist=n_r; inembed(1)=n_r; dim(1)=n_r 2404 odist=m_max; onembed(1)=m_max 2405 IF (rang==(nb_procs-1)) THEN 2406 howmany= (np - bloc_size*(nb_procs-1))*nb_field 2407 ELSE 2408 howmany=bloc_size*nb_field 2409 END IF 2410 2411 t = mpi_wtime() 2412 IF (once) CALL dfftw_plan_many_dft_c2r(fftw_plan_multi_c2r, fft_dim, dim, howmany, cu, & 2413 onembed, ostride, odist, ru, inembed, istride, idist, fftw_estimate); 2414 CALL dfftw_execute(fftw_plan_multi_c2r) 2415 2416 ! SIMPLE PRODUCT 2417 !DO nf = 1, nb_field/2 2418 ! ! ru(1:3) contains V1_in and ru(4:6) contains V2_in 2419 ! prod_ru(:,nf,:) = ru(:,nf,:)*ru(:,nb_field/2+nf,:) 2420 !END DO 2421 ! END SIMPLE PRODUCT 2422 2423 ! CROSS PRODDUCT 2424 IF (nb_field==6) THEN 2425 prod_ru(:,1,:) = ru(:,2,:)*ru(:,6,:) - ru(:,3,:)*ru(:,5,:) 2426 prod_ru(:,2,:) = ru(:,3,:)*ru(:,4,:) - ru(:,1,:)*ru(:,6,:) 2427 prod_ru(:,3,:) = ru(:,1,:)*ru(:,5,:) - ru(:,2,:)*ru(:,4,:) 2428 END IF 2429 ! CROSS PRODUCT 2430 2431 howmany = howmany/2 2432 IF (once) CALL dfftw_plan_many_dft_r2c(fftw_plan_multi_r2c, fft_dim, dim, howmany, prod_ru, & 2433 inembed, istride, idist, prod_cu, onembed, ostride, odist, fftw_estimate); 2434 CALL dfftw_execute(fftw_plan_multi_r2c) 2435 2436 prod_cu = prod_cu/n_r !Scaling 2437 ! prod_cu = prod_cu/REAL(N_r,KIND=8) !Scaling 2438 IF (present(temps)) temps(2) = temps(2) + mpi_wtime() -t 2439 2440 !Now we need to redistribute the Fourier coefficients on each processor 2441 2442 t = mpi_wtime() 2443 combined_prod_cu(:,:,1)=prod_cu(1,:,:) 2444 DO n=2, m_max 2445 !combined_prod_cu(:,:,n)=prod_cu(n,:,:) 2446 combined_prod_cu(:,:,n)=2.d0*conjg(prod_cu(n,:,:)) 2447 END DO 2448 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 2449 2450 t = mpi_wtime() 2451 longueur_tranche=bloc_size*m_max_c*nb_field 2452 mpid=mpi_double_precision 2453 CALL mpi_alltoall(combined_prod_cu,longueur_tranche,mpid, dist_prod_cu,longueur_tranche, & 2454 mpid,communicator,code) 2455 IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t 2456 2457 ! dimensions: 2458 ! v_out(np, nb_field, m_max_c) 2459 t = mpi_wtime() 2460 2461 IF (.false.) THEN 2462 DO i_field = 1, nb_field/2 2463 DO n = 1, bloc_size 2464 DO nb = 1, nb_procs 2465 shiftc = (nb-1)*bloc_size 2466 shiftl = (nb-1)*m_max_c 2467 out_prod_cu(:,n+shiftc,i_field) = dist_prod_cu(i_field,n,shiftl+1:shiftl+m_max_c) 2468 END DO 2469 END DO 2470 END DO 2471 2472 DO i_field = 1, nb_field/2 2473 DO i = 1, m_max_c 2474 v_out(:, i_field*2-1, i) = REAL(out_prod_cu(i, 1:np, i_field),kind=8) 2475 v_out(:, i_field*2, i) = aimag(out_prod_cu(i, 1:np, i_field)) 2476 END DO 2477 END DO 2478 ELSE 2479 DO i = 1, m_max_c 2480 DO nb = 1, nb_procs 2481 shiftc = (nb-1)*bloc_size 2482 shiftl = (nb-1)*m_max_c 2483 intermediate = dist_prod_cu(:,:,shiftl+i) 2484 DO n = 1, bloc_size 2485 IF (n+shiftc > np ) cycle 2486 DO i_field = 1, nb_field/2 2487 v_out(n+shiftc, i_field*2-1, i) = REAL (intermediate(i_field,n),kind=8) 2488 v_out(n+shiftc, i_field*2 , i) = aimag(intermediate(i_field,n)) 2489 !v_out(n+shiftc, i_field*2-1, i) = REAL(dist_prod_cu(i_field,n,shiftl+i),KIND=8) 2490 !v_out(n+shiftc, i_field*2 , i) = AIMAG(dist_prod_cu(i_field,n,shiftl+i)) 2491 END DO 2492 END DO 2493 END DO 2494 END DO 2495 END IF 2496 2497 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 2498 2499 DEALLOCATE(dist_field, combined_field, dist_prod_cu, combined_prod_cu, intermediate, out_prod_cu) 2500 IF (once) once = .false. 2501 END SUBROUTINE ref 2502 2503 SUBROUTINE fft_heaviside_dcl(communicator,V1_in, V_out, nb_procs, bloc_size, m_max_pad, temps, padding) 2504 !This a de-aliased version of the code, FEB 4, 2011, JLG 2505 IMPLICIT NONE 2506 include 'fftw3.f' 2507 ! Format: V_1in(1:np,1:6,1:m_max_c) 2508 ! INPUT ARE COSINE AND SINE COEFFICIENTS 2509 ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2 2510 REAL(KIND=8), DIMENSION(:,:,:), INTENT(IN) :: v1_in 2511 REAL(KIND=8), DIMENSION(:,:,:), INTENT(OUT) :: v_out 2512 REAL(KIND=8), DIMENSION(:), OPTIONAL, INTENT(INOUT) :: temps 2513 LOGICAL, OPTIONAL, INTENT(IN) :: padding 2514 INTEGER, INTENT(IN) :: bloc_size, m_max_pad, nb_procs 2515 INTEGER :: np, np_tot, nb_field, m_max, m_max_c, mpid, n_r_pad 2516 INTEGER(KIND=8) :: fftw_plan_multi_c2r, fftw_plan_multi_r2c 2517 2518 !COMPLEX(KIND=8), DIMENSION(m_max_pad, SIZE(V1_in,2), bloc_size) :: cu 2519 COMPLEX(KIND=8), DIMENSION(m_max_pad, bloc_size) :: cu 2520 REAL(KIND=8), DIMENSION(2*m_max_pad-1, bloc_size) :: ru 2521 COMPLEX(KIND=8), DIMENSION(m_max_pad,bloc_size) :: prod_cu 2522 REAL(KIND=8), DIMENSION(2*m_max_pad-1,bloc_size) :: prod_ru 2523 COMPLEX(KIND=8), DIMENSION(bloc_size) :: intermediate 2524 REAL(KIND=8), DIMENSION(SIZE(V1_in,3),SIZE(V1_in,2),bloc_size*nb_procs) :: dist_field, combined_field 2525 COMPLEX(KIND=8), DIMENSION(bloc_size,SIZE(V1_in,3)*nb_procs) :: combined_prod_cu 2526 COMPLEX(KIND=8), DIMENSION(bloc_size,SIZE(V1_in,3)*nb_procs) :: dist_prod_cu 2527 2528 INTEGER :: n1, n2, rank 2529 INTEGER :: nb, nf, shiftc, shiftl, jindex, longueur_tranche, i, n, code 2530 REAL(KIND=8) :: t 2531 2532 ! FFTW parameters 2533 INTEGER :: fft_dim, howmany, istride, ostride, idist, odist 2534 INTEGER, DIMENSION(1) :: dim, inembed, onembed 2535 ! Recall complexes must be rescaled 2536 ! End FFTW parameters 2537 #include "petsc/finclude/petsc.h" 2538 mpi_comm :: communicator 2539 petscerrorcode :: ierr 2540 2541 IF (present(temps)) temps = 0.d0 2542 2543 np = SIZE(v1_in,1) 2544 nb_field= SIZE(v1_in,2) ! Number of fields 2545 m_max_c = SIZE(v1_in,3) ! Number of complex (cosines + sines) coefficients per point 2546 m_max = m_max_c*nb_procs! Number of comlex coefficients per point per processor 2547 n_r_pad=2*m_max_pad-1 2548 np_tot = nb_procs*bloc_size 2549 2550 IF (mod(nb_field,2)/=0 .OR. m_max_c==0) THEN 2551 WRITE(*,*) ' BUG ' 2552 stop 2553 END IF 2554 2555 ! Bloc_size is the number of points that are handled by one processor 2556 ! once the Fourier modes are all collected 2557 ! Computation of bloc_size and np_tot 2558 ! fin de la repartition des points 2559 2560 ! Packing all 3 complex components of both v1 and v2 input fields 2561 ! into dist_field, where the dimension indexing the nodal points varies the least rapidly, 2562 ! so that after distributing the data to the processes, each one will obtain a part 2563 ! on nodal points 2564 ! TRANSPOSE pr que la variable i associee aux modes soit la 1ere sur laquelle on va faire la FFT 2565 t = mpi_wtime() 2566 2567 DO i = 1, m_max_c 2568 dist_field(i,1:nb_field,1:np) = transpose(v1_in(:,:,i)) 2569 END DO 2570 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 2571 2572 IF (np/=np_tot) dist_field(:,:,np+1:np_tot) = 1.d100 2573 2574 longueur_tranche=bloc_size*m_max_c*nb_field 2575 2576 t = mpi_wtime() 2577 mpid=mpi_double_precision 2578 CALL mpi_alltoall(dist_field,longueur_tranche, mpid, combined_field, longueur_tranche, & 2579 mpid, communicator, code) 2580 2581 IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t 2582 2583 t = mpi_wtime() 2584 !JLG, FEB 4, 2011 2585 cu = 0.d0 2586 !JLG, FEB 4, 2011 2587 DO n = 1, bloc_size 2588 DO nb = 1, nb_procs 2589 shiftc = (nb-1)*bloc_size 2590 shiftl = (nb-1)*m_max_c 2591 jindex = n + shiftc 2592 ! Put real and imaginary parts in a complex 2593 ! nf=1,2,3 => V1_in 2594 ! nf=4,5,6 => V2_in 2595 ! INPUT ARE COSINE AND SINE COEFFICIENTS 2596 ! THEY ARE PUT IN COMPLEX FORMAT: c_0 = a_0 + i*0 and c_n = (a_n-i*b_n)/2 2597 cu(shiftl+1:shiftl+m_max_c,n) = cmplx(combined_field(:,1,jindex),& 2598 -combined_field(:,2,jindex),kind=8)/2 2599 END DO 2600 END DO 2601 cu(1,:) = 2*cmplx(REAL(cu(1,:),KIND=8),0.d0,kind=8) 2602 !JLG, FEB 4, 2011 2603 !Padding is done by initialization of cu: cu = 0 2604 !This is eequivalent to cu(m_max+1:m_max_pad,:,:) = 0.d0 2605 !JLG, FEB 4, 2011 2606 2607 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 2608 2609 ! Set the parameters for dfftw 2610 fft_dim=1; istride=1; ostride=1; 2611 !JLG, FEB 4, 2011 2612 !!$ idist=N_r; inembed(1)=N_r; DIM(1)=N_r
2613 !!$odist=m_max; onembed(1)=m_max 2614 idist=n_r_pad; inembed(1)=n_r_pad; dim(1)=n_r_pad 2615 odist=m_max_pad; onembed(1)=m_max_pad 2616 !JLG, FEB 4, 2011 2617 2618 howmany=bloc_size*nb_field/2 2619 2620 2621 t = mpi_wtime() 2622 CALL dfftw_plan_many_dft_c2r(fftw_plan_multi_c2r, fft_dim, dim, howmany, cu, & 2623 onembed, ostride, odist, ru, inembed, istride, idist, fftw_estimate) 2624 !write(*,*) ' FFT_PAR_CROSS_PROD: fftw_plan_multi_c2r', fftw_plan_multi_c2r 2625 CALL dfftw_execute(fftw_plan_multi_c2r) 2626 ! CROSS PRODDUCT 2627 IF (nb_field==2) THEN 2628 DO n1 = 1, 2*m_max_pad-1 2629 DO n2 = 1, bloc_size 2630 !!$ prod_ru(n1,n2) = (1+tanh((ru(n1,n2)-0.5d0)/.01))/2
2631  IF (ru(n1,n2) > 0.5) THEN
2632  prod_ru(n1,n2) = 1.d0
2633  ELSE
2634  prod_ru(n1,n2) = 0.d0
2635  END IF
2636  END DO
2637  END DO
2638  END IF
2639  ! CROSS PRODUCT
2640
2641  howmany = howmany
2642  CALL dfftw_plan_many_dft_r2c(fftw_plan_multi_r2c, fft_dim, dim, howmany, prod_ru, &
2643  inembed, istride, idist, prod_cu, onembed, ostride, odist, fftw_estimate)
2644  !write(*,*) ' FFT_PAR_CROSS_PROD: fftw_plan_multi_r2c', fftw_plan_multi_r2c
2645  CALL dfftw_execute(fftw_plan_multi_r2c)
2646  !JLG, FEB 4, 2011
2647 !!$prod_cu = prod_cu/N_r !Scaling 2648 prod_cu = prod_cu/n_r_pad !Scaling 2649 !JLG, FEB 4, 2011 2650 IF (present(temps)) temps(2) = temps(2) + mpi_wtime() -t 2651 !Now we need to redistribute the Fourier coefficients on each processor 2652 2653 t = mpi_wtime() 2654 combined_prod_cu(:,1)=prod_cu(1,:) 2655 DO n=2, m_max 2656 !combined_prod_cu(:,:,n)=prod_cu(n,:,:) 2657 combined_prod_cu(:,n)=2*conjg(prod_cu(n,:)) 2658 END DO 2659 2660 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 2661 2662 t = mpi_wtime() 2663 longueur_tranche=bloc_size*m_max_c*nb_field 2664 mpid=mpi_double_precision 2665 CALL mpi_alltoall(combined_prod_cu,longueur_tranche,mpid, dist_prod_cu,longueur_tranche, & 2666 mpid,communicator,code) 2667 IF (present(temps)) temps(1) = temps(1) + mpi_wtime() -t 2668 ! dimensions: 2669 t = mpi_wtime() 2670 DO i = 1, m_max_c 2671 DO nb = 1, nb_procs 2672 shiftc = (nb-1)*bloc_size 2673 shiftl = (nb-1)*m_max_c 2674 intermediate = dist_prod_cu(:,shiftl+i) 2675 DO n = 1, bloc_size 2676 IF (n+shiftc > np ) cycle 2677 v_out(n+shiftc, 1, i) = REAL (intermediate(n),kind=8) 2678 v_out(n+shiftc, 2, i) = aimag(intermediate(n)) 2679 END DO 2680 END DO 2681 END DO 2682 IF (present(temps)) temps(3) = temps(3) + mpi_wtime() -t 2683 2684 END SUBROUTINE fft_heaviside_dcl 2685 2686 END MODULE sft_parallele_obsolete section doc_intro_frame_work_num_app Numerical approximation subsection doc_intro_fram_work_num_app_Fourier_FEM Fourier Finite element representation The SFEMaNS code uses a hybrid Fourier Finite element formulation The Fourier decomposition allows to approximate the problemâ€™s solutions for each Fourier mode modulo nonlinear terms that are made explicit The variables are then approximated on a meridian section of the domain with a finite element method The numerical approximation of a function f$f f is written in the following generic t
