From 7c964fbf85ec25b375eaf05faa6ed8a4e3fae07b Mon Sep 17 00:00:00 2001 From: Ranjani Sridharan Date: Thu, 6 Mar 2025 16:14:03 -0800 Subject: [PATCH 1/3] tools: plugin: tplg: Initialize IPC message to avoid garbage values in the IPC data. Signed-off-by: Ranjani Sridharan --- tools/plugin/alsaplug/tplg.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/plugin/alsaplug/tplg.c b/tools/plugin/alsaplug/tplg.c index b3f04f4ee456..8fd70c9a6798 100644 --- a/tools/plugin/alsaplug/tplg.c +++ b/tools/plugin/alsaplug/tplg.c @@ -944,7 +944,7 @@ static int plug_set_up_widget_ipc(snd_sof_plug_t *plug, struct tplg_comp_info *c static int plug_set_up_pipeline(snd_sof_plug_t *plug, struct tplg_pipeline_info *pipe_info) { - struct ipc4_pipeline_create msg; + struct ipc4_pipeline_create msg = {{ 0 }}; struct ipc4_message_reply reply; int ret; From cb92bf13dc0aaf42208b697c0f38e0c9a954281d Mon Sep 17 00:00:00 2001 From: Ranjani Sridharan Date: Thu, 6 Mar 2025 16:21:01 -0800 Subject: [PATCH 2/3] tools: plugin: ctl: Fix tlv read for volume controls Report back the DBscale values in the tlv read callback for volume controls. Signed-off-by: Ranjani Sridharan --- tools/plugin/alsaplug/ctl.c | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tools/plugin/alsaplug/ctl.c b/tools/plugin/alsaplug/ctl.c index f45bf4f9b0e1..edcc087143e6 100644 --- a/tools/plugin/alsaplug/ctl.c +++ b/tools/plugin/alsaplug/ctl.c @@ -644,6 +644,7 @@ static int plug_ctl_write_bytes(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, return 0; } +/* TLV ops used for TLV bytes control callback */ /* TLV ops used for TLV bytes control callback */ static int plug_tlv_rw(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, int op_flag, unsigned int numid, unsigned int *tlv, unsigned int tlv_size) @@ -653,6 +654,21 @@ static int plug_tlv_rw(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, int op_flag, struct sof_abi_hdr *abi = (struct sof_abi_hdr *)(tlv + 2); /* skip TLV header */ int data_size; + /* only bytes and volume controls have tlv callback set */ + if (bytes_ctl->hdr.ops.info != SND_SOC_TPLG_CTL_BYTES) { + struct snd_soc_tplg_mixer_control *mixer_ctl = CTL_GET_TPLG_MIXER(ctl, key); + struct snd_soc_tplg_ctl_hdr *hdr = &mixer_ctl->hdr; + struct snd_soc_tplg_ctl_tlv *mixer_tlv = &hdr->tlv; + + /* set the dbscale values */ + tlv[0] = SND_CTL_TLVT_DB_SCALE; + tlv[1] = sizeof(int) * 2; + tlv[2] = mixer_tlv->scale.min; + tlv[3] = mixer_tlv->scale.mute << 16 | mixer_tlv->scale.step; + + return 0; + } + /* send IPC with kcontrol data if op_flag is > 0 else send IPC to get kcontrol data */ if (op_flag) { int err; From 32ddca848f807cde10b34df5fd57f6dd473d4908 Mon Sep 17 00:00:00 2001 From: Ranjani Sridharan Date: Thu, 6 Mar 2025 16:31:19 -0800 Subject: [PATCH 3/3] tools: plugin: Replace mqueue with unix domain socket Replace the mqueues with unix domain ocket for IPC communication between the plugin and sof-pipe. Since socket are full-duplex, a single socket is sufficient to handle the IPC communication. They are also faster and require no queue management unlike with mqueues. Also, unix domain sockets aloow sending larger data sizes. Signed-off-by: Ranjani Sridharan --- tools/plugin/alsaplug/CMakeLists.txt | 4 +- tools/plugin/alsaplug/ctl.c | 56 ++---- tools/plugin/alsaplug/pcm.c | 79 ++------- tools/plugin/alsaplug/plugin.c | 22 --- tools/plugin/alsaplug/plugin.h | 3 +- tools/plugin/alsaplug/tplg.c | 20 +-- tools/plugin/alsaplug/tplg_ctl.c | 1 - tools/plugin/common.c | 254 ++++++++++++++++++++++----- tools/plugin/common.h | 34 ++-- tools/plugin/pipe/CMakeLists.txt | 2 +- tools/plugin/pipe/ipc4.c | 147 +++++++--------- tools/plugin/pipe/main.c | 42 +---- tools/plugin/pipe/pipe.h | 11 +- tools/plugin/pipe/pipeline.c | 24 --- 14 files changed, 333 insertions(+), 366 deletions(-) diff --git a/tools/plugin/alsaplug/CMakeLists.txt b/tools/plugin/alsaplug/CMakeLists.txt index eddcc0759220..d60104d8245d 100644 --- a/tools/plugin/alsaplug/CMakeLists.txt +++ b/tools/plugin/alsaplug/CMakeLists.txt @@ -17,7 +17,7 @@ target_include_directories(asound_module_pcm_sof PRIVATE ${sof_source_directory}/src/audio) target_compile_options(asound_module_pcm_sof PRIVATE -DPIC -g -O3 -Wmissing-prototypes - -Wimplicit-fallthrough -DCONFIG_LIBRARY -imacros${config_h}) + -Wno-stringop-truncation -Wimplicit-fallthrough -DCONFIG_LIBRARY -imacros${config_h}) install(TARGETS asound_module_pcm_sof DESTINATION /usr/lib/x86_64-linux-gnu/alsa-lib) @@ -49,7 +49,7 @@ target_include_directories(asound_module_ctl_sof PRIVATE ${sof_source_directory}/src/audio) target_compile_options(asound_module_ctl_sof PRIVATE -DPIC -g -O3 -Wmissing-prototypes - -Wimplicit-fallthrough -Wall -Werror -DCONFIG_LIBRARY -imacros${config_h}) + -Wimplicit-fallthrough -Wno-stringop-truncation -Wall -Werror -DCONFIG_LIBRARY -imacros${config_h}) install(TARGETS asound_module_ctl_sof DESTINATION /usr/lib/x86_64-linux-gnu/alsa-lib) diff --git a/tools/plugin/alsaplug/ctl.c b/tools/plugin/alsaplug/ctl.c index edcc087143e6..85a31177cfca 100644 --- a/tools/plugin/alsaplug/ctl.c +++ b/tools/plugin/alsaplug/ctl.c @@ -31,8 +31,7 @@ typedef struct snd_sof_ctl { struct plug_shm_glb_state *glb; snd_ctl_ext_t ext; - struct plug_mq_desc ipc_tx; - struct plug_mq_desc ipc_rx; + struct plug_socket_desc ipc; struct plug_shm_desc shm_ctx; int subscribed; int updated[MAX_CTLS]; @@ -232,8 +231,7 @@ static int plug_ctl_read_integer(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, long /* send the IPC message */ memcpy(msg, &config, sizeof(config)); - err = plug_mq_cmd_tx_rx(&ctl->ipc_tx, &ctl->ipc_rx, - msg, size, reply_data, reply_data_size); + err = plug_ipc_cmd_tx_rx(&ctl->ipc, msg, size, reply_data, reply_data_size); free(msg); if (err < 0) { SNDERR("failed to set volume for control %s\n", mixer_ctl->hdr.name); @@ -325,8 +323,7 @@ static int plug_ctl_write_integer(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, lon memcpy(msg + sizeof(config), &volume, sizeof(volume)); /* send the message and check status */ - err = plug_mq_cmd_tx_rx(&ctl->ipc_tx, &ctl->ipc_rx, - msg, size, &reply, sizeof(reply)); + err = plug_ipc_cmd_tx_rx(&ctl->ipc, msg, size, &reply, sizeof(reply)); free(msg); if (err < 0) { SNDERR("failed to set volume control %s\n", mixer_ctl->hdr.name); @@ -426,8 +423,7 @@ static int plug_ctl_read_enumerated(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, /* send the IPC message */ memcpy(msg, &config, sizeof(config)); - err = plug_mq_cmd_tx_rx(&ctl->ipc_tx, &ctl->ipc_rx, - msg, size, reply_data, reply_data_size); + err = plug_ipc_cmd_tx_rx(&ctl->ipc, msg, size, reply_data, reply_data_size); free(msg); if (err < 0) { SNDERR("failed to get enum items for control %s\n", enum_ctl->hdr.name); @@ -516,7 +512,7 @@ static int plug_ctl_write_enumerated(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, free(data); /* send the message and check status */ - err = plug_mq_cmd_tx_rx(&ctl->ipc_tx, &ctl->ipc_rx, msg, msg_size, &reply, sizeof(reply)); + err = plug_ipc_cmd_tx_rx(&ctl->ipc, msg, msg_size, &reply, sizeof(reply)); free(msg); if (err < 0) { SNDERR("failed to set enum control %s\n", enum_ctl->hdr.name); @@ -573,8 +569,7 @@ static int plug_ctl_get_bytes_data(snd_sof_ctl_t *ctl, snd_ctl_ext_key_t key, /* send the IPC message */ memcpy(msg, &config, sizeof(config)); - err = plug_mq_cmd_tx_rx(&ctl->ipc_tx, &ctl->ipc_rx, - msg, size, reply_data, reply_data_size); + err = plug_ipc_cmd_tx_rx(&ctl->ipc, msg, size, reply_data, reply_data_size); free(msg); if (err < 0) { SNDERR("failed to get bytes data for control %s\n", bytes_ctl->hdr.name); @@ -633,9 +628,8 @@ static int plug_ctl_write_bytes(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, int err; /* send IPC with kcontrol data */ - err = plug_send_bytes_data(&ctl->ipc_tx, &ctl->ipc_rx, - ctl->glb->ctl[key].module_id, ctl->glb->ctl[key].instance_id, - abi); + err = plug_send_bytes_data(&ctl->ipc, ctl->glb->ctl[key].module_id, + ctl->glb->ctl[key].instance_id, abi); if (err < 0) { SNDERR("failed to set bytes data for control %s\n", bytes_ctl->hdr.name); return err; @@ -673,8 +667,7 @@ static int plug_tlv_rw(snd_ctl_ext_t *ext, snd_ctl_ext_key_t key, int op_flag, if (op_flag) { int err; - err = plug_send_bytes_data(&ctl->ipc_tx, &ctl->ipc_rx, - ctl->glb->ctl[key].module_id, + err = plug_send_bytes_data(&ctl->ipc, ctl->glb->ctl[key].module_id, ctl->glb->ctl[key].instance_id, abi); if (err < 0) { SNDERR("failed to set bytes data for control %s\n", bytes_ctl->hdr.name); @@ -747,6 +740,7 @@ static void plug_ctl_close(snd_ctl_ext_t *ext) snd_sof_ctl_t *ctl = ext->private_data; /* TODO: munmap */ + close(ctl->ipc.socket_fd); free(ctl); } @@ -793,34 +787,17 @@ SND_CTL_PLUGIN_DEFINE_FUNC(sof) goto error; } - /* init IPC tx message queue name */ - err = plug_mq_init(&ctl->ipc_tx, "sof", "ipc-tx", 0); + /* init IPC socket name */ + err = plug_socket_path_init(&ctl->ipc, "sof", "ipc", 0); if (err < 0) { SNDERR("error: invalid name for IPC tx mq %s\n", plug->tplg_file); goto error; } - /* open the sof-pipe IPC tx message queue */ - err = plug_mq_open(&ctl->ipc_tx); + err = plug_create_client_socket(&ctl->ipc); if (err < 0) { - SNDERR("error: failed to open sof-pipe IPC mq %s: %s", - ctl->ipc_tx.queue_name, strerror(err)); - goto error; - } - - /* init IPC rx message queue name */ - err = plug_mq_init(&ctl->ipc_rx, "sof", "ipc-rx", 0); - if (err < 0) { - SNDERR("error: invalid name for IPC rx mq %s\n", plug->tplg_file); - goto error; - } - - /* open the sof-pipe IPC rx message queue */ - err = plug_mq_open(&ctl->ipc_rx); - if (err < 0) { - SNDERR("error: failed to open sof-pipe IPC mq %s: %s", - ctl->ipc_rx.queue_name, strerror(err)); - goto error; + SNDERR("failed to connect to SOF pipe IPC socket : %s", strerror(err)); + return -errno; } /* create a SHM mapping for low latency stream position */ @@ -847,9 +824,6 @@ SND_CTL_PLUGIN_DEFINE_FUNC(sof) strncpy(ctl->ext.mixername, "SOF", sizeof(ctl->ext.mixername) - 1); - /* polling on message queue - supported on Linux but not portable */ - ctl->ext.poll_fd = ctl->ipc_tx.mq; - ctl->ext.callback = &sof_ext_callback; ctl->ext.private_data = ctl; ctl->ext.tlv.c = plug_tlv_rw; diff --git a/tools/plugin/alsaplug/pcm.c b/tools/plugin/alsaplug/pcm.c index 5ed7d832eaa8..ed5224c829b5 100644 --- a/tools/plugin/alsaplug/pcm.c +++ b/tools/plugin/alsaplug/pcm.c @@ -8,11 +8,12 @@ #include #include #include +#include +#include #include #include #include #include -#include #include #include #include @@ -42,10 +43,6 @@ typedef struct snd_sof_pcm { /* PCM flow control */ struct plug_sem_desc ready[TPLG_MAX_PCM_PIPELINES]; struct plug_sem_desc done[TPLG_MAX_PCM_PIPELINES]; - /* pipeline IPC tx queues */ - struct plug_mq_desc pipeline_ipc_tx[TPLG_MAX_PCM_PIPELINES]; - /* pipeline IPC response queues */ - struct plug_mq_desc pipeline_ipc_rx[TPLG_MAX_PCM_PIPELINES]; struct plug_shm_desc shm_pcm; @@ -55,15 +52,15 @@ typedef struct snd_sof_pcm { static int plug_pipeline_set_state(snd_sof_plug_t *plug, int state, struct ipc4_pipeline_set_state *pipe_state, struct tplg_pipeline_info *pipe_info, - struct plug_mq_desc *ipc_tx, struct plug_mq_desc *ipc_rx) + struct plug_socket_desc *ipc) { struct ipc4_message_reply reply = {{ 0 }}; int ret; pipe_state->primary.r.ppl_id = pipe_info->instance_id; - ret = plug_mq_cmd_tx_rx(ipc_tx, ipc_rx, pipe_state, sizeof(*pipe_state), - &reply, sizeof(reply)); + ret = plug_ipc_cmd_tx_rx(ipc, pipe_state, sizeof(*pipe_state), + &reply, sizeof(reply)); if (ret < 0) SNDERR("failed pipeline %d set state %d\n", pipe_info->instance_id, state); @@ -97,8 +94,7 @@ static int plug_pipelines_set_state(snd_sof_plug_t *plug, int state) int ret; ret = plug_pipeline_set_state(plug, state, &pipe_state, pipe_info, - &pcm->pipeline_ipc_tx[i], - &pcm->pipeline_ipc_rx[i]); + &plug->ipc); if (ret < 0) return ret; } @@ -111,7 +107,7 @@ static int plug_pipelines_set_state(snd_sof_plug_t *plug, int state) int ret; ret = plug_pipeline_set_state(plug, state, &pipe_state, pipe_info, - &pcm->pipeline_ipc_tx[i], &pcm->pipeline_ipc_rx[i]); + &plug->ipc); if (ret < 0) return ret; } @@ -482,38 +478,6 @@ static int plug_pcm_hw_params(snd_pcm_ioplug_t *io, snd_pcm_hw_params_t *params) for (i = 0; i < pipeline_list->count; i++) { struct tplg_pipeline_info *pipe_info = pipeline_list->pipelines[i]; - /* init IPC message queue name */ - err = plug_mq_init(&pcm->pipeline_ipc_tx[i], plug->tplg_file, "pcm-tx", - pipe_info->instance_id); - if (err < 0) { - SNDERR("error: invalid name for pipeline IPC mq %s\n", plug->tplg_file); - return err; - } - - /* open the sof-pipe IPC message queue */ - err = plug_mq_open(&pcm->pipeline_ipc_tx[i]); - if (err < 0) { - SNDERR("error: failed to open sof-pipe IPC mq %s: %s", - pcm->pipeline_ipc_tx[i].queue_name, strerror(err)); - return -errno; - } - - /* init IPC message queue name */ - err = plug_mq_init(&pcm->pipeline_ipc_rx[i], plug->tplg_file, "pcm-rx", - pipe_info->instance_id); - if (err < 0) { - SNDERR("error: invalid name for pipeline IPC mq %s\n", plug->tplg_file); - return err; - } - - /* open the sof-pipe IPC message queue */ - err = plug_mq_open(&pcm->pipeline_ipc_rx[i]); - if (err < 0) { - SNDERR("error: failed to open sof-pipe IPC mq %s: %s", - pcm->pipeline_ipc_tx[i].queue_name, strerror(err)); - return -errno; - } - /* init name for pipeline ready lock */ err = plug_lock_init(&pcm->ready[i], plug->tplg_file, "ready", pipe_info->instance_id); @@ -671,8 +635,6 @@ static int plug_pcm_hw_free(snd_pcm_ioplug_t *io) pipeline_list = &plug->pcm_info->playback_pipeline_list; ret = plug_free_pipelines(plug, pipeline_list, pcm->capture); - if (ret < 0) - return ret; close(pcm->shm_pcm.fd); close(plug->glb_ctx.fd); @@ -680,11 +642,10 @@ static int plug_pcm_hw_free(snd_pcm_ioplug_t *io) for (i = 0; i < pipeline_list->count; i++) { struct tplg_pipeline_info *pipe_info = pipeline_list->pipelines[i]; - mq_close(pcm->pipeline_ipc_tx[pipe_info->instance_id].mq); - mq_close(pcm->pipeline_ipc_rx[pipe_info->instance_id].mq); sem_close(pcm->ready[pipe_info->instance_id].sem); sem_close(pcm->done[pipe_info->instance_id].sem); } + close(plug->ipc.socket_fd); return 0; } @@ -957,31 +918,15 @@ static int plug_init_sof_pipe(snd_sof_plug_t *plug, snd_pcm_t **pcmp, fprintf(stdout, "topology parsing complete\n"); /* init IPC message queue name */ - err = plug_mq_init(&plug->ipc_tx, "sof", "ipc-tx", 0); - if (err < 0) { - SNDERR("error: invalid name for IPC mq %s\n", plug->tplg_file); - return err; - } - - /* open the sof-pipe IPC message queue */ - err = plug_mq_open(&plug->ipc_tx); - if (err < 0) { - SNDERR("error: failed to open sof-pipe IPC mq %s: %s", - plug->ipc_tx.queue_name, strerror(err)); - return -errno; - } - - err = plug_mq_init(&plug->ipc_rx, "sof", "ipc-rx", 0); + err = plug_socket_path_init(&plug->ipc, "sof", "ipc", 0); if (err < 0) { - SNDERR("error: invalid name for IPC mq %s\n", plug->tplg_file); + SNDERR("error: invalid name for IPC socket %s\n", plug->tplg_file); return err; } - /* open the sof-pipe IPC message queue */ - err = plug_mq_open(&plug->ipc_rx); + err = plug_create_client_socket(&plug->ipc); if (err < 0) { - SNDERR("error: failed to open sof-pipe IPC mq %s: %s", - plug->ipc_rx.queue_name, strerror(err)); + SNDERR("failed to connect to SOF pipe IPC socket : %s", strerror(err)); return -errno; } diff --git a/tools/plugin/alsaplug/plugin.c b/tools/plugin/alsaplug/plugin.c index deb767a4133f..52bbdcfc70e1 100644 --- a/tools/plugin/alsaplug/plugin.c +++ b/tools/plugin/alsaplug/plugin.c @@ -29,7 +29,6 @@ #include #include #include -#include #include #include #include @@ -41,27 +40,6 @@ #include "plugin.h" #include "common.h" -int plug_mq_cmd(struct plug_mq_desc *ipc, void *msg, size_t len, void *reply, size_t rlen) -{ - return plug_mq_cmd_tx_rx(ipc, ipc, msg, len, reply, rlen); -} - -/* - * Open an existing message queue using IPC object. - */ -int plug_mq_open(struct plug_mq_desc *ipc) -{ - /* now open new queue for Tx and Rx */ - ipc->mq = mq_open(ipc->queue_name, O_RDWR); - if (ipc->mq < 0) { - // SNDERR("failed to open IPC queue %s: %s\n", - // ipc->queue_name, strerror(errno)); - return -errno; - } - - return 0; -} - /* * Open an existing semaphore using lock object. */ diff --git a/tools/plugin/alsaplug/plugin.h b/tools/plugin/alsaplug/plugin.h index dcc6f550ff53..7f4009267d24 100644 --- a/tools/plugin/alsaplug/plugin.h +++ b/tools/plugin/alsaplug/plugin.h @@ -39,8 +39,7 @@ typedef struct snd_sof_plug { struct list_item pcm_list; struct list_item pipeline_list; int instance_ids[SND_SOC_TPLG_DAPM_LAST]; - struct plug_mq_desc ipc_tx; - struct plug_mq_desc ipc_rx; + struct plug_socket_desc ipc; struct plug_shm_desc glb_ctx; diff --git a/tools/plugin/alsaplug/tplg.c b/tools/plugin/alsaplug/tplg.c index 8fd70c9a6798..7b05b7ead50a 100644 --- a/tools/plugin/alsaplug/tplg.c +++ b/tools/plugin/alsaplug/tplg.c @@ -16,7 +16,6 @@ #include #include #include -#include #include #include #include @@ -926,8 +925,7 @@ static int plug_set_up_widget_ipc(snd_sof_plug_t *plug, struct tplg_comp_info *c memcpy(msg, module_init, sizeof(*module_init)); memcpy(msg + sizeof(*module_init), comp_info->ipc_payload, comp_info->ipc_size); - ret = plug_mq_cmd_tx_rx(&plug->ipc_tx, &plug->ipc_rx, - msg, size, &reply, sizeof(reply)); + ret = plug_ipc_cmd_tx_rx(&plug->ipc, msg, size, &reply, sizeof(reply)); free(msg); if (ret < 0) { SNDERR("error: can't set up widget %s\n", comp_info->name); @@ -955,8 +953,7 @@ static int plug_set_up_pipeline(snd_sof_plug_t *plug, struct tplg_pipeline_info msg.primary.r.instance_id = pipe_info->instance_id; msg.primary.r.ppl_mem_size = pipe_info->mem_usage; - ret = plug_mq_cmd_tx_rx(&plug->ipc_tx, &plug->ipc_rx, - &msg, sizeof(msg), &reply, sizeof(reply)); + ret = plug_ipc_cmd_tx_rx(&plug->ipc, &msg, sizeof(msg), &reply, sizeof(reply)); if (ret < 0) { SNDERR("error: can't set up pipeline %s\n", pipe_info->name); return ret; @@ -1113,8 +1110,7 @@ static int plug_set_up_route(snd_sof_plug_t *plug, struct tplg_route_info *route bu.extension.r.dst_queue = 0; bu.extension.r.src_queue = 0; - ret = plug_mq_cmd_tx_rx(&plug->ipc_tx, &plug->ipc_rx, - &bu, sizeof(bu), &reply, sizeof(reply)); + ret = plug_ipc_cmd_tx_rx(&plug->ipc, &bu, sizeof(bu), &reply, sizeof(reply)); if (ret < 0) { SNDERR("error: can't set up route %s -> %s\n", src_comp_info->name, sink_comp_info->name); @@ -1174,8 +1170,8 @@ static int plug_set_up_widget(snd_sof_plug_t *plug, struct tplg_comp_info *comp_ abi = (struct sof_abi_hdr *)ctl->data; /* send IPC with kcontrol data */ - ret = plug_send_bytes_data(&plug->ipc_tx, &plug->ipc_rx, - comp_info->module_id, comp_info->instance_id, abi); + ret = plug_send_bytes_data(&plug->ipc, comp_info->module_id, + comp_info->instance_id, abi); if (ret < 0) { SNDERR("failed to set bytes data for widget %s\n", comp_info->name); return ret; @@ -1339,8 +1335,7 @@ static int plug_delete_pipeline(snd_sof_plug_t *plug, struct tplg_pipeline_info msg.primary.r.rsp = SOF_IPC4_MESSAGE_DIR_MSG_REQUEST; msg.primary.r.instance_id = pipe_info->instance_id; - ret = plug_mq_cmd_tx_rx(&plug->ipc_tx, &plug->ipc_rx, - &msg, sizeof(msg), &reply, sizeof(reply)); + ret = plug_ipc_cmd_tx_rx(&plug->ipc, &msg, sizeof(msg), &reply, sizeof(reply)); if (ret < 0) { SNDERR("error: can't delete pipeline %s\n", pipe_info->name); return ret; @@ -1383,8 +1378,7 @@ static int plug_free_route(snd_sof_plug_t *plug, struct tplg_route_info *route_i bu.extension.r.dst_queue = 0; bu.extension.r.src_queue = 0; - ret = plug_mq_cmd_tx_rx(&plug->ipc_tx, &plug->ipc_rx, - &bu, sizeof(bu), &reply, sizeof(reply)); + ret = plug_ipc_cmd_tx_rx(&plug->ipc, &bu, sizeof(bu), &reply, sizeof(reply)); if (ret < 0) { SNDERR("error: can't set up route %s -> %s\n", src_comp_info->name, sink_comp_info->name); diff --git a/tools/plugin/alsaplug/tplg_ctl.c b/tools/plugin/alsaplug/tplg_ctl.c index 6424b54825ba..ad2709153806 100644 --- a/tools/plugin/alsaplug/tplg_ctl.c +++ b/tools/plugin/alsaplug/tplg_ctl.c @@ -13,7 +13,6 @@ #include #include #include -#include #include #include #include diff --git a/tools/plugin/common.c b/tools/plugin/common.c index 46629f061201..f20cb14e0b0e 100644 --- a/tools/plugin/common.c +++ b/tools/plugin/common.c @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include #include #include @@ -94,14 +96,10 @@ static const char *suffix_name(const char *longname) /* * Initialise the IPC object. */ -int plug_mq_init(struct plug_mq_desc *ipc, const char *tplg, const char *type, int index) +int plug_socket_path_init(struct plug_socket_desc *ipc, const char *tplg, const char *type, + int index) { - const char *name = suffix_name(tplg); - - if (!name) - return -EINVAL; - - snprintf(ipc->queue_name, NAME_SIZE, "/mq-%s-%s-%d", type, name, index); + snprintf(ipc->path, NAME_SIZE, "/tmp/%s-%s", tplg, type); return 0; } @@ -178,12 +176,45 @@ int plug_shm_open(struct plug_shm_desc *shm) return 0; } -int plug_mq_cmd_tx_rx(struct plug_mq_desc *ipc_tx, struct plug_mq_desc *ipc_rx, - void *msg, size_t len, void *reply, size_t rlen) +static int plug_socket_timed_wait(struct plug_socket_desc *ipc, fd_set *fds, int timeout_ms, + bool write) +{ + struct timeval timeout; + int result; + + /* Set the timeout for select */ + timeout.tv_sec = 0; + timeout.tv_usec = timeout_ms * 1000; + + /* now wait for socket to be readable/writable */ + if (write) + result = select(ipc->socket_fd + 1, NULL, fds, NULL, &timeout); + else + result = select(ipc->socket_fd + 1, fds, NULL, NULL, &timeout); + + if (result == -1) { + SNDERR("error waiting for socket to be %s\n", write ? "writable" : "readable"); + return result; + } + + if (result == 0) { + SNDERR("IPC Socket %s timeout\n", write ? "write" : "read"); + return -ETIMEDOUT; + } + + /* socket ready for read/write */ + if (FD_ISSET(ipc->socket_fd, fds)) + return 0; + + /* socket not ready */ + return -EINVAL; +} + +static int plug_ipc_cmd_tx(struct plug_socket_desc *ipc, void *msg, size_t len) { - struct timespec ts; - ssize_t ipc_size; + fd_set write_fds; char mailbox[IPC3_MAX_MSG_SIZE]; + ssize_t bytes; int err; if (len > IPC3_MAX_MSG_SIZE) { @@ -193,55 +224,90 @@ int plug_mq_cmd_tx_rx(struct plug_mq_desc *ipc_tx, struct plug_mq_desc *ipc_rx, memset(mailbox, 0, IPC3_MAX_MSG_SIZE); memcpy(mailbox, msg, len); - /* wait for sof-pipe reader to consume data or timeout */ - err = clock_gettime(CLOCK_REALTIME, &ts); - if (err == -1) { - SNDERR("ipc: cant get time: %s", strerror(errno)); + /* Wait for the socket to be writable */ + FD_ZERO(&write_fds); + FD_SET(ipc->socket_fd, &write_fds); + + err = plug_socket_timed_wait(ipc, &write_fds, 20, true); + if (err < 0) + return err; + + bytes = send(ipc->socket_fd, mailbox, IPC3_MAX_MSG_SIZE, 0); + if (bytes == -1) { + SNDERR("failed to send IPC message : %s\n", strerror(errno)); return -errno; } - /* IPCs should be read under 10ms */ - plug_timespec_add_ms(&ts, 10); + return bytes; +} + +static int plug_ipc_cmd_rx(struct plug_socket_desc *ipc, char mailbox[IPC3_MAX_MSG_SIZE]) +{ + fd_set read_fds; + int err; + + /* Wait for the socket to be readable */ + FD_ZERO(&read_fds); + FD_SET(ipc->socket_fd, &read_fds); + + err = plug_socket_timed_wait(ipc, &read_fds, 200, false); + if (err < 0) + return err; + + memset(mailbox, 0, IPC3_MAX_MSG_SIZE); + return recv(ipc->socket_fd, mailbox, IPC3_MAX_MSG_SIZE, 0); +} + +int plug_ipc_cmd_tx_rx(struct plug_socket_desc *ipc, void *msg, size_t len, void *reply, + size_t rlen) +{ + char mailbox[IPC3_MAX_MSG_SIZE]; + ssize_t bytes; + int err; - /* now return message completion status */ - err = mq_timedsend(ipc_tx->mq, mailbox, IPC3_MAX_MSG_SIZE, 0, &ts); - if (err == -1) { - SNDERR("error: timeout can't send IPC message queue %s : %s\n", - ipc_tx->queue_name, strerror(errno)); + /* send IPC message */ + bytes = plug_ipc_cmd_tx(ipc, msg, len); + if (bytes == -1) { + SNDERR("failed to send IPC message : %s\n", strerror(errno)); return -errno; } - /* wait for sof-pipe reader to consume data or timeout */ - err = clock_gettime(CLOCK_REALTIME, &ts); - if (err == -1) { - SNDERR("ipc: cant get time: %s", strerror(errno)); + /* wait for response */ + memset(mailbox, 0, IPC3_MAX_MSG_SIZE); + bytes = plug_ipc_cmd_rx(ipc, mailbox); + if (bytes == -1) { + SNDERR("failed to read IPC message reply %s\n", strerror(errno)); return -errno; } - /* IPCs should be processed under 20ms, but wait longer as - * some can take longer especially in valgrind - */ - plug_timespec_add_ms(&ts, 20); - - ipc_size = mq_timedreceive(ipc_rx->mq, mailbox, IPC3_MAX_MSG_SIZE, NULL, &ts); - if (ipc_size == -1) { - //fprintf(stderr, "dbg: timeout can't read IPC message queue %s : %s retrying\n", - // ipc->queue_name, strerror(errno)); + /* no response or connection lost, try to restablish connection */ + if (bytes == 0) { + close(ipc->socket_fd); + err = plug_create_client_socket(ipc); + if (err < 0) { + SNDERR("failed to reestablish connection to SOF pipe IPC socket : %s", + strerror(err)); + return -errno; + } - /* ok, its a long IPC or valgrind, wait longer */ - plug_timespec_add_ms(&ts, 800); + /* send IPC message again */ + bytes = plug_ipc_cmd_tx(ipc, msg, len); + if (bytes == -1) { + SNDERR("failed to send IPC message : %s\n", strerror(errno)); + return -errno; + } - ipc_size = mq_timedreceive(ipc_rx->mq, mailbox, IPC3_MAX_MSG_SIZE, NULL, &ts); - if (ipc_size == -1) { - SNDERR("error: timeout can't read IPC message queue %s : %s\n", - ipc_rx->queue_name, strerror(errno)); + /* wait for response */ + memset(mailbox, 0, IPC3_MAX_MSG_SIZE); + bytes = plug_ipc_cmd_rx(ipc, mailbox); + if (bytes == -1) { + SNDERR("failed to read IPC message reply %s\n", strerror(errno)); return -errno; } - /* needed for valgrind to complete MQ op before next client IPC */ - ts.tv_nsec = 20 * 1000 * 1000; - ts.tv_sec = 0; - nanosleep(&ts, NULL); + /* connection lost again, quit now */ + if (bytes == 0) + return -errno; } /* do the message work */ @@ -265,8 +331,8 @@ void plug_ctl_ipc_message(struct ipc4_module_large_config *config, int param_id, config->extension.r.large_param_id = param_id; } -int plug_send_bytes_data(struct plug_mq_desc *ipc_tx, struct plug_mq_desc *ipc_rx, - uint32_t module_id, uint32_t instance_id, struct sof_abi_hdr *abi) +int plug_send_bytes_data(struct plug_socket_desc *ipc, uint32_t module_id, uint32_t instance_id, + struct sof_abi_hdr *abi) { struct ipc4_module_large_config config = {{ 0 }}; struct ipc4_message_reply reply; @@ -292,7 +358,7 @@ int plug_send_bytes_data(struct plug_mq_desc *ipc_tx, struct plug_mq_desc *ipc_r memcpy(msg + sizeof(config), abi->data, abi->size); /* send the message and check status */ - err = plug_mq_cmd_tx_rx(ipc_tx, ipc_rx, msg, msg_size, &reply, sizeof(reply)); + err = plug_ipc_cmd_tx_rx(ipc, msg, msg_size, &reply, sizeof(reply)); free(msg); if (err < 0) { SNDERR("failed to send IPC to set bytes data\n"); @@ -306,3 +372,95 @@ int plug_send_bytes_data(struct plug_mq_desc *ipc_tx, struct plug_mq_desc *ipc_r return 0; } + +int plug_socket_create(struct plug_socket_desc *ipc) +{ + struct sockaddr_un addr; + int sockfd; + + /* Check if the socket path already exists */ + if (access(ipc->path, F_OK) != -1) { + /* If it exists, remove it */ + if (unlink(ipc->path) == -1) { + SNDERR("unlink previous socket file"); + return -EINVAL; + } + } + + /* Create the socket */ + sockfd = socket(AF_UNIX, SOCK_STREAM, 0); + if (sockfd == -1) { + SNDERR("failed to create new socket"); + return sockfd; + } + + ipc->socket_fd = sockfd; + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, ipc->path, sizeof(addr.sun_path) - 1); + + /* Bind the socket to the address */ + if (bind(sockfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { + SNDERR("failed to bind new socket for IPC path\n"); + close(sockfd); + return -EINVAL; + } + + if (listen(sockfd, MAX_IPC_CLIENTS) == -1) { + SNDERR("failed to listen on socket for IPC\n"); + return -EINVAL; + } + + return 0; +} + +static int set_socket_nonblocking(int sockfd) +{ + int flags = fcntl(sockfd, F_GETFL, 0); + + if (flags == -1) { + SNDERR("fcntl(F_GETFL) failed"); + return -EINVAL; + } + + flags |= O_NONBLOCK; + if (fcntl(sockfd, F_SETFL, flags) == -1) { + SNDERR("fcntl(F_SETFL) failed"); + return -EINVAL; + } + + return 0; +} + +int plug_create_client_socket(struct plug_socket_desc *ipc) +{ + struct sockaddr_un addr; + int sockfd; + + sockfd = socket(AF_UNIX, SOCK_STREAM, 0); + if (sockfd == -1) { + SNDERR("error: failed to create sof-pipe IPC socket\n"); + return sockfd; + } + + if (set_socket_nonblocking(sockfd) < 0) { + close(sockfd); + return -EINVAL; + } + ipc->socket_fd = sockfd; + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, ipc->path, sizeof(addr.sun_path) - 1); + + /* Connect to the server (non-blocking) */ + if (connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { + if (errno != EINPROGRESS) { + SNDERR("failed to connect to ipc socket"); + return -errno; + } + } + + return sockfd; +} diff --git a/tools/plugin/common.h b/tools/plugin/common.h index 002f3843c3e7..a64100c397cc 100644 --- a/tools/plugin/common.h +++ b/tools/plugin/common.h @@ -9,7 +9,6 @@ #define __SOF_PLUGIN_COMMON_H__ #include -#include #include #include #include @@ -39,6 +38,8 @@ #define NUM_EP_CONFIGS 8 +#define MAX_IPC_CLIENTS 5 + /* * Run with valgrind * valgrind --trace-children=yes aplay -v -Dsof:blah.tplg,1,hw:1,2 -f dat /dev/zero @@ -161,11 +162,9 @@ struct plug_shm_desc { void *addr; }; -struct plug_mq_desc { - /* IPC message queue */ - mqd_t mq; - struct mq_attr attr; - char queue_name[NAME_SIZE]; +struct plug_socket_desc { + int socket_fd; + char path[NAME_SIZE]; }; struct plug_sem_desc { @@ -265,18 +264,13 @@ void plug_shm_free(struct plug_shm_desc *shm); /* * IPC */ -int plug_mq_create(struct plug_mq_desc *ipc); - -int plug_mq_open(struct plug_mq_desc *ipc); - -int plug_mq_init(struct plug_mq_desc *ipc, const char *tplg, const char *type, int index); +int plug_socket_path_init(struct plug_socket_desc *ipc, const char *tplg, const char *type, + int index); -void plug_mq_free(struct plug_mq_desc *ipc); +void plug_socket_free(struct plug_socket_desc *ipc); -int plug_mq_cmd(struct plug_mq_desc *ipc, void *msg, size_t len, void *reply, size_t rlen); - -int plug_mq_cmd_tx_rx(struct plug_mq_desc *ipc_tx, struct plug_mq_desc *ipc_rx, - void *msg, size_t len, void *reply, size_t rlen); +int plug_ipc_cmd_tx_rx(struct plug_socket_desc *ipc, void *msg, size_t len, void *reply, + size_t rlen); /* * Locking @@ -324,7 +318,11 @@ static inline void data_dump(void *vdata, size_t bytes) void plug_ctl_ipc_message(struct ipc4_module_large_config *config, int param_id, size_t size, uint32_t module_id, uint32_t instance_id, uint32_t type); -int plug_send_bytes_data(struct plug_mq_desc *ipc_tx, struct plug_mq_desc *ipc_rx, - uint32_t module_id, uint32_t instance_id, struct sof_abi_hdr *abi); +int plug_send_bytes_data(struct plug_socket_desc *ipc, uint32_t module_id, uint32_t instance_id, + struct sof_abi_hdr *abi); + +int plug_socket_create(struct plug_socket_desc *ipc); + +int plug_create_client_socket(struct plug_socket_desc *ipc); #endif diff --git a/tools/plugin/pipe/CMakeLists.txt b/tools/plugin/pipe/CMakeLists.txt index 80114d0f6d2f..244fb3199e38 100644 --- a/tools/plugin/pipe/CMakeLists.txt +++ b/tools/plugin/pipe/CMakeLists.txt @@ -15,7 +15,7 @@ target_include_directories(sof-pipe PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/.. ${sof_source_directory}/src/audio) -target_compile_options(sof-pipe PRIVATE -DPIC -g -O3 -Wall -Werror -DCONFIG_LIBRARY -imacros${config_h}) +target_compile_options(sof-pipe PRIVATE -DPIC -g -O3 -Wall -Werror -Wno-stringop-truncation -DCONFIG_LIBRARY -imacros${config_h}) target_include_directories(sof-pipe PRIVATE ${sof_install_directory}/include) target_include_directories(sof-pipe PRIVATE ${parser_install_dir}/include) diff --git a/tools/plugin/pipe/ipc4.c b/tools/plugin/pipe/ipc4.c index 632c161015b4..90157971d05e 100644 --- a/tools/plugin/pipe/ipc4.c +++ b/tools/plugin/pipe/ipc4.c @@ -14,12 +14,12 @@ #include #include #include +#include #include #include #include #include #include -#include #include #include #include @@ -311,58 +311,36 @@ int pipe_ipc_do(struct sof_pipe *sp, void *mailbox, size_t bytes) return err; } -int pipe_ipc_process(struct sof_pipe *sp, struct plug_mq_desc *tx_mq, struct plug_mq_desc *rx_mq) +static void *handle_ipc_client(void *data) { - ssize_t ipc_size; + struct sof_pipe *sp = data; + struct plug_socket_desc *ipc_socket = &sp->ipc_socket; + struct timespec ts; + int client_id = sp->new_client_id; + int clientfd = sp->client_sock_ids[client_id]; char mailbox[IPC3_MAX_MSG_SIZE] = {0}; + ssize_t ipc_size; int err; - struct timespec ts; - /* IPC thread should not preempt processing thread */ - err = pipe_set_ipc_lowpri(sp); - if (err < 0) - fprintf(sp->log, "error: cant set PCM IPC thread to low priority"); - - /* create the IPC message queue */ - err = plug_mq_create(tx_mq); - if (err < 0) { - fprintf(sp->log, "error: can't create TX IPC message queue : %s\n", - strerror(errno)); - return -errno; - } - - /* create the IPC message queue */ - err = plug_mq_create(rx_mq); - if (err < 0) { - fprintf(sp->log, "error: can't create PCM IPC message queue : %s\n", - strerror(errno)); - return -errno; - } - - /* let main() know we are ready */ - fprintf(sp->log, "sof-pipe: IPC TX %s thread ready\n", tx_mq->queue_name); - fprintf(sp->log, "sof-pipe: IPC RX %s thread ready\n", rx_mq->queue_name); - - /* main PCM IPC handling loop */ while (1) { memset(mailbox, 0, IPC3_MAX_MSG_SIZE); - /* is client dead ? */ - if (sp->glb->state == SOF_PLUGIN_STATE_DEAD) { - fprintf(sp->log, "sof-pipe: IPC %s client complete\n", tx_mq->queue_name); + ipc_size = recv(clientfd, mailbox, IPC3_MAX_MSG_SIZE, 0); + if (ipc_size < 0) { + fprintf(sp->log, "error: can't read IPC socket %s : %s\n", + ipc_socket->path, strerror(errno)); break; } - ipc_size = mq_receive(tx_mq->mq, mailbox, IPC3_MAX_MSG_SIZE, NULL); - if (ipc_size < 0) { - fprintf(sp->log, "error: can't read PCM IPC message queue %s : %s\n", - tx_mq->queue_name, strerror(errno)); + /* connection lost */ + if (ipc_size == 0) { + close(clientfd); break; } /* TODO: properly validate message and continue if garbage */ if (*((uint32_t *)mailbox) == 0) { - fprintf(sp->log, "sof-pipe: IPC %s garbage read\n", tx_mq->queue_name); + fprintf(sp->log, "sof-pipe: IPC %s garbage read\n", ipc_socket->path); ts.tv_sec = 0; ts.tv_nsec = 20 * 1000 * 1000; /* 20 ms */ nanosleep(&ts, NULL); @@ -371,77 +349,78 @@ int pipe_ipc_process(struct sof_pipe *sp, struct plug_mq_desc *tx_mq, struct plu /* do the message work */ //data_dump(mailbox, IPC3_MAX_MSG_SIZE); - err = pipe_ipc_do(sp, mailbox, ipc_size); if (err < 0) fprintf(sp->log, "error: local IPC processing failed\n"); /* now return message completion status found in mailbox */ - err = mq_send(rx_mq->mq, mailbox, IPC3_MAX_MSG_SIZE, 0); + err = send(clientfd, mailbox, IPC3_MAX_MSG_SIZE, 0); if (err < 0) { - fprintf(sp->log, "error: can't send PCM IPC message queue %s : %s\n", - rx_mq->queue_name, strerror(errno)); + close(clientfd); break; } } - fprintf(sp->log, "***sof-pipe: IPC %s thread finished !!\n", tx_mq->queue_name); - return 0; + close(clientfd); + sp->client_sock_ids[client_id] = 0; + return NULL; } -int plug_mq_cmd(struct plug_mq_desc *ipc, void *msg, size_t len, void *reply, size_t rlen) +int pipe_ipc_process(struct sof_pipe *sp, struct plug_socket_desc *ipc_socket) { - struct timespec ts; - ssize_t ipc_size; - char mailbox[IPC3_MAX_MSG_SIZE]; + pthread_t thread_id; int err; - if (len > IPC3_MAX_MSG_SIZE) { - SNDERR("ipc: message too big %d\n", len); - return -EINVAL; - } - memset(mailbox, 0, IPC3_MAX_MSG_SIZE); - memcpy(mailbox, msg, len); + /* IPC thread should not preempt processing thread */ + err = pipe_set_ipc_lowpri(sp); + if (err < 0) + fprintf(sp->log, "error: cant set PCM IPC thread to low priority"); - /* wait for sof-pipe reader to consume data or timeout */ - err = clock_gettime(CLOCK_REALTIME, &ts); - if (err == -1) { - SNDERR("ipc: cant get time: %s", strerror(errno)); + /* create the IPC socket */ + err = plug_socket_create(ipc_socket); + if (err < 0) { + fprintf(sp->log, "error: can't create TX IPC socket : %s\n", + strerror(errno)); return -errno; } - /* IPCs should be read under 10ms */ - plug_timespec_add_ms(&ts, 10); + /* let main() know we are ready */ + fprintf(sp->log, "sof-pipe: IPC %s socket ready\n", ipc_socket->path); - /* now return message completion status */ - err = mq_timedsend(ipc->mq, mailbox, IPC3_MAX_MSG_SIZE, 0, &ts); - if (err < 0) { - SNDERR("error: can't send IPC message queue %s : %s\n", - ipc->queue_name, strerror(errno)); - return -errno; - } + /* main PCM IPC handling loop */ + while (1) { + int clientfd, i; - /* wait for sof-pipe reader to consume data or timeout */ - err = clock_gettime(CLOCK_REALTIME, &ts); - if (err == -1) { - SNDERR("ipc: cant get time: %s", strerror(errno)); - return -errno; - } + /* Accept a connection from a client */ + clientfd = accept(ipc_socket->socket_fd, NULL, NULL); + if (clientfd == -1) { + fprintf(sp->log, "IPC %s socket accept failed\n", ipc_socket->path); + return -errno; + } - /* IPCs should be processed under 20ms */ - plug_timespec_add_ms(&ts, 20); + for (i = 0; i < MAX_IPC_CLIENTS; i++) { + if (!sp->client_sock_ids[i]) { + sp->client_sock_ids[i] = clientfd; + sp->new_client_id = i; + break; + } + } - ipc_size = mq_timedreceive(ipc->mq, mailbox, IPC3_MAX_MSG_SIZE, NULL, &ts); - if (ipc_size < 0) { - SNDERR("error: can't read IPC message queue %s : %s\n", - ipc->queue_name, strerror(errno)); - return -errno; + /* create a new thread for each new IPC client */ + if (pthread_create(&thread_id, NULL, handle_ipc_client, sp) != 0) { + fprintf(sp->log, "Client IPC thread creation failed"); + close(clientfd); + sp->client_sock_ids[i] = 0; + continue; + } + + fprintf(sp->log, "Client IPC thread created client id %d\n", clientfd); + /* Detach the thread */ + pthread_detach(thread_id); } - /* do the message work */ - //printf("cmd got IPC %ld reply bytes\n", ipc_size); - if (rlen && reply) - memcpy(reply, mailbox, rlen); + close(ipc_socket->socket_fd); + fprintf(sp->log, "***sof-pipe: IPC %s thread finished !!\n", ipc_socket->path); return 0; } diff --git a/tools/plugin/pipe/main.c b/tools/plugin/pipe/main.c index 3d9c831dba0b..68861fc0af2a 100644 --- a/tools/plugin/pipe/main.c +++ b/tools/plugin/pipe/main.c @@ -19,7 +19,6 @@ #include #include #include -#include #include #include #include @@ -53,15 +52,12 @@ static void shutdown(struct sof_pipe *sp) pthread_cancel(pd->ipc_thread); pthread_cancel(pd->pcm_thread); - plug_mq_free(&pd->ipc_tx_mq); - plug_mq_free(&pd->ipc_rx_mq); plug_lock_free(&pd->ready); plug_lock_free(&pd->done); } /* free the sof-pipe IPC tx/rx message queues */ - plug_mq_free(&sp->ipc_tx_mq); - plug_mq_free(&sp->ipc_rx_mq); + plug_socket_free(&sp->ipc_socket); pthread_mutex_destroy(&sp->ipc_lock); @@ -211,37 +207,13 @@ void plug_shm_free(struct plug_shm_desc *shm) shm_unlink(shm->name); } -/* - * Create and open a new message queue using the IPC object. - */ -int plug_mq_create(struct plug_mq_desc *ipc) -{ - /* delete any old stale resources that use our resource name */ - mq_unlink(ipc->queue_name); - - memset(&ipc->attr, 0, sizeof(ipc->attr)); - ipc->attr.mq_msgsize = IPC3_MAX_MSG_SIZE; - ipc->attr.mq_maxmsg = 1; - - /* now open new queue for Tx/Rx */ - ipc->mq = mq_open(ipc->queue_name, O_CREAT | O_RDWR | O_EXCL, - S_IRWXU | S_IRWXG, &ipc->attr); - if (ipc->mq < 0) { - fprintf(stderr, "failed to create IPC queue %s: %s\n", - ipc->queue_name, strerror(errno)); - return -errno; - } - - return 0; -} /* * Free and delete message queue resources in IPC object. */ -void plug_mq_free(struct plug_mq_desc *ipc) +void plug_socket_free(struct plug_socket_desc *ipc) { - mq_close(ipc->mq); - mq_unlink(ipc->queue_name); + unlink(ipc->path); } /* @@ -359,16 +331,12 @@ int main(int argc, char *argv[], char *env[]) /* sofpipe is now ready */ sp.glb->state = SOF_PLUGIN_STATE_INIT; - ret = plug_mq_init(&sp.ipc_tx_mq, "sof", "ipc-tx", 0); - if (ret < 0) - goto out; - - ret = plug_mq_init(&sp.ipc_rx_mq, "sof", "ipc-rx", 0); + ret = plug_socket_path_init(&sp.ipc_socket, "sof", "ipc", 0); if (ret < 0) goto out; /* now process IPCs as they arrive from plugins */ - ret = pipe_ipc_process(&sp, &sp.ipc_tx_mq, &sp.ipc_rx_mq); + ret = pipe_ipc_process(&sp, &sp.ipc_socket); out: fprintf(sp.log, "shutdown main\n"); diff --git a/tools/plugin/pipe/pipe.h b/tools/plugin/pipe/pipe.h index 3cc95bf8401e..8f8fb26080ea 100644 --- a/tools/plugin/pipe/pipe.h +++ b/tools/plugin/pipe/pipe.h @@ -10,7 +10,6 @@ #include #include -#include #include #include @@ -29,8 +28,6 @@ struct pipethread_data { pthread_t pcm_thread; pthread_t ipc_thread; struct sof_pipe *sp; - struct plug_mq_desc ipc_tx_mq; - struct plug_mq_desc ipc_rx_mq; struct pipeline *pcm_pipeline; /* PCM flow control */ struct plug_sem_desc ready; @@ -72,8 +69,10 @@ struct sof_pipe { /* IPC */ pthread_t ipc_pcm_thread; - struct plug_mq_desc ipc_tx_mq; /* queue used by plugin to send IPCs */ - struct plug_mq_desc ipc_rx_mq; /* queue used by plugin to receive the IPC response */ + struct plug_socket_desc ipc_socket; /* queue used by plugin to send IPCs */ + int client_sock_ids[MAX_IPC_CLIENTS]; + int client_count; + int new_client_id; /* module SO handles */ struct sof_pipe_module module[MAX_MODULE_ID]; @@ -100,7 +99,7 @@ int pipe_set_rt(struct sof_pipe *sp); /* set ipc thread to low priority */ int pipe_set_ipc_lowpri(struct sof_pipe *sp); -int pipe_ipc_process(struct sof_pipe *sp, struct plug_mq_desc *tx_mq, struct plug_mq_desc *rx_mq); +int pipe_ipc_process(struct sof_pipe *sp, struct plug_socket_desc *ipc_socket); int pipe_ipc_new(struct sof_pipe *sp, int pri, int core); void pipe_ipc_free(struct sof_pipe *sp); diff --git a/tools/plugin/pipe/pipeline.c b/tools/plugin/pipe/pipeline.c index 831b60a3b6a9..2985e46d04ad 100644 --- a/tools/plugin/pipe/pipeline.c +++ b/tools/plugin/pipe/pipeline.c @@ -17,7 +17,6 @@ #include #include #include -#include #include #include #include @@ -181,12 +180,6 @@ static void *pipe_ipc_process_thread(void *arg) return NULL; } - err = pipe_ipc_process(pd->sp, &pd->ipc_tx_mq, &pd->ipc_rx_mq); - if (err < 0) { - fprintf(_sp->log, "pipe IPC thread error for pipeline %d\n", - pd->pcm_pipeline->pipeline_id); - } - return NULL; } @@ -297,18 +290,6 @@ int pipe_thread_new(struct sof_pipe *sp, struct pipeline *p) pd->sp = _sp; pd->pcm_pipeline = p; - /* initialise global IPC data */ - /* TODO: change the PCM name to tplg or make it per PID*/ - ret = plug_mq_init(&pd->ipc_tx_mq, pd->sp->topology_name, "pcm-tx", p->pipeline_id); - if (ret < 0) - return -EINVAL; - mq_unlink(pd->ipc_tx_mq.queue_name); - - ret = plug_mq_init(&pd->ipc_rx_mq, pd->sp->topology_name, "pcm-rx", p->pipeline_id); - if (ret < 0) - return -EINVAL; - mq_unlink(pd->ipc_rx_mq.queue_name); - /* init names of shared resources */ ret = plug_lock_init(&pd->ready, _sp->topology_name, "ready", p->pipeline_id); if (ret < 0) @@ -367,11 +348,6 @@ int pipe_thread_free(struct sof_pipe *sp, int pipeline_id) return -errno; } - plug_mq_free(&pd->ipc_tx_mq); - mq_unlink(pd->ipc_tx_mq.queue_name); - plug_mq_free(&pd->ipc_rx_mq); - mq_unlink(pd->ipc_rx_mq.queue_name); - plug_lock_free(&pd->ready); plug_lock_free(&pd->done);