@@ -388,11 +388,11 @@ or False otherwise.
388
388
389
389
```python
390
390
def to_topic(
391
- topic: Topic,
391
+ topic: Union[ Topic, Callable[[Any, Any, int , Any], Topic]] ,
392
392
key: Optional[Callable[[Any], Any]] = None ) -> " StreamingDataFrame"
393
393
```
394
394
395
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L670 )
395
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L684 )
396
396
397
397
Produce current value to a topic. You can optionally specify a new key.
398
398
@@ -417,13 +417,30 @@ sdf = app.dataframe(input_topic)
417
417
sdf = sdf.to_topic(output_topic_0)
418
418
# does not require reassigning
419
419
sdf.to_topic(output_topic_1, key = lambda data : data[" a_field" ])
420
+
421
+ # Dynamic topic selection based on message content
422
+ def select_topic(value, key, timestamp, headers):
423
+ if value.get(" priority" ) == " high" :
424
+ return output_topic_0
425
+ else :
426
+ return output_topic_1
427
+
428
+ sdf = sdf.to_topic(select_topic)
420
429
```
421
430
422
431
423
432
< br>
424
433
** * Arguments:***
425
434
426
- - `topic` : instance of `Topic`
435
+ - `topic` : instance of `Topic` or a callable that returns a `Topic` .
436
+ If a callable is provided, it will receive four arguments:
437
+ value, key, timestamp, and headers of the current message.
438
+ The callable must return a `Topic` object .
439
+ ** Important** : We recommend declaring all `Topic` instances before
440
+ staring the application instead of creating them dynamically
441
+ within the passed callback. Creating topics dynamically can lead
442
+ to accidentally creating numerous topics and
443
+ saturating the broker' s partitions limits.
427
444
- `key` : a callable to generate a new message key, optional.
428
445
If passed, the return type of this callable must be serializable
429
446
by `key_serializer` defined for this Topic object .
@@ -446,7 +463,7 @@ def set_timestamp(
446
463
func: Callable[[Any, Any, int , Any], int ]) -> " StreamingDataFrame"
447
464
```
448
465
449
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L715 )
466
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L753 )
450
467
451
468
Set a new timestamp based on the current message value and its metadata.
452
469
@@ -499,7 +516,7 @@ def set_headers(
499
516
) -> " StreamingDataFrame"
500
517
```
501
518
502
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L758 )
519
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L796 )
503
520
504
521
Set new message headers based on the current message value and metadata.
505
522
@@ -548,7 +565,7 @@ a new StreamingDataFrame instance
548
565
def print (pretty: bool = True , metadata: bool = False ) -> " StreamingDataFrame"
549
566
```
550
567
551
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L809 )
568
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L847 )
552
569
553
570
Print out the current message value (and optionally, the message metadata) to
554
571
@@ -611,7 +628,7 @@ def print_table(
611
628
int ]] = None ) -> " StreamingDataFrame"
612
629
```
613
630
614
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L855 )
631
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L893 )
615
632
616
633
Print a table with the most recent records.
617
634
@@ -704,7 +721,7 @@ sdf.print_table(size=5, title="Live Records", slowdown=1)
704
721
def compose(sink: Optional[VoidExecutor] = None ) -> dict[str , VoidExecutor]
705
722
```
706
723
707
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L971 )
724
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1009 )
708
725
709
726
Compose all functions of this StreamingDataFrame into one big closure.
710
727
@@ -758,7 +775,7 @@ def test(value: Any,
758
775
topic: Optional[Topic] = None ) -> List[Any]
759
776
```
760
777
761
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1005 )
778
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1043 )
762
779
763
780
A shorthand to test `StreamingDataFrame` with provided value
764
781
@@ -798,7 +815,7 @@ def tumbling_window(
798
815
) -> TumblingTimeWindowDefinition
799
816
```
800
817
801
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1044 )
818
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1082 )
802
819
803
820
Create a time- based tumbling window transformation on this StreamingDataFrame.
804
821
@@ -890,7 +907,7 @@ def tumbling_count_window(
890
907
name: Optional[str ] = None ) -> TumblingCountWindowDefinition
891
908
```
892
909
893
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1133 )
910
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1171 )
894
911
895
912
Create a count- based tumbling window transformation on this StreamingDataFrame.
896
913
@@ -963,7 +980,7 @@ def hopping_window(
963
980
) -> HoppingTimeWindowDefinition
964
981
```
965
982
966
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1183 )
983
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1221 )
967
984
968
985
Create a time- based hopping window transformation on this StreamingDataFrame.
969
986
@@ -1066,7 +1083,7 @@ def hopping_count_window(
1066
1083
name: Optional[str ] = None ) -> HoppingCountWindowDefinition
1067
1084
```
1068
1085
1069
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1286 )
1086
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1324 )
1070
1087
1071
1088
Create a count- based hopping window transformation on this StreamingDataFrame.
1072
1089
@@ -1144,7 +1161,7 @@ def sliding_window(
1144
1161
) -> SlidingTimeWindowDefinition
1145
1162
```
1146
1163
1147
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1343 )
1164
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1381 )
1148
1165
1149
1166
Create a time- based sliding window transformation on this StreamingDataFrame.
1150
1167
@@ -1242,7 +1259,7 @@ def sliding_count_window(
1242
1259
name: Optional[str ] = None ) -> SlidingCountWindowDefinition
1243
1260
```
1244
1261
1245
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1438 )
1262
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1476 )
1246
1263
1247
1264
Create a count- based sliding window transformation on this StreamingDataFrame.
1248
1265
@@ -1312,7 +1329,7 @@ sdf = (
1312
1329
def fill(* columns: str , ** mapping: Any) -> " StreamingDataFrame"
1313
1330
```
1314
1331
1315
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1491 )
1332
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1529 )
1316
1333
1317
1334
Fill missing values in the message value with a constant value.
1318
1335
@@ -1369,7 +1386,7 @@ def drop(columns: Union[str, List[str]],
1369
1386
errors: Literal[" ignore" , " raise" ] = " raise" ) -> " StreamingDataFrame"
1370
1387
```
1371
1388
1372
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1543 )
1389
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1581 )
1373
1390
1374
1391
Drop column(s) from the message value (value must support `del ` , like a dict ).
1375
1392
@@ -1413,7 +1430,7 @@ a new StreamingDataFrame instance
1413
1430
def sink(sink: BaseSink)
1414
1431
```
1415
1432
1416
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1587 )
1433
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1625 )
1417
1434
1418
1435
Sink the processed data to the specified destination.
1419
1436
@@ -1441,7 +1458,7 @@ operations, but branches can still be generated from its originating SDF.
1441
1458
def concat(other: " StreamingDataFrame" ) -> " StreamingDataFrame"
1442
1459
```
1443
1460
1444
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1625 )
1461
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1663 )
1445
1462
1446
1463
Concatenate two StreamingDataFrames together and return a new one.
1447
1464
@@ -1482,7 +1499,7 @@ def join_asof(right: "StreamingDataFrame",
1482
1499
name: Optional[str ] = None ) -> " StreamingDataFrame"
1483
1500
```
1484
1501
1485
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1661 )
1502
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1699 )
1486
1503
1487
1504
Join the left dataframe with the records of the right dataframe with
1488
1505
@@ -1565,7 +1582,7 @@ def join_interval(
1565
1582
forward_ms: Union[int , timedelta] = 0 ) -> " StreamingDataFrame"
1566
1583
```
1567
1584
1568
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1737 )
1585
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1775 )
1569
1586
1570
1587
Join the left dataframe with records from the right dataframe that fall within
1571
1588
@@ -1668,7 +1685,7 @@ def join_lookup(
1668
1685
) -> " StreamingDataFrame"
1669
1686
```
1670
1687
1671
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1842 )
1688
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1880 )
1672
1689
1673
1690
Note: This is an experimental feature, and its API is likely to change in the future.
1674
1691
@@ -1729,7 +1746,7 @@ sdf = sdf.join_lookup(lookup, fields)
1729
1746
def register_store(store_type: Optional[StoreTypes] = None ) -> None
1730
1747
```
1731
1748
1732
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1931 )
1749
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1969 )
1733
1750
1734
1751
Register the default store for the current stream_id in StateStoreManager.
1735
1752
0 commit comments