diff --git a/tools/plugin/alsaplug/CMakeLists.txt b/tools/plugin/alsaplug/CMakeLists.txt index eddcc0759220..d60104d8245d 100644 --- a/tools/plugin/alsaplug/CMakeLists.txt +++ b/tools/plugin/alsaplug/CMakeLists.txt @@ -17,7 +17,7 @@ target_include_directories(asound_module_pcm_sof PRIVATE ${sof_source_directory}/src/audio) target_compile_options(asound_module_pcm_sof PRIVATE -DPIC -g -O3 -Wmissing-prototypes - -Wimplicit-fallthrough -DCONFIG_LIBRARY -imacros${config_h}) + -Wno-stringop-truncation -Wimplicit-fallthrough -DCONFIG_LIBRARY -imacros${config_h}) install(TARGETS asound_module_pcm_sof DESTINATION /usr/lib/x86_64-linux-gnu/alsa-lib) @@ -49,7 +49,7 @@ target_include_directories(asound_module_ctl_sof PRIVATE ${sof_source_directory}/src/audio) target_compile_options(asound_module_ctl_sof PRIVATE -DPIC -g -O3 -Wmissing-prototypes - -Wimplicit-fallthrough -Wall -Werror -DCONFIG_LIBRARY -imacros${config_h}) + -Wimplicit-fallthrough -Wno-stringop-truncation -Wall -Werror -DCONFIG_LIBRARY -imacros${config_h}) install(TARGETS asound_module_ctl_sof DESTINATION /usr/lib/x86_64-linux-gnu/alsa-lib) diff --git a/tools/plugin/alsaplug/ctl.c b/tools/plugin/alsaplug/ctl.c index f45bf4f9b0e1..85a31177cfca 100644 --- a/tools/plugin/alsaplug/ctl.c +++ b/tools/plugin/alsaplug/ctl.c @@ -31,8 +31,7 @@ typedef struct snd_sof_ctl { struct plug_shm_glb_state *glb; snd_ctl_ext_t ext; - struct plug_mq_desc ipc_tx; - struct plug_mq_desc ipc_rx; + struct plug_socket_desc ipc; struct plug_shm_desc shm_ctx; int subscribed; int updated[MAX_CTLS]; @@ -232,8 +231,7 @@ static int plug_ctl_read_integer(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, long /* send the IPC message */ memcpy(msg, &config, sizeof(config)); - err = plug_mq_cmd_tx_rx(&ctl->ipc_tx, &ctl->ipc_rx, - msg, size, reply_data, reply_data_size); + err = plug_ipc_cmd_tx_rx(&ctl->ipc, msg, size, reply_data, reply_data_size); free(msg); if (err < 0) { SNDERR("failed to set volume for control %s\n", mixer_ctl->hdr.name); @@ -325,8 +323,7 @@ static int plug_ctl_write_integer(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, lon memcpy(msg + sizeof(config), &volume, sizeof(volume)); /* send the message and check status */ - err = plug_mq_cmd_tx_rx(&ctl->ipc_tx, &ctl->ipc_rx, - msg, size, &reply, sizeof(reply)); + err = plug_ipc_cmd_tx_rx(&ctl->ipc, msg, size, &reply, sizeof(reply)); free(msg); if (err < 0) { SNDERR("failed to set volume control %s\n", mixer_ctl->hdr.name); @@ -426,8 +423,7 @@ static int plug_ctl_read_enumerated(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, /* send the IPC message */ memcpy(msg, &config, sizeof(config)); - err = plug_mq_cmd_tx_rx(&ctl->ipc_tx, &ctl->ipc_rx, - msg, size, reply_data, reply_data_size); + err = plug_ipc_cmd_tx_rx(&ctl->ipc, msg, size, reply_data, reply_data_size); free(msg); if (err < 0) { SNDERR("failed to get enum items for control %s\n", enum_ctl->hdr.name); @@ -516,7 +512,7 @@ static int plug_ctl_write_enumerated(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, free(data); /* send the message and check status */ - err = plug_mq_cmd_tx_rx(&ctl->ipc_tx, &ctl->ipc_rx, msg, msg_size, &reply, sizeof(reply)); + err = plug_ipc_cmd_tx_rx(&ctl->ipc, msg, msg_size, &reply, sizeof(reply)); free(msg); if (err < 0) { SNDERR("failed to set enum control %s\n", enum_ctl->hdr.name); @@ -573,8 +569,7 @@ static int plug_ctl_get_bytes_data(snd_sof_ctl_t *ctl, snd_ctl_ext_key_t key, /* send the IPC message */ memcpy(msg, &config, sizeof(config)); - err = plug_mq_cmd_tx_rx(&ctl->ipc_tx, &ctl->ipc_rx, - msg, size, reply_data, reply_data_size); + err = plug_ipc_cmd_tx_rx(&ctl->ipc, msg, size, reply_data, reply_data_size); free(msg); if (err < 0) { SNDERR("failed to get bytes data for control %s\n", bytes_ctl->hdr.name); @@ -633,9 +628,8 @@ static int plug_ctl_write_bytes(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, int err; /* send IPC with kcontrol data */ - err = plug_send_bytes_data(&ctl->ipc_tx, &ctl->ipc_rx, - ctl->glb->ctl[key].module_id, ctl->glb->ctl[key].instance_id, - abi); + err = plug_send_bytes_data(&ctl->ipc, ctl->glb->ctl[key].module_id, + ctl->glb->ctl[key].instance_id, abi); if (err < 0) { SNDERR("failed to set bytes data for control %s\n", bytes_ctl->hdr.name); return err; @@ -644,6 +638,7 @@ static int plug_ctl_write_bytes(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, return 0; } +/* TLV ops used for TLV bytes control callback */ /* TLV ops used for TLV bytes control callback */ static int plug_tlv_rw(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, int op_flag, unsigned int numid, unsigned int *tlv, unsigned int tlv_size) @@ -653,12 +648,26 @@ static int plug_tlv_rw(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, int op_flag, struct sof_abi_hdr *abi = (struct sof_abi_hdr *)(tlv + 2); /* skip TLV header */ int data_size; + /* only bytes and volume controls have tlv callback set */ + if (bytes_ctl->hdr.ops.info != SND_SOC_TPLG_CTL_BYTES) { + struct snd_soc_tplg_mixer_control *mixer_ctl = CTL_GET_TPLG_MIXER(ctl, key); + struct snd_soc_tplg_ctl_hdr *hdr = &mixer_ctl->hdr; + struct snd_soc_tplg_ctl_tlv *mixer_tlv = &hdr->tlv; + + /* set the dbscale values */ + tlv[0] = SND_CTL_TLVT_DB_SCALE; + tlv[1] = sizeof(int) * 2; + tlv[2] = mixer_tlv->scale.min; + tlv[3] = mixer_tlv->scale.mute << 16 | mixer_tlv->scale.step; + + return 0; + } + /* send IPC with kcontrol data if op_flag is > 0 else send IPC to get kcontrol data */ if (op_flag) { int err; - err = plug_send_bytes_data(&ctl->ipc_tx, &ctl->ipc_rx, - ctl->glb->ctl[key].module_id, + err = plug_send_bytes_data(&ctl->ipc, ctl->glb->ctl[key].module_id, ctl->glb->ctl[key].instance_id, abi); if (err < 0) { SNDERR("failed to set bytes data for control %s\n", bytes_ctl->hdr.name); @@ -731,6 +740,7 @@ static void plug_ctl_close(snd_ctl_ext_t *ext) snd_sof_ctl_t *ctl = ext->private_data; /* TODO: munmap */ + close(ctl->ipc.socket_fd); free(ctl); } @@ -777,34 +787,17 @@ SND_CTL_PLUGIN_DEFINE_FUNC(sof) goto error; } - /* init IPC tx message queue name */ - err = plug_mq_init(&ctl->ipc_tx, "sof", "ipc-tx", 0); + /* init IPC socket name */ + err = plug_socket_path_init(&ctl->ipc, "sof", "ipc", 0); if (err < 0) { SNDERR("error: invalid name for IPC tx mq %s\n", plug->tplg_file); goto error; } - /* open the sof-pipe IPC tx message queue */ - err = plug_mq_open(&ctl->ipc_tx); - if (err < 0) { - SNDERR("error: failed to open sof-pipe IPC mq %s: %s", - ctl->ipc_tx.queue_name, strerror(err)); - goto error; - } - - /* init IPC rx message queue name */ - err = plug_mq_init(&ctl->ipc_rx, "sof", "ipc-rx", 0); - if (err < 0) { - SNDERR("error: invalid name for IPC rx mq %s\n", plug->tplg_file); - goto error; - } - - /* open the sof-pipe IPC rx message queue */ - err = plug_mq_open(&ctl->ipc_rx); + err = plug_create_client_socket(&ctl->ipc); if (err < 0) { - SNDERR("error: failed to open sof-pipe IPC mq %s: %s", - ctl->ipc_rx.queue_name, strerror(err)); - goto error; + SNDERR("failed to connect to SOF pipe IPC socket : %s", strerror(err)); + return -errno; } /* create a SHM mapping for low latency stream position */ @@ -831,9 +824,6 @@ SND_CTL_PLUGIN_DEFINE_FUNC(sof) strncpy(ctl->ext.mixername, "SOF", sizeof(ctl->ext.mixername) - 1); - /* polling on message queue - supported on Linux but not portable */ - ctl->ext.poll_fd = ctl->ipc_tx.mq; - ctl->ext.callback = &sof_ext_callback; ctl->ext.private_data = ctl; ctl->ext.tlv.c = plug_tlv_rw; diff --git a/tools/plugin/alsaplug/pcm.c b/tools/plugin/alsaplug/pcm.c index 5ed7d832eaa8..ed5224c829b5 100644 --- a/tools/plugin/alsaplug/pcm.c +++ b/tools/plugin/alsaplug/pcm.c @@ -8,11 +8,12 @@ #include #include #include +#include +#include #include #include #include #include -#include #include #include #include @@ -42,10 +43,6 @@ typedef struct snd_sof_pcm { /* PCM flow control */ struct plug_sem_desc ready[TPLG_MAX_PCM_PIPELINES]; struct plug_sem_desc done[TPLG_MAX_PCM_PIPELINES]; - /* pipeline IPC tx queues */ - struct plug_mq_desc pipeline_ipc_tx[TPLG_MAX_PCM_PIPELINES]; - /* pipeline IPC response queues */ - struct plug_mq_desc pipeline_ipc_rx[TPLG_MAX_PCM_PIPELINES]; struct plug_shm_desc shm_pcm; @@ -55,15 +52,15 @@ typedef struct snd_sof_pcm { static int plug_pipeline_set_state(snd_sof_plug_t *plug, int state, struct ipc4_pipeline_set_state *pipe_state, struct tplg_pipeline_info *pipe_info, - struct plug_mq_desc *ipc_tx, struct plug_mq_desc *ipc_rx) + struct plug_socket_desc *ipc) { struct ipc4_message_reply reply = {{ 0 }}; int ret; pipe_state->primary.r.ppl_id = pipe_info->instance_id; - ret = plug_mq_cmd_tx_rx(ipc_tx, ipc_rx, pipe_state, sizeof(*pipe_state), - &reply, sizeof(reply)); + ret = plug_ipc_cmd_tx_rx(ipc, pipe_state, sizeof(*pipe_state), + &reply, sizeof(reply)); if (ret < 0) SNDERR("failed pipeline %d set state %d\n", pipe_info->instance_id, state); @@ -97,8 +94,7 @@ static int plug_pipelines_set_state(snd_sof_plug_t *plug, int state) int ret; ret = plug_pipeline_set_state(plug, state, &pipe_state, pipe_info, - &pcm->pipeline_ipc_tx[i], - &pcm->pipeline_ipc_rx[i]); + &plug->ipc); if (ret < 0) return ret; } @@ -111,7 +107,7 @@ static int plug_pipelines_set_state(snd_sof_plug_t *plug, int state) int ret; ret = plug_pipeline_set_state(plug, state, &pipe_state, pipe_info, - &pcm->pipeline_ipc_tx[i], &pcm->pipeline_ipc_rx[i]); + &plug->ipc); if (ret < 0) return ret; } @@ -482,38 +478,6 @@ static int plug_pcm_hw_params(snd_pcm_ioplug_t *io, snd_pcm_hw_params_t *params) for (i = 0; i < pipeline_list->count; i++) { struct tplg_pipeline_info *pipe_info = pipeline_list->pipelines[i]; - /* init IPC message queue name */ - err = plug_mq_init(&pcm->pipeline_ipc_tx[i], plug->tplg_file, "pcm-tx", - pipe_info->instance_id); - if (err < 0) { - SNDERR("error: invalid name for pipeline IPC mq %s\n", plug->tplg_file); - return err; - } - - /* open the sof-pipe IPC message queue */ - err = plug_mq_open(&pcm->pipeline_ipc_tx[i]); - if (err < 0) { - SNDERR("error: failed to open sof-pipe IPC mq %s: %s", - pcm->pipeline_ipc_tx[i].queue_name, strerror(err)); - return -errno; - } - - /* init IPC message queue name */ - err = plug_mq_init(&pcm->pipeline_ipc_rx[i], plug->tplg_file, "pcm-rx", - pipe_info->instance_id); - if (err < 0) { - SNDERR("error: invalid name for pipeline IPC mq %s\n", plug->tplg_file); - return err; - } - - /* open the sof-pipe IPC message queue */ - err = plug_mq_open(&pcm->pipeline_ipc_rx[i]); - if (err < 0) { - SNDERR("error: failed to open sof-pipe IPC mq %s: %s", - pcm->pipeline_ipc_tx[i].queue_name, strerror(err)); - return -errno; - } - /* init name for pipeline ready lock */ err = plug_lock_init(&pcm->ready[i], plug->tplg_file, "ready", pipe_info->instance_id); @@ -671,8 +635,6 @@ static int plug_pcm_hw_free(snd_pcm_ioplug_t *io) pipeline_list = &plug->pcm_info->playback_pipeline_list; ret = plug_free_pipelines(plug, pipeline_list, pcm->capture); - if (ret < 0) - return ret; close(pcm->shm_pcm.fd); close(plug->glb_ctx.fd); @@ -680,11 +642,10 @@ static int plug_pcm_hw_free(snd_pcm_ioplug_t *io) for (i = 0; i < pipeline_list->count; i++) { struct tplg_pipeline_info *pipe_info = pipeline_list->pipelines[i]; - mq_close(pcm->pipeline_ipc_tx[pipe_info->instance_id].mq); - mq_close(pcm->pipeline_ipc_rx[pipe_info->instance_id].mq); sem_close(pcm->ready[pipe_info->instance_id].sem); sem_close(pcm->done[pipe_info->instance_id].sem); } + close(plug->ipc.socket_fd); return 0; } @@ -957,31 +918,15 @@ static int plug_init_sof_pipe(snd_sof_plug_t *plug, snd_pcm_t **pcmp, fprintf(stdout, "topology parsing complete\n"); /* init IPC message queue name */ - err = plug_mq_init(&plug->ipc_tx, "sof", "ipc-tx", 0); - if (err < 0) { - SNDERR("error: invalid name for IPC mq %s\n", plug->tplg_file); - return err; - } - - /* open the sof-pipe IPC message queue */ - err = plug_mq_open(&plug->ipc_tx); - if (err < 0) { - SNDERR("error: failed to open sof-pipe IPC mq %s: %s", - plug->ipc_tx.queue_name, strerror(err)); - return -errno; - } - - err = plug_mq_init(&plug->ipc_rx, "sof", "ipc-rx", 0); + err = plug_socket_path_init(&plug->ipc, "sof", "ipc", 0); if (err < 0) { - SNDERR("error: invalid name for IPC mq %s\n", plug->tplg_file); + SNDERR("error: invalid name for IPC socket %s\n", plug->tplg_file); return err; } - /* open the sof-pipe IPC message queue */ - err = plug_mq_open(&plug->ipc_rx); + err = plug_create_client_socket(&plug->ipc); if (err < 0) { - SNDERR("error: failed to open sof-pipe IPC mq %s: %s", - plug->ipc_rx.queue_name, strerror(err)); + SNDERR("failed to connect to SOF pipe IPC socket : %s", strerror(err)); return -errno; } diff --git a/tools/plugin/alsaplug/plugin.c b/tools/plugin/alsaplug/plugin.c index deb767a4133f..52bbdcfc70e1 100644 --- a/tools/plugin/alsaplug/plugin.c +++ b/tools/plugin/alsaplug/plugin.c @@ -29,7 +29,6 @@ #include #include #include -#include #include #include #include @@ -41,27 +40,6 @@ #include "plugin.h" #include "common.h" -int plug_mq_cmd(struct plug_mq_desc *ipc, void *msg, size_t len, void *reply, size_t rlen) -{ - return plug_mq_cmd_tx_rx(ipc, ipc, msg, len, reply, rlen); -} - -/* - * Open an existing message queue using IPC object. - */ -int plug_mq_open(struct plug_mq_desc *ipc) -{ - /* now open new queue for Tx and Rx */ - ipc->mq = mq_open(ipc->queue_name, O_RDWR); - if (ipc->mq < 0) { - // SNDERR("failed to open IPC queue %s: %s\n", - // ipc->queue_name, strerror(errno)); - return -errno; - } - - return 0; -} - /* * Open an existing semaphore using lock object. */ diff --git a/tools/plugin/alsaplug/plugin.h b/tools/plugin/alsaplug/plugin.h index dcc6f550ff53..7f4009267d24 100644 --- a/tools/plugin/alsaplug/plugin.h +++ b/tools/plugin/alsaplug/plugin.h @@ -39,8 +39,7 @@ typedef struct snd_sof_plug { struct list_item pcm_list; struct list_item pipeline_list; int instance_ids[SND_SOC_TPLG_DAPM_LAST]; - struct plug_mq_desc ipc_tx; - struct plug_mq_desc ipc_rx; + struct plug_socket_desc ipc; struct plug_shm_desc glb_ctx; diff --git a/tools/plugin/alsaplug/tplg.c b/tools/plugin/alsaplug/tplg.c index b3f04f4ee456..7b05b7ead50a 100644 --- a/tools/plugin/alsaplug/tplg.c +++ b/tools/plugin/alsaplug/tplg.c @@ -16,7 +16,6 @@ #include #include #include -#include #include #include #include @@ -926,8 +925,7 @@ static int plug_set_up_widget_ipc(snd_sof_plug_t *plug, struct tplg_comp_info *c memcpy(msg, module_init, sizeof(*module_init)); memcpy(msg + sizeof(*module_init), comp_info->ipc_payload, comp_info->ipc_size); - ret = plug_mq_cmd_tx_rx(&plug->ipc_tx, &plug->ipc_rx, - msg, size, &reply, sizeof(reply)); + ret = plug_ipc_cmd_tx_rx(&plug->ipc, msg, size, &reply, sizeof(reply)); free(msg); if (ret < 0) { SNDERR("error: can't set up widget %s\n", comp_info->name); @@ -944,7 +942,7 @@ static int plug_set_up_widget_ipc(snd_sof_plug_t *plug, struct tplg_comp_info *c static int plug_set_up_pipeline(snd_sof_plug_t *plug, struct tplg_pipeline_info *pipe_info) { - struct ipc4_pipeline_create msg; + struct ipc4_pipeline_create msg = {{ 0 }}; struct ipc4_message_reply reply; int ret; @@ -955,8 +953,7 @@ static int plug_set_up_pipeline(snd_sof_plug_t *plug, struct tplg_pipeline_info msg.primary.r.instance_id = pipe_info->instance_id; msg.primary.r.ppl_mem_size = pipe_info->mem_usage; - ret = plug_mq_cmd_tx_rx(&plug->ipc_tx, &plug->ipc_rx, - &msg, sizeof(msg), &reply, sizeof(reply)); + ret = plug_ipc_cmd_tx_rx(&plug->ipc, &msg, sizeof(msg), &reply, sizeof(reply)); if (ret < 0) { SNDERR("error: can't set up pipeline %s\n", pipe_info->name); return ret; @@ -1113,8 +1110,7 @@ static int plug_set_up_route(snd_sof_plug_t *plug, struct tplg_route_info *route bu.extension.r.dst_queue = 0; bu.extension.r.src_queue = 0; - ret = plug_mq_cmd_tx_rx(&plug->ipc_tx, &plug->ipc_rx, - &bu, sizeof(bu), &reply, sizeof(reply)); + ret = plug_ipc_cmd_tx_rx(&plug->ipc, &bu, sizeof(bu), &reply, sizeof(reply)); if (ret < 0) { SNDERR("error: can't set up route %s -> %s\n", src_comp_info->name, sink_comp_info->name); @@ -1174,8 +1170,8 @@ static int plug_set_up_widget(snd_sof_plug_t *plug, struct tplg_comp_info *comp_ abi = (struct sof_abi_hdr *)ctl->data; /* send IPC with kcontrol data */ - ret = plug_send_bytes_data(&plug->ipc_tx, &plug->ipc_rx, - comp_info->module_id, comp_info->instance_id, abi); + ret = plug_send_bytes_data(&plug->ipc, comp_info->module_id, + comp_info->instance_id, abi); if (ret < 0) { SNDERR("failed to set bytes data for widget %s\n", comp_info->name); return ret; @@ -1339,8 +1335,7 @@ static int plug_delete_pipeline(snd_sof_plug_t *plug, struct tplg_pipeline_info msg.primary.r.rsp = SOF_IPC4_MESSAGE_DIR_MSG_REQUEST; msg.primary.r.instance_id = pipe_info->instance_id; - ret = plug_mq_cmd_tx_rx(&plug->ipc_tx, &plug->ipc_rx, - &msg, sizeof(msg), &reply, sizeof(reply)); + ret = plug_ipc_cmd_tx_rx(&plug->ipc, &msg, sizeof(msg), &reply, sizeof(reply)); if (ret < 0) { SNDERR("error: can't delete pipeline %s\n", pipe_info->name); return ret; @@ -1383,8 +1378,7 @@ static int plug_free_route(snd_sof_plug_t *plug, struct tplg_route_info *route_i bu.extension.r.dst_queue = 0; bu.extension.r.src_queue = 0; - ret = plug_mq_cmd_tx_rx(&plug->ipc_tx, &plug->ipc_rx, - &bu, sizeof(bu), &reply, sizeof(reply)); + ret = plug_ipc_cmd_tx_rx(&plug->ipc, &bu, sizeof(bu), &reply, sizeof(reply)); if (ret < 0) { SNDERR("error: can't set up route %s -> %s\n", src_comp_info->name, sink_comp_info->name); diff --git a/tools/plugin/alsaplug/tplg_ctl.c b/tools/plugin/alsaplug/tplg_ctl.c index 6424b54825ba..ad2709153806 100644 --- a/tools/plugin/alsaplug/tplg_ctl.c +++ b/tools/plugin/alsaplug/tplg_ctl.c @@ -13,7 +13,6 @@ #include #include #include -#include #include #include #include diff --git a/tools/plugin/common.c b/tools/plugin/common.c index 46629f061201..f20cb14e0b0e 100644 --- a/tools/plugin/common.c +++ b/tools/plugin/common.c @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include #include #include @@ -94,14 +96,10 @@ static const char *suffix_name(const char *longname) /* * Initialise the IPC object. */ -int plug_mq_init(struct plug_mq_desc *ipc, const char *tplg, const char *type, int index) +int plug_socket_path_init(struct plug_socket_desc *ipc, const char *tplg, const char *type, + int index) { - const char *name = suffix_name(tplg); - - if (!name) - return -EINVAL; - - snprintf(ipc->queue_name, NAME_SIZE, "/mq-%s-%s-%d", type, name, index); + snprintf(ipc->path, NAME_SIZE, "/tmp/%s-%s", tplg, type); return 0; } @@ -178,12 +176,45 @@ int plug_shm_open(struct plug_shm_desc *shm) return 0; } -int plug_mq_cmd_tx_rx(struct plug_mq_desc *ipc_tx, struct plug_mq_desc *ipc_rx, - void *msg, size_t len, void *reply, size_t rlen) +static int plug_socket_timed_wait(struct plug_socket_desc *ipc, fd_set *fds, int timeout_ms, + bool write) +{ + struct timeval timeout; + int result; + + /* Set the timeout for select */ + timeout.tv_sec = 0; + timeout.tv_usec = timeout_ms * 1000; + + /* now wait for socket to be readable/writable */ + if (write) + result = select(ipc->socket_fd + 1, NULL, fds, NULL, &timeout); + else + result = select(ipc->socket_fd + 1, fds, NULL, NULL, &timeout); + + if (result == -1) { + SNDERR("error waiting for socket to be %s\n", write ? "writable" : "readable"); + return result; + } + + if (result == 0) { + SNDERR("IPC Socket %s timeout\n", write ? "write" : "read"); + return -ETIMEDOUT; + } + + /* socket ready for read/write */ + if (FD_ISSET(ipc->socket_fd, fds)) + return 0; + + /* socket not ready */ + return -EINVAL; +} + +static int plug_ipc_cmd_tx(struct plug_socket_desc *ipc, void *msg, size_t len) { - struct timespec ts; - ssize_t ipc_size; + fd_set write_fds; char mailbox[IPC3_MAX_MSG_SIZE]; + ssize_t bytes; int err; if (len > IPC3_MAX_MSG_SIZE) { @@ -193,55 +224,90 @@ int plug_mq_cmd_tx_rx(struct plug_mq_desc *ipc_tx, struct plug_mq_desc *ipc_rx, memset(mailbox, 0, IPC3_MAX_MSG_SIZE); memcpy(mailbox, msg, len); - /* wait for sof-pipe reader to consume data or timeout */ - err = clock_gettime(CLOCK_REALTIME, &ts); - if (err == -1) { - SNDERR("ipc: cant get time: %s", strerror(errno)); + /* Wait for the socket to be writable */ + FD_ZERO(&write_fds); + FD_SET(ipc->socket_fd, &write_fds); + + err = plug_socket_timed_wait(ipc, &write_fds, 20, true); + if (err < 0) + return err; + + bytes = send(ipc->socket_fd, mailbox, IPC3_MAX_MSG_SIZE, 0); + if (bytes == -1) { + SNDERR("failed to send IPC message : %s\n", strerror(errno)); return -errno; } - /* IPCs should be read under 10ms */ - plug_timespec_add_ms(&ts, 10); + return bytes; +} + +static int plug_ipc_cmd_rx(struct plug_socket_desc *ipc, char mailbox[IPC3_MAX_MSG_SIZE]) +{ + fd_set read_fds; + int err; + + /* Wait for the socket to be readable */ + FD_ZERO(&read_fds); + FD_SET(ipc->socket_fd, &read_fds); + + err = plug_socket_timed_wait(ipc, &read_fds, 200, false); + if (err < 0) + return err; + + memset(mailbox, 0, IPC3_MAX_MSG_SIZE); + return recv(ipc->socket_fd, mailbox, IPC3_MAX_MSG_SIZE, 0); +} + +int plug_ipc_cmd_tx_rx(struct plug_socket_desc *ipc, void *msg, size_t len, void *reply, + size_t rlen) +{ + char mailbox[IPC3_MAX_MSG_SIZE]; + ssize_t bytes; + int err; - /* now return message completion status */ - err = mq_timedsend(ipc_tx->mq, mailbox, IPC3_MAX_MSG_SIZE, 0, &ts); - if (err == -1) { - SNDERR("error: timeout can't send IPC message queue %s : %s\n", - ipc_tx->queue_name, strerror(errno)); + /* send IPC message */ + bytes = plug_ipc_cmd_tx(ipc, msg, len); + if (bytes == -1) { + SNDERR("failed to send IPC message : %s\n", strerror(errno)); return -errno; } - /* wait for sof-pipe reader to consume data or timeout */ - err = clock_gettime(CLOCK_REALTIME, &ts); - if (err == -1) { - SNDERR("ipc: cant get time: %s", strerror(errno)); + /* wait for response */ + memset(mailbox, 0, IPC3_MAX_MSG_SIZE); + bytes = plug_ipc_cmd_rx(ipc, mailbox); + if (bytes == -1) { + SNDERR("failed to read IPC message reply %s\n", strerror(errno)); return -errno; } - /* IPCs should be processed under 20ms, but wait longer as - * some can take longer especially in valgrind - */ - plug_timespec_add_ms(&ts, 20); - - ipc_size = mq_timedreceive(ipc_rx->mq, mailbox, IPC3_MAX_MSG_SIZE, NULL, &ts); - if (ipc_size == -1) { - //fprintf(stderr, "dbg: timeout can't read IPC message queue %s : %s retrying\n", - // ipc->queue_name, strerror(errno)); + /* no response or connection lost, try to restablish connection */ + if (bytes == 0) { + close(ipc->socket_fd); + err = plug_create_client_socket(ipc); + if (err < 0) { + SNDERR("failed to reestablish connection to SOF pipe IPC socket : %s", + strerror(err)); + return -errno; + } - /* ok, its a long IPC or valgrind, wait longer */ - plug_timespec_add_ms(&ts, 800); + /* send IPC message again */ + bytes = plug_ipc_cmd_tx(ipc, msg, len); + if (bytes == -1) { + SNDERR("failed to send IPC message : %s\n", strerror(errno)); + return -errno; + } - ipc_size = mq_timedreceive(ipc_rx->mq, mailbox, IPC3_MAX_MSG_SIZE, NULL, &ts); - if (ipc_size == -1) { - SNDERR("error: timeout can't read IPC message queue %s : %s\n", - ipc_rx->queue_name, strerror(errno)); + /* wait for response */ + memset(mailbox, 0, IPC3_MAX_MSG_SIZE); + bytes = plug_ipc_cmd_rx(ipc, mailbox); + if (bytes == -1) { + SNDERR("failed to read IPC message reply %s\n", strerror(errno)); return -errno; } - /* needed for valgrind to complete MQ op before next client IPC */ - ts.tv_nsec = 20 * 1000 * 1000; - ts.tv_sec = 0; - nanosleep(&ts, NULL); + /* connection lost again, quit now */ + if (bytes == 0) + return -errno; } /* do the message work */ @@ -265,8 +331,8 @@ void plug_ctl_ipc_message(struct ipc4_module_large_config *config, int param_id, config->extension.r.large_param_id = param_id; } -int plug_send_bytes_data(struct plug_mq_desc *ipc_tx, struct plug_mq_desc *ipc_rx, - uint32_t module_id, uint32_t instance_id, struct sof_abi_hdr *abi) +int plug_send_bytes_data(struct plug_socket_desc *ipc, uint32_t module_id, uint32_t instance_id, + struct sof_abi_hdr *abi) { struct ipc4_module_large_config config = {{ 0 }}; struct ipc4_message_reply reply; @@ -292,7 +358,7 @@ int plug_send_bytes_data(struct plug_mq_desc *ipc_tx, struct plug_mq_desc *ipc_r memcpy(msg + sizeof(config), abi->data, abi->size); /* send the message and check status */ - err = plug_mq_cmd_tx_rx(ipc_tx, ipc_rx, msg, msg_size, &reply, sizeof(reply)); + err = plug_ipc_cmd_tx_rx(ipc, msg, msg_size, &reply, sizeof(reply)); free(msg); if (err < 0) { SNDERR("failed to send IPC to set bytes data\n"); @@ -306,3 +372,95 @@ int plug_send_bytes_data(struct plug_mq_desc *ipc_tx, struct plug_mq_desc *ipc_r return 0; } + +int plug_socket_create(struct plug_socket_desc *ipc) +{ + struct sockaddr_un addr; + int sockfd; + + /* Check if the socket path already exists */ + if (access(ipc->path, F_OK) != -1) { + /* If it exists, remove it */ + if (unlink(ipc->path) == -1) { + SNDERR("unlink previous socket file"); + return -EINVAL; + } + } + + /* Create the socket */ + sockfd = socket(AF_UNIX, SOCK_STREAM, 0); + if (sockfd == -1) { + SNDERR("failed to create new socket"); + return sockfd; + } + + ipc->socket_fd = sockfd; + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, ipc->path, sizeof(addr.sun_path) - 1); + + /* Bind the socket to the address */ + if (bind(sockfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { + SNDERR("failed to bind new socket for IPC path\n"); + close(sockfd); + return -EINVAL; + } + + if (listen(sockfd, MAX_IPC_CLIENTS) == -1) { + SNDERR("failed to listen on socket for IPC\n"); + return -EINVAL; + } + + return 0; +} + +static int set_socket_nonblocking(int sockfd) +{ + int flags = fcntl(sockfd, F_GETFL, 0); + + if (flags == -1) { + SNDERR("fcntl(F_GETFL) failed"); + return -EINVAL; + } + + flags |= O_NONBLOCK; + if (fcntl(sockfd, F_SETFL, flags) == -1) { + SNDERR("fcntl(F_SETFL) failed"); + return -EINVAL; + } + + return 0; +} + +int plug_create_client_socket(struct plug_socket_desc *ipc) +{ + struct sockaddr_un addr; + int sockfd; + + sockfd = socket(AF_UNIX, SOCK_STREAM, 0); + if (sockfd == -1) { + SNDERR("error: failed to create sof-pipe IPC socket\n"); + return sockfd; + } + + if (set_socket_nonblocking(sockfd) < 0) { + close(sockfd); + return -EINVAL; + } + ipc->socket_fd = sockfd; + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, ipc->path, sizeof(addr.sun_path) - 1); + + /* Connect to the server (non-blocking) */ + if (connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { + if (errno != EINPROGRESS) { + SNDERR("failed to connect to ipc socket"); + return -errno; + } + } + + return sockfd; +} diff --git a/tools/plugin/common.h b/tools/plugin/common.h index 002f3843c3e7..a64100c397cc 100644 --- a/tools/plugin/common.h +++ b/tools/plugin/common.h @@ -9,7 +9,6 @@ #define __SOF_PLUGIN_COMMON_H__ #include -#include #include #include #include @@ -39,6 +38,8 @@ #define NUM_EP_CONFIGS 8 +#define MAX_IPC_CLIENTS 5 + /* * Run with valgrind * valgrind --trace-children=yes aplay -v -Dsof:blah.tplg,1,hw:1,2 -f dat /dev/zero @@ -161,11 +162,9 @@ struct plug_shm_desc { void *addr; }; -struct plug_mq_desc { - /* IPC message queue */ - mqd_t mq; - struct mq_attr attr; - char queue_name[NAME_SIZE]; +struct plug_socket_desc { + int socket_fd; + char path[NAME_SIZE]; }; struct plug_sem_desc { @@ -265,18 +264,13 @@ void plug_shm_free(struct plug_shm_desc *shm); /* * IPC */ -int plug_mq_create(struct plug_mq_desc *ipc); - -int plug_mq_open(struct plug_mq_desc *ipc); - -int plug_mq_init(struct plug_mq_desc *ipc, const char *tplg, const char *type, int index); +int plug_socket_path_init(struct plug_socket_desc *ipc, const char *tplg, const char *type, + int index); -void plug_mq_free(struct plug_mq_desc *ipc); +void plug_socket_free(struct plug_socket_desc *ipc); -int plug_mq_cmd(struct plug_mq_desc *ipc, void *msg, size_t len, void *reply, size_t rlen); - -int plug_mq_cmd_tx_rx(struct plug_mq_desc *ipc_tx, struct plug_mq_desc *ipc_rx, - void *msg, size_t len, void *reply, size_t rlen); +int plug_ipc_cmd_tx_rx(struct plug_socket_desc *ipc, void *msg, size_t len, void *reply, + size_t rlen); /* * Locking @@ -324,7 +318,11 @@ static inline void data_dump(void *vdata, size_t bytes) void plug_ctl_ipc_message(struct ipc4_module_large_config *config, int param_id, size_t size, uint32_t module_id, uint32_t instance_id, uint32_t type); -int plug_send_bytes_data(struct plug_mq_desc *ipc_tx, struct plug_mq_desc *ipc_rx, - uint32_t module_id, uint32_t instance_id, struct sof_abi_hdr *abi); +int plug_send_bytes_data(struct plug_socket_desc *ipc, uint32_t module_id, uint32_t instance_id, + struct sof_abi_hdr *abi); + +int plug_socket_create(struct plug_socket_desc *ipc); + +int plug_create_client_socket(struct plug_socket_desc *ipc); #endif diff --git a/tools/plugin/pipe/CMakeLists.txt b/tools/plugin/pipe/CMakeLists.txt index 80114d0f6d2f..244fb3199e38 100644 --- a/tools/plugin/pipe/CMakeLists.txt +++ b/tools/plugin/pipe/CMakeLists.txt @@ -15,7 +15,7 @@ target_include_directories(sof-pipe PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/.. ${sof_source_directory}/src/audio) -target_compile_options(sof-pipe PRIVATE -DPIC -g -O3 -Wall -Werror -DCONFIG_LIBRARY -imacros${config_h}) +target_compile_options(sof-pipe PRIVATE -DPIC -g -O3 -Wall -Werror -Wno-stringop-truncation -DCONFIG_LIBRARY -imacros${config_h}) target_include_directories(sof-pipe PRIVATE ${sof_install_directory}/include) target_include_directories(sof-pipe PRIVATE ${parser_install_dir}/include) diff --git a/tools/plugin/pipe/ipc4.c b/tools/plugin/pipe/ipc4.c index 632c161015b4..90157971d05e 100644 --- a/tools/plugin/pipe/ipc4.c +++ b/tools/plugin/pipe/ipc4.c @@ -14,12 +14,12 @@ #include #include #include +#include #include #include #include #include #include -#include #include #include #include @@ -311,58 +311,36 @@ int pipe_ipc_do(struct sof_pipe *sp, void *mailbox, size_t bytes) return err; } -int pipe_ipc_process(struct sof_pipe *sp, struct plug_mq_desc *tx_mq, struct plug_mq_desc *rx_mq) +static void *handle_ipc_client(void *data) { - ssize_t ipc_size; + struct sof_pipe *sp = data; + struct plug_socket_desc *ipc_socket = &sp->ipc_socket; + struct timespec ts; + int client_id = sp->new_client_id; + int clientfd = sp->client_sock_ids[client_id]; char mailbox[IPC3_MAX_MSG_SIZE] = {0}; + ssize_t ipc_size; int err; - struct timespec ts; - /* IPC thread should not preempt processing thread */ - err = pipe_set_ipc_lowpri(sp); - if (err < 0) - fprintf(sp->log, "error: cant set PCM IPC thread to low priority"); - - /* create the IPC message queue */ - err = plug_mq_create(tx_mq); - if (err < 0) { - fprintf(sp->log, "error: can't create TX IPC message queue : %s\n", - strerror(errno)); - return -errno; - } - - /* create the IPC message queue */ - err = plug_mq_create(rx_mq); - if (err < 0) { - fprintf(sp->log, "error: can't create PCM IPC message queue : %s\n", - strerror(errno)); - return -errno; - } - - /* let main() know we are ready */ - fprintf(sp->log, "sof-pipe: IPC TX %s thread ready\n", tx_mq->queue_name); - fprintf(sp->log, "sof-pipe: IPC RX %s thread ready\n", rx_mq->queue_name); - - /* main PCM IPC handling loop */ while (1) { memset(mailbox, 0, IPC3_MAX_MSG_SIZE); - /* is client dead ? */ - if (sp->glb->state == SOF_PLUGIN_STATE_DEAD) { - fprintf(sp->log, "sof-pipe: IPC %s client complete\n", tx_mq->queue_name); + ipc_size = recv(clientfd, mailbox, IPC3_MAX_MSG_SIZE, 0); + if (ipc_size < 0) { + fprintf(sp->log, "error: can't read IPC socket %s : %s\n", + ipc_socket->path, strerror(errno)); break; } - ipc_size = mq_receive(tx_mq->mq, mailbox, IPC3_MAX_MSG_SIZE, NULL); - if (ipc_size < 0) { - fprintf(sp->log, "error: can't read PCM IPC message queue %s : %s\n", - tx_mq->queue_name, strerror(errno)); + /* connection lost */ + if (ipc_size == 0) { + close(clientfd); break; } /* TODO: properly validate message and continue if garbage */ if (*((uint32_t *)mailbox) == 0) { - fprintf(sp->log, "sof-pipe: IPC %s garbage read\n", tx_mq->queue_name); + fprintf(sp->log, "sof-pipe: IPC %s garbage read\n", ipc_socket->path); ts.tv_sec = 0; ts.tv_nsec = 20 * 1000 * 1000; /* 20 ms */ nanosleep(&ts, NULL); @@ -371,77 +349,78 @@ int pipe_ipc_process(struct sof_pipe *sp, struct plug_mq_desc *tx_mq, struct plu /* do the message work */ //data_dump(mailbox, IPC3_MAX_MSG_SIZE); - err = pipe_ipc_do(sp, mailbox, ipc_size); if (err < 0) fprintf(sp->log, "error: local IPC processing failed\n"); /* now return message completion status found in mailbox */ - err = mq_send(rx_mq->mq, mailbox, IPC3_MAX_MSG_SIZE, 0); + err = send(clientfd, mailbox, IPC3_MAX_MSG_SIZE, 0); if (err < 0) { - fprintf(sp->log, "error: can't send PCM IPC message queue %s : %s\n", - rx_mq->queue_name, strerror(errno)); + close(clientfd); break; } } - fprintf(sp->log, "***sof-pipe: IPC %s thread finished !!\n", tx_mq->queue_name); - return 0; + close(clientfd); + sp->client_sock_ids[client_id] = 0; + return NULL; } -int plug_mq_cmd(struct plug_mq_desc *ipc, void *msg, size_t len, void *reply, size_t rlen) +int pipe_ipc_process(struct sof_pipe *sp, struct plug_socket_desc *ipc_socket) { - struct timespec ts; - ssize_t ipc_size; - char mailbox[IPC3_MAX_MSG_SIZE]; + pthread_t thread_id; int err; - if (len > IPC3_MAX_MSG_SIZE) { - SNDERR("ipc: message too big %d\n", len); - return -EINVAL; - } - memset(mailbox, 0, IPC3_MAX_MSG_SIZE); - memcpy(mailbox, msg, len); + /* IPC thread should not preempt processing thread */ + err = pipe_set_ipc_lowpri(sp); + if (err < 0) + fprintf(sp->log, "error: cant set PCM IPC thread to low priority"); - /* wait for sof-pipe reader to consume data or timeout */ - err = clock_gettime(CLOCK_REALTIME, &ts); - if (err == -1) { - SNDERR("ipc: cant get time: %s", strerror(errno)); + /* create the IPC socket */ + err = plug_socket_create(ipc_socket); + if (err < 0) { + fprintf(sp->log, "error: can't create TX IPC socket : %s\n", + strerror(errno)); return -errno; } - /* IPCs should be read under 10ms */ - plug_timespec_add_ms(&ts, 10); + /* let main() know we are ready */ + fprintf(sp->log, "sof-pipe: IPC %s socket ready\n", ipc_socket->path); - /* now return message completion status */ - err = mq_timedsend(ipc->mq, mailbox, IPC3_MAX_MSG_SIZE, 0, &ts); - if (err < 0) { - SNDERR("error: can't send IPC message queue %s : %s\n", - ipc->queue_name, strerror(errno)); - return -errno; - } + /* main PCM IPC handling loop */ + while (1) { + int clientfd, i; - /* wait for sof-pipe reader to consume data or timeout */ - err = clock_gettime(CLOCK_REALTIME, &ts); - if (err == -1) { - SNDERR("ipc: cant get time: %s", strerror(errno)); - return -errno; - } + /* Accept a connection from a client */ + clientfd = accept(ipc_socket->socket_fd, NULL, NULL); + if (clientfd == -1) { + fprintf(sp->log, "IPC %s socket accept failed\n", ipc_socket->path); + return -errno; + } - /* IPCs should be processed under 20ms */ - plug_timespec_add_ms(&ts, 20); + for (i = 0; i < MAX_IPC_CLIENTS; i++) { + if (!sp->client_sock_ids[i]) { + sp->client_sock_ids[i] = clientfd; + sp->new_client_id = i; + break; + } + } - ipc_size = mq_timedreceive(ipc->mq, mailbox, IPC3_MAX_MSG_SIZE, NULL, &ts); - if (ipc_size < 0) { - SNDERR("error: can't read IPC message queue %s : %s\n", - ipc->queue_name, strerror(errno)); - return -errno; + /* create a new thread for each new IPC client */ + if (pthread_create(&thread_id, NULL, handle_ipc_client, sp) != 0) { + fprintf(sp->log, "Client IPC thread creation failed"); + close(clientfd); + sp->client_sock_ids[i] = 0; + continue; + } + + fprintf(sp->log, "Client IPC thread created client id %d\n", clientfd); + /* Detach the thread */ + pthread_detach(thread_id); } - /* do the message work */ - //printf("cmd got IPC %ld reply bytes\n", ipc_size); - if (rlen && reply) - memcpy(reply, mailbox, rlen); + close(ipc_socket->socket_fd); + fprintf(sp->log, "***sof-pipe: IPC %s thread finished !!\n", ipc_socket->path); return 0; } diff --git a/tools/plugin/pipe/main.c b/tools/plugin/pipe/main.c index 3d9c831dba0b..68861fc0af2a 100644 --- a/tools/plugin/pipe/main.c +++ b/tools/plugin/pipe/main.c @@ -19,7 +19,6 @@ #include #include #include -#include #include #include #include @@ -53,15 +52,12 @@ static void shutdown(struct sof_pipe *sp) pthread_cancel(pd->ipc_thread); pthread_cancel(pd->pcm_thread); - plug_mq_free(&pd->ipc_tx_mq); - plug_mq_free(&pd->ipc_rx_mq); plug_lock_free(&pd->ready); plug_lock_free(&pd->done); } /* free the sof-pipe IPC tx/rx message queues */ - plug_mq_free(&sp->ipc_tx_mq); - plug_mq_free(&sp->ipc_rx_mq); + plug_socket_free(&sp->ipc_socket); pthread_mutex_destroy(&sp->ipc_lock); @@ -211,37 +207,13 @@ void plug_shm_free(struct plug_shm_desc *shm) shm_unlink(shm->name); } -/* - * Create and open a new message queue using the IPC object. - */ -int plug_mq_create(struct plug_mq_desc *ipc) -{ - /* delete any old stale resources that use our resource name */ - mq_unlink(ipc->queue_name); - - memset(&ipc->attr, 0, sizeof(ipc->attr)); - ipc->attr.mq_msgsize = IPC3_MAX_MSG_SIZE; - ipc->attr.mq_maxmsg = 1; - - /* now open new queue for Tx/Rx */ - ipc->mq = mq_open(ipc->queue_name, O_CREAT | O_RDWR | O_EXCL, - S_IRWXU | S_IRWXG, &ipc->attr); - if (ipc->mq < 0) { - fprintf(stderr, "failed to create IPC queue %s: %s\n", - ipc->queue_name, strerror(errno)); - return -errno; - } - - return 0; -} /* * Free and delete message queue resources in IPC object. */ -void plug_mq_free(struct plug_mq_desc *ipc) +void plug_socket_free(struct plug_socket_desc *ipc) { - mq_close(ipc->mq); - mq_unlink(ipc->queue_name); + unlink(ipc->path); } /* @@ -359,16 +331,12 @@ int main(int argc, char *argv[], char *env[]) /* sofpipe is now ready */ sp.glb->state = SOF_PLUGIN_STATE_INIT; - ret = plug_mq_init(&sp.ipc_tx_mq, "sof", "ipc-tx", 0); - if (ret < 0) - goto out; - - ret = plug_mq_init(&sp.ipc_rx_mq, "sof", "ipc-rx", 0); + ret = plug_socket_path_init(&sp.ipc_socket, "sof", "ipc", 0); if (ret < 0) goto out; /* now process IPCs as they arrive from plugins */ - ret = pipe_ipc_process(&sp, &sp.ipc_tx_mq, &sp.ipc_rx_mq); + ret = pipe_ipc_process(&sp, &sp.ipc_socket); out: fprintf(sp.log, "shutdown main\n"); diff --git a/tools/plugin/pipe/pipe.h b/tools/plugin/pipe/pipe.h index 3cc95bf8401e..8f8fb26080ea 100644 --- a/tools/plugin/pipe/pipe.h +++ b/tools/plugin/pipe/pipe.h @@ -10,7 +10,6 @@ #include #include -#include #include #include @@ -29,8 +28,6 @@ struct pipethread_data { pthread_t pcm_thread; pthread_t ipc_thread; struct sof_pipe *sp; - struct plug_mq_desc ipc_tx_mq; - struct plug_mq_desc ipc_rx_mq; struct pipeline *pcm_pipeline; /* PCM flow control */ struct plug_sem_desc ready; @@ -72,8 +69,10 @@ struct sof_pipe { /* IPC */ pthread_t ipc_pcm_thread; - struct plug_mq_desc ipc_tx_mq; /* queue used by plugin to send IPCs */ - struct plug_mq_desc ipc_rx_mq; /* queue used by plugin to receive the IPC response */ + struct plug_socket_desc ipc_socket; /* queue used by plugin to send IPCs */ + int client_sock_ids[MAX_IPC_CLIENTS]; + int client_count; + int new_client_id; /* module SO handles */ struct sof_pipe_module module[MAX_MODULE_ID]; @@ -100,7 +99,7 @@ int pipe_set_rt(struct sof_pipe *sp); /* set ipc thread to low priority */ int pipe_set_ipc_lowpri(struct sof_pipe *sp); -int pipe_ipc_process(struct sof_pipe *sp, struct plug_mq_desc *tx_mq, struct plug_mq_desc *rx_mq); +int pipe_ipc_process(struct sof_pipe *sp, struct plug_socket_desc *ipc_socket); int pipe_ipc_new(struct sof_pipe *sp, int pri, int core); void pipe_ipc_free(struct sof_pipe *sp); diff --git a/tools/plugin/pipe/pipeline.c b/tools/plugin/pipe/pipeline.c index 831b60a3b6a9..2985e46d04ad 100644 --- a/tools/plugin/pipe/pipeline.c +++ b/tools/plugin/pipe/pipeline.c @@ -17,7 +17,6 @@ #include #include #include -#include #include #include #include @@ -181,12 +180,6 @@ static void *pipe_ipc_process_thread(void *arg) return NULL; } - err = pipe_ipc_process(pd->sp, &pd->ipc_tx_mq, &pd->ipc_rx_mq); - if (err < 0) { - fprintf(_sp->log, "pipe IPC thread error for pipeline %d\n", - pd->pcm_pipeline->pipeline_id); - } - return NULL; } @@ -297,18 +290,6 @@ int pipe_thread_new(struct sof_pipe *sp, struct pipeline *p) pd->sp = _sp; pd->pcm_pipeline = p; - /* initialise global IPC data */ - /* TODO: change the PCM name to tplg or make it per PID*/ - ret = plug_mq_init(&pd->ipc_tx_mq, pd->sp->topology_name, "pcm-tx", p->pipeline_id); - if (ret < 0) - return -EINVAL; - mq_unlink(pd->ipc_tx_mq.queue_name); - - ret = plug_mq_init(&pd->ipc_rx_mq, pd->sp->topology_name, "pcm-rx", p->pipeline_id); - if (ret < 0) - return -EINVAL; - mq_unlink(pd->ipc_rx_mq.queue_name); - /* init names of shared resources */ ret = plug_lock_init(&pd->ready, _sp->topology_name, "ready", p->pipeline_id); if (ret < 0) @@ -367,11 +348,6 @@ int pipe_thread_free(struct sof_pipe *sp, int pipeline_id) return -errno; } - plug_mq_free(&pd->ipc_tx_mq); - mq_unlink(pd->ipc_tx_mq.queue_name); - plug_mq_free(&pd->ipc_rx_mq); - mq_unlink(pd->ipc_rx_mq.queue_name); - plug_lock_free(&pd->ready); plug_lock_free(&pd->done);