import numpy as np
from astropy.io import fits
from astropy import time, coordinates as coord, units as u
from matplotlib import pyplot as plt
import sep
import os
import datetime
# 禁止下载地球自转数据
from astropy.utils import iers
iers.conf.auto_download = False
# 数据处理的根目录,实际工作中一般会分为原始数据目录和处理结果目录
rawdir = "PiA2021raw/"
reddir = "PiA2021red/"
# 调用操作系统命令生成列表,这里使用了多行命令(用\在行末换行),一句命令写多条,循环等shell编程技巧
# 可以直接在终端窗口一条条写
# 另外,这里用一个!开头,实际上新开了一个进程,不会影响实际当前路径
! cd PiA2021raw; \
ls bias_*.fit > bias.lst; \
for b in U B V R I; do ls flat_${b}_*.fit > flat_${b}.lst; done; \
for b in B V R; do ls V398UMa_${b}_*.fit > V398UMa_${b}.lst; done;
# 一个小函数,生成当前UTC时间的字符串形式,用于在header中加注数据处理时间
def utcnowstr():
return datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
使用了列表生成器等语法,该语法也可以生成元组Tuple、字典Dict
# 本底
bias_lst = open(rawdir + "bias.lst").read().split("\n")
bias_lst
# 平场。注意平场比数据的波段多,如果不想处理数据未用到的平场也是可以的
flat_band = "UBVRI"
# flat_lst = {b: open(rawdir + "flat_"+b+".lst").read().split() for b in flat_band}
b = "B"
flat_lst = open(rawdir + "flat_"+b+".lst").read().split("\n")
flat_lst
# 科学数据。这里因为只有一个目标,实际上有些步骤是多余的
obj_band = "BVR"
objs = ["V398UMa"]
obj_bands = [(o, b) for b in obj_band for o in objs]
obj_bands
# 本格可以忽略
o, b = "V398UMa", "B"
obj_lst = open(rawdir + o+"_"+b+".lst").read().split("\n")
obj_lst
nx = fits.getval(rawdir + bias_lst[0], "NAXIS1")
ny = fits.getval(rawdir + bias_lst[0], "NAXIS2")
nx, ny
先读入数据,然后进行合并,合并使用中值。最后生成新文件
nbias = len(bias_lst)
# 创建空数组,准备接收数据,注意下标顺序是ny nx,数据类型是16位无符号整数
# 此外注意这里用到是empty,而不是ones或者zeros,不会对数组进行初始化,节约时间
bias_cube = np.empty((nbias, ny, nx), np.uint16)
# 读入数据。对三维数组,只给出第一个下标,表示后面是全部都用到,相当于 [f,:,:]
for f in range(nbias):
bias_cube[f] = fits.getdata(rawdir + bias_lst[f])
# 求中值,注意axis=0,表示按照0维度进行求中值
# 即对于[y,x],计算方式是求median([:, y, x])
# 还有就是,需要把数据类型转换成float32,默认是64位,精度太高,浪费空间
bias_data = np.float32(np.median(bias_cube, axis=0))
bias_data.dtype, bias_data.shape
# 读第0个bias文件的头,适当改造作为最终的头
hdr = fits.getheader(rawdir + bias_lst[0])
# 在头里加上处理时间
# 其他字段这里没有处理。数据类型会被自动根据实际情况更正,不需要干预
hdr.append(("PROCTIME", utcnowstr(), "DateTime of processing"))
hdr
# 创建一个新文件,每个fits文件在astropy中表现为一个列表,哪怕是单图像,也是单元素的列表
# 每个单元是一个HDU(Header-Data-Unit),包括header头部和data数据部
hdul_new = fits.HDUList([
fits.PrimaryHDU(header=hdr, data=bias_data)
])
# 保存文件。如果文件已经存在,需要加上overwrite=True
hdul_new.writeto(reddir + "BIAS.fits")
bias_med = np.median(bias_data)
bias_std = np.std(bias_data)
fig, ax = plt.subplots(1, 1, figsize=(10, 10))
ax.imshow(bias_data, vmin=bias_med-3*bias_std, vmax=bias_med+3*bias_std, cmap="rainbow")
# 注意,如果是像课程示例中这样连续处理,这一步是多余的
# 但是实践中一般本底合并、平场合并、科学数据改正等各步骤是独立的,所以必须有读入过程
bias_data = fits.getdata(reddir + "BIAS.fits")
# 这里不做循环,只演示一个波段,所以看到的是直接读取波段字符串的某一个
# 如果要全处理,那么接下来的几个步骤要放到一个for里,并且在一个格子内完成
# 对于测试程序而言,这样逐个波段比较方便
# for b in flat_band:
b = "B"
# 基本上和bias合并相同,但是注意这里,是float32,因为待会要减去本底,而合并后的本底是float32
nflat = len(flat_lst[b])
flat_cube = np.empty((nflat, ny, nx), np.float32)
for f in range(nflat):
# 读平场数据,注意这里的数组形式,[b]是针对字典的,而[f]是针对列表的
dd = fits.getdata(rawdir + flat_lst[b][f])
# 减本底
dd = dd - bias_data
# 计算中值,并利用中值进行归一化
dmed = np.median(dd)
dd /= dmed
# 保存到最终数组中
flat_cube[f] = dd
flat_data = np.median(flat_cube, axis=0)
flat_data.dtype, flat_data.shape
hdr = fits.getheader(rawdir + flat_lst[b][0])
hdr.append(("PROCTIME", utcnowstr(), "DateTime of processing"))
hdul_new = fits.HDUList([
fits.PrimaryHDU(header=hdr, data=flat_data)
])
# 这里尽可能用字符串格式化的方式去拼接文件名的各部分,因为这个方式系统开销较小
hdul_new.writeto(reddir + "FLAT_{b}.fits".format(b=b))
flat_med = np.median(flat_data)
flat_std = np.std(flat_data)
fig, ax = plt.subplots(1, 1, figsize=(10, 10))
ax.imshow(flat_data, vmin=flat_med-3*flat_std, vmax=flat_med+3*flat_std, cmap="hot")
和平场合并类似,但是这次是逐个处理,不需要把多幅叠加起来。此外,公共数据只需要读一次就行
# 老规矩。但是这里注意,是放在选择波段和目标之前的,也就是不管处理几个波段,只读入一次
bias_data = fits.getdata(reddir + "BIAS.fits")
# 同样的,从一组obj_bands中选一个来做演示
o, b = obj_bands[0]
# 读该波段的平场数据。注意这里是在科学数据的循环之前的,也就是对每个波段也是只读入一次
flat_data = fits.getdata(reddir + "FLAT_{b}.fits".format(b=b))
# 对该波段的目标数据,逐个进行循环
# for f in range(len()):
f = 0
# 这里加这么一步,不是必须的,但是这样后续做处理的时候省点事,因为还得写输出文件,每次写这么复杂很累
objf = obj_lst[f]
# 打开文件
obj_hdu = fits.open(rawdir + objf)
# 读出原始数据,注意后面改正之后变量名不同
ori_data = obj_hdu[0].data
# 读出数据头
obj_hdr = obj_hdu[0].header
# 这里指定变量只是为了后续编程省事,不这么做也行
# 生成观测站地理坐标对象
site = coord.EarthLocation(lat=40.4 * u.deg, lon=117.6 * u.deg, height=900 * u.m)
# 生成观测目标天球对象
obj_coord = coord.SkyCoord(obj_hdr["RA"], obj_hdr["DEC"], unit=(u.hour, u.deg), frame="icrs")
# 减本底,除平场
obj_data = (ori_data - bias_data) / flat_data
# 找出头中的观测日期时间
obs_dt = obj_hdr["DATE-OBS"] + "T" + obj_hdr["TIME-OBS"]
# 计算JD
obs_jd = time.Time(obs_dt, format='isot', scale='utc', location=site)
# 计算地球和太阳系质心之间的光程差
ltt_bary = obs_jd.light_travel_time(obj_coord)
# 计算BJD
obs_bjd = (obs_jd.tdb + ltt_bary).jd
obs_dt, obs_jd.jd, ltt_bary, obs_bjd
# 使用update去更新header,一次性多个,如果已经存在则更新,不存在就添加
obj_hdr.update({
"PROCTIME": (utcnowstr(), "DateTime of processing"),
'JD': (obs_jd.jd, "JD of observation"),
'BJD': (obs_bjd, "Barycentric Julian Day")
})
obj_hdr
hdul_new = fits.HDUList([
fits.PrimaryHDU(header=obj_hdr, data=obj_data)
])
# 在原始文件名基础上加上后缀,表示经过本底(bias)平场(flat)改正的结果
hdul_new.writeto(reddir + os.path.splitext(objf)[0] + "_bf.fits")
注意和绘制平场、本底时的差异
obj_med, obj_std = np.median(obj_data), np.std(obj_data)
obj_med, obj_std
fig, ax = plt.subplots(1, 1, figsize=(10, 10))
ax.imshow(obj_data, vmin=obj_med-1*50, vmax=obj_med+5*50, cmap="cool")
这里就不写循环,以及读取改正后数据等步骤,直接开始处理
# 计算背景
bkg = sep.Background(obj_data)
# 背景的全局中值和标准差,注意和图像的标准差的区别
# 图像的标准差受到星像的影响,而这里实际上是剔除了星像之后的
bkg.globalback, bkg.globalrms
# 背景及其误差
bkg_image = bkg.back()
bkg_rms = bkg.rms()
fig, ax = plt.subplots(1, 2, figsize=(12, 6))
ax[0].imshow(bkg_image)
ax[1].imshow(bkg_rms);
# 图像扣减背景
data_sub = obj_data - bkg
fig, ax = plt.subplots(1, 1, figsize=(10, 10))
ax.imshow(data_sub, vmin=-1*30, vmax=+5*30, cmap="cool")
# 找源
stars = sep.extract(data_sub, 1.5, err=bkg.globalrms)
stars.shape, stars.dtype
from matplotlib.patches import Ellipse
# 画观测图像
fig, ax = plt.subplots(1, 1, figsize=(10, 10))
im = ax.imshow(data_sub, interpolation='nearest', cmap='gray',
vmin=-1*30, vmax=+5*30, origin='lower')
# 挨个目标画椭圆
for i in range(len(stars)):
e = Ellipse(xy=(stars['x'][i], stars['y'][i]),
width=15*stars['a'][i],
height=15*stars['b'][i],
angle=np.deg2rad(stars['theta'][i]),)
e.set_facecolor('none')
e.set_edgecolor('red')
ax.add_artist(e)
stars["a"], stars["b"]
# 简单孔径测光,孔径=5.0像素
flux1, fluxerr1, flag1 = sep.sum_circle(data_sub, stars['x'], stars['y'],
5.0, err=bkg.globalrms, gain=1.0)
# 流量转星等,误差分析公式略
mag1 = 25.0 - 2.5 * np.log10(flux1)
magerr1 = 2.5 / np.log(10) * fluxerr1 / flux1
# 不用全局统一的误差,而是用每个像素的局部误差
flux2, fluxerr2, flag2 = sep.sum_circle(data_sub, stars['x'], stars['y'], 5.0,
err=bkg_rms, gain=1.0)
# 流量转星等,误差分析公式略
mag2 = 25.0 - 2.5 * np.log10(flux2)
magerr2 = 2.5 / np.log(10) * fluxerr2 / flux2
fig, ax = plt.subplots(1, 1, figsize=(10, 5))
ax.plot(mag1, magerr1, "x")
ax.set_xlabel("Mag $_1$")
ax.set_ylabel("Error $_1$")
fig, ax = plt.subplots(1, 1, figsize=(10, 5))
ax.plot(mag2, magerr2, "x")
ax.set_xlabel("Mag $_2$")
ax.set_ylabel("Error $_2$")
# 计算Source-Extractor的AUTO流量/星等
kronrad, krflag = sep.kron_radius(data_sub, stars["x"], stars["y"],
stars["a"], stars["b"], stars["theta"], 6.0)
fluxa, fluxerra, flaga = sep.sum_ellipse(data_sub, stars["x"], stars["y"],
stars["a"], stars["b"], stars["theta"], 2.5*kronrad,
err=bkg_rms, subpix=1)
flaga |= krflag
maga = 25.0 - 2.5 * np.log10(fluxa)
magerra = 2.5 / np.log(10) * fluxerra / fluxa
fig, ax = plt.subplots(1, 1, figsize=(10, 5))
ax.plot(maga, magerra, "x")
ax.set_xlabel("Mag $_{auto}$")
ax.set_ylabel("Error $_{auto}$")
# 构建星表
dt_cat = [
("x", float),
("y", float),
("a", float),
("b", float),
("theta", float),
("flux_aper", float),
("fluxerr_aper", float),
("mag_aper", float),
("magerr_aper", float),
("flag_aper", int),
("flux_auto", float),
("fluxerr_auto", float),
("mag_auto", float),
("magerr_auto", float),
("flag_auto", int),
]
# 创建空数组,xxx_like表示数组的维度大小和参考数组一致
cat = np.empty_like(stars, dt_cat)
# 逐个字段填充数据
cat["x"] = stars["x"]
cat["y"] = stars["y"]
cat["a"] = stars["a"]
cat["b"] = stars["b"]
cat["theta"] = stars["theta"]
cat["flux_aper"] = flux2
cat["fluxerr_aper"] = fluxerr2
cat["mag_aper"] = mag2
cat["magerr_aper"] = magerr2
cat["flag_aper"] = flag2
cat["flux_auto"] = flaga
cat["fluxerr_auto"] = fluxerra
cat["mag_auto"] = maga
cat["magerr_auto"] = magerra
cat["flag_auto"] = flaga
# 创建一个带表格的fits文件,这时候,[0]的HDU必须是空的,只能有heaader,不能有数据
hdul_new = fits.HDUList([
fits.PrimaryHDU(header=obj_hdr, ),
fits.BinTableHDU(data=cat)
])
# 在原始文件名基础上加上后缀,表示这是星表
hdul_new.writeto(reddir + os.path.splitext(objf)[0] + "_cat.fits")
# 文本格式的数据输出略,网上有很多案例