Skip to content

Commit

Permalink
chore: polish types
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Dec 15, 2024
1 parent d1a2376 commit d958655
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 119 deletions.
130 changes: 65 additions & 65 deletions faststream/kafka/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,49 +667,47 @@ def _subscriber_setup_extra(self) -> "AnyDict":

@overload
async def publish(
self,
message: "SendableMessage",
topic: str = "",
*,
key: Union[bytes, Any, None] = None,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: Literal[True] = False,
) -> "asyncio.Future[RecordMetadata]":
...
self,
message: "SendableMessage",
topic: str = "",
*,
key: Union[bytes, Any, None] = None,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: Literal[True],
) -> "asyncio.Future[RecordMetadata]": ...

@overload
async def publish(
self,
message: "SendableMessage",
topic: str = "",
*,
key: Union[bytes, Any, None] = None,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: Literal[False] = False,
) -> "RecordMetadata":
...
self,
message: "SendableMessage",
topic: str = "",
*,
key: Union[bytes, Any, None] = None,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: Literal[False] = False,
) -> "RecordMetadata": ...

@override
async def publish(
self,
message: "SendableMessage",
topic: str = "",
*,
key: Union[bytes, Any, None] = None,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: bool = False,
self,
message: "SendableMessage",
topic: str = "",
*,
key: Union[bytes, Any, None] = None,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: bool = False,
) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
"""Publish message directly.
Expand Down Expand Up @@ -842,42 +840,44 @@ async def request( # type: ignore[override]

@overload
async def publish_batch(
self,
*messages: "SendableMessage",
topic: str = "",
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: Literal[True] = False,
self,
*messages: "SendableMessage",
topic: str = "",
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: Literal[True],
) -> "asyncio.Future[RecordMetadata]": ...

@overload
async def publish_batch(
self,
*messages: "SendableMessage",
topic: str = "",
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: Literal[False] = False,
self,
*messages: "SendableMessage",
topic: str = "",
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: Literal[False] = False,
) -> "RecordMetadata": ...

async def publish_batch(
self,
*messages: "SendableMessage",
topic: str = "",
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: bool = False,
self,
*messages: "SendableMessage",
topic: str = "",
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: bool = False,
) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
"""Args:
"""Publish a message batch as a single request to broker.
Args:
*messages:
Messages bodies to send.
topic:
Expand Down
2 changes: 1 addition & 1 deletion faststream/kafka/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async def disconnect(self) -> None:
await self._producer.stop()
self._producer = EmptyProducerState()

def __bool__(self) -> None:
def __bool__(self) -> bool:
return bool(self._producer)

@property
Expand Down
6 changes: 5 additions & 1 deletion faststream/kafka/publisher/state.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from abc import abstractmethod
from typing import TYPE_CHECKING, Protocol

from faststream.exceptions import IncorrectState
Expand All @@ -8,7 +9,10 @@

class ProducerState(Protocol):
producer: "AIOKafkaProducer"
closed: bool

@property
@abstractmethod
def closed(self) -> bool: ...

def __bool__(self) -> bool: ...

Expand Down
89 changes: 46 additions & 43 deletions faststream/kafka/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,32 +155,32 @@ def __init__(

@overload
async def publish(
self,
message: "SendableMessage",
topic: str = "",
*,
key: Union[bytes, Any, None] = None,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: Literal[True] = False,
self,
message: "SendableMessage",
topic: str = "",
*,
key: Union[bytes, Any, None] = None,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: Literal[True],
) -> "asyncio.Future[RecordMetadata]": ...

@overload
async def publish(
self,
message: "SendableMessage",
topic: str = "",
*,
key: Union[bytes, Any, None] = None,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: Literal[False] = False,
self,
message: "SendableMessage",
topic: str = "",
*,
key: Union[bytes, Any, None] = None,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
correlation_id: Optional[str] = None,
reply_to: str = "",
no_confirm: Literal[False] = False,
) -> "RecordMetadata": ...

@override
Expand All @@ -197,7 +197,9 @@ async def publish(
reply_to: str = "",
no_confirm: bool = False,
) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
"""Args:
"""Publishes a message to Kafka.
Args:
message:
Message body to send.
topic:
Expand Down Expand Up @@ -336,31 +338,30 @@ async def request(


class BatchPublisher(LogicPublisher[tuple["ConsumerRecord", ...]]):

@overload
async def publish(
self,
*messages: "SendableMessage",
topic: str = "",
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: Literal[True] = False,
self,
*messages: "SendableMessage",
topic: str = "",
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: Literal[True],
) -> "asyncio.Future[RecordMetadata]": ...

@overload
async def publish(
self,
*messages: "SendableMessage",
topic: str = "",
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: Literal[False] = False,
self,
*messages: "SendableMessage",
topic: str = "",
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
no_confirm: Literal[False] = False,
) -> "RecordMetadata": ...

@override
Expand All @@ -375,7 +376,9 @@ async def publish(
correlation_id: Optional[str] = None,
no_confirm: bool = False,
) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
"""Args:
"""Publish a message batch as a single request to broker.
Args:
*messages:
Messages bodies to send.
topic:
Expand Down
4 changes: 2 additions & 2 deletions faststream/specification/asyncapi/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

def parse_handler_params(call: "CallModel", prefix: str = "") -> AnyDict:
"""Parses the handler parameters."""
model = getattr(call, "serializer", call).model # type: ignore[union-attr]
model = getattr(call, "serializer", call).model
assert model # nosec B101

body = get_model_schema(
create_model( # type: ignore[call-overload]
create_model(
model.__name__,
**{p.field_name: (p.field_type, p.default_value) for p in call.flat_params},
),
Expand Down
2 changes: 1 addition & 1 deletion faststream/specification/asyncapi/v2_6_0/facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def to_yaml(self) -> str:
return self.schema.to_yaml()

@property
def schema(self) -> ApplicationSchema: # type: ignore[override]
def schema(self) -> ApplicationSchema:
return get_app_schema(
self.broker,
title=self.title,
Expand Down
2 changes: 1 addition & 1 deletion faststream/specification/asyncapi/v3_0_0/facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def to_yaml(self) -> str:
return self.schema.to_yaml()

@property
def schema(self) -> ApplicationSchema: # type: ignore[override]
def schema(self) -> ApplicationSchema:
return get_app_schema(
self.broker,
title=self.title,
Expand Down
4 changes: 1 addition & 3 deletions faststream/specification/asyncapi/v3_0_0/schema/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from faststream.specification.asyncapi.v2_6_0.schema import ServerVariable

from .channels import Channel
from .components import Components
from .contact import Contact
Expand All @@ -9,7 +7,7 @@
from .message import CorrelationId, Message
from .operations import Operation
from .schema import ApplicationSchema
from .servers import Server
from .servers import Server, ServerVariable
from .tag import Tag
from .utils import Parameter, Reference

Expand Down
6 changes: 6 additions & 0 deletions faststream/specification/asyncapi/v3_0_0/schema/servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
SecurityRequirement = list[dict[str, list[str]]]


__all__ = (
"Server",
"ServerVariable",
)


class Server(BaseModel):
"""A class to represent a server.
Expand Down
5 changes: 4 additions & 1 deletion faststream/specification/base/specification.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from abc import abstractmethod
from typing import Any, Protocol, runtime_checkable

from .schema import BaseApplicationSchema


@runtime_checkable
class Specification(Protocol):
schema: BaseApplicationSchema
@property
@abstractmethod
def schema(self) -> BaseApplicationSchema: ...

def to_json(self) -> str:
return self.schema.to_json()
Expand Down
4 changes: 3 additions & 1 deletion tests/brokers/kafka/test_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ async def handler(m) -> None:
async with self.patch_broker(pub_broker) as br:
await br.start()

batch_record_metadata_future = await br.publish_batch(1, "hi", topic=queue, no_confirm=True)
batch_record_metadata_future = await br.publish_batch(
1, "hi", topic=queue, no_confirm=True
)
record_metadata_future = await br.publish("", topic=queue, no_confirm=True)
assert isinstance(batch_record_metadata_future, asyncio.Future)
assert isinstance(record_metadata_future, asyncio.Future)

0 comments on commit d958655

Please sign in to comment.