Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ let package = Package(
.product(name: "GRPC", package: "grpc-swift"),
.product(name: "SystemPackage", package: "swift-system"),
.product(name: "_NIOFileSystem", package: "swift-nio"),
"ContainerizationArchive",
"ContainerizationOCI",
"ContainerizationOS",
"ContainerizationIO",
Expand Down
217 changes: 194 additions & 23 deletions Sources/Containerization/LinuxContainer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//===----------------------------------------------------------------------===//

#if os(macOS)
import ContainerizationArchive
import ContainerizationError
import ContainerizationExtras
import ContainerizationOCI
Expand Down Expand Up @@ -123,6 +124,9 @@ public final class LinuxContainer: Container, Sendable {
// the host.
private let guestVsockPorts: Atomic<UInt32>

// Queue for copy IO.
private let copyQueue = DispatchQueue(label: "com.apple.containerization.copy")

private enum State: Sendable {
/// The container class has been created but no live resources are running.
case initialized
Expand Down Expand Up @@ -1043,52 +1047,219 @@ extension LinuxContainer {
/// Default chunk size for file transfers (1MiB).
public static let defaultCopyChunkSize = 1024 * 1024

/// Copy a file from the host into the container.
/// Copy a file or directory from the host into the container.
///
/// Data transfer happens over a dedicated vsock connection. For directories,
/// the source is archived as tar+gzip and streamed directly through vsock
/// without intermediate temp files.
public func copyIn(
from source: URL,
to destination: URL,
mode: UInt32 = 0o644,
createParents: Bool = true,
chunkSize: Int = defaultCopyChunkSize,
progress: ProgressHandler? = nil
chunkSize: Int = defaultCopyChunkSize
) async throws {
try await self.state.withLock {
let state = try $0.startedState("copyIn")

var isDirectory: ObjCBool = false
guard FileManager.default.fileExists(atPath: source.path, isDirectory: &isDirectory) else {
throw ContainerizationError(.notFound, message: "copyIn: source not found '\(source.path)'")
}
let isArchive = isDirectory.boolValue

let guestPath = URL(filePath: self.root).appending(path: destination.path)
try await state.vm.withAgent { agent in
try await agent.copyIn(
from: source,
to: guestPath,
mode: mode,
createParents: createParents,
chunkSize: chunkSize,
progress: progress
)
let port = self.hostVsockPorts.wrappingAdd(1, ordering: .relaxed).oldValue
let listener = try state.vm.listen(port)

try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
try await state.vm.withAgent { agent in
guard let vminitd = agent as? Vminitd else {
throw ContainerizationError(.unsupported, message: "copyIn requires Vminitd agent")
}
try await vminitd.copy(
direction: .copyIn,
guestPath: guestPath,
vsockPort: port,
mode: mode,
createParents: createParents,
isArchive: isArchive
)
}
}

group.addTask {
guard let conn = await listener.first(where: { _ in true }) else {
throw ContainerizationError(.internalError, message: "copyIn: vsock connection not established")
}
try listener.finish()

try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, any Error>) in
self.copyQueue.async {
do {
defer { conn.closeFile() }

if isArchive {
let writer = try ArchiveWriter(configuration: .init(format: .pax, filter: .gzip))
try writer.open(fileDescriptor: conn.fileDescriptor)
try writer.archiveDirectory(source)
try writer.finishEncoding()
} else {
let srcFd = open(source.path, O_RDONLY)
guard srcFd != -1 else {
throw ContainerizationError(
.internalError,
message: "copyIn: failed to open '\(source.path)': \(String(cString: strerror(errno)))"
)
}
defer { close(srcFd) }

var buf = [UInt8](repeating: 0, count: chunkSize)
while true {
let n = read(srcFd, &buf, buf.count)
if n == 0 { break }
guard n > 0 else {
throw ContainerizationError(
.internalError,
message: "copyIn: read error: \(String(cString: strerror(errno)))"
)
}
var written = 0
while written < n {
let w = buf.withUnsafeBytes { ptr in
write(conn.fileDescriptor, ptr.baseAddress! + written, n - written)
}
guard w > 0 else {
throw ContainerizationError(
.internalError,
message: "copyIn: vsock write error: \(String(cString: strerror(errno)))"
)
}
written += w
}
}
}
continuation.resume()
} catch {
continuation.resume(throwing: error)
}
}
}
}

try await group.waitForAll()
}
}
}

/// Copy a file from the container to the host.
/// Copy a file or directory from the container to the host.
///
/// Data transfer happens over a dedicated vsock connection. For directories,
/// the guest archives the source as tar+gzip and streams it directly through
/// vsock. The host extracts the archive without intermediate temp files.
public func copyOut(
from source: URL,
to destination: URL,
createParents: Bool = true,
chunkSize: Int = defaultCopyChunkSize,
progress: ProgressHandler? = nil
chunkSize: Int = defaultCopyChunkSize
) async throws {
try await self.state.withLock {
let state = try $0.startedState("copyOut")

if createParents {
let parentDir = destination.deletingLastPathComponent()
try FileManager.default.createDirectory(at: parentDir, withIntermediateDirectories: true)
}

let guestPath = URL(filePath: self.root).appending(path: source.path)
try await state.vm.withAgent { agent in
try await agent.copyOut(
from: guestPath,
to: destination,
createParents: createParents,
chunkSize: chunkSize,
progress: progress
)
let port = self.hostVsockPorts.wrappingAdd(1, ordering: .relaxed).oldValue
let listener = try state.vm.listen(port)

let (metadataStream, metadataCont) = AsyncStream.makeStream(of: Vminitd.CopyMetadata.self)

try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
try await state.vm.withAgent { agent in
guard let vminitd = agent as? Vminitd else {
throw ContainerizationError(.unsupported, message: "copyOut requires Vminitd agent")
}
try await vminitd.copy(
direction: .copyOut,
guestPath: guestPath,
vsockPort: port,
onMetadata: { meta in
metadataCont.yield(meta)
metadataCont.finish()
}
)
}
}

group.addTask {
guard let metadata = await metadataStream.first(where: { _ in true }) else {
throw ContainerizationError(.internalError, message: "copyOut: no metadata received")
}

guard let conn = await listener.first(where: { _ in true }) else {
throw ContainerizationError(.internalError, message: "copyOut: vsock connection not established")
}
try listener.finish()

try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, any Error>) in
self.copyQueue.async {
do {
defer { conn.closeFile() }

if metadata.isArchive {
try FileManager.default.createDirectory(at: destination, withIntermediateDirectories: true)
let fh = FileHandle(fileDescriptor: dup(conn.fileDescriptor), closeOnDealloc: true)
let reader = try ArchiveReader(format: .pax, filter: .gzip, fileHandle: fh)
_ = try reader.extractContents(to: destination)
} else {
let destFd = open(destination.path, O_WRONLY | O_CREAT | O_TRUNC, 0o644)
guard destFd != -1 else {
throw ContainerizationError(
.internalError,
message: "copyOut: failed to open '\(destination.path)': \(String(cString: strerror(errno)))"
)
}
defer { close(destFd) }

var buf = [UInt8](repeating: 0, count: chunkSize)
while true {
let n = read(conn.fileDescriptor, &buf, buf.count)
if n == 0 { break }
guard n > 0 else {
throw ContainerizationError(
.internalError,
message: "copyOut: vsock read error: \(String(cString: strerror(errno)))"
)
}
var written = 0
while written < n {
let w = buf.withUnsafeBytes { ptr in
write(destFd, ptr.baseAddress! + written, n - written)
}
guard w > 0 else {
throw ContainerizationError(
.internalError,
message: "copyOut: write error: \(String(cString: strerror(errno)))"
)
}
written += w
}
}
}
continuation.resume()
} catch {
continuation.resume(throwing: error)
}
}
}
}

try await group.waitForAll()
}
}
}
Expand Down
Loading