diff --git a/lib/raop_buffer.c b/lib/raop_buffer.c index 317319e..c630af4 100644 --- a/lib/raop_buffer.c +++ b/lib/raop_buffer.c @@ -207,7 +207,7 @@ raop_buffer_decrypt(raop_buffer_t *raop_buffer, unsigned char *data, unsigned ch } int -raop_buffer_enqueue(raop_buffer_t *raop_buffer, unsigned char *data, unsigned short datalen, int use_seqnum) { +raop_buffer_enqueue(raop_buffer_t *raop_buffer, unsigned char *data, unsigned short datalen, uint64_t timestamp, int use_seqnum) { unsigned char empty_packet_marker[] = { 0x00, 0x68, 0x34, 0x00 }; assert(raop_buffer); @@ -248,7 +248,7 @@ raop_buffer_enqueue(raop_buffer_t *raop_buffer, unsigned char *data, unsigned sh /* Update the raop_buffer entry header */ entry->seqnum = seqnum; - entry->timestamp = byteutils_get_int_be(data, 4); + entry->timestamp = timestamp; entry->filled = 1; entry->payload_data = malloc(payload_size); @@ -269,7 +269,7 @@ raop_buffer_enqueue(raop_buffer_t *raop_buffer, unsigned char *data, unsigned sh } void * -raop_buffer_dequeue(raop_buffer_t *raop_buffer, unsigned int *length, uint32_t *timestamp, unsigned short *seqnum, int no_resend) { +raop_buffer_dequeue(raop_buffer_t *raop_buffer, unsigned int *length, uint64_t *timestamp, unsigned short *seqnum, int no_resend) { assert(raop_buffer); /* Calculate number of entries in the current buffer */ diff --git a/lib/raop_buffer.h b/lib/raop_buffer.h index d28f3b1..0418509 100644 --- a/lib/raop_buffer.h +++ b/lib/raop_buffer.h @@ -25,8 +25,8 @@ typedef int (*raop_resend_cb_t)(void *opaque, unsigned short seqno, unsigned sho raop_buffer_t *raop_buffer_init(logger_t *logger, const unsigned char *aeskey, const unsigned char *aesiv); -int raop_buffer_enqueue(raop_buffer_t *raop_buffer, unsigned char *data, unsigned short datalen, int use_seqnum); -void *raop_buffer_dequeue(raop_buffer_t *raop_buffer, unsigned int *length, uint32_t *timestamp, unsigned short *seqnum, int no_resend); +int raop_buffer_enqueue(raop_buffer_t *raop_buffer, unsigned char *data, unsigned short datalen, uint64_t timestamp, int use_seqnum); +void *raop_buffer_dequeue(raop_buffer_t *raop_buffer, unsigned int *length, uint64_t *timestamp, unsigned short *seqnum, int no_resend); void raop_buffer_handle_resends(raop_buffer_t *raop_buffer, raop_resend_cb_t resend_cb, void *opaque); void raop_buffer_flush(raop_buffer_t *raop_buffer, int next_seq); diff --git a/lib/raop_rtp.c b/lib/raop_rtp.c index 5bf32bf..bac21c9 100644 --- a/lib/raop_rtp.c +++ b/lib/raop_rtp.c @@ -41,8 +41,7 @@ typedef struct raop_rtp_sync_data_s { uint64_t ntp_time; // The local wall clock time at the time of rtp_time - uint32_t rtp_time; // The remote rtp clock time corresponding to ntp_time - uint32_t rtp_epoch; // The epoch of the remote rtp clock (rtp epoch is about 27 hours) + uint64_t rtp_time; // The remote rtp clock time corresponding to ntp_time } raop_rtp_sync_data_t; struct raop_rtp_s { @@ -164,7 +163,6 @@ raop_rtp_init(logger_t *logger, raop_callbacks_t *callbacks, raop_ntp_t *ntp, co for (int i = 0; i < RAOP_RTP_SYNC_DATA_COUNT; ++i) { raop_rtp->sync_data[i].ntp_time = 0; raop_rtp->sync_data[i].rtp_time = 0; - raop_rtp->sync_data[i].rtp_epoch = 0; } raop_rtp->dacp_id = NULL; @@ -381,15 +379,11 @@ raop_rtp_process_events(raop_rtp_t *raop_rtp, void *cb_data) return 0; } -void raop_rtp_sync_clock(raop_rtp_t *raop_rtp, uint64_t ntp_time, uint64_t ntp_start_time, uint32_t rtp_time, int shift) { - const double epoch_length = (double) 0x100000000ULL; - if (rtp_time < raop_rtp->sync_data[raop_rtp->sync_data_index].rtp_time) { - raop_rtp->rtp_sync_epoch++; - } +void raop_rtp_sync_clock(raop_rtp_t *raop_rtp, uint64_t ntp_time, uint64_t ntp_start_time, uint64_t rtp_time, int shift) { + raop_rtp->sync_data_index = (raop_rtp->sync_data_index + 1) % RAOP_RTP_SYNC_DATA_COUNT; raop_rtp->sync_data[raop_rtp->sync_data_index].rtp_time = rtp_time; raop_rtp->sync_data[raop_rtp->sync_data_index].ntp_time = ntp_time; - raop_rtp->sync_data[raop_rtp->sync_data_index].rtp_epoch = raop_rtp->rtp_sync_epoch; uint32_t valid_data_count = 0; valid_data_count = 0; @@ -398,9 +392,6 @@ void raop_rtp_sync_clock(raop_rtp_t *raop_rtp, uint64_t ntp_time, uint64_t ntp_s for (int i = 0; i < RAOP_RTP_SYNC_DATA_COUNT; ++i) { if (raop_rtp->sync_data[i].ntp_time == 0) continue; rtp_offset = (((double) raop_rtp->sync_data[i].rtp_time) + ((double) shift)); - if (raop_rtp->sync_data[i].rtp_epoch) { - rtp_offset += raop_rtp->sync_data[i].rtp_epoch * epoch_length; - } total_offsets += (int64_t) (rtp_offset / raop_rtp-> rtp_sync_scale); if (raop_rtp->sync_data[i].ntp_time > ntp_start_time) { total_offsets -= (int64_t)(raop_rtp->sync_data[i].ntp_time - ntp_start_time); @@ -413,7 +404,25 @@ void raop_rtp_sync_clock(raop_rtp_t *raop_rtp, uint64_t ntp_time, uint64_t ntp_s int64_t correction = avg_offset - raop_rtp->rtp_sync_offset; raop_rtp->rtp_sync_offset = avg_offset; - logger_log(raop_rtp->logger, LOGGER_DEBUG, "raop_rtp sync correction=%lld", correction); + logger_log(raop_rtp->logger, LOGGER_DEBUG, "raop_rtp sync correction=%lld, rtp_sync_offset = %8.6f ", correction, ((double) raop_rtp->rtp_sync_offset) /SEC); +} + + +uint64_t rtp32_to_64time(uint32_t *rtp32, uint64_t *rtp64_time) { + uint32_t rtp32_time = (uint32_t) (*rtp64_time); + uint64_t rtp64; + + /* convert from 32-bit to 64-bit rtp time: + * the rtp_time 32-bit epoch at 44.1kHz has length of about 27 hours + * using 64-bit rtp time avoids any epoch issues */ + + if ((rtp32_time - *rtp32) >= (*rtp32 - rtp32_time)) { + rtp64 = *rtp64_time + (uint64_t) (*rtp32 - rtp32_time); + } else { + assert (*rtp64_time >= (uint64_t) (rtp32_time - *rtp32)); + rtp64 = *rtp64_time - (uint64_t) (rtp32_time - *rtp32); + } + return rtp64; } static THREAD_RETVAL @@ -424,15 +433,24 @@ raop_rtp_thread_udp(void *arg) unsigned int packetlen; struct sockaddr_storage saddr; socklen_t saddrlen; + + /* for initial rtp to ntp conversions */ bool have_synced = false; - bool audio_data_started = false; - unsigned char empty_packet_marker[] = {0x00, 0x34, 0x68, 0x00}; - bool have_rtp_time = false; - uint64_t rtp_time = 0; /* will only change by small amounts to track rtp epoch changes */ - uint32_t prev_rtp_timestamp = 0; + int rtp_count = 0; + int64_t initial_offset = 0; + double sync_adjustment = 0; + bool have_set_delay = false; + int64_t delay = 0; + + /* store rtp time as 64 bits */ + bool rtp_clock_started = false; + uint32_t rtp_start_time = 0; + uint64_t rtp64_time = 0; + assert(raop_rtp); uint64_t ntp_start_time = raop_ntp_get_local_time(raop_rtp->ntp); logger_log(raop_rtp->logger, LOGGER_DEBUG, "raop_rtp start_time = %8.6f (raop_rtp audio)", ((double) ntp_start_time) / SEC); + while(1) { fd_set rfds; struct timeval tv; @@ -480,10 +498,11 @@ raop_rtp_thread_udp(void *arg) unsigned char *resent_packet = &packet[4]; unsigned int resent_packetlen = packetlen - 4; unsigned short seqnum = byteutils_get_short_be(resent_packet, 2); + uint32_t timestamp = byteutils_get_int_be(resent_packet, 4); + uint64_t timestamp_64 = rtp32_to_64time(×tamp, &rtp64_time); if (resent_packetlen > 12) { logger_log(raop_rtp->logger, LOGGER_DEBUG, "raop_rtp audio resent packet: seqnum=%u", seqnum); - int result = raop_buffer_enqueue(raop_rtp->buffer, resent_packet, resent_packetlen, 1); - assert(result >= 0); + assert(raop_buffer_enqueue(raop_rtp->buffer, resent_packet, resent_packetlen, timestamp_64, 1) >= 0); } else { /* type_c = 0x56 packets with length 8 have been reported */ char *str = utils_data_to_string(packet, packetlen, 16); @@ -502,6 +521,10 @@ raop_rtp_thread_udp(void *arg) // The unit for the rtp clock is 1 / sample rate = 1 / 44100 uint32_t sync_rtp = byteutils_get_int_be(packet, 4); + assert(rtp_clock_started); + uint64_t sync_rtp64 = rtp32_to_64time(&sync_rtp, &rtp64_time); + sync_rtp64 -= (uint64_t) rtp_start_time; + if (have_synced == false) { logger_log(raop_rtp->logger, LOGGER_DEBUG, "first audio rtp sync"); have_synced = true; @@ -519,12 +542,12 @@ raop_rtp_thread_udp(void *arg) shift = 0; /* not needed for ALAC (audio only) */ break; } - raop_rtp_sync_clock(raop_rtp, sync_ntp_local, ntp_start_time, sync_rtp, shift); char *str = utils_data_to_string(packet, packetlen, 20); logger_log(raop_rtp->logger, LOGGER_DEBUG, - "raop_rtp sync: client ntp=%8.6f, ntp = %8.6f, ntp_start_time %8.6f, sync_rtp=%u\n%s", - ((double) sync_ntp_remote) / SEC, ((double)sync_ntp_local) / SEC, ((double)ntp_start_time) / SEC, sync_rtp, str); + "raop_rtp sync: client ntp=%8.6f, ntp = %8.6f, ntp_start_time %8.6f, sync_rtp=%lu\n%s", + ((double) sync_ntp_remote) / SEC, ((double)sync_ntp_local) / SEC, ((double)ntp_start_time) / SEC, sync_rtp64, str); free(str); + raop_rtp_sync_clock(raop_rtp, sync_ntp_local, ntp_start_time, sync_rtp64, shift); } else { logger_log(raop_rtp->logger, LOGGER_DEBUG, "raop_rtp unknown packet"); } @@ -573,75 +596,67 @@ raop_rtp_thread_udp(void *arg) //logger_log(raop_rtp->logger, LOGGER_DEBUG, "raop_rtp_thread_udp type_d 0x%02x, packetlen = %d", type_d, packetlen); if (packetlen >= 12) { int no_resend = (raop_rtp->control_rport == 0); /* true when control_rport is not set */ - if (packetlen == 16 && !memcmp(&packet[12], empty_packet_marker, 4)) { - /* skip packet: these marked empty packets occur before first time syncronization */ + uint32_t rtp_timestamp = byteutils_get_int_be(packet, 4); + if (!rtp_clock_started) { + rtp_clock_started = true; + rtp_start_time = rtp_timestamp; + rtp64_time = (uint64_t) rtp_timestamp; } else { - if (!audio_data_started) { - audio_data_started = true; - if (have_synced == false) { - /* until the first rtp sync occurs, we don't know the exact client ntp timestamp that matches the client rtp timesamp */ - uint32_t rtp_timestamp = byteutils_get_int_be(packet, 4); - uint64_t ntp_now = raop_ntp_get_local_time(raop_rtp->ntp); - int64_t ntp_time = (int64_t) (ntp_now - ntp_start_time); - int64_t latency = 0; - switch (raop_rtp->ct) { - case 0x02: - latency = -DELAY_ALAC; /* DELAY = 2000000 (2.0 sec) is empirical choice for ALAC */ - break; - case 0x08: - latency = -DELAY_AAC; /* DELAY = 500000 (0.5 sec) is empirical choice for AAC-ELD */ - break; - default: - break; - } - ntp_time -= latency; - logger_log(raop_rtp->logger, LOGGER_DEBUG, "First audio packet received, have_synced = false, using assumed latency %8.6f", - ((double)latency) / SEC); - raop_rtp->rtp_sync_offset = (int64_t) ((double) rtp_timestamp) / raop_rtp->rtp_sync_scale; - raop_rtp->rtp_sync_offset -= ntp_time; - int64_t ntp_timestamp = (int64_t) ((double) rtp_timestamp) / raop_rtp->rtp_sync_scale; - ntp_timestamp -= raop_rtp->rtp_sync_offset; - } - } - int result = raop_buffer_enqueue(raop_rtp->buffer, packet, packetlen, 1); - assert(result >= 0); + rtp64_time = rtp32_to_64time(&rtp_timestamp, &rtp64_time); } + + if (have_synced == false) { + /* until the first rtp sync occurs, we don't know the exact client ntp timestamp that matches the client rtp timesamp */ + uint64_t ntp_now = raop_ntp_get_local_time(raop_rtp->ntp); + int64_t offset = -(int64_t) (ntp_now - ntp_start_time); + if (!have_set_delay) { + switch (raop_rtp->ct) { + case 0x02: + delay = DELAY_ALAC; /* DELAY = 2000000 (2.0 sec) is empirical choice for ALAC */ + logger_log(raop_rtp->logger, LOGGER_DEBUG, "Audio is ALAC: using initial latency estimate -%8.6f sec", ((double) delay) / SEC); + break; + case 0x08: + delay = DELAY_AAC; /* DELAY = 500000 (0.5 sec) is empirical choice for AAC-ELD */ + logger_log(raop_rtp->logger, LOGGER_DEBUG, "Audio is AAC: using initial latency estimate -%8.6f sec", ((double) delay ) / SEC); + break; + default: + break; + } + have_set_delay = true; + } + offset -= delay; + if (rtp_count == 0) { + initial_offset = offset; + sync_adjustment = 0.0; + } + rtp_count++; + offset -= initial_offset; + rtp_timestamp -= rtp_start_time; + sync_adjustment += ((double) offset) + (((double) rtp_timestamp) / raop_rtp->rtp_sync_scale); + raop_rtp->rtp_sync_offset = initial_offset + (int64_t) (sync_adjustment / rtp_count); + } + assert(raop_buffer_enqueue(raop_rtp->buffer, packet, packetlen, rtp64_time, 1) >= 0); // Render continuous buffer entries void *payload = NULL; unsigned int payload_size; - uint32_t timestamp, diff1, diff2; unsigned short seqnum; - while ((payload = raop_buffer_dequeue(raop_rtp->buffer, &payload_size, ×tamp, &seqnum, no_resend))) { - if (have_rtp_time == false) { - rtp_time = timestamp; - have_rtp_time = true; - prev_rtp_timestamp = timestamp; - } - /* the 44.1 kHZ rtp_time epoch is about 27 hours; - * this code handles epoch straddling by uint32_t time pairs */ - diff1 = timestamp - prev_rtp_timestamp; - diff2 = prev_rtp_timestamp - timestamp; - if (diff1 < diff2) { - rtp_time += (int64_t) diff1; - } else { - rtp_time -= (int64_t) diff2; - } - prev_rtp_timestamp = timestamp; - int64_t ntp_timestamp = (int64_t) ((double) rtp_time) / raop_rtp->rtp_sync_scale; - ntp_timestamp -= raop_rtp->rtp_sync_offset; - audio_decode_struct audio_data; + uint64_t rtp64_timestamp; + while ((payload = raop_buffer_dequeue(raop_rtp->buffer, &payload_size, &rtp64_timestamp, &seqnum, no_resend))) { + uint64_t elapsed_time = (uint64_t) (((double) (rtp64_timestamp - (uint64_t) rtp_start_time)) / raop_rtp->rtp_sync_scale); + audio_decode_struct audio_data; audio_data.data_len = payload_size; audio_data.data = payload; - audio_data.ntp_time = ntp_start_time ; - audio_data.ntp_time += ntp_timestamp; - audio_data.rtp_time = (uint64_t) rtp_time; - audio_data.have_synced = have_synced; + audio_data.ntp_time = ntp_start_time; + audio_data.ntp_time = ntp_start_time + elapsed_time; + audio_data.ntp_time -= raop_rtp->rtp_sync_offset; + audio_data.rtp_time = rtp64_timestamp; raop_rtp->callbacks.audio_process(raop_rtp->callbacks.cls, raop_rtp->ntp, &audio_data); free(payload); uint64_t ntp_now = raop_ntp_get_local_time(raop_rtp->ntp); - int64_t latency = ((int64_t) (ntp_now - ntp_start_time)) - ntp_timestamp; - logger_log(raop_rtp->logger, LOGGER_INFO, "raop_rtp audio: now = %8.6f, npt = %8.6f, latency = %8.6f, rtp_time=%lld seqnum = %u", - ((double) ntp_now ) / SEC, ((double) audio_data.ntp_time) / SEC, ((double) latency) / SEC, rtp_time, seqnum); + int64_t latency = raop_rtp->rtp_sync_offset + (int64_t) (ntp_now - ntp_start_time); + latency -= (int64_t) elapsed_time; + logger_log(raop_rtp->logger, LOGGER_INFO, "raop_rtp audio: now = %8.6f, npt = %8.6f, latency = %8.6f, rtp_time=%lu seqnum = %u", + ((double) ntp_now ) / SEC, ((double) audio_data.ntp_time) / SEC, ((double) latency) / SEC, rtp64_timestamp, seqnum); } /* Handle possible resend requests */ diff --git a/lib/stream.h b/lib/stream.h index 1f56774..bd61587 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -30,7 +30,6 @@ typedef struct { int data_len; uint64_t ntp_time; uint64_t rtp_time; - bool have_synced; } audio_decode_struct; #endif //AIRPLAYSERVER_STREAM_H