raop_rtp fixes/cleanups to remove unneeded ntp use

This commit is contained in:
F. Duncanh
2025-01-27 03:53:01 -05:00
parent 518b72d9c5
commit bccc42e4e2
12 changed files with 143 additions and 167 deletions

View File

@@ -36,10 +36,9 @@
#define NO_FLUSH (-42)
#define SECOND_IN_NSECS 1000000000
#define RAOP_RTP_SYNC_DATA_COUNT 8
#define SEC SECOND_IN_NSECS
#define DELAY_AAC 0.275 //empirical, matches audio latency of about -0.25 sec after first clock sync event
#define DELAY_AAC 0.20 //empirical, matches audio latency of about -0.25 sec after first clock sync event
/* note: it is unclear what will happen in the unlikely event that this code is running at the time of the unix-time
* epoch event on 2038-01-19 at 3:14:08 UTC ! (but Apple will surely have removed AirPlay "legacy pairing" by then!) */
@@ -56,14 +55,16 @@ struct raop_rtp_s {
// Time and sync
raop_ntp_t *ntp;
double rtp_clock_rate;
int64_t rtp_sync_offset;
raop_rtp_sync_data_t sync_data[RAOP_RTP_SYNC_DATA_COUNT];
int sync_data_index;
uint64_t ntp_start_time;
uint64_t rtp_start_time;
uint64_t rtp_time;
bool rtp_clock_started;
uint32_t rtp_sync;
uint64_t client_ntp_sync;
bool initial_sync;
// Transmission Stats, could be used if a playout buffer is needed
// float interarrival_jitter; // As defined by RTP RFC 3550, Section 6.4.1
// unsigned int last_packet_transit_time;
@@ -161,12 +162,10 @@ raop_rtp_init(logger_t *logger, raop_callbacks_t *callbacks, raop_ntp_t *ntp, co
raop_rtp->logger = logger;
raop_rtp->ntp = ntp;
raop_rtp->rtp_sync_offset = 0;
raop_rtp->sync_data_index = 0;
for (int i = 0; i < RAOP_RTP_SYNC_DATA_COUNT; ++i) {
raop_rtp->sync_data[i].ntp_time = 0;
raop_rtp->sync_data[i].rtp_time = 0;
}
raop_rtp->rtp_sync = 0;
raop_rtp->client_ntp_sync = 0;
raop_rtp->initial_sync = false;
raop_rtp->ntp_start_time = 0;
raop_rtp->rtp_start_time = 0;
raop_rtp->rtp_clock_started = false;
@@ -386,59 +385,22 @@ raop_rtp_process_events(raop_rtp_t *raop_rtp, void *cb_data)
return 0;
}
void raop_rtp_sync_clock(raop_rtp_t *raop_rtp, uint64_t *ntp_time, uint64_t *rtp_time) {
/* ntp_time = (uint64_t)(((int64_t)(raop_rtp->rtp_clock_rate * rtp_time)) + raop_rtp->rtp_sync_offset) */
int latest, valid_data_count = 0;
uint64_t ntp_sum = 0, rtp_sum = 0;
double offset = ((double) *ntp_time) - raop_rtp->rtp_clock_rate * *rtp_time;
int64_t correction = 0;
raop_rtp->sync_data_index = (raop_rtp->sync_data_index + 1) % RAOP_RTP_SYNC_DATA_COUNT;
latest = raop_rtp->sync_data_index;
raop_rtp->sync_data[latest].rtp_time = *rtp_time;
raop_rtp->sync_data[latest].ntp_time = *ntp_time;
for (int i = 0; i < RAOP_RTP_SYNC_DATA_COUNT; i++) {
if (raop_rtp->sync_data[i].ntp_time == 0) continue;
valid_data_count++;
if (i == latest) continue;
ntp_sum += *ntp_time - raop_rtp->sync_data[i].ntp_time;
rtp_sum += *rtp_time - raop_rtp->sync_data[i].rtp_time;
static uint64_t rtp_time_to_client_ntp(raop_rtp_t *raop_rtp, uint32_t *rtp32) {
if (!raop_rtp->initial_sync) {
return 0;
}
if (valid_data_count > 1) {
correction -= raop_rtp->rtp_sync_offset;
offset += (((double) ntp_sum) - raop_rtp->rtp_clock_rate * rtp_sum) / valid_data_count;
}
raop_rtp->rtp_sync_offset = (int64_t) offset;
correction += raop_rtp->rtp_sync_offset;
logger_log(raop_rtp->logger, LOGGER_DEBUG, "dataset %d raop_rtp sync correction=%lld, rtp_sync_offset = %lld ",
valid_data_count, correction, raop_rtp->rtp_sync_offset);
}
uint64_t rtp64_time (raop_rtp_t *raop_rtp, const uint32_t *rtp32) {
/* convert from 32-bit to 64-bit rtp time:
* the rtp_time 32-bit epoch at 44.1kHz has length of about 27 hours
* using 64-bit rtp time avoids any epoch issues.
* initial call sets epoch to 1; subsequent calls maintain consistent epoch.
* (assumes successive calls are close in time) */
if (raop_rtp->rtp_clock_started) {
uint32_t diff1 = *rtp32 - ((uint32_t) raop_rtp->rtp_time);
uint32_t diff2 = ((uint32_t) raop_rtp->rtp_time) - *rtp32;
if (diff1 <= diff2) {
raop_rtp->rtp_time += (uint64_t) diff1;
} else {
raop_rtp->rtp_time -= (uint64_t) diff2;
}
uint64_t client_ntp = raop_rtp->client_ntp_sync;
if (*rtp32 >= raop_rtp->rtp_sync) {
client_ntp += (uint64_t) raop_rtp->rtp_clock_rate * (*rtp32 - raop_rtp->rtp_sync);
} else {
raop_rtp->rtp_time = (0x01ULL << 32 ) + (uint64_t) *rtp32;
raop_rtp->rtp_start_time = raop_rtp->rtp_time;
raop_rtp->rtp_clock_started = true;
uint64_t ntp_shift = (uint64_t) raop_rtp->rtp_clock_rate * (raop_rtp->rtp_sync - *rtp32);
if (client_ntp > ntp_shift) {
client_ntp -= ntp_shift;
} else {
client_ntp = 0;
}
}
//assert(*rtp32 == (uint32_t) raop_rtp->rtp_time);
return raop_rtp->rtp_time;
return client_ntp;
}
static THREAD_RETVAL
@@ -450,22 +412,15 @@ raop_rtp_thread_udp(void *arg)
struct sockaddr_storage saddr;
socklen_t saddrlen;
bool got_remote_control_saddr = false;
uint64_t video_arrival_offset = 0;
/* for initial rtp to ntp conversions */
bool have_synced = false;
bool no_data_yet = true;
/* initial audio stream has no data */
unsigned char no_data_marker[] = {0x00, 0x68, 0x34, 0x00 };
int rtp_count = 0;
double sync_adjustment = 0;
unsigned short seqnum1 = 0, seqnum2 = 0;
assert(raop_rtp);
bool logger_debug = (logger_get_level(raop_rtp->logger) >= LOGGER_DEBUG);
raop_rtp->ntp_start_time = raop_ntp_get_local_time(raop_rtp->ntp);
raop_rtp->ntp_start_time = raop_ntp_get_local_time();
raop_rtp->rtp_clock_started = false;
for (int i = 0; i < RAOP_RTP_SYNC_DATA_COUNT; i++) {
raop_rtp->sync_data[i].ntp_time = 0;
}
int no_resend = (raop_rtp->control_rport == 0); /* true when control_rport is not set */
@@ -526,14 +481,8 @@ raop_rtp_thread_udp(void *arg)
unsigned int resent_packetlen = packetlen - 4;
unsigned short seqnum = byteutils_get_short_be(resent_packet, 2);
if (resent_packetlen >= 12) {
uint32_t timestamp = byteutils_get_int_be(resent_packet, 4);
uint64_t rtp_time = rtp64_time(raop_rtp, &timestamp);
uint64_t ntp_time = 0;
if (have_synced) {
ntp_time = (uint64_t) (raop_rtp->rtp_sync_offset + (int64_t) (raop_rtp->rtp_clock_rate * rtp_time));
}
logger_log(raop_rtp->logger, LOGGER_DEBUG, "raop_rtp resent audio packet: seqnum=%u", seqnum);
int result = raop_buffer_enqueue(raop_rtp->buffer, resent_packet, resent_packetlen, &ntp_time, &rtp_time, 1);
int result = raop_buffer_enqueue(raop_rtp->buffer, resent_packet, resent_packetlen, 1);
assert(result >= 0);
} else if (logger_debug) {
/* type_c = 0x56 packets with length 8 have been reported */
@@ -553,24 +502,30 @@ raop_rtp_thread_udp(void *arg)
* next_rtp = sync_rtp + 77175 = 441 * 175 (1.75 sec) for ALAC */
// The unit for the rtp clock is 1 / sample rate = 1 / 44100
uint32_t sync_rtp = byteutils_get_int_be(packet, 4);
uint64_t sync_rtp64 = rtp64_time(raop_rtp, &sync_rtp);
if (have_synced == false) {
uint64_t client_ntp_sync_prev = 0;
uint64_t rtp_sync_prev = 0;
if (!raop_rtp->initial_sync) {
logger_log(raop_rtp->logger, LOGGER_DEBUG, "first audio rtp sync");
have_synced = true;
}
raop_rtp->initial_sync = true;
} else {
client_ntp_sync_prev = raop_rtp->client_ntp_sync;
rtp_sync_prev = raop_rtp->rtp_sync;
}
raop_rtp->rtp_sync = byteutils_get_int_be(packet, 4);
uint64_t sync_ntp_raw = byteutils_get_long_be(packet, 8);
uint64_t sync_ntp_remote = raop_remote_timestamp_to_nano_seconds(raop_rtp->ntp, sync_ntp_raw);
raop_rtp->client_ntp_sync = raop_remote_timestamp_to_nano_seconds(raop_rtp->ntp, sync_ntp_raw);
if (logger_debug) {
uint64_t sync_ntp_local = raop_ntp_convert_remote_time(raop_rtp->ntp, sync_ntp_remote);
double offset_change = ((double) raop_rtp->client_ntp_sync) - raop_rtp->rtp_clock_rate * raop_rtp->rtp_sync;
offset_change -= ((double) client_ntp_sync_prev) - raop_rtp->rtp_clock_rate * rtp_sync_prev;
uint64_t sync_ntp_local = raop_ntp_convert_remote_time(raop_rtp->ntp, raop_rtp->rtp_sync);
char *str = utils_data_to_string(packet, packetlen, 20);
logger_log(raop_rtp->logger, LOGGER_DEBUG,
"raop_rtp sync: client ntp=%8.6f, ntp = %8.6f, ntp_start_time %8.6f\nts_client = %8.6f sync_rtp=%u\n%s",
(double) sync_ntp_remote / SEC, (double) sync_ntp_local / SEC,
(double) raop_rtp->ntp_start_time / SEC, (double) sync_ntp_remote / SEC, sync_rtp, str);
"raop_rtp sync: ntp = %8.6f, ntp_start_time %8.6f\nts_client = %8.6f sync_rtp=%u offset change = %8.6f\n%s",
(double) sync_ntp_local / SEC, (double) raop_rtp->ntp_start_time / SEC,
(double) raop_rtp->client_ntp_sync / SEC, raop_rtp->rtp_sync, offset_change / SEC, str);
free(str);
}
raop_rtp_sync_clock(raop_rtp, &sync_ntp_remote, &sync_rtp64);
} else if (logger_debug) {
char *str = utils_data_to_string(packet, packetlen, 16);
logger_log(raop_rtp->logger, LOGGER_DEBUG, "raop_rtp unknown udp control packet\n%s", str);
@@ -615,6 +570,9 @@ raop_rtp_thread_udp(void *arg)
if (FD_ISSET(raop_rtp->dsock, &rfds)) {
if (!raop_rtp->initial_sync && !video_arrival_offset) {
video_arrival_offset = raop_ntp_get_video_arrival_offset(raop_rtp->ntp);
}
//logger_log(raop_rtp->logger, LOGGER_INFO, "Would have data packet in queue");
// Receiving audio data here
saddrlen = sizeof(saddr);
@@ -633,83 +591,58 @@ raop_rtp_thread_udp(void *arg)
continue;
}
uint32_t rtp_timestamp = byteutils_get_int_be(packet, 4);
uint64_t rtp_time = rtp64_time(raop_rtp, &rtp_timestamp);
uint64_t ntp_time = 0;
if (!raop_rtp->initial_sync && raop_rtp->ct == 8 && video_arrival_offset) {
/* estimate a fake initial remote timestamp for video synchronization with AAC audio before the first rtp sync */
uint64_t ts = raop_ntp_get_local_time() - video_arrival_offset;
double delay = DELAY_AAC;
ts += (uint64_t) (delay * SEC);
raop_rtp->client_ntp_sync = ts;
raop_rtp->rtp_sync = byteutils_get_int_be(packet, 4);
raop_rtp->initial_sync = true;
}
if (packetlen == 16 && memcmp(packet + 12, no_data_marker, 4) == 0) {
/* this is a "no data" packet */
/* the first such packet could be used to provide the initial rtptime and seqnum formerly given in the RECORD request */
continue;
}
if (raop_rtp->ct == 2 && packetlen == 44) continue; /* ignore the ALAC packets with format information only. */
if (have_synced) {
ntp_time = (uint64_t) (raop_rtp->rtp_sync_offset + (int64_t) (raop_rtp->rtp_clock_rate * rtp_time));
} else if (packetlen == 16 && memcmp(packet + 12, no_data_marker, 4) == 0) {
/* use the special "no_data" packet to help determine an initial offset before the first rtp sync.
* until the first rtp sync occurs, we don't know the exact client ntp timestamp that matches the client rtp timestamp */
if (no_data_yet) {
int64_t sync_ntp = ((int64_t) raop_ntp_get_local_time(raop_rtp->ntp)) - ((int64_t) raop_rtp->ntp_start_time) ;
int64_t sync_rtp = ((int64_t) rtp_time) - ((int64_t) raop_rtp->rtp_start_time);
unsigned short seqnum = byteutils_get_short_be(packet, 2);
if (rtp_count == 0) {
sync_adjustment = ((double) sync_ntp);
rtp_count = 1;
seqnum1 = seqnum;
seqnum2 = seqnum;
}
if (seqnum2 != seqnum) { /* for AAC-ELD only use copy 1 of the 3 copies of each frame */
rtp_count++;
sync_adjustment += (((double) sync_ntp) - raop_rtp->rtp_clock_rate * sync_rtp - sync_adjustment) / rtp_count;
}
seqnum2 = seqnum1;
seqnum1 = seqnum;
}
continue;
} else {
no_data_yet = false;
}
int result = raop_buffer_enqueue(raop_rtp->buffer, packet, packetlen, &ntp_time, &rtp_time, 1);
int result = raop_buffer_enqueue(raop_rtp->buffer, packet, packetlen, 1);
assert(result >= 0);
if (raop_rtp->ct == 2 && !have_synced) {
/* in ALAC Audio-only mode wait until the first sync before dequeing */
if (!raop_rtp->initial_sync) {
/* wait until the first sync before dequeing ALAC */
continue;
} else {
// Render continuous buffer entries
void *payload = NULL;
unsigned int payload_size;
unsigned short seqnum;
uint64_t rtp64_timestamp;
uint64_t ntp_timestamp;
uint32_t rtp_timestamp;
while ((payload = raop_buffer_dequeue(raop_rtp->buffer, &payload_size, &ntp_timestamp, &rtp64_timestamp, &seqnum, no_resend))) {
while ((payload = raop_buffer_dequeue(raop_rtp->buffer, &payload_size, &rtp_timestamp, &seqnum, no_resend))) {
audio_decode_struct audio_data;
audio_data.rtp_time = rtp64_timestamp;
audio_data.rtp_time = rtp_timestamp;
audio_data.seqnum = seqnum;
audio_data.data_len = payload_size;
audio_data.data = payload;
audio_data.ct = raop_rtp->ct;
if (have_synced) {
if (ntp_timestamp == 0) {
ntp_timestamp = (uint64_t) (raop_rtp->rtp_sync_offset + (int64_t) (raop_rtp->rtp_clock_rate * rtp64_timestamp));
}
audio_data.ntp_time_remote = ntp_timestamp;
audio_data.ntp_time_local = raop_ntp_convert_remote_time(raop_rtp->ntp, audio_data.ntp_time_remote);
audio_data.sync_status = 1;
} else {
double elapsed_time = raop_rtp->rtp_clock_rate * (rtp64_timestamp - raop_rtp->rtp_start_time) + sync_adjustment
+ DELAY_AAC * SECOND_IN_NSECS;
audio_data.ntp_time_local = raop_rtp->ntp_start_time + (uint64_t) elapsed_time;
audio_data.ntp_time_remote = raop_ntp_convert_local_time(raop_rtp->ntp, audio_data.ntp_time_local);
audio_data.sync_status = 0;
audio_data.ntp_time_remote = rtp_time_to_client_ntp(raop_rtp, &rtp_timestamp);
audio_data.ntp_time_local = raop_ntp_convert_remote_time(raop_rtp->ntp, audio_data.ntp_time_remote);
if (logger_debug) {
uint64_t ntp_now = raop_ntp_get_local_time();
int64_t latency = (audio_data.ntp_time_local ? ((int64_t) ntp_now) - ((int64_t) audio_data.ntp_time_local) : 0);
logger_log(raop_rtp->logger, LOGGER_DEBUG,
"raop_rtp audio: now = %8.6f, ntp = %8.6f, latency = %9.6f, ts = %8.6f, rtp_time=%u seqnum = %u",
(double) ntp_now / SEC, (double) audio_data.ntp_time_local / SEC, (double) latency / SEC,
(double) audio_data.ntp_time_remote /SEC, rtp_timestamp, seqnum);
}
raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, raop_rtp->ntp, &audio_data);
free(payload);
if (logger_debug) {
uint64_t ntp_now = raop_ntp_get_local_time(raop_rtp->ntp);
int64_t latency = ((int64_t) ntp_now) - ((int64_t) audio_data.ntp_time_local);
logger_log(raop_rtp->logger, LOGGER_DEBUG,
"raop_rtp audio: now = %8.6f, ntp = %8.6f, latency = %8.6f, rtp_time=%u seqnum = %u",
(double) ntp_now / SEC, (double) audio_data.ntp_time_local / SEC, (double) latency / SEC,
(uint32_t) rtp64_timestamp, seqnum);
}
}
/* Handle possible resend requests */